In [1]:
#!pip install findspark

import findspark
findspark.init()
import warnings
import pyspark
from pyspark.sql.types import IntegerType, FloatType, DateType, StringType
import pyspark.sql.functions as F

In [2]:
warnings.filterwarnings('ignore')
spark_ui_port = 4040
app_name = "Otus_DZ_3"

In [3]:
spark = (
    pyspark.sql.SparkSession
        .builder
        .appName(app_name)
        .config("spark.driver.memory", "4g") 
        .config("spark.executor.memory", "4g")   # 8 ГБ для executor
        .config("spark.executor.cores", "2")    # 2 ядра на executor
        .config("spark.executor.instances", "6")   # 2 executor
        .getOrCreate()
)
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)  # to pretty print pyspark.DataFrame in jupyter

In [4]:
spark

In [5]:
data_text = spark.read.format("text").load("data/*.txt")

In [6]:
data_text.show(5)

+--------------------+
|               value|
+--------------------+
|# tranaction_id |...|
|0,2019-08-22 06:5...|
|1,2019-08-22 05:1...|
|2,2019-08-22 19:0...|
|3,2019-08-22 07:2...|
+--------------------+
only showing top 5 rows



In [7]:
# Извлечение заголовков
header = data_text.filter(data_text.value.startswith("#")).first()[0]
columns = header[2:].split(" | ")

In [8]:
columns

['tranaction_id',
 'tx_datetime',
 'customer_id',
 'terminal_id',
 'tx_amount',
 'tx_time_seconds',
 'tx_time_days',
 'tx_fraud',
 'tx_fraud_scenario']

In [9]:
# Фильтрация данных
data = data_text.filter(~data_text.value.startswith("#"))

# Преобразование данных в DataFrame
df = data.selectExpr(
    f"split(value, ',')[0] as {columns[0]}",
    f"split(value, ',')[1] as {columns[1]}",
    f"split(value, ',')[2] as {columns[2]}",
    f"split(value, ',')[3] as {columns[3]}",
    f"split(value, ',')[4] as {columns[4]}",
    f"split(value, ',')[5] as {columns[5]}",
    f"split(value, ',')[6] as {columns[6]}",
    f"split(value, ',')[7] as {columns[7]}",
    f"split(value, ',')[8] as {columns[8]}"
)

# Преобразование типов данных
df = df.withColumn(columns[0], df[columns[0]].cast(IntegerType())) \
       .withColumn(columns[1], df[columns[1]].cast(DateType())) \
       .withColumn(columns[2], df[columns[2]].cast(IntegerType())) \
       .withColumn(columns[3], df[columns[3]].cast(IntegerType())) \
       .withColumn(columns[4], df[columns[4]].cast(FloatType())) \
       .withColumn(columns[5], df[columns[5]].cast(IntegerType())) \
       .withColumn(columns[6], df[columns[6]].cast(IntegerType())) \
       .withColumn(columns[7], df[columns[7]].cast(IntegerType())) \
       .withColumn(columns[8], df[columns[8]].cast(IntegerType())) 

In [10]:
df.show()

+-------------+-----------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|tranaction_id|tx_datetime|customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|
+-------------+-----------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|            0| 2019-08-22|          0|        711|    70.91|          24663|           0|       0|                0|
|            1| 2019-08-22|          0|          0|    90.55|          18637|           0|       0|                0|
|            2| 2019-08-22|          0|        753|    35.38|          68733|           0|       0|                0|
|            3| 2019-08-22|          0|          0|    80.41|          26493|           0|       0|                0|
|            4| 2019-08-22|          1|        981|   102.83|          32777|           0|       0|                0|
|            5| 2019-08-22|          3|        205|     

In [11]:
dt = df.dtypes
for r in dt:
    print(f"{r[0]:>25}\t{r[1]}")

            tranaction_id	int
              tx_datetime	date
              customer_id	int
              terminal_id	int
                tx_amount	float
          tx_time_seconds	int
             tx_time_days	int
                 tx_fraud	int
        tx_fraud_scenario	int


In [12]:
# Подсчет пропусков
null_counts = df.agg(*[
    F.sum(F.col(c).isNull().cast("int")).alias(c) 
    for c in df.columns
]).collect()[0]

# Подсчет процента пропусков
total_rows = df.count()
null_percentages = {col: (null_counts[col] / total_rows * 100) for col in df.columns}

# Вывод результатов
for col in df.columns:
    print(f"{col}: {null_counts[col]} пропусков, {null_percentages[col]:.2f}%")

tranaction_id: 0 пропусков, 0.00%
tx_datetime: 0 пропусков, 0.00%
customer_id: 0 пропусков, 0.00%
terminal_id: 36643 пропусков, 0.00%
tx_amount: 0 пропусков, 0.00%
tx_time_seconds: 0 пропусков, 0.00%
tx_time_days: 0 пропусков, 0.00%
tx_fraud: 0 пропусков, 0.00%
tx_fraud_scenario: 0 пропусков, 0.00%


In [13]:
# dataset shape
print("Всего записей в датачете:", total_rows)

Всего записей в датачете: 1691815322


In [14]:
# предобработа данных

df = df.withColumn("tx_amount", F.col("tx_amount").cast(FloatType()))
df = df.na.drop("any")
df = df.withColumnRenamed('tranaction_id', 'transaction_id')
df = df.dropDuplicates(["transaction_id"])

In [None]:
# Сохранение данных
df.repartition(50).write.parquet("s3a://otus-bucket-b1g7tkp18f4l17b9fn7d/data_prepare/cleaned_fraud_dataset.parquet")

# Остановка Spark
# spark.stop()