In [None]:
import os
from pyspark.sql import SparkSession, functions as F, types as T
import subprocess

#Конфигурация 
HDFS_NAMENODE = "rc1a-dataproc-m-j8aqvcff64464qn8.mdb.yandexcloud.net:8020"
HDFS_INPUT_DIR = "/user/ubuntu/data"
HDFS_OUTPUT_DIR = "/user/ubuntu/clean/transactions_parquet"
S3_OUTPUT_PATH = "s3a://otus-bucket-20251311-b1ghiv85eubrk846dis6/clean/transactions_parquet"
S3_ACCESS_KEY = ""
S3_SECRET_KEY = ""

# Инициализация Spark с подавлением спилл предупреждений
spark = SparkSession.builder \
    .appName("clean_transactions") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY) \
    .config("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY) \
    .config("spark.hadoop.fs.s3a.endpoint", "storage.yandexcloud.net") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "400") \
    .config("spark.driver.memory", "8g") \
    .config("spark.driver.maxResultSize", "1g") \
    .config("spark.executor.memory", "10g") \
    .config("spark.executor.memoryOverhead", "3g") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.memory.storageFraction", "0.1") \
    .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "64MB") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .config("spark.sql.adaptive.localShuffleReader.enabled", "true") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "false") \
    .config("spark.sql.adaptive.maxRecordsPerFile", "500000") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryoserializer.buffer.max", "1024m") \
    .config("spark.sql.sources.bucketing.enabled", "false") \
    .config("spark.sql.execution.sortBeforeRepartition", "false") \
    .config("spark.sql.execution.useObjectHashAggregateExec", "false") \
    .config("spark.sql.execution.fastHashAggregateRowMaxCapacityBit", "16") \
    .getOrCreate()


spark.sparkContext.setLogLevel("ERROR")

# Схема данных
schema = T.StructType([
    T.StructField("transaction_id", T.IntegerType(), nullable=False),
    T.StructField("tx_datetime", T.StringType(), nullable=False),
    T.StructField("customer_id", T.IntegerType(), nullable=False),
    T.StructField("terminal_id", T.IntegerType(), nullable=False),
    T.StructField("tx_amount", T.DoubleType(), nullable=False),
    T.StructField("tx_time_seconds", T.IntegerType(), nullable=False),
    T.StructField("tx_time_days", T.IntegerType(), nullable=False),
    T.StructField("tx_fraud", T.IntegerType(), nullable=False),
    T.StructField("tx_fraud_scenario", T.IntegerType(), nullable=False),
])

# Получаем список файлов в HDFS INPUT_DIR — будем обрабатывать файлы по одному
ls_res = subprocess.run(["hdfs", "dfs", "-ls", HDFS_INPUT_DIR], capture_output=True, text=True)
input_files = []
if ls_res.returncode == 0:
    for ln in ls_res.stdout.strip().splitlines():
        parts = ln.split()
        if len(parts) >= 8:
            p = parts[-1]
            # normalize path to absolute from HDFS NN if needed
            if not p.startswith("/") and not p.startswith("hdfs://"):
                p = os.path.join(HDFS_INPUT_DIR, p)
            input_files.append(p)
else:
    print('Не удалось получить список файлов в HDFS INPUT_DIR:', ls_res.stderr[:200])

print(f"→ Найдено файлов для обработки: {len(input_files)}")


→ Найдено файлов для обработки: 1


In [2]:
# 2. Анализ и очистка данных
print(" АНАЛИЗ ПРОБЛЕМ КАЧЕСТВА ")

# Функция очистки, применяемая к DataFrame
def clean_df(df_in):
    df_cleaned = df_in.withColumn(
        'tx_datetime',
        F.to_timestamp(
            F.regexp_replace('tx_datetime', ' 24:00:00$', ' 23:59:59'),
            'yyyy-MM-dd HH:mm:ss'
        )
    ).filter(
        (F.col('tx_fraud').isNotNull()) &
        (F.col('tx_amount') > 0) & (F.col('tx_amount') < 1e7) &
        (F.col('tx_time_seconds') >= 0) &
        (F.col('tx_time_days') >= 0) &
        (F.col('tx_datetime').isNotNull())
    ).dropDuplicates(['transaction_id'])
    return df_cleaned


print('Функция очистки готова: clean_df(df)')


 АНАЛИЗ ПРОБЛЕМ КАЧЕСТВА 
Функция очистки готова: clean_df(df)


In [3]:
# === 3. Сохранение в HDFS: последовательная обработка исходных файлов и удаление ===
import subprocess

print(f"\n=== СОХРАНЕНИЕ (HDFS, per-file processing) ===")

print(f"СТАТИСТИКА ПРЕДВАРИТЕЛЬНАЯ:")
print(f"  Файлов для обработки: {len(input_files)}")

hdfs_output_full = f"hdfs://{HDFS_NAMENODE}{HDFS_OUTPUT_DIR}"

# Принудительно очищаем выходной каталог, если нужно (чтобы итог был заменой)
print('→ Удаляем старую директорию результата (если есть)')
rm_out = subprocess.run(["hdfs", "dfs", "-rm", "-r", "-skipTrash", HDFS_OUTPUT_DIR], capture_output=True, text=True)
if rm_out.returncode == 0:
    print('✓ Удалена старая выходная директория HDFS (или её не было)')
else:
    print(f'Отклик удаления выходной директории (может быть не проблема): {rm_out.stderr[:200]}')

# Счетчики для итоговой статистики
processed_files = 0
total_rows_raw = 0
total_rows_clean = 0
total_rows_removed = 0

for src_path in input_files:
    try:
        print(f"\n→ Обработка файла: {src_path}")
        # Формируем path для чтения Spark (с hdfs://NAMENODE/... если путь начинается с /)
        if src_path.startswith('/'):
            spark_path = f"hdfs://{HDFS_NAMENODE}{src_path}"
        elif src_path.startswith('hdfs://'):
            spark_path = src_path
        else:
            spark_path = src_path

        # Читаем файл
        df_file = spark.read.option('header', 'false').option('sep', ',').schema(schema).csv(spark_path)
        raw_cnt = df_file.count()
        print(f'  raw rows: {raw_cnt:,}')

        # Очищаем
        df_file_clean = clean_df(df_file)
        clean_cnt = df_file_clean.count()
        removed_cnt = raw_cnt - clean_cnt
        print(f'  clean rows: {clean_cnt:,} (removed {removed_cnt:,})')

        # Записываем в выходной HDFS каталог (append) — последовательная сборка финального набора (замена после предварительного удаления выходной директории)
        try:
            df_file_clean.coalesce(10).write.format('parquet').mode('append').option('compression', 'snappy').save(hdfs_output_full)
            print(' Запись в HDFS выполнена (append)')
        except Exception as we:
            print('  Ошибка записи в HDFS:', str(we)[:400])
            raise

        # Удаляем исходный файл из INPUT_DIR только после успешной записи
        rm = subprocess.run(["hdfs", "dfs", "-rm", src_path], capture_output=True, text=True)
        if rm.returncode == 0:
            print(f'  Удалён исходный файл: {src_path}')
        else:
            print(f' Не удалось удалить исходный файл: {src_path} — stderr: {rm.stderr[:200]}')

        # Обновляем глобальные счетчики
        processed_files += 1
        total_rows_raw += raw_cnt
        total_rows_clean += clean_cnt
        total_rows_removed += removed_cnt

    except Exception as ex:
        print(f'Ошибка при обработке файла {src_path}: {str(ex)[:400]}')
        print('  → Пропускаем файл (не удаляем исходный) и переходим к следующему')
        continue

# Итог
print('\n=== ИТОГ ===')
print(f'  Обработано файлов: {processed_files}/{len(input_files)}')
print(f'  Исходных строк: {total_rows_raw:,}')
print(f'  Очищенных строк: {total_rows_clean:,}')
print(f'  Удалено строк: {total_rows_removed:,}')



=== СОХРАНЕНИЕ (HDFS, per-file processing) ===
СТАТИСТИКА ПРЕДВАРИТЕЛЬНАЯ:
  Файлов для обработки: 40
→ Удаляем старую директорию результата (если есть)
Отклик удаления выходной директории (может быть не проблема): rm: `/user/ubuntu/clean/transactions_parquet': No such file or directory


→ Обработка файла: /user/ubuntu/data/2019-08-22.txt
Отклик удаления выходной директории (может быть не проблема): rm: `/user/ubuntu/clean/transactions_parquet': No such file or directory


→ Обработка файла: /user/ubuntu/data/2019-08-22.txt


                                                                                

  raw rows: 46,988,419


                                                                                

  clean rows: 46,987,353 (removed 1,066)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2019-08-22.txt

→ Обработка файла: /user/ubuntu/data/2019-09-21.txt
  Удалён исходный файл: /user/ubuntu/data/2019-08-22.txt

→ Обработка файла: /user/ubuntu/data/2019-09-21.txt


                                                                                

  raw rows: 46,994,587


                                                                                

  clean rows: 46,993,666 (removed 921)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2019-09-21.txt

→ Обработка файла: /user/ubuntu/data/2019-10-21.txt
  Удалён исходный файл: /user/ubuntu/data/2019-09-21.txt

→ Обработка файла: /user/ubuntu/data/2019-10-21.txt


                                                                                

  raw rows: 46,994,433


                                                                                

  clean rows: 46,993,489 (removed 944)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2019-10-21.txt

→ Обработка файла: /user/ubuntu/data/2019-11-20.txt
  Удалён исходный файл: /user/ubuntu/data/2019-10-21.txt

→ Обработка файла: /user/ubuntu/data/2019-11-20.txt


                                                                                

  raw rows: 46,992,240


                                                                                

  clean rows: 46,991,258 (removed 982)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2019-11-20.txt

→ Обработка файла: /user/ubuntu/data/2019-12-20.txt
  Удалён исходный файл: /user/ubuntu/data/2019-11-20.txt

→ Обработка файла: /user/ubuntu/data/2019-12-20.txt


                                                                                

  raw rows: 46,994,938


                                                                                

  clean rows: 46,994,074 (removed 864)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2019-12-20.txt

→ Обработка файла: /user/ubuntu/data/2020-01-19.txt
  Удалён исходный файл: /user/ubuntu/data/2019-12-20.txt

→ Обработка файла: /user/ubuntu/data/2020-01-19.txt


                                                                                

  raw rows: 46,986,198


                                                                                

  clean rows: 46,985,282 (removed 916)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2020-01-19.txt

→ Обработка файла: /user/ubuntu/data/2020-02-18.txt
  Удалён исходный файл: /user/ubuntu/data/2020-01-19.txt

→ Обработка файла: /user/ubuntu/data/2020-02-18.txt


                                                                                

  raw rows: 46,994,272


                                                                                

  clean rows: 46,993,408 (removed 864)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2020-02-18.txt

→ Обработка файла: /user/ubuntu/data/2020-03-19.txt
  Удалён исходный файл: /user/ubuntu/data/2020-02-18.txt

→ Обработка файла: /user/ubuntu/data/2020-03-19.txt


                                                                                

  raw rows: 46,990,425


                                                                                

  clean rows: 46,989,546 (removed 879)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2020-03-19.txt

→ Обработка файла: /user/ubuntu/data/2020-04-18.txt
  Удалён исходный файл: /user/ubuntu/data/2020-03-19.txt

→ Обработка файла: /user/ubuntu/data/2020-04-18.txt


                                                                                

  raw rows: 47,001,236


                                                                                

  clean rows: 47,000,295 (removed 941)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2020-04-18.txt

→ Обработка файла: /user/ubuntu/data/2020-05-18.txt
  Удалён исходный файл: /user/ubuntu/data/2020-04-18.txt

→ Обработка файла: /user/ubuntu/data/2020-05-18.txt


                                                                                

  raw rows: 46,998,003


                                                                                

  clean rows: 46,996,951 (removed 1,052)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2020-05-18.txt

→ Обработка файла: /user/ubuntu/data/2020-06-17.txt
  Удалён исходный файл: /user/ubuntu/data/2020-05-18.txt

→ Обработка файла: /user/ubuntu/data/2020-06-17.txt


                                                                                

  raw rows: 46,983,064


                                                                                

  clean rows: 46,982,136 (removed 928)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2020-06-17.txt

→ Обработка файла: /user/ubuntu/data/2020-07-17.txt
  Удалён исходный файл: /user/ubuntu/data/2020-06-17.txt

→ Обработка файла: /user/ubuntu/data/2020-07-17.txt


                                                                                

  raw rows: 47,000,293


                                                                                

  clean rows: 46,999,424 (removed 869)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2020-07-17.txt

→ Обработка файла: /user/ubuntu/data/2020-08-16.txt
  Удалён исходный файл: /user/ubuntu/data/2020-07-17.txt

→ Обработка файла: /user/ubuntu/data/2020-08-16.txt


                                                                                

  raw rows: 47,003,160


                                                                                

  clean rows: 47,002,344 (removed 816)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2020-08-16.txt

→ Обработка файла: /user/ubuntu/data/2020-09-15.txt
  Удалён исходный файл: /user/ubuntu/data/2020-08-16.txt

→ Обработка файла: /user/ubuntu/data/2020-09-15.txt


                                                                                

  raw rows: 46,999,866


                                                                                

  clean rows: 46,998,973 (removed 893)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2020-09-15.txt

→ Обработка файла: /user/ubuntu/data/2020-10-15.txt
  Удалён исходный файл: /user/ubuntu/data/2020-09-15.txt

→ Обработка файла: /user/ubuntu/data/2020-10-15.txt


                                                                                

  raw rows: 47,001,239


                                                                                

  clean rows: 47,000,241 (removed 998)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2020-10-15.txt

→ Обработка файла: /user/ubuntu/data/2020-11-14.txt
  Удалён исходный файл: /user/ubuntu/data/2020-10-15.txt

→ Обработка файла: /user/ubuntu/data/2020-11-14.txt


                                                                                

  raw rows: 46,995,021


                                                                                

  clean rows: 46,993,930 (removed 1,091)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2020-11-14.txt

→ Обработка файла: /user/ubuntu/data/2020-12-14.txt
  Удалён исходный файл: /user/ubuntu/data/2020-11-14.txt

→ Обработка файла: /user/ubuntu/data/2020-12-14.txt


                                                                                

  raw rows: 46,983,906


                                                                                

  clean rows: 46,983,010 (removed 896)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2020-12-14.txt

→ Обработка файла: /user/ubuntu/data/2021-01-13.txt
  Удалён исходный файл: /user/ubuntu/data/2020-12-14.txt

→ Обработка файла: /user/ubuntu/data/2021-01-13.txt


                                                                                

  raw rows: 46,993,756


                                                                                

  clean rows: 46,992,783 (removed 973)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2021-01-13.txt

→ Обработка файла: /user/ubuntu/data/2021-02-12.txt
  Удалён исходный файл: /user/ubuntu/data/2021-01-13.txt

→ Обработка файла: /user/ubuntu/data/2021-02-12.txt


                                                                                

  raw rows: 46,997,041


                                                                                

  clean rows: 46,996,101 (removed 940)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2021-02-12.txt

→ Обработка файла: /user/ubuntu/data/2021-03-14.txt
  Удалён исходный файл: /user/ubuntu/data/2021-02-12.txt

→ Обработка файла: /user/ubuntu/data/2021-03-14.txt


                                                                                

  raw rows: 46,990,555


                                                                                

  clean rows: 46,989,671 (removed 884)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2021-03-14.txt

→ Обработка файла: /user/ubuntu/data/2021-04-13.txt
  Удалён исходный файл: /user/ubuntu/data/2021-03-14.txt

→ Обработка файла: /user/ubuntu/data/2021-04-13.txt


                                                                                

  raw rows: 46,994,571


                                                                                

  clean rows: 46,993,678 (removed 893)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2021-04-13.txt

→ Обработка файла: /user/ubuntu/data/2021-05-13.txt
  Удалён исходный файл: /user/ubuntu/data/2021-04-13.txt

→ Обработка файла: /user/ubuntu/data/2021-05-13.txt


                                                                                

  raw rows: 46,992,295


                                                                                

  clean rows: 46,991,345 (removed 950)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2021-05-13.txt

→ Обработка файла: /user/ubuntu/data/2021-06-12.txt
  Удалён исходный файл: /user/ubuntu/data/2021-05-13.txt

→ Обработка файла: /user/ubuntu/data/2021-06-12.txt


                                                                                

  raw rows: 46,998,584


                                                                                

  clean rows: 46,997,524 (removed 1,060)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2021-06-12.txt

→ Обработка файла: /user/ubuntu/data/2021-07-12.txt
  Удалён исходный файл: /user/ubuntu/data/2021-06-12.txt

→ Обработка файла: /user/ubuntu/data/2021-07-12.txt


                                                                                

  raw rows: 46,987,372


                                                                                

  clean rows: 46,986,527 (removed 845)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2021-07-12.txt

→ Обработка файла: /user/ubuntu/data/2021-08-11.txt
  Удалён исходный файл: /user/ubuntu/data/2021-07-12.txt

→ Обработка файла: /user/ubuntu/data/2021-08-11.txt


                                                                                

  raw rows: 46,998,056


                                                                                

  clean rows: 46,997,178 (removed 878)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2021-08-11.txt

→ Обработка файла: /user/ubuntu/data/2021-09-10.txt
  Удалён исходный файл: /user/ubuntu/data/2021-08-11.txt

→ Обработка файла: /user/ubuntu/data/2021-09-10.txt


                                                                                

  raw rows: 46,994,608


                                                                                

  clean rows: 46,993,677 (removed 931)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2021-09-10.txt

→ Обработка файла: /user/ubuntu/data/2021-10-10.txt
  Удалён исходный файл: /user/ubuntu/data/2021-09-10.txt

→ Обработка файла: /user/ubuntu/data/2021-10-10.txt


                                                                                

  raw rows: 46,994,094


                                                                                

  clean rows: 46,993,219 (removed 875)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2021-10-10.txt

→ Обработка файла: /user/ubuntu/data/2021-11-09.txt
  Удалён исходный файл: /user/ubuntu/data/2021-10-10.txt

→ Обработка файла: /user/ubuntu/data/2021-11-09.txt


                                                                                

  raw rows: 46,993,019


                                                                                

  clean rows: 46,992,109 (removed 910)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2021-11-09.txt

→ Обработка файла: /user/ubuntu/data/2021-12-09.txt
  Удалён исходный файл: /user/ubuntu/data/2021-11-09.txt

→ Обработка файла: /user/ubuntu/data/2021-12-09.txt


                                                                                

  raw rows: 47,001,970


                                                                                

  clean rows: 47,001,084 (removed 886)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2021-12-09.txt

→ Обработка файла: /user/ubuntu/data/2022-01-08.txt
  Удалён исходный файл: /user/ubuntu/data/2021-12-09.txt

→ Обработка файла: /user/ubuntu/data/2022-01-08.txt


                                                                                

  raw rows: 46,997,816


                                                                                

  clean rows: 46,996,783 (removed 1,033)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2022-01-08.txt

→ Обработка файла: /user/ubuntu/data/2022-02-07.txt
  Удалён исходный файл: /user/ubuntu/data/2022-01-08.txt

→ Обработка файла: /user/ubuntu/data/2022-02-07.txt


                                                                                

  raw rows: 47,005,088


                                                                                

  clean rows: 47,004,180 (removed 908)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2022-02-07.txt

→ Обработка файла: /user/ubuntu/data/2022-03-09.txt
  Удалён исходный файл: /user/ubuntu/data/2022-02-07.txt

→ Обработка файла: /user/ubuntu/data/2022-03-09.txt


                                                                                

  raw rows: 46,992,442


                                                                                

  clean rows: 46,991,371 (removed 1,071)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2022-03-09.txt

→ Обработка файла: /user/ubuntu/data/2022-04-08.txt
  Удалён исходный файл: /user/ubuntu/data/2022-03-09.txt

→ Обработка файла: /user/ubuntu/data/2022-04-08.txt


                                                                                

  raw rows: 46,987,224


                                                                                

  clean rows: 46,986,180 (removed 1,044)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2022-04-08.txt

→ Обработка файла: /user/ubuntu/data/2022-05-08.txt
  Удалён исходный файл: /user/ubuntu/data/2022-04-08.txt

→ Обработка файла: /user/ubuntu/data/2022-05-08.txt


                                                                                

  raw rows: 46,993,170


                                                                                

  clean rows: 46,992,070 (removed 1,100)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2022-05-08.txt

→ Обработка файла: /user/ubuntu/data/2022-06-07.txt
  Удалён исходный файл: /user/ubuntu/data/2022-05-08.txt

→ Обработка файла: /user/ubuntu/data/2022-06-07.txt


                                                                                

  raw rows: 46,992,462


                                                                                

  clean rows: 46,991,447 (removed 1,015)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2022-06-07.txt

→ Обработка файла: /user/ubuntu/data/2022-07-07.txt
  Удалён исходный файл: /user/ubuntu/data/2022-06-07.txt

→ Обработка файла: /user/ubuntu/data/2022-07-07.txt


                                                                                

  raw rows: 46,996,445


                                                                                

  clean rows: 46,995,600 (removed 845)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2022-07-07.txt

→ Обработка файла: /user/ubuntu/data/2022-08-06.txt
  Удалён исходный файл: /user/ubuntu/data/2022-07-07.txt

→ Обработка файла: /user/ubuntu/data/2022-08-06.txt


                                                                                

  raw rows: 47,002,233


                                                                                

  clean rows: 47,001,142 (removed 1,091)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2022-08-06.txt

→ Обработка файла: /user/ubuntu/data/2022-09-05.txt
  Удалён исходный файл: /user/ubuntu/data/2022-08-06.txt

→ Обработка файла: /user/ubuntu/data/2022-09-05.txt


                                                                                

  raw rows: 46,993,905


                                                                                

  clean rows: 46,992,970 (removed 935)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2022-09-05.txt

→ Обработка файла: /user/ubuntu/data/2022-10-05.txt
  Удалён исходный файл: /user/ubuntu/data/2022-09-05.txt

→ Обработка файла: /user/ubuntu/data/2022-10-05.txt


                                                                                

  raw rows: 46,997,188


                                                                                

  clean rows: 46,996,267 (removed 921)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2022-10-05.txt

→ Обработка файла: /user/ubuntu/data/2022-11-04.txt
  Удалён исходный файл: /user/ubuntu/data/2022-10-05.txt

→ Обработка файла: /user/ubuntu/data/2022-11-04.txt


                                                                                

  raw rows: 46,998,984


                                                                                

  clean rows: 46,998,039 (removed 945)


                                                                                

 Запись в HDFS выполнена (append)
  Удалён исходный файл: /user/ubuntu/data/2022-11-04.txt

=== ИТОГ ===
  Обработано файлов: 40/40
  Исходных строк: 1,879,794,178
  Очищенных строк: 1,879,756,325
  Удалено строк: 37,853
  Удалён исходный файл: /user/ubuntu/data/2022-11-04.txt

=== ИТОГ ===
  Обработано файлов: 40/40
  Исходных строк: 1,879,794,178
  Очищенных строк: 1,879,756,325
  Удалено строк: 37,853


In [5]:
# Копирование в S3
print(f"→ Копируем в S3...")
result = subprocess.run(
    ["hadoop", "distcp", "-overwrite", "-m", "32", HDFS_OUTPUT_DIR, S3_OUTPUT_PATH],
    capture_output=True,
    text=True,
    timeout=7200
)
if result.returncode == 0:
    print("✓ S3: готово")
else:
    print(f"S3 копирование: {result.stderr[:200]}")

→ Копируем в S3...


→ Копируем в S3...


KeyboardInterrupt: 

In [None]:
# копирование с S3 в HDFS
import subprocess

cmd = [
    'hadoop', 'distcp', '-overwrite', '-m', '32', S3_OUTPUT_PATH, HDFS_OUTPUT_DIR
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=1800)
if result.returncode == 0:
    print('✓ Обратное копирование: S3 -> HDFS выполнено успешно')
else:
    print(f"Ошибка обратного копирования (S3 -> HDFS): {result.stderr[:500]}")

In [4]:
# === 5. Создание standalone скрипта (composed: init, clean, HDFS write, distcp in S3) ===
script_path = "/home/ubuntu/clean_transactions.py"

# Исправленный шаблон скрипта — обычная строка (не f-string), корректные тройные кавычки внутри
script_template = '''#!/usr/bin/env python3
"""
Скрипт очистки транзакционных данных и выгрузки в S3.
Шаги:
  1) Инициализация Spark (с локальными JAR'ами в SPARK_JARS)
  2) Получение списка файлов в HDFS_INPUT_DIR
  3) По одному: читаем, чистим, append в HDFS_OUTPUT_DIR, удаляем исходный файл
  4) Финальное hadoop distcp HDFS_OUTPUT_DIR -> S3_OUTPUT_PATH
"""
import os
import sys
import subprocess
from pyspark.sql import SparkSession, functions as F, types as T

SPARK_JARS = "/usr/lib/spark/jars/hadoop-aws-3.3.4.jar,/usr/lib/spark/jars/aws-java-sdk-bundle-1.11.1026.jar"
# SPARK_PACKAGES removed — используем локальные JAR'ы, скачанные Ansible'ом

def clean_df(df_in):
    return df_in.withColumn(
        'tx_datetime',
        F.to_timestamp(
            F.regexp_replace('tx_datetime', ' 24:00:00$', ' 23:59:59'),
            'yyyy-MM-dd HH:mm:ss'
        )
    ).filter(
        (F.col('tx_fraud').isNotNull()) &
        (F.col('tx_amount') > 0) & (F.col('tx_amount') < 1e7) &
        (F.col('tx_time_seconds') >= 0) &
        (F.col('tx_time_days') >= 0) &
        (F.col('tx_datetime').isNotNull())
    ).dropDuplicates(['transaction_id'])

def list_hdfs_files(hdfs_input_dir):
    r = subprocess.run(['hdfs', 'dfs', '-ls', hdfs_input_dir], capture_output=True, text=True)
    files = []
    if r.returncode != 0:
        print('Не удалось получить список файлов в HDFS:', r.stderr[:300], file=sys.stderr)
        return files
    for ln in r.stdout.strip().splitlines():
        parts = ln.split()
        if len(parts) >= 8:
            files.append(parts[-1])
    return files

def main():
    # Параметры (подставляются .replace)
    HDFS_NAMENODE = '{HDFS_NAMENODE}'
    HDFS_INPUT_DIR = '{HDFS_INPUT_DIR}'
    HDFS_OUTPUT_DIR = '{HDFS_OUTPUT_DIR}'
    S3_OUTPUT_PATH = '{S3_OUTPUT_PATH}'
    S3_ACCESS_KEY = '{S3_ACCESS_KEY}'
    S3_SECRET_KEY = '{S3_SECRET_KEY}'

    spark = SparkSession.builder \
        .appName('clean_transactions') \
        .config('spark.jars', SPARK_JARS) \
        .config('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem') \
        .config('spark.hadoop.fs.s3a.access.key', S3_ACCESS_KEY) \
        .config('spark.hadoop.fs.s3a.secret.key', S3_SECRET_KEY) \
        .config('spark.hadoop.fs.s3a.endpoint', 'storage.yandexcloud.net') \
        .config('spark.hadoop.fs.s3a.path.style.access', 'true') \
        .getOrCreate()

    spark.sparkContext.setLogLevel('ERROR')

    schema = T.StructType([
        T.StructField('transaction_id', T.IntegerType(), nullable=False),
        T.StructField('tx_datetime', T.StringType(), nullable=False),
        T.StructField('customer_id', T.IntegerType(), nullable=False),
        T.StructField('terminal_id', T.IntegerType(), nullable=False),
        T.StructField('tx_amount', T.DoubleType(), nullable=False),
        T.StructField('tx_time_seconds', T.IntegerType(), nullable=False),
        T.StructField('tx_time_days', T.IntegerType(), nullable=False),
        T.StructField('tx_fraud', T.IntegerType(), nullable=False),
        T.StructField('tx_fraud_scenario', T.IntegerType(), nullable=False),
    ])

    files = list_hdfs_files(HDFS_INPUT_DIR)
    print(f'Files to process: {len(files)}')
    if not files:
        print('No files found in HDFS input dir, exiting')
        spark.stop()
        return

    # Очистим выходной каталог заранее (чтобы итог был заменой)
    subprocess.run(['hdfs','dfs','-rm','-r','-skipTrash', HDFS_OUTPUT_DIR], capture_output=True)

    processed = 0
    total_raw = 0
    total_clean = 0
    for src in files:
        try:
            print('Processing', src)
            if src.startswith('/'):
                reader = f'hdfs://{HDFS_NAMENODE}' + src
            else:
                reader = src
            df_raw = spark.read.option('header','false').option('sep',',').schema(schema).csv(reader)
            cnt_raw = df_raw.count()
            df_clean = clean_df(df_raw)
            cnt_clean = df_clean.count()
            df_clean.coalesce(10).write.format('parquet').mode('append').option('compression','snappy').save(f'hdfs://{HDFS_NAMENODE}{HDFS_OUTPUT_DIR}')
            # Только после успешной записи удаляем исходный файл
            subprocess.run(['hdfs','dfs','-rm', src], capture_output=True)
            print(f'{src}: {cnt_raw:,} -> {cnt_clean:,} processed')
            processed += 1
            total_raw += cnt_raw
            total_clean += cnt_clean
        except Exception as e:
            print(f'Error processing {src}: {str(e)[:400]}', file=sys.stderr)
            # не удаляем исходник при ошибке; переходим к следующему
            continue

    print('Processing complete')
    print(f'Files processed: {processed}/{len(files)}')
    print(f'Rows: {total_raw:,} -> {total_clean:,}')

    # Финальное distcp в S3
    try:
        print('Starting distcp to S3...')
        rc = subprocess.run(['hadoop','distcp','-overwrite','-m','64', f'hdfs://{HDFS_NAMENODE}{HDFS_OUTPUT_DIR}', S3_OUTPUT_PATH], capture_output=True, text=True, timeout=7200)
        if rc.returncode == 0:
            print('✓ distcp to S3 completed successfully')
        else:
            print('⚠ distcp failed:', rc.stderr[:400])
    except Exception as e:
        print('⚠ distcp exception:', str(e)[:400])

    spark.stop()

if __name__ == '__main__':
    main()
'''

# подменяем плейсхолдеры
script_content = script_template.replace('{HDFS_NAMENODE}', HDFS_NAMENODE) \
    .replace('{HDFS_INPUT_DIR}', HDFS_INPUT_DIR) \
    .replace('{HDFS_OUTPUT_DIR}', HDFS_OUTPUT_DIR) \
    .replace('{S3_OUTPUT_PATH}', S3_OUTPUT_PATH) \
    .replace('{S3_ACCESS_KEY}', S3_ACCESS_KEY) \
    .replace('{S3_SECRET_KEY}', S3_SECRET_KEY)

with open(script_path, 'w') as f:
    f.write(script_content)

import subprocess
subprocess.run(['chmod', '+x', script_path])

print(f"Скрипт создан: {script_path}")
print(f"Запуск: spark-submit {script_path}")

Скрипт создан: /home/ubuntu/clean_transactions.py
Запуск: spark-submit /home/ubuntu/clean_transactions.py
