Установка зависимостей

In [1]:
!pip install findspark pyspark

Defaulting to user installation because normal site-packages is not writeable
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Collecting pyspark
  Downloading pyspark-3.5.6.tar.gz (317.4 MB)
[K     |███████████████████████████▊    | 274.6 MB 101.6 MB/s eta 0:00:01

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



[K     |████████████████████████████████| 317.4 MB 1.1 kB/s 
[?25hCollecting py4j==0.10.9.7
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[K     |████████████████████████████████| 200 kB 80.9 MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.6-py2.py3-none-any.whl size=317895818 sha256=8864011f58ef7a33de85bc859c6c17632c2add696c40481873e5d97ae9ec352a
  Stored in directory: /home/ubuntu/.cache/pip/wheels/59/22/7b/02883022a5a50e8e1a403bc1bacc4e9e8eb34b1699c09cd539
Successfully built pyspark
Installing collected packages: findspark, py4j, pyspark
Successfully installed findspark-2.0.1 py4j-0.10.9.7 pyspark-3.5.6


Конфигурация переменных для запуска

In [1]:
accesskey="accesskey"
secretkey="secretkey"

Инициализация сессии спарк

In [2]:
!pip install findspark pyspark

Defaulting to user installation because normal site-packages is not writeable


In [2]:
import findspark
findspark.init()

In [3]:
import os
# Читаем имя бакета из переменной окружения
SOURCE_BUCKET = os.getenv("SOURCE_BUCKET", "otus-bucket-default")
print(f"Using SOURCE_BUCKET={SOURCE_BUCKET}")

Using SOURCE_BUCKET=otus-bucket-b1g1p055ep53fi91o22r


In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("TransactionValidation") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.access.key", accesskey) \
    .config("spark.hadoop.fs.s3a.secret.key", secretkey) \
    .config("spark.hadoop.fs.s3a.endpoint", "storage.yandexcloud.net") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .getOrCreate()

Указание пути до данных:

In [5]:
data_path = "s3a://data_path/"

Чтение данных с облака:

In [6]:
df = spark.read.option("comment", "#").option("timestampFormat", "ууyy-MM-dd HH:mm:ss") \
.schema("transaction_id LONG, tx_datetime STRING, customer_id INT, terminal_id INT, tx_amount DOUBLE, tx_time_seconds LONG, tx_time_days LONG, tx_fraud INT, tx_fraud_scenario INT") \
.csv('/user/ubuntu/data')

In [8]:
# Сначала ОБЯЗАТЕЛЬНО уменьшаем партиции
df_optimized = df.coalesce(100)  # уменьшаем до 100

In [None]:
# Затем быстрый анализ на оптимизированном DF
duplicate_stats = df_optimized.groupBy("transaction_id") \
                             .count() \
                             .filter("count > 1") \
                             .orderBy("count", ascending=False)

print("Статистика дубликатов:")
duplicate_stats.show()

Статистика дубликатов:
+--------------+-----+
|transaction_id|count|
+--------------+-----+
|    1590802232|    2|
|      42955533|    2|
|    1024754394|    2|
|    1057587324|    2|
|    1692849430|    2|
|    1549598224|    2|
|    1553487795|    2|
|     723885933|    2|
|     659780807|    2|
|     430593489|    2|
|      26908859|    2|
|     439107232|    2|
|    1631616097|    2|
|    1073430309|    2|
|    1074286843|    2|
|     811472993|    2|
|     449649283|    2|
|     683075166|    2|
|     151617421|    2|
|     432881584|    2|
+--------------+-----+
only showing top 20 rows



In [9]:
# Очистка данных
df_clean = df_optimized.dropDuplicates(['transaction_id'])

In [10]:
# Валидация: все числовые поля должны быть >= 0
from pyspark.sql.functions import col
df_clean = df_clean.filter(
    (col("transaction_id") >= 0) &
    (col("customer_id") >= 0) &
    (col("terminal_id") >= 0) &
    (col("tx_amount") >= 0) &
    (col("tx_time_seconds") >= 0) &
    (col("tx_time_days") >= 0) &
    (col("tx_fraud") >= 0) &
    (col("tx_fraud_scenario") >= 0)
)

In [11]:
from pyspark.sql.functions import to_timestamp, col
# Преобразуем колонку в тип timestamp и одновременно отфильтруем некорректные значения
df_clean = df_clean.withColumn("tx_datetime", to_timestamp("tx_datetime", "yyyy-MM-dd HH:mm:ss")) \
       .filter(col("tx_datetime").isNotNull())

In [None]:
# Убираем выбросы (IQR)
def remove_outliers_iqr(df, columns):
    for col_name in columns:
        q1, q3 = df.approxQuantile(col_name, [0.25, 0.75], 0.05)
        iqr = q3 - q1
        lower_bound = q1 - 1.5 * iqr
        upper_bound = q3 + 1.5 * iqr
        df = df.filter((col(col_name) >= lower_bound) & (col(col_name) <= upper_bound))
    return df

df_clean = remove_outliers_iqr(df_clean, ["tx_amount", "tx_time_seconds", "tx_time_days"])

In [None]:
# Убираем выбросы (IQR)
def remove_outliers_iqr(df, columns):
    bounds = {}
    for col_name in columns:
        q1, q3 = df.approxQuantile(col_name, [0.25, 0.75], 0.05)
        iqr = q3 - q1
        lower_bound = q1 - 1.5 * iqr
        upper_bound = q3 + 1.5 * iqr
        bounds[col_name] = (lower_bound, upper_bound)

    # Применяем все фильтры одновременно
    filter_condition = None
    for col_name, (low, high) in bounds.items():
        cond = (col(col_name) >= low) & (col(col_name) <= high)
        filter_condition = cond if filter_condition is None else filter_condition & cond

    return df.filter(filter_condition)

df_clean = remove_outliers_iqr(df_clean, ["tx_amount", "tx_time_seconds", "tx_time_days"])


In [None]:
# Просмотр выборосов
df.exceptAll(remove_outliers_iqr(df,["customer_id", "terminal_id", "tx_time_seconds","transaction_id", "tx_time_days"]))

In [12]:
df_clean=df_clean.orderBy("transaction_id")

Запись в bucket:

In [18]:
data_write_path = f"s3a://{SOURCE_BUCKET}/spark_output"

df.limit(40) \
  .write \
  .mode("overwrite") \
  .option("header", "true") \
  .parquet(data_write_path)


Остановка сессии spark:

In [19]:
spark.stop()