In [1]:
# !pip install findspark pandas matplotlib

In [1]:
import findspark
findspark.init()

In [23]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

from pyspark.sql.functions import col, sum
from pyspark.sql.window import Window
from pyspark.sql.functions import last
from pyspark.sql import functions as F
from pyspark.sql.functions import col, coalesce, lag, lit, expr, when

In [3]:
spark = (SparkSession
    .builder
    .appName("Dataset Practice")
    .master("local[*]")
    .getOrCreate()
)

# Проверка версии Spark
print(f"Spark Version: {spark.version}")

Spark Version: 3.0.3


In [1]:
!hdfs dfs -ls data

Found 40 items
-rw-r--r--   1 ubuntu hadoop 2807409271 2025-04-05 13:32 data/2019-08-22.txt
-rw-r--r--   1 ubuntu hadoop 2854479008 2025-04-05 13:44 data/2019-09-21.txt
-rw-r--r--   1 ubuntu hadoop 2895460543 2025-04-05 13:47 data/2019-10-21.txt
-rw-r--r--   1 ubuntu hadoop 2939120942 2025-04-05 13:47 data/2019-11-20.txt
-rw-r--r--   1 ubuntu hadoop 2995462277 2025-04-05 13:42 data/2019-12-20.txt
-rw-r--r--   1 ubuntu hadoop 2994906767 2025-04-05 13:42 data/2020-01-19.txt
-rw-r--r--   1 ubuntu hadoop 2995431240 2025-04-05 13:30 data/2020-02-18.txt
-rw-r--r--   1 ubuntu hadoop 2995176166 2025-04-05 13:40 data/2020-03-19.txt
-rw-r--r--   1 ubuntu hadoop 2996034632 2025-04-05 13:46 data/2020-04-18.txt
-rw-r--r--   1 ubuntu hadoop 2995666965 2025-04-05 13:43 data/2020-05-18.txt
-rw-r--r--   1 ubuntu hadoop 2994699401 2025-04-05 13:31 data/2020-06-17.txt
-rw-r--r--   1 ubuntu hadoop 2995810010 2025-04-05 13:41 data/2020-07-17.txt
-rw-r--r--   1 ubuntu hadoop 2995995152 2025-04-05 13:46 data

In [5]:
!hadoop fs -cat data/2019-08-22.txt | head

# tranaction_id | tx_datetime | customer_id | terminal_id | tx_amount | tx_time_seconds | tx_time_days | tx_fraud | tx_fraud_scenario
0,2019-08-22 06:51:03,0,711,70.91,24663,0,0,0
1,2019-08-22 05:10:37,0,0,90.55,18637,0,0,0
2,2019-08-22 19:05:33,0,753,35.38,68733,0,0,0
3,2019-08-22 07:21:33,0,0,80.41,26493,0,0,0
4,2019-08-22 09:06:17,1,981,102.83,32777,0,0,0
5,2019-08-22 18:41:25,3,205,34.20,67285,0,0,0
6,2019-08-22 03:12:21,3,0,47.20,11541,0,0,0
7,2019-08-22 22:36:40,6,809,139.39,81400,0,0,0
8,2019-08-22 17:23:29,7,184,87.24,62609,0,0,0
cat: Unable to write to output stream.


In [6]:
hadoop_fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(
    spark._jsc.hadoopConfiguration()
)

path = "hdfs:/user/ubuntu/data"
file_statuses = hadoop_fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(path))
file_list = [file.getPath().getName() for file in file_statuses]
print(file_list)

['2019-08-22.txt', '2019-09-21.txt', '2019-10-21.txt', '2019-11-20.txt', '2019-12-20.txt', '2020-01-19.txt', '2020-02-18.txt', '2020-03-19.txt', '2020-04-18.txt', '2020-05-18.txt', '2020-06-17.txt', '2020-07-17.txt', '2020-08-16.txt', '2020-09-15.txt', '2020-10-15.txt', '2020-11-14.txt', '2020-12-14.txt', '2021-01-13.txt', '2021-02-12.txt', '2021-03-14.txt', '2021-04-13.txt', '2021-05-13.txt', '2021-06-12.txt', '2021-07-12.txt', '2021-08-11.txt', '2021-09-10.txt', '2021-10-10.txt', '2021-11-09.txt', '2021-12-09.txt', '2022-01-08.txt', '2022-02-07.txt', '2022-03-09.txt', '2022-04-08.txt', '2022-05-08.txt', '2022-06-07.txt', '2022-07-07.txt', '2022-08-06.txt', '2022-09-05.txt', '2022-10-05.txt', '2022-11-04.txt']


In [15]:
schema = StructType([
    StructField("tranaction_id", IntegerType()),
    StructField("tx_datetime", TimestampType()),
    StructField("customer_id", IntegerType()),
    StructField("terminal_id", IntegerType()),
    StructField("tx_amount", DoubleType()),
    StructField("tx_time_seconds", IntegerType()),
    StructField("tx_time_days", IntegerType()),
    StructField("tx_fraud", IntegerType()),
    StructField("tx_fraud_scenario", IntegerType())
])

df = spark.read \
    .option("sep", ",") \
    .option("header", "true") \
    .option("comment", "#") \
    .schema(schema) \
    .csv(f"hdfs:/user/ubuntu/data/{file_list[1]}")

In [16]:
df.printSchema()
df.show(5, truncate=False)

root
 |-- tranaction_id: integer (nullable = true)
 |-- tx_datetime: timestamp (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- terminal_id: integer (nullable = true)
 |-- tx_amount: double (nullable = true)
 |-- tx_time_seconds: integer (nullable = true)
 |-- tx_time_days: integer (nullable = true)
 |-- tx_fraud: integer (nullable = true)
 |-- tx_fraud_scenario: integer (nullable = true)

+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|tranaction_id|tx_datetime        |customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|
+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|46988238     |2019-09-21 19:33:01|2          |660        |22.15    |2662381        |30          |0       |0                |
|46988239     |2019-09-21 18:06:19|3          |732        |36.83    |2657179        |

### Проверка пропущенных значений

In [17]:
missing_values = df.select([
    sum(col(c).isNull().cast("int")).alias(c) 
    for c in df.columns
])
missing_values.show()

+-------------+-----------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|tranaction_id|tx_datetime|customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|
+-------------+-----------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|            0|        103|          0|        778|        0|              0|           0|       0|                0|
+-------------+-----------+-----------+-----------+---------+---------------+------------+--------+-----------------+



In [28]:
print("\n=== Статистика пропущенных значений до обработки ===")
missing_stats = df.select([
    F.sum(F.col(c).isNull().cast("int")).alias(c) 
    for c in df.columns
]).collect()[0]

for col_name, null_count in zip(df.columns, missing_stats):
    print(f"{col_name}: {null_count} пропущенных значений ({null_count/df.count()*100:.2f}%)")


=== Статистика пропущенных значений до обработки ===
tranaction_id: 0 пропущенных значений (0.00%)
tx_datetime: 103 пропущенных значений (0.00%)
customer_id: 0 пропущенных значений (0.00%)
terminal_id: 778 пропущенных значений (0.00%)
tx_amount: 0 пропущенных значений (0.00%)
tx_time_seconds: 0 пропущенных значений (0.00%)
tx_time_days: 0 пропущенных значений (0.00%)
tx_fraud: 0 пропущенных значений (0.00%)
tx_fraud_scenario: 0 пропущенных значений (0.00%)


In [11]:
# window = Window.partitionBy("customer_id").orderBy("tranaction_id")
# clean_df = df.withColumn("tx_datetime", last("tx_datetime", ignorenulls=True).over(window))

In [12]:
# missing_values_clean = clean_df.select([
#     sum(col(c).isNull().cast("int")).alias(c) 
#     for c in clean_df.columns
# ])
# missing_values_clean.show()

In [24]:
window_spec = Window.orderBy("tranaction_id")

# Обрабатываем пропуски
df_clean = (df
    # Флаг пропущенных дат
    .withColumn("is_datetime_imputed", 
                col("tx_datetime").isNull())
    
    # Заполняем пропуски в дате: предыдущая дата + 1 секунда
    .withColumn("tx_datetime",
                coalesce(
                    col("tx_datetime"),
                    lag("tx_datetime", 1).over(window_spec) + expr("INTERVAL 1 SECOND")
                ))
    
    # Флаг пропущенных terminal_id
    .withColumn("is_terminal_imputed",
                col("terminal_id").isNull())
    
    # Заполняем пропуски в terminal_id специальным значением
    .withColumn("terminal_id",
                coalesce(col("terminal_id"), lit(-999)))
    
    # Флаг пропущенных terminal_id
    .withColumn("is_terminal_imputed",
                col("terminal_id").isNull())
    
    # Заполняем пропуски в customer_id специальным значением
    .withColumn("customer_id",
                coalesce(col("customer_id"), lit(-999)))
    
    # Удаляем строки, где не удалось заполнить дату (нет предыдущего значения)
    .filter(col("tx_datetime").isNotNull())
)

df_clean.show(5, truncate=False)

+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+-------------------+-------------------+-------------------+------------------+
|tranaction_id|tx_datetime        |customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|is_datetime_imputed|tx_datetime_filled |is_terminal_imputed|terminal_id_filled|
+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+-------------------+-------------------+-------------------+------------------+
|46988238     |2019-09-21 19:33:01|2          |660        |22.15    |2662381        |30          |0       |0                |false              |2019-09-21 19:33:01|false              |660               |
|46988239     |2019-09-21 18:06:19|3          |732        |36.83    |2657179        |30          |0       |0                |false              |2019-09-21 18:06:19|false          

### Проверка дубликатов

In [14]:
duplicate_rows = df.groupBy(df.columns).count().filter("count > 1")

print(f"Найдено полных дубликатов: {duplicate_rows.count()}")
duplicate_rows.show(5)

Найдено полных дубликатов: 181
+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+-----+
|tranaction_id|        tx_datetime|customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|count|
+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+-----+
|     35909184|2019-09-13 20:10:14|     927611|        298|    78.84|        1973414|          22|       1|                2|    2|
|     34610209|2019-09-13 19:25:18|      98166|        249|     82.9|        1970718|          22|       0|                0|    2|
|     43227844|2019-09-18 14:29:41|     599043|        150|    62.01|        2384981|          27|       0|                0|    2|
|     15974647|2019-09-01 09:06:30|     202272|        248|     79.0|         896790|          10|       0|                0|    2|
|     24595200|2019-09-06 08:27:15|     70312

In [15]:
# df = clean_df.dropDuplicates()

In [16]:
# duplicate_rows = df.groupBy(df.columns).count().filter("count > 1")

# print(f"Найдено полных дубликатов: {duplicate_rows.count()}")
# duplicate_rows.show(5)

In [17]:
# Отрицательные суммы
df.filter(col("tx_amount") < 0).count()

0

In [22]:
# Пропуски в ID
window = Window.orderBy("tranaction_id")
df.withColumn("id_diff", col("tranaction_id") - lag("tranaction_id", 1).over(window)) \
  .filter(col("id_diff") > 1).count()

0

In [None]:
from pyspark.sql.functions import isnan, when, count, col

# Проверка колонок, которые должны быть числами, но содержат текст
df.select([count(when(col(c).cast("float").isNull() & col(c).isNotNull(), c)).alias(c) 
          for c in df.columns]).show()

In [9]:
from pyspark.sql.functions import mean, stddev

stats = df.select(
     mean("tx_amount").alias("avg"),
     stddev("tx_amount").alias("std")
).collect()[0]

upper_bound = stats["avg"] + 3*stats["std"]
df.filter(col("tx_amount") > upper_bound).show()

+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|tranaction_id|        tx_datetime|customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|
+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|          328|2019-08-22 17:40:13|        226|        101|   187.29|          63613|           0|       0|                0|
|          367|2019-08-22 14:43:37|        256|        138|   262.02|          53017|           0|       1|                3|
|          589|2019-08-22 06:47:40|        383|        312|   186.88|          24460|           0|       0|                0|
|          630|2019-08-22 03:19:16|        410|          0|   203.01|          11956|           0|       0|                0|
|          911|2019-08-22 15:52:24|        577|          0|    183.8|          57144|           0|       0|           

In [9]:
df.withColumn("calc_days", (col("tx_time_seconds")/86400).cast("int")) \
  .filter(col("calc_days") != col("tx_time_days")).show()

+-------------+-----------+-----------+-----------+---------+---------------+------------+--------+-----------------+---------+
|tranaction_id|tx_datetime|customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|calc_days|
+-------------+-----------+-----------+-----------+---------+---------------+------------+--------+-----------------+---------+
|       933817|       null|     597125|        611|    62.83|          86400|           0|       0|                0|        1|
|      1164488|       null|     743659|          0|     7.25|          86400|           0|       0|                0|        1|
|      1205236|       null|     769896|         44|     15.1|          86400|           0|       0|                0|        1|
|      1543099|       null|     985197|        967|    85.18|          86400|           0|       0|                0|        1|
|      1670385|       null|      66533|        956|    21.39|         172800|           1|       0|     

In [10]:
df.filter(col("customer_id") == 0).count()

65

In [11]:
df.filter(col("terminal_id") == 0).count()

2041671

In [None]:
# Расчет границ через IQR
q1, q3 = df.approxQuantile("tx_amount", [0.25, 0.75], 0.01)
iqr = q3 - q1
lower_bound = q1 - 1.5*iqr
upper_bound = q3 + 1.5*iqr

# Замена выбросов на граничные значения
df_clean = df.withColumn(
    "tx_amount",
    when(col("tx_amount") < lower_bound, lower_bound)
    .when(col("tx_amount") > upper_bound, upper_bound)
    .otherwise(col("tx_amount"))
)

In [27]:
df_clean.show(5, truncate=False)

+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+---------------+
|tranaction_id|tx_datetime        |customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|tx_amount_clean|
+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+---------------+
|46988238     |2019-09-21 19:33:01|2          |660        |22.15    |2662381        |30          |0       |0                |22.15          |
|46988239     |2019-09-21 18:06:19|3          |732        |36.83    |2657179        |30          |0       |0                |36.83          |
|46988240     |2019-09-21 16:56:01|10         |663        |19.3     |2652961        |30          |0       |0                |19.3           |
|46988241     |2019-09-21 05:34:26|10         |145        |106.51   |2612066        |30          |0       |0                |106.51         |
|46988