# Анализ качества и очистка датасета мошеннических финансовых операций

Данный ноутбук содержит:
1. Инициализацию Spark
2. Загрузку и первичный анализ данных
3. Выявление проблем с качеством данных
4. Очистку данных
5. Сохранение очищенного датасета

In [2]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Инициализация Spark
spark = SparkSession.builder \
    .appName("TransactionDataCleaning") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

In [3]:
hdfs_path = "/user/ubuntu/data/2022-11-04.txt"

# Define schema matching your data structure
schema = StructType([
    StructField("transaction_id", LongType(), True),
    StructField("tx_datetime", StringType(), True),
    StructField("customer_id", LongType(), True),
    StructField("terminal_id", LongType(), True),
    StructField("tx_amount", DoubleType(), True),
    StructField("tx_time_seconds", LongType(), True),
    StructField("tx_time_days", LongType(), True),
    StructField("tx_fraud", IntegerType(), True),
    StructField("tx_fraud_scenario", IntegerType(), True)
])

# Initialize Spark
spark = SparkSession.builder \
    .appName("FraudDataAnalysis") \
    .master("local[*]") \
    .getOrCreate()

# Read the CSV file with specific options
df = spark.read.format("csv") \
    .option("header", False) \
    .option("delimiter", ",") \
    .schema(schema) \
    .load("/user/ubuntu/data/2022-11-04.txt")

# Verify the data loaded correctly
print("Schema:")
df.printSchema()

print("\nSample data:")
df.show(5, truncate=False)

# Basic statistics
print("\nRow count:", df.count())

# Sample analysis
print("\nTransactions per fraud status:")
df.groupBy("tx_fraud").count().show()


Schema:
root
 |-- transaction_id: long (nullable = true)
 |-- tx_datetime: string (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- terminal_id: long (nullable = true)
 |-- tx_amount: double (nullable = true)
 |-- tx_time_seconds: long (nullable = true)
 |-- tx_time_days: long (nullable = true)
 |-- tx_fraud: integer (nullable = true)
 |-- tx_fraud_scenario: integer (nullable = true)


Sample data:
+--------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|transaction_id|tx_datetime        |customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|
+--------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|null          |null               |null       |null       |null     |null           |null        |null    |null             |
|1832792610    |2022-11-04 14:22:18|0          |53         |63.58    |101139

## 1. Анализ пропущенных значений

In [None]:
def analyze_nulls(df):
    """
    Analyze null values in a Spark DataFrame and return statistics

    Args:
        df: Spark DataFrame to analyze
    Returns:
        Spark DataFrame with null analysis results
    """
    # Get total number of rows
    total_rows = df.count()

    # Create expressions for null analysis
    null_analytics = []
    for col_name in df.columns:
        null_analytics.extend([
            F.sum(F.when(F.col(col_name).isNull(), 1).otherwise(0)).alias(f"{col_name}_null_count"),
            F.round((F.sum(F.when(F.col(col_name).isNull(), 1).otherwise(0)) / total_rows) * 100, 2).alias(f"{col_name}_null_percentage")
        ])

    # Calculate all metrics in a single pass
    null_analysis = df.select(null_analytics)

    # Convert wide format to long format for better readability
    result_rows = []
    for col_name in df.columns:
        row = null_analysis.select(
            f"{col_name}_null_count",
            f"{col_name}_null_percentage"
        ).first()

        result_rows.append((
            col_name,
            int(row[f"{col_name}_null_count"]),
            float(row[f"{col_name}_null_percentage"])
        ))

    # Create result DataFrame
    result_schema = StructType([
        StructField("column_name", StringType(), False),
        StructField("null_count", IntegerType(), False),
        StructField("null_percentage", DoubleType(), False)
    ])

    result_df = spark.createDataFrame(result_rows, result_schema)

    # Show results sorted by null percentage in descending order
    result_df = result_df.orderBy(F.col("null_percentage").desc())

    # Display results
    print("\nNull Value Analysis:")
    print(f"Total rows in dataset: {total_rows:,}")
    print("\nNull statistics by column:")
    result_df.show(truncate=False)

    return result_df

null_analysis_df = analyze_nulls(df)



Null Value Analysis:
Total rows in dataset: 46,998,984

Null statistics by column:
+-----------------+----------+---------------+
|column_name      |null_count|null_percentage|
+-----------------+----------+---------------+
|tx_fraud         |1         |0.0            |
|transaction_id   |1         |0.0            |
|tx_amount        |1         |0.0            |
|tx_time_seconds  |1         |0.0            |
|tx_datetime      |1         |0.0            |
|tx_time_days     |1         |0.0            |
|customer_id      |1         |0.0            |
|terminal_id      |2299      |0.0            |
|tx_fraud_scenario|1         |0.0            |
+-----------------+----------+---------------+



## 2. Анализ дубликатов

In [None]:
# Count total duplicates (all columns)
total_duplicates = df.count() - df.dropDuplicates().count()
print(f"Total duplicate records found: {total_duplicates}")

# Analyze duplicates by transaction_id
transaction_duplicates = df.groupBy("transaction_id") \
    .agg(F.count("*").alias("count")) \
    .filter("count > 1")

# Get count of duplicate transactions
duplicate_count = transaction_duplicates.count()
print(f"Found {duplicate_count} transactions with duplicate IDs")

# Optional: Show detailed information about duplicates
if duplicate_count > 0:
    print("\nDetailed view of duplicate transactions:")
    transaction_duplicates.show(truncate=False)

    # Show full records for duplicated transaction_ids
    duplicate_ids = transaction_duplicates.select("transaction_id")
    df.join(duplicate_ids, "transaction_id") \
        .orderBy("transaction_id") \
        .show(truncate=False)

Total duplicate records found: 8
Found 8 transactions with duplicate IDs

Detailed view of duplicate transactions:
+--------------+-----+
|transaction_id|count|
+--------------+-----+
|1862497332    |2    |
|1862070073    |2    |
|1871745328    |2    |
|1843366603    |2    |
|1838630470    |2    |
|1839244215    |2    |
|1865868785    |2    |
|1849268479    |2    |
+--------------+-----+

+--------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|transaction_id|tx_datetime        |customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|
+--------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|1838630470    |2022-11-07 18:41:37|726029     |145        |19.78    |101414497      |1173        |0       |0                |
|1838630470    |2022-11-07 18:41:37|726029     |145        |19.78    |101414497      |1173        |0

## 3. Анализ выбросов в числовых полях

In [6]:
def analyze_outliers(df, numeric_cols):
    for col in numeric_cols:
        # Вычисляем квартили
        quantiles = df.approxQuantile(col, [0.25, 0.5, 0.75], 0.01)
        Q1, median, Q3 = quantiles
        IQR = Q3 - Q1

        # Определяем границы выбросов
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR

        # Подсчитываем выбросы
        outliers = df.filter(
            (F.col(col) < lower_bound) |
            (F.col(col) > upper_bound)
        ).count()

        print(f"\nКолонка: {col}")
        print(f"Количество выбросов: {outliers}")
        print(f"Процент выбросов: {(outliers/df.count())*100:.2f}%")

# Определяем числовые колонки
numeric_columns = [field.name for field in df.schema.fields
                  if isinstance(field.dataType, (IntegerType, DoubleType, FloatType))]

analyze_outliers(df, numeric_columns)


Колонка: tx_amount
Количество выбросов: 724473
Процент выбросов: 1.54%

Колонка: tx_fraud
Количество выбросов: 1406151
Процент выбросов: 2.99%

Колонка: tx_fraud_scenario
Количество выбросов: 1406151
Процент выбросов: 2.99%


## 4. Анализ категориальных данных

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

def analyze_categorical(df, categorical_cols):
    for col in categorical_cols:
        # Получаем топ-10 значений и их частоты
        value_counts = df.groupBy(col).count().orderBy('count', ascending=False)

        # Проверяем на пустые строки и специальные символы
        empty_strings = df.filter(F.trim(F.col(col)) == "").count()
        # Используем rlike вместо regexp
        special_chars = df.filter(F.col(col).rlike('[^a-zA-Z0-9\\s]')).count()

        print(f"\nКолонка: {col}")
        print(f"Количество уникальных значений: {value_counts.count()}")
        print(f"Пустые строки: {empty_strings}")
        print(f"Записи со спец. символами: {special_chars}")

        # Выводим топ-10 значений
        print("\nТоп-10 значений:")
        value_counts.show(10, truncate=False)

# Определяем категориальные колонки
categorical_columns = [field.name for field in df.schema.fields
                      if isinstance(field.dataType, StringType)]

analyze_categorical(df, categorical_columns)


Колонка: tx_datetime
Количество уникальных значений: 2585594
Пустые строки: 0
Записи со спец. символами: 46998983

Топ-10 значений:
+-------------------+-----+
|tx_datetime        |count|
+-------------------+-----+
|2022-11-13 14:02:36|62   |
|2022-11-19 10:52:56|61   |
|2022-11-10 12:09:02|59   |
|2022-11-29 12:28:29|59   |
|2022-11-09 12:13:32|58   |
|2022-11-23 11:12:13|58   |
|2022-11-05 10:16:19|57   |
|2022-11-21 10:54:05|57   |
|2022-11-06 10:22:27|57   |
|2022-11-24 11:47:16|57   |
+-------------------+-----+
only showing top 10 rows

