In [None]:
! pip install pyspark==3.0.3

In [1]:
import warnings
warnings.filterwarnings('ignore')
import pyspark
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col, sum

In [2]:
lOCAL_RUN = False

In [3]:
from pyspark.sql import SparkSession

# Если сессия уже существует, убиваем её
if 'spark' in locals() or 'spark' in globals():
    spark.stop()

if lOCAL_RUN:
    # для локального применения свежей версии
    spark = (
        SparkSession.builder
            .appName("Spark ML Research")
            # 1. Используем все ядра (16), но оставляем 1-2 для системы
            .master("local[14]") 

            # 2. Память Драйвера (в локальном режиме это основная настройка)
            # Выделяем 16-20 ГБ, чтобы спокойно делать .toPandas() и обучать модели
            .config("spark.driver.memory", "18g")

            # 3. Лимит на размер объектов, собираемых на драйвере (увеличиваем для тяжелых операций)
            .config("spark.driver.maxResultSize", "8g")

            # 4. Включаем современные оптимизации 2025 года (Adaptive Query Execution)
            .config("spark.sql.adaptive.enabled", "true")

            # 5. Оптимизация работы с памятью при передаче данных в Pandas
            .config("spark.sql.execution.arrow.pyspark.enabled", "true")

            .getOrCreate()
    )
else:
    spark = (
        SparkSession.builder
            .appName("Spark ML Clean Data")
            #.master(f"spark://{MASTER_CONN}") 
            #.config("spark.executor.instances", "3")
            #.config("spark.executor.cores", "3")
            #.config("spark.executor.memory", "10g")
            #.config("spark.executor.memoryOverhead", "1500m")
        
            .config("spark.driver.memory", "6g")
            .config("spark.driver.cores", "1")
            #.config("spark.driver.maxResultSize", "2g")

            #.config("spark.sql.shuffle.partitions", "150")
            #.config("spark.default.parallelism", "150")
            #.config("spark.sql.files.maxPartitionBytes", "128m")  # 1GB # 134217728 128Mb
          
            #.config("spark.memory.fraction", "0.8") 
            #.config("spark.network.timeout", "800s")
        
            .config("spark.sql.adaptive.enabled", "true")
            .config("spark.sql.execution.arrow.pyspark.enabled", "true")
            .getOrCreate()
    )
    
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)  # to pretty print pyspark.DataFrame in jupyter

sc = spark.sparkContext
sc.setLogLevel("ERROR")

In [4]:
print(spark.sparkContext.getConf().get("spark.driver.memory"))

6g


In [5]:
#path = r"/media/rk/500гб/Обучение/MLOps/16 Валидация данных/2022-11-04.txt"
#path = "s3a://otus-bucket2-b1gukkncvsp3tvci7gp3/*.txt"
path = "/user/ubuntu/data/2022-11-0*.txt"

df = spark.read.csv(
    path, 
    header=False, 
    comment='#', 
    inferSchema=True
)

columns = [
    "transaction_id", "tx_datetime", "customer_id", "terminal_id", 
    "tx_amount", "tx_time_seconds", "tx_time_days", "tx_fraud", "tx_fraud_scenario"
]

df = df.toDF(*columns)

df.limit(5)

transaction_id,tx_datetime,customer_id,terminal_id,tx_amount,tx_time_seconds,tx_time_days,tx_fraud,tx_fraud_scenario
1836815734,2022-11-06 08:58:16,569011,793,95.21,101293096,1172,0,0
1836815735,2022-11-06 02:12:13,569011,0,148.72,101268733,1172,0,0
1836815736,2022-11-06 16:01:39,569014,894,19.42,101318499,1172,0,0
1836815737,2022-11-06 04:03:46,569014,460,10.84,101275426,1172,0,0
1836815738,2022-11-06 13:45:52,569014,460,31.03,101310352,1172,0,0


In [6]:
df_fraud = df.filter((df["tx_fraud"] == 1))
df_fraud.limit(5)

transaction_id,tx_datetime,customer_id,terminal_id,tx_amount,tx_time_seconds,tx_time_days,tx_fraud,tx_fraud_scenario
1848882860,2022-11-14 09:03:26,270267,883,14.38,101984606,1180,1,2
1848882874,2022-11-14 07:51:01,270280,606,84.81,101980261,1180,1,2
1848882890,2022-11-14 14:15:50,270292,38,21.16,102003350,1180,1,2
1848882963,2022-11-14 17:19:39,270333,846,45.82,102014379,1180,1,2
1848882964,2022-11-14 20:31:11,270333,846,98.38,102025871,1180,1,2


# EDA

In [7]:
df.printSchema()

root
 |-- transaction_id: integer (nullable = true)
 |-- tx_datetime: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- terminal_id: string (nullable = true)
 |-- tx_amount: double (nullable = true)
 |-- tx_time_seconds: integer (nullable = true)
 |-- tx_time_days: integer (nullable = true)
 |-- tx_fraud: integer (nullable = true)
 |-- tx_fraud_scenario: integer (nullable = true)



In [8]:
#количество записей
df.count()

46998983

In [10]:
#выводим общие статистики по всему датасету
df.describe()

summary,transaction_id,tx_datetime,customer_id,terminal_id,tx_amount,tx_time_seconds,tx_time_days,tx_fraud,tx_fraud_scenario
count,46998983.0,46998983,46998983.0,46998898,46998983.0,46998983.0,46998983.0,46998983.0,46998983.0
mean,1856292096.624556,,500437.420257349,28085.477290004605,54.17185979556221,102384002.46815348,1184.5000234154004,0.0299187537738848,0.0603637104232659
stddev,13567435.381965691,,288558.3518378499,1571112.2498451143,40.93842760395922,748058.0447052687,8.655529240607235,0.1703632077769138,0.3460899863248696
min,1832792610.0,2022-11-04 00:00:00,-999999.0,0,0.0,101088000.0,1170.0,0.0,0.0
max,1879791584.0,2022-12-03 24:00:00,999999.0,Err,4754.5,103680000.0,1199.0,1.0,3.0


In [12]:
numeric_cols = ["tx_amount",'tx_time_seconds','tx_time_days']
category_cols = ['transaction_id','customer_id','terminal_id']
date_cols = ['tx_datetime']
targets = ['tx_fraud','tx_fraud_scenario']

In [13]:
for c in category_cols + targets:
    count_uniq = df.select(c).distinct().count()
    print(f"Колонка {c}: {count_uniq} уникальных значений")
    df.select(c).distinct().limit(5).show()

Колонка transaction_id: 46998975 уникальных значений
+--------------+
|transaction_id|
+--------------+
|    1832792740|
|    1832792774|
|    1832792785|
|    1832792881|
|    1832792987|
+--------------+

Колонка customer_id: 988635 уникальных значений
+-----------+
|customer_id|
+-----------+
|     270553|
|     270981|
|     271086|
|     271109|
|     271213|
+-----------+

Колонка terminal_id: 1007 уникальных значений
+-----------+
|terminal_id|
+-----------+
|        829|
|        467|
|        296|
|        675|
|        691|
+-----------+

Колонка tx_fraud: 2 уникальных значений
+--------+
|tx_fraud|
+--------+
|       1|
|       0|
+--------+

Колонка tx_fraud_scenario: 4 уникальных значений
+-----------------+
|tx_fraud_scenario|
+-----------------+
|                1|
|                0|
|                2|
|                3|
+-----------------+



## Дубли, пропуски данных

In [14]:
#дубли
duble_transactions = df.select('transaction_id')\
    .groupby('transaction_id')\
    .count()\
    .filter(col("count") > 1).toPandas()

In [15]:
duble_transactions.shape[0]

8

In [None]:
df.filter( (df["transaction_id"].isin(duble_transactions['transaction_id'].to_list()) ))

In [17]:
# Считаем количество null в каждой колонке
null_counts = df.select([
    sum((col(c).isNull()).cast("int")).alias(c) 
    for c in df.columns
])
null_counts.show()

+--------------+-----------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|transaction_id|tx_datetime|customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|
+--------------+-----------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|             0|          0|          0|         85|        0|              0|           0|       0|                0|
+--------------+-----------+-----------+-----------+---------+---------------+------------+--------+-----------------+



In [None]:
Есть проблемы:
    transaction_id - есть дубли (можно удалять, так как дубли полные)
    customer_id - есть отрицательные значения, по идее их быть не должно
    terminal_id - должно быть поле типа integer, но оно string, т.е. есть какие то странные значения, а так же null данные
    
Замечания:
    tx_amount - есть нулевые транзакции, пока будем считать, что это нормально
    tx_time_seconds - отсчёт от начала датасета в секундах
    tx_time_days - количество дней, прошедших с начала отсчёта 
    tx_fraud - 1 or 0
    tx_fraud_scenario - [0,1,2,3] 
    tx_datetime - надо проверять формат, возможно это местная дата с терминала (transaction_id  ),
        соответственно можно будет делать признаки по времени суток.
            

In [18]:
#смотрим некорректные записи customer_id
neg_customer_id = df.filter(df["customer_id"]< 0)
neg_customer_id.count()

96

In [19]:
neg_customer_df = neg_customer_id.toPandas()
neg_customer_df

Unnamed: 0,transaction_id,tx_datetime,customer_id,terminal_id,tx_amount,tx_time_seconds,tx_time_days,tx_fraud,tx_fraud_scenario
0,1834231246,2022-11-04 06:09:28,-999999,168,77.45,101110168,1170,0,0
1,1834432304,2022-11-05 19:13:44,-999999,951,53.97,101243624,1171,0,0
2,1834802463,2022-11-05 16:19:28,-999999,716,3.89,101233168,1171,0,0
3,1835071499,2022-11-05 08:27:38,-999999,433,66.67,101204858,1171,0,0
4,1835640422,2022-11-05 20:11:08,-999999,331,20.15,101247068,1171,0,0
...,...,...,...,...,...,...,...,...,...
91,1876582158,2022-12-01 05:09:29,-999999,325,8.57,103439369,1197,0,0
92,1877488970,2022-12-02 13:54:59,-999999,423,27.07,103557299,1198,0,0
93,1877731728,2022-12-02 06:03:14,-999999,968,7.83,103528994,1198,0,0
94,1878420467,2022-12-03 06:13:26,-999999,347,13.90,103616006,1199,0,0


In [20]:
neg_customer_df[['customer_id','tx_fraud']].nunique()

customer_id    1
tx_fraud       2
dtype: int64

In [26]:
#смотрим некорректные записи terminal_id
null_term = df.filter((df["terminal_id"].isNull()) | (df["terminal_id"].cast("int").isNull()))
nc = null_term.count()
nc

2298

In [27]:
null_term.sample(20*100/nc,3)

transaction_id,tx_datetime,customer_id,terminal_id,tx_amount,tx_time_seconds,tx_time_days,tx_fraud,tx_fraud_scenario
1832799975,2022-11-04 13:13:25,4776,Err,5.21,101135605,1170,1,2
1832820886,2022-11-04 08:46:02,18099,Err,96.56,101119562,1170,0,0
1832891819,2022-11-04 11:49:27,63351,Err,60.33,101130567,1170,0,0
1832913565,2022-11-04 01:56:23,77298,Err,31.01,101094983,1170,0,0
1832937980,2022-11-04 02:27:39,93024,Err,139.17,101096859,1170,0,0
1832939697,2022-11-04 02:00:37,94121,Err,73.07,101095237,1170,0,0
1832994312,2022-11-04 06:25:14,129119,Err,42.08,101111114,1170,0,0
1833017888,2022-11-04 09:53:51,144162,Err,34.84,101123631,1170,0,0
1833056393,2022-11-04 06:15:27,168962,Err,70.42,101110527,1170,0,0
1833121221,2022-11-04 05:44:18,210666,Err,97.69,101108658,1170,0,0


In [28]:
for c in ['customer_id','tx_fraud','tx_fraud_scenario']:
    count_uniq = null_term.select(c).distinct().count()
    print(f"Колонка {c}: {count_uniq} уникальных значений")
    null_term.select(c).distinct().limit(5).show()

Колонка customer_id: 2291 уникальных значений
+-----------+
|customer_id|
+-----------+
|     344076|
|     668308|
|     971787|
|     347393|
|     626423|
+-----------+

Колонка tx_fraud: 2 уникальных значений
+--------+
|tx_fraud|
+--------+
|       0|
|       1|
+--------+

Колонка tx_fraud_scenario: 4 уникальных значений
+-----------------+
|tx_fraud_scenario|
+-----------------+
|                3|
|                1|
|                0|
|                2|
+-----------------+



In [30]:
#смотрим некорректные записи tx_datetime
null_dt = df.filter((df["tx_datetime"].isNull()) | (df["tx_datetime"].cast("timestamp").isNull()))
nc = null_dt.count()
nc

92

In [31]:
null_dt.limit(10)

transaction_id,tx_datetime,customer_id,terminal_id,tx_amount,tx_time_seconds,tx_time_days,tx_fraud,tx_fraud_scenario
1832907535,2022-11-04 24:00:00,73447,894,22.43,101174400,1170,0,0
1833166751,2022-11-04 24:00:00,239687,544,56.1,101174400,1170,0,0
1833439501,2022-11-04 24:00:00,413494,114,113.01,101174400,1170,0,0
1833859309,2022-11-04 24:00:00,681479,664,94.01,101174400,1170,0,0
1834587620,2022-11-05 24:00:00,146776,919,20.26,101260800,1171,0,0
1841162239,2022-11-09 24:00:00,343429,686,26.6,101606400,1175,0,0
1841892068,2022-11-09 24:00:00,808595,435,23.74,101606400,1175,0,0
1842057515,2022-11-09 24:00:00,914596,151,78.57,101606400,1175,0,0
1842115840,2022-11-09 24:00:00,951653,145,81.74,101606400,1175,0,0
1842824163,2022-11-10 24:00:00,404520,591,28.26,101692800,1176,0,0


In [None]:
Итого, обрабатывать будем так:
    transaction_id    int :  удаляем дубли; int,>=0; delete row
    tx_datetime       string :  заменяем "24:00:00" -> "23:59:59"; try_cast('timestamp'); delete row ? 
    customer_id	      int :  int,>=0;  определяем в новый id (например -1)
    terminal_id       int :  int,>=0;  определяем в новый id (например -1)
    tx_amount	      double: notnull,>=0; delete row 
    tx_time_seconds   int :  int,>=0; определяем в новый id (например -1)
    tx_time_days	  int :  int,>=0; определяем в новый id (например -1)
    tx_fraud	      bool :  int,in [0,1];  delete row
    tx_fraud_scenario int :  int,>=0;  определяем в новый id (например -1)