#### Загрузка библиотек

In [1]:
!pip install findspark==2.0.1
!pip install pandas==2.0.0
!pip install numpy==1.24.2
!pip install catboost==1.2.2

Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable


## Разведочный анализ датасета

In [2]:
import os

import findspark


findspark.init()
findspark.find()


from pyspark.sql import SparkSession
from pyspark import SparkConf
from catboost import CatBoostClassifier

In [3]:
conf = (
    SparkConf().setMaster("yarn").setAppName("EDA")
        .set("spark.executor.memory", "2g")
        .set("spark.driver.memory", "4g")
        .set("spark.sql.execution.arrow.pyspark.enabled", "true")
)


spark = SparkSession.builder.config(conf=conf).getOrCreate()

Указываем типы данных для столбцов и корректные значения столбцов

In [4]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType

s3_filepath = "s3a://amamylov-mlops/2019-08-22.txt"

columns = ['transaction_id',
 'tx_datetime',
 'customer_id',
 'terminal_id',
 'tx_amount',
 'tx_time_seconds',
 'tx_time_days',
 'tx_fraud',
 'tx_fraud_scenario']

#задаем типы данных для фичей
schema = StructType([
    StructField("transaction_id", IntegerType(), True),
    StructField("tx_datetime", TimestampType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("terminal_id", IntegerType(), True),
    StructField("tx_amount", DoubleType(), True),
    StructField("tx_time_seconds", IntegerType(), True),
    StructField("tx_time_days", IntegerType(), True),
    StructField("tx_fraud", IntegerType(), True),
    StructField("tx_fraud_scenario", IntegerType(), True)
])

sdf = spark.read. \
        option("sep", ","). \
        option("comment", "#"). \
        schema(schema). \
        csv(s3_filepath, header=False).toDF(*columns)

sdf.show()

+--------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|transaction_id|        tx_datetime|customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|
+--------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|             0|2019-08-22 06:51:03|          0|        711|    70.91|          24663|           0|       0|                0|
|             1|2019-08-22 05:10:37|          0|          0|    90.55|          18637|           0|       0|                0|
|             2|2019-08-22 19:05:33|          0|        753|    35.38|          68733|           0|       0|                0|
|             3|2019-08-22 07:21:33|          0|          0|    80.41|          26493|           0|       0|                0|
|             4|2019-08-22 09:06:17|          1|        981|   102.83|          32777|           0|       0|   

In [5]:
sdf.printSchema()

root
 |-- transaction_id: integer (nullable = true)
 |-- tx_datetime: timestamp (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- terminal_id: integer (nullable = true)
 |-- tx_amount: double (nullable = true)
 |-- tx_time_seconds: integer (nullable = true)
 |-- tx_time_days: integer (nullable = true)
 |-- tx_fraud: integer (nullable = true)
 |-- tx_fraud_scenario: integer (nullable = true)



Давайте рассмотрим значения каждой колонки (фичи) в предложенной таблице данных:
1. transaction_id - Уникальный идентификатор транзакции. Это числовое значение, которое служит для идентификации каждой отдельной транзакции в датасете.
2. tx_datetime - Дата и время проведения транзакции. Это строка или объект datetime, который указывает точное время, когда транзакция была выполнена.
3. customer_id - Идентификатор клиента, который совершил транзакцию. Это числовой идентификатор, присвоенный каждому клиенту, и он используется для отслеживания транзакций, совершенных одним и тем же клиентом.
4. terminal_id - Идентификатор терминала, на котором была проведена транзакция. Это число, которое идентифицирует физическое устройство или место, где транзакция была выполнена (например, банкомат или терминал в магазине).
5. tx_amount - Сумма транзакции. Это числовое значение, которое указывает на общую сумму денег, переданных в ходе транзакции.
6. tx_time_seconds - Время транзакции в секундах с начала дня. Это числовое значение, показывающее, сколько секунд прошло с полуночи до момента совершения транзакции.
7. tx_time_days - Количество дней с начала наблюдения до даты транзакции. Это число указывает, на какой день после начала сбора данных произошла транзакция.
8. tx_fraud - Индикатор мошенничества. Это бинарное значение (0 или 1), где 1 указывает на то, что транзакция была мошеннической, а 0 — что транзакция была законной.
9. tx_fraud_scenario - Сценарий мошенничества. Это числовое значение, которое описывает тип мошенничества, если оно имело место (если tx_fraud = 1). Различные числа могут соответствовать разным сценариям мошенничества, например, кража карты, фишинг и т.д.

Сортируем датафрейм по времени, так как исходный не отсортирован

In [6]:
from pyspark.sql.functions import col

sdf = sdf.orderBy(col("tx_datetime"))
sdf.show()

+--------------+-----------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|transaction_id|tx_datetime|customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|
+--------------+-----------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|      15100839|       null|     643855|        287|    42.93|         864000|           9|       0|                0|
|      18342252|       null|     713099|        936|   101.43|        1036800|          11|       0|                0|
|      20002662|       null|     773093|        343|     9.71|        1123200|          12|       1|                2|
|      19354697|       null|     359525|        616|     9.58|        1123200|          12|       0|                0|
|      16438382|       null|     498350|        615|   140.26|         950400|          10|       0|                0|
|      22571066|       null|     412353|        

Проверяем количество null в столбце времени. Если их немного можем удалить.

In [7]:
from pyspark.sql.functions import col

total_count = sdf.count()
# Фильтрация строк, где tx_datetime является null
null_count = sdf.filter(col("tx_datetime").isNull()).count()
null_percentage = (null_count / total_count) * 100

print(f"Общее количество строк: {total_count}")
print(f"Количество строк с null в tx_datetime: {null_count}")
print(f"Процент строк с null в tx_datetime: {null_percentage:.2f}%")

# Удаление строк, где tx_datetime равно null
sdf = sdf.na.drop(subset=["tx_datetime"])

Общее количество строк: 46988418
Количество строк с null в tx_datetime: 100
Процент строк с null в tx_datetime: 0.00%


Проверяем transaction_id на уникальность. Если есть записи больше одной, то удаляем следующие за ней.

In [8]:
from pyspark.sql.functions import col

# Группировка по transaction_id и подсчет количества каждого идентификатора
duplicate_counts = sdf.groupBy("transaction_id").count()

# Фильтрация результатов, чтобы найти те, где количество больше 1
duplicates = duplicate_counts.filter(col("count") > 1).orderBy(col("count").desc())

duplicate_count = duplicates.count()
if duplicate_count > 0:
    print(f"Есть повторяющиеся transaction_id. Количество повторений: {duplicate_count}")
    duplicates.show()
else:
    print("Повторяющихся transaction_id нет.")
    
sdf = sdf.dropDuplicates(['transaction_id'])

Есть повторяющиеся transaction_id. Количество повторений: 181
+--------------+-----+
|transaction_id|count|
+--------------+-----+
|       5133913|    2|
|      13463161|    2|
|      11403446|    2|
|       7052242|    2|
|      17098584|    2|
|      17517624|    2|
|      35909184|    2|
|       7365855|    2|
|      15879594|    2|
|      13885299|    2|
|       5881543|    2|
|       5349542|    2|
|      35502726|    2|
|      38290377|    2|
|      24822180|    2|
|      18933927|    2|
|      13075936|    2|
|      24319383|    2|
|      40445669|    2|
|      23385554|    2|
+--------------+-----+
only showing top 20 rows



Проверим есть ли в строках полные дубликаты. Проверять будем по всем колонкам кроме времени

In [9]:
# Список всех столбцов, кроме 'tx_datetime'
columns_to_check = [col_name for col_name in sdf.columns if col_name != 'tx_datetime']
unique_df = sdf.dropDuplicates(columns_to_check)

print(f"Количество строк до удаления дубликатов: {sdf.count()}")
print(f"Количество строк после удаления дубликатов: {unique_df.count()}")

Количество строк до удаления дубликатов: 46988137
Количество строк после удаления дубликатов: 46988137


Дубликатов больше нет, все хорошо! Теперь проверим есть ли аномалии в соответствиях фрода и его сценария. Все легитимные операции должны быть со сценарием 0. А фродовые операциями со сценариями 1,2,3

In [10]:
from pyspark.sql.functions import col

# Группировка данных по столбцам tx_fraud и tx_fraud_scenario и подсчет количества записей для каждой комбинации
fraud_report = sdf.groupBy("tx_fraud", "tx_fraud_scenario").count()
fraud_report.show()

+--------+-----------------+--------+
|tx_fraud|tx_fraud_scenario|   count|
+--------+-----------------+--------+
|       1|                2| 2435433|
|       1|                1|   25653|
|       1|                3|   65895|
|       0|                0|44461156|
+--------+-----------------+--------+



Распределение меток правильное, все хорошо! Теперь проверим id, чтобы не было отрицательных значений.

In [11]:
id_columns = ['transaction_id', 'customer_id', 'terminal_id']

# Фильтрация строк с отрицательными значениями в любой из колонок
negative_values = sdf.filter(
    (col('transaction_id') < 0) |
    (col('customer_id') < 0) |
    (col('terminal_id') < 0)
)

if negative_values.count() > 0:
    print("Есть строки с отрицательными значениями ID.")
    negative_values.show()
else:
    print("Отрицательные значения в ID не найдены.")

Есть строки с отрицательными значениями ID.
+--------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|transaction_id|        tx_datetime|customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|
+--------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|       8652632|2019-08-27 03:52:56|    -999999|        561|    74.97|         445976|           5|       0|                0|
|      30318659|2019-09-10 09:28:44|    -999999|         81|    55.97|        1675724|          19|       0|                0|
|      13931349|2019-08-30 17:41:54|    -999999|        936|    88.35|         754914|           8|       0|                0|
|      11061761|2019-08-29 16:15:09|    -999999|        473|     47.9|         663309|           7|       0|                0|
|      44445421|2019-09-19 06:59:51|    -999999|        740|   183.

In [12]:
# Создание нового DataFrame без строк, где ID отрицательные
sdf = sdf.filter(
    (col('transaction_id') >= 0) &
    (col('customer_id') >= 0) &
    (col('terminal_id') >= 0)
)

Сохранение обработанного датафрейма в s3

In [13]:
spark_to_s3 = SparkSession.builder \
    .appName("Write DataFrame to S3") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.access.key", "ACCESS_KEY") \
    .config("spark.hadoop.fs.s3a.secret.key", "SECRET_KEY") \
    .config("spark.hadoop.fs.s3a.endpoint", "storage.yandexcloud.net") \
    .getOrCreate()

In [14]:
output_path = "s3a://amamylov-mlops/2019-08-22.parquet"
sdf.write.parquet(output_path, mode="overwrite")

In [15]:
spark_to_s3.stop()
spark.stop()