In [1]:
!pip install findspark

Defaulting to user installation because normal site-packages is not writeable


In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import time
from datetime import datetime

In [None]:
# # 1. Инициализация Spark
# spark = SparkSession.builder \
#     .appName("Batch_TXT_to_Parquet_Conversion") \
#     .config("spark.hadoop.fs.s3a.access.key", "YOUR_ACCESS_KEY") \
#     .config("spark.hadoop.fs.s3a.secret.key", "YOUR_SECRET_KEY") \
#     .config("spark.hadoop.fs.s3a.endpoint", "storage.yandexcloud.net") \
#     .config("spark.sql.parquet.compression.codec", "snappy") \
#     .getOrCreate()


In [None]:
# Инициализация Spark с оптимизацией для больших файлов    .config("spark.sql.parquet.compression.codec", "zstd") \ 
spark = SparkSession.builder \
    .appName("Massive_TXT_to_Parquet_Conversion") \
    .config("spark.hadoop.fs.s3a.access.key", "access_key") \
    .config("spark.hadoop.fs.s3a.secret.key", "secret_key") \
    .config("spark.hadoop.fs.s3a.endpoint", "storage.yandexcloud.net") \
    .config("spark.sql.parquet.compression.codec", "zstd") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.yarn.executor.memoryOverhead", "2g") \
    .getOrCreate()


In [5]:
# Схема данных
transaction_schema = StructType([
    StructField("transaction_id", IntegerType()),
    StructField("tx_datetime", TimestampType()),
    StructField("customer_id", IntegerType()),
    StructField("terminal_id", IntegerType()),
    StructField("tx_amount", DoubleType()),
    StructField("tx_time_seconds", IntegerType()),
    StructField("tx_time_days", IntegerType()),
    StructField("tx_fraud", IntegerType()),
    StructField("tx_fraud_scenario", IntegerType())
])

In [6]:
# Пути
input_bucket = "s3a://otus-mlops-data17/"
output_bucket = "s3a://fraud-detection-data-otus-2025/parquet/"

In [None]:


# 4. Список файлов для конвертации
all_files = [
    "2019-08-22.txt",
    "2019-09-21.txt",
    "2019-10-21.txt",
    "2019-11-20.txt",
    "2019-12-20.txt"
]


In [7]:
# 4. Список файлов для конвертации
all_files = [ 
    "2020-02-18.txt",
    "2020-04-18.txt",
    "2020-07-17.txt",
    "2020-08-16.txt",
    "2020-10-15.txt",
    "2020-11-14.txt",
    "2020-12-14.txt"
]    

In [13]:
# Получаем список всех файлов динамически
all_files = [f.name for f in spark.sparkContext._jvm.org.apache.hadoop.fs.FileSystem.get(
    spark.sparkContext._jsc.hadoopConfiguration()
).listStatus(
    spark.sparkContext._jvm.org.apache.hadoop.fs.Path(input_bucket)
) if f.getPath().getName().endswith('.txt')]

IllegalArgumentException: Wrong FS: s3a://otus-mlops-data17/, expected: hdfs://rc1a-dataproc-m-aq3b14utvqh59ufq.mdb.yandexcloud.net

In [8]:




# Сортируем по дате в имени файла
all_files_sorted = sorted(all_files, key=lambda x: datetime.strptime(x.split('.')[0], "%Y-%m-%d"))

In [9]:
# Функция для конвертации с прогресс-баром
def convert_with_progress(files):
    from tqdm import tqdm
    success_count = 0
    
    for file in tqdm(files, desc="Конвертация файлов"):
        try:
            start_time = time.time()
            df = spark.read \
                .option("header", "false") \
                .option("delimiter", ",") \
                .schema(transaction_schema) \
                .csv(f"{input_bucket}{file}")
            
            # Оптимизированная запись с партиционированием
            df.repartition(8).write \
                .mode("overwrite") \
                .option("compression", "zstd") \
                .parquet(f"{output_bucket}{file.replace('.txt', '')}")
            
            elapsed = time.time() - start_time
            tqdm.write(f"{file} -> {file.replace('.txt', '')} | {df.count():,} строк | {elapsed:.1f} сек")
            success_count += 1
            
        except Exception as e:
            tqdm.write(f"Ошибка в {file}: {str(e)}")
    
    return success_count

In [None]:


# Запуск конвертации
print(f"Начало конвертации {len(all_files_sorted)} файлов...")
success = convert_with_progress(all_files_sorted[:40])  # Ограничение 40 файлов
print(f"Успешно сконвертировано {success} из {len(all_files_sorted[:40])} файлов")


Начало конвертации 7 файлов...



Конвертация файлов:   0%|          | 0/7 [00:00<?, ?it/s]

In [None]:

spark.stop()