In [None]:
# ───────────────────────────────────────────────
# 0. Стартуем Spark с доступом к MinIO
# ───────────────────────────────────────────────
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import (
    StructType, StructField, IntegerType, StringType, DoubleType
)
import socket

POD_IP = socket.gethostbyname(socket.gethostname())
print("driver.host =", POD_IP)

spark = (
    SparkSession.builder
      .master("spark://10.233.51.227:7077")
      .appName("MinIO integration")
      .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.2")
      .config("spark.driver.host", POD_IP)
      # → S3A / MinIO
      .config("spark.hadoop.fs.s3a.endpoint",          "http://192.168.31.201:9000")
      .config("spark.hadoop.fs.s3a.access.key",        "admin")
      .config("spark.hadoop.fs.s3a.secret.key",        "password")
      .config("spark.hadoop.fs.s3a.path.style.access", "true")
      .config("spark.hadoop.fs.s3a.impl",              "org.apache.hadoop.fs.s3a.S3AFileSystem")
      .config("spark.hadoop.fs.s3a.aws.credentials.provider",
              "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
      .getOrCreate()
)

# ───────────────────────────────────────────────
# 1. Читаем ВСЕ .txt  (sep=',', header=False) + явная схема
# ───────────────────────────────────────────────
input_path = "s3a://otus/*.txt"

schema = StructType([
    StructField("transaction_id",    IntegerType(), True),
    StructField("tx_datetime",       StringType(),  True),
    StructField("customer_id",       IntegerType(), True),
    StructField("terminal_id",       IntegerType(), True),
    StructField("tx_amount",         DoubleType(),  True),
    StructField("tx_time_seconds",   IntegerType(), True),
    StructField("tx_time_days",      IntegerType(), True),
    StructField("tx_fraud",          IntegerType(), True),
    StructField("tx_fraud_scenario", IntegerType(), True),
])

raw_df = (spark.read
           .csv(input_path,
                header=False,       # шапка «плохая» – игнорируем
                sep=",",
                schema=schema,
                mode="DROPMALFORMED")  # пропускаем совсем битые строки
)

# Убираем первую строку‑шапку (transaction_id == null)
df = raw_df.filter(raw_df.transaction_id.isNotNull())

print("Схема после чтения:")
df.printSchema()
df.show(5)

# ───────────────────────────────────────────────
# 2. Drop NA Удаляем строки с пропущенными значениями
# ───────────────────────────────────────────────
rows_before = df.count()
df_clean    = df.dropna()
print(f"DropNA: {rows_before} → {df_clean.count()}")

# ───────────────────────────────────────────────
# 3. Удаляем выбросы (3 σ) по всем числовым колонкам
# ───────────────────────────────────────────────
from pyspark.sql.types import NumericType
from functools import reduce

numeric_cols = [f.name for f in df_clean.schema.fields
                if isinstance(f.dataType, NumericType)]

stats_row = (df_clean
             .select(*[F.mean(c).alias(f"{c}_mean") for c in numeric_cols],
                     *[F.stddev(c).alias(f"{c}_std") for c in numeric_cols])
             .collect()[0])

conds = []
for c in numeric_cols:
    mu = stats_row[f"{c}_mean"]; sigma = stats_row[f"{c}_std"]
    if sigma:  # если σ не None
        low, hi = mu - 3*sigma, mu + 3*sigma
        conds.append((F.col(c) >= low) & (F.col(c) <= hi))

df_no_outliers = df_clean.filter(reduce(lambda a, b: a & b, conds)
                                 if conds else F.lit(True))
print(f"После выбросов: {df_no_outliers.count()} строк")

# ───────────────────────────────────────────────
# 4. Логические фильтры
# ───────────────────────────────────────────────
df_valid = df_no_outliers
if "tx_amount" in df_valid.columns:
    df_valid = df_valid.filter(F.col("tx_amount") >= 0)
if "tx_time_seconds" in df_valid.columns:
    df_valid = df_valid.filter(
        (F.col("tx_time_seconds") >= 0) & (F.col("tx_time_seconds") <= 86400)
    )
if "customer_id" in df_valid.columns:
    df_valid = df_valid.filter(F.col("customer_id").isNotNull())
if "terminal_id" in df_valid.columns:
    df_valid = df_valid.filter(F.col("terminal_id").isNotNull())

print(f"Логические фильтры: {df_no_outliers.count()} → {df_valid.count()}")

# ───────────────────────────────────────────────
# 5. Сохраняем корректный Parquet
# ───────────────────────────────────────────────
output_path = "s3a://otus/clean/fraud_transactions_fixed_new.parquet"

(df_valid
   .write
   .mode("overwrite")
   .parquet(output_path))

print(f"✅ Новый Parquet сохранён: {output_path}")


In [None]:
# Старая версия

from pyspark.sql import SparkSession
import socket

# --- 0. Сессия Spark ------------------------------------------
POD_IP = socket.gethostbyname(socket.gethostname())
print("driver.host =", POD_IP)

spark = (
    SparkSession.builder
      .master("spark://10.233.51.227:7077")
      .appName("MinIO integration")
      .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:3.3.2")
      .config("spark.driver.host", POD_IP)

      # доступ к MinIO
      .config("spark.hadoop.fs.s3a.endpoint",  "http://192.168.31.201:9000")
      .config("spark.hadoop.fs.s3a.access.key","admin") # в будущем в отдельный файл или секрет сервиса
      .config("spark.hadoop.fs.s3a.secret.key","password")
      .config("spark.hadoop.fs.s3a.path.style.access", "true")
      .config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")

      # таймауты + провайдер
      .config("spark.hadoop.fs.s3a.threads.keepalivetime",        "60000")
      .config("spark.hadoop.fs.s3a.connection.establish.timeout", "30000")
      .config("spark.hadoop.fs.s3a.connection.timeout",           "200000")
      .config("spark.hadoop.fs.s3a.connection.request.timeout",   "200000")
      .config("spark.hadoop.fs.s3a.retry.sleep.max",              "30000")
      .config("spark.hadoop.fs.s3a.aws.credentials.provider",
              "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
      .config("spark.hadoop.fs.s3a.multipart.purge.age",          "86400")
      .getOrCreate()
)

# --- 1. Загружаем исходный датасет ------------------------------------------

input_path = "s3a://otus/*.txt"        # все 40 .txt в бакете otus
df = (spark.read
       .csv(input_path, header=True, inferSchema=True, sep=",")  # поправь sep, если нужно
)

# # --- 1. Загружаем ТОЛЬКО 5 файлов для теста ---------------------------------
# # input_path = "s3a://otus/*.txt"        # полный датасет — пока закомментирован
# test_files = [
#     "s3a://otus/2019-08-22.txt",
#     # "s3a://otus/2019-09-21.txt",
#     # "s3a://otus/2019-10-21.txt",
#     # "s3a://otus/2019-11-20.txt",
#     # "s3a://otus/2019-12-20.txt",
# ]
# df = (spark.read
#       .csv(test_files, header=True, inferSchema=True, sep=",")
# )

print("Схема датафрейма:")
df.printSchema()

# --- 2. Удаляем строки с пропущенными значениями ----------------------------
rows_before = df.count()
df_clean = df.dropna()
rows_after = df_clean.count()

print(f"Строк до очистки : {rows_before}")
print(f"Строк после dropna: {rows_after}")


# --- 3. Удаляем выбросы для КАЖДОЙ числовой колонки ------------------------

from pyspark.sql.types import NumericType
from functools import reduce
import pyspark.sql.functions as F

# а) собираем список числовых столбцов
numeric_cols = [f.name for f in df_clean.schema.fields
                if isinstance(f.dataType, NumericType)]

# б) собираем μ и σ сразу по всем числовым колонкам
stats_row = (df_clean
             .select(*[F.mean(c).alias(f"{c}_mean") for c in numeric_cols],
                     *[F.stddev(c).alias(f"{c}_std") for c in numeric_cols])
             .collect()[0])

# в) строим список условий вида  col BETWEEN (μ-3σ) AND (μ+3σ)
conditions = []
for c in numeric_cols:
    mu = stats_row[f"{c}_mean"]
    sigma = stats_row[f"{c}_std"]
    if sigma is None:          # бывает, если все значения одинаковые
        continue
    low, high = mu - 3 * sigma, mu + 3 * sigma
    conditions.append((F.col(c) >= low) & (F.col(c) <= high))

# г) применяем AND‑фильтр по всем условиям
df_no_outliers = df_clean.filter(reduce(lambda a, b: a & b, conditions)
                                 if conditions else F.lit(True))

print(f"Строк после фильтра выбросов: {df_no_outliers.count()}")

# --- 4. Логические фильтры "неадекватных" значений -------------------------
# Здесь мы оставляем строки, которые выполняют адекватные условия.
# Фильтры применяем ТОЛЬКО если колонка реально существует.

from pyspark.sql import functions as F

rows_before_logic = df_no_outliers.count()
df_valid = df_no_outliers   # начинаем с того, что уже отфильтровали по выбросам

# 4.1 Положительная сумма транзакции
if "tx_amount" in df_valid.columns:
    df_valid = df_valid.filter(F.col("tx_amount") >= 0)

# 4.2 Время в секундах должно лежать в пределах суток
if "tx_time_seconds" in df_valid.columns:
    df_valid = df_valid.filter(
        (F.col("tx_time_seconds") >= 0) & (F.col("tx_time_seconds") <= 86400)
    )

# 4.3 Не пустые идентификаторы клиента и терминала
if "customer_id" in df_valid.columns:
    df_valid = df_valid.filter(F.col("customer_id").isNotNull())
if "terminal_id" in df_valid.columns:
    df_valid = df_valid.filter(F.col("terminal_id").isNotNull())

rows_after_logic = df_valid.count()
print(f"Строк после логических фильтров: {rows_before_logic} → {rows_after_logic}")

# --- 5. Сохраняем очищенный датасет в формате Parquet ---------------------

output_path = "s3a://otus/clean/fraud_transactions_clean.parquet"

(
    df_valid
      .write
      .mode("overwrite")  # перезаписывает целиком; смени на "append", если нужно дописывать
      .parquet(output_path)
)

print(f"✅ Паркет сохранён: {output_path}")