In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DateType, IntegerType, StringType, FloatType
from pyspark.sql.functions import udf, lit, coalesce, current_date
from datetime import datetime
import time
import logging

# Настройка логирования
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class SparkETLProcessor:
    def __init__(self):
        self.spark = SparkSession.builder \
            .appName("BankDataETL") \
            .config("spark.sql.parquet.compression.codec", "snappy") \
            .getOrCreate()
        
        # Определение схем
        self.schema_md_account_d = StructType([
            StructField("data_actual_date", DateType(), nullable=False),
            StructField("data_actual_end_date", DateType(), nullable=False),
            StructField("account_rk", IntegerType(), nullable=False),
            StructField("account_number", StringType(), nullable=False),
            StructField("char_type", StringType(), nullable=False),
            StructField("currency_rk", IntegerType(), nullable=False),
            StructField("currency_code", StringType(), nullable=False)
        ])
        
        self.schema_md_currency_d = StructType([
            StructField("currency_rk", IntegerType(), nullable=False),
            StructField("data_actual_date", DateType(), nullable=False),
            StructField("data_actual_end_date", DateType()),
            StructField("currency_code", StringType()),
            StructField("code_iso_char", StringType())
        ])
        
        self.schema_ft_posting_f = StructType([
            StructField("oper_date", DateType(), nullable=False),
            StructField("credit_account_rk", IntegerType(), nullable=False),
            StructField("debet_account_rk", IntegerType(), nullable=False),
            StructField("credit_amount", FloatType()),
            StructField("debet_amount", FloatType())
        ])
        
        self.schema_ft_balance_f = StructType([
            StructField("on_date", DateType(), nullable=False),
            StructField("account_rk", IntegerType(), nullable=False),
            StructField("currency_rk", IntegerType(), nullable=False),
            StructField("balance_out", FloatType(), nullable=False)
        ])
        
        self.schema_md_exchange_rate_d = StructType([
            StructField("data_actual_date", DateType(), nullable=False),
            StructField("currency_rk", IntegerType(), nullable=False),
            StructField("reduced_rate", FloatType()),
            StructField("code_iso_num", StringType())
        ])
        
        self.schema_md_ledger_account_s = StructType([
            StructField("ledger_account", StringType(), nullable=False),
            StructField("start_date", DateType(), nullable=False),
            StructField("end_date", DateType()),
            StructField("chapter", StringType()),
            StructField("account_name", StringType())
        ])

    def log_process_start(self, process_name):
        """Логирование начала процесса"""
        start_time = datetime.now()
        logger.info(f"Начало процесса {process_name} в {start_time}")
        return start_time

    def log_process_end(self, start_time, process_name, status, rows_processed=None, error_message=None):
        """Логирование завершения процесса"""
        end_time = datetime.now()
        duration = (end_time - start_time).total_seconds()
        logger.info(f"Завершение процесса {process_name}, статус: {status}, продолжительность: {duration:.2f} сек.")

    def parse_date(self, date_str):
        """Функция для парсинга дат в UDF"""
        from dateutil.parser import parse
        try:
            return parse(date_str).date()
        except:
            return None

    def load_csv_with_schema(self, file_path, schema, date_columns=None):
        """Загрузка CSV с применением схемы"""
        try:
            # Регистрируем UDF для парсинга дат
            parse_date_udf = udf(self.parse_date, DateType())
            
            # Читаем CSV с inferSchema, затем применяем нашу схему
            df = self.spark.read.option("header", "true").option("sep", ";").option("inferSchema", "true").csv(file_path)
            
            # Приводим типы данных к целевой схеме
            for field in schema.fields:
                col_name = field.name
                if col_name in df.columns:
                    if isinstance(field.dataType, DateType):
                        df = df.withColumn(col_name, parse_date_udf(df[col_name]))
                    elif str(df.schema[col_name].dataType) != str(field.dataType):
                        df = df.withColumn(col_name, df[col_name].cast(field.dataType))
            
            # Применяем схему (проверяем обязательные поля)
            for field in schema.fields:
                if field.nullable == False:
                    df = df.filter(df[field.name].isNotNull())
            
            return df
        except Exception as e:
            logger.error(f"Ошибка загрузки CSV {file_path}: {e}")
            raise

    def save_as_parquet(self, df, output_path, partition_cols=None, mode="overwrite"):
        """Сохранение DataFrame в Parquet"""
        try:
            writer = df.write.mode(mode)
            if partition_cols:
                writer = writer.partitionBy(partition_cols)
            writer.parquet(output_path)
            logger.info(f"Данные сохранены в {output_path}")
        except Exception as e:
            logger.error(f"Ошибка сохранения Parquet: {e}")
            raise

    def update_dimension_table(self, new_data_df, table_name, output_path, primary_keys):
        """Общая стратегия обновления для таблиц измерений"""
        try:
            # Проверяем существование таблицы
            existing_df = None
            try:
                existing_df = self.spark.read.parquet(output_path)
                logger.info(f"Обнаружена существующая таблица {table_name}")
            except:
                logger.info(f"Таблица {table_name} не существует, будет создана новая")
                existing_df = None
            
            if existing_df is None:
                # Если таблицы нет, просто сохраняем новые данные
                self.save_as_parquet(new_data_df, output_path)
                return new_data_df.count()
            
            # Определяем ключи для join
            join_condition = None
            for key in primary_keys:
                if join_condition is None:
                    join_condition = (new_data_df[key] == existing_df[key])
                else:
                    join_condition = join_condition & (new_data_df[key] == existing_df[key])
            
            # Находим записи для обновления (существующие записи)
            updates_df = new_data_df.join(existing_df, join_condition, "inner")
            
            if updates_df.count() > 0:
                # Обновляем data_actual_end_date для старых записей
                old_records_to_update = existing_df.join(
                    new_data_df.select(primary_keys[0]), 
                    primary_keys[0], 
                    "inner"
                )
                
                old_records_updated = old_records_to_update.withColumn(
                    "data_actual_end_date", 
                    current_date()
                )
                
                # Оставляем старые записи, которые не обновляются
                old_records_unchanged = existing_df.join(
                    new_data_df.select(primary_keys[0]), 
                    primary_keys[0], 
                    "left_anti"
                )
                
                # Объединяем все данные
                final_df = old_records_unchanged.unionByName(old_records_updated).unionByName(new_data_df)
            else:
                # Если нет обновлений, просто добавляем новые записи
                final_df = existing_df.unionByName(new_data_df)
            
            # Сохраняем результат
            self.save_as_parquet(final_df, output_path)
            return final_df.count()
            
        except Exception as e:
            logger.error(f"Ошибка обновления таблицы {table_name}: {e}")
            raise

    def process_table(self, file_path, schema, output_path, process_name, table_type, primary_keys=None):
        """Основной метод обработки таблицы с учетом типа таблицы"""
        start_time = None
        try:
            # Логируем начало процесса
            start_time = self.log_process_start(process_name)

            # Искусственная пауза для демонстрации
            time.sleep(5)
            
            # Загружаем данные с применением схемы
            df = self.load_csv_with_schema(file_path, schema)
            
            # Применяем соответствующую стратегию обновления
            if table_type == "FACT" and "FT_POSTING_F" in output_path:
                # Для FT_POSTING_F - полная перезагрузка
                self.save_as_parquet(df, output_path, mode="overwrite")
                rows_processed = df.count()
            elif table_type == "FACT":
                # Для других фактов - обновление по ключам
                if not primary_keys:
                    raise ValueError(f"Для таблицы фактов {output_path} необходимо указать первичные ключи")
                
                # Проверяем существование таблицы
                try:
                    existing_df = self.spark.read.parquet(output_path)
                    
                    # Удаляем старые записи, которые будут обновлены
                    existing_df = existing_df.join(
                        df.select(primary_keys), 
                        primary_keys, 
                        "left_anti"
                    )
                    
                    # Объединяем старые и новые записи
                    final_df = existing_df.unionByName(df)
                    self.save_as_parquet(final_df, output_path)
                    rows_processed = final_df.count()
                except:
                    # Если таблицы нет, просто сохраняем
                    self.save_as_parquet(df, output_path)
                    rows_processed = df.count()
            elif table_type == "DIMENSION":
                # Для измерений - сложное обновление с историей
                if not primary_keys:
                    raise ValueError(f"Для таблицы измерения {output_path} необходимо указать первичные ключи")
                rows_processed = self.update_dimension_table(df, process_name, output_path, primary_keys)
            else:
                raise ValueError(f"Неизвестный тип таблицы: {table_type}")
            
            # Логируем успешное завершение
            self.log_process_end(
                start_time, 
                process_name, 
                'completed', 
                rows_processed=rows_processed
            )
            
            return rows_processed
            
        except Exception as e:
            # Логируем ошибку
            if start_time:
                self.log_process_end(
                    start_time, 
                    process_name, 
                    'failed', 
                    error_message=str(e)
                )
            logger.error(f"Ошибка обработки: {e}")
            raise

# Пример использования
if __name__ == "__main__":
    etl = None
    try:
        etl = SparkETLProcessor()
        
        # Обработка таблиц с соответствующими стратегиями обновления
        
        # 1. FT_POSTING_F - полная перезагрузка
        etl.process_table(
            file_path="data/ft_posting_f.csv",
            schema=etl.schema_ft_posting_f,
            output_path="data/parquet/FT_POSTING_F",
            process_name="Загрузка FT_POSTING_F",
            table_type="FACT"
        )
        
        # 2. FT_BALANCE_F - обновление по ключу (ON_DATE, ACCOUNT_RK)
        etl.process_table(
            file_path="data/ft_balance_f.csv",
            schema=etl.schema_ft_balance_f,
            output_path="data/parquet/FT_BALANCE_F",
            process_name="Загрузка FT_BALANCE_F",
            table_type="FACT",
            primary_keys=["on_date", "account_rk"]
        )
        
        # 3. MD_ACCOUNT_D - обновление измерения (DATA_ACTUAL_DATE, ACCOUNT_RK)
        etl.process_table(
            file_path="data/md_account_d.csv",
            schema=etl.schema_md_account_d,
            output_path="data/parquet/MD_ACCOUNT_D",
            process_name="Загрузка MD_ACCOUNT_D",
            table_type="DIMENSION",
            primary_keys=["data_actual_date", "account_rk"]
        )
        
        # 4. MD_CURRENCY_D - обновление измерения (CURRENCY_RK, DATA_ACTUAL_DATE)
        etl.process_table(
            file_path="data/md_currency_d.csv",
            schema=etl.schema_md_currency_d,
            output_path="data/parquet/MD_CURRENCY_D",
            process_name="Загрузка MD_CURRENCY_D",
            table_type="DIMENSION",
            primary_keys=["data_actual_date","currency_rk"] 
        )

        # 5. MD_EXCHANGE_RATE_D - обновление измерения (DATA_ACTUAL_DATE, CURRENCY_RK)
        etl.process_table(
            file_path="data/md_exchange_rate_d.csv",
            schema=etl.schema_md_exchange_rate_d,
            output_path="data/parquet/MD_EXCHANGE_RATE_D",
            process_name="Загрузка MD_EXCHANGE_RATE_D",
            table_type="DIMENSION",
            primary_keys=["data_actual_date", "currency_rk"]
        )
        
        # 6. MD_LEDGER_ACCOUNT_S - обновление измерения (LEDGER_ACCOUNT, START_DATE)
        etl.process_table(
            file_path="data/md_ledger_account_s.csv",
            schema=etl.schema_md_ledger_account_s,
            output_path="data/parquet/MD_LEDGER_ACCOUNT_S",
            process_name="Загрузка MD_LEDGER_ACCOUNT_S",
            table_type="DIMENSION",
            primary_keys=["ledger_account", "start_date"]
        )
        
    except Exception as e:
        logger.error(f"Ошибка в основном процессе: {e}")
    finally:
        if etl is not None:
            etl.spark.stop()

INFO:__main__:Начало процесса Загрузка FT_POSTING_F в 2025-04-22 09:32:22.565309
INFO:__main__:Данные сохранены в data/parquet/FT_POSTING_F
INFO:__main__:Завершение процесса Загрузка FT_POSTING_F, статус: completed, продолжительность: 16.98 сек.
INFO:__main__:Начало процесса Загрузка FT_BALANCE_F в 2025-04-22 09:32:39.544506
INFO:__main__:Данные сохранены в data/parquet/FT_BALANCE_F
INFO:__main__:Завершение процесса Загрузка FT_BALANCE_F, статус: completed, продолжительность: 6.17 сек.
INFO:__main__:Начало процесса Загрузка MD_ACCOUNT_D в 2025-04-22 09:32:45.715924
INFO:__main__:Таблица Загрузка MD_ACCOUNT_D не существует, будет создана новая
INFO:__main__:Данные сохранены в data/parquet/MD_ACCOUNT_D
INFO:__main__:Завершение процесса Загрузка MD_ACCOUNT_D, статус: completed, продолжительность: 6.34 сек.
INFO:__main__:Начало процесса Загрузка MD_CURRENCY_D в 2025-04-22 09:32:52.059622
INFO:__main__:Таблица Загрузка MD_CURRENCY_D не существует, будет создана новая
INFO:__main__:Данные со

In [41]:
from pyspark.sql import SparkSession

def show_parquet_head(spark, table_paths):
    """Выводит первые 5 строк каждой Parquet таблицы"""
    for table_name, path in table_paths.items():
        try:
            print(f"\n=== {table_name} ===")
            df = spark.read.parquet(path)
            df.show(5, vertical=True)
            print(f"Всего строк: {df.count()}")
            print("Схема:")
            df.printSchema()
        except Exception as e:
            print(f"Ошибка при чтении таблицы {table_name}: {str(e)}")

if __name__ == "__main__":
    spark = SparkSession.builder \
        .appName("ShowParquetHead") \
        .getOrCreate()

    # Пути к Parquet-таблицам
    table_paths = {
        "FT_BALANCE_F": "data/parquet/FT_BALANCE_F",
        "FT_POSTING_F": "data/parquet/FT_POSTING_F",
        "MD_ACCOUNT_D": "data/parquet/MD_ACCOUNT_D",
        "MD_CURRENCY_D": "data/parquet/MD_CURRENCY_D",
        "MD_EXCHANGE_RATE_D": "data/parquet/MD_EXCHANGE_RATE_D",
        "MD_LEDGER_ACCOUNT_S": "data/parquet/MD_LEDGER_ACCOUNT_S"
    }

    show_parquet_head(spark, table_paths)
    spark.stop()


=== FT_BALANCE_F ===
-RECORD 0-----------------
 ON_DATE     | 31.12.2017 
 ACCOUNT_RK  | 36237725   
 CURRENCY_RK | 35         
 BALANCE_OUT | 38318.13   
-RECORD 1-----------------
 ON_DATE     | 31.12.2017 
 ACCOUNT_RK  | 24656      
 CURRENCY_RK | 35         
 BALANCE_OUT | 80533.62   
-RECORD 2-----------------
 ON_DATE     | 31.12.2017 
 ACCOUNT_RK  | 18849846   
 CURRENCY_RK | 34         
 BALANCE_OUT | 63891.96   
-RECORD 3-----------------
 ON_DATE     | 31.12.2017 
 ACCOUNT_RK  | 1972647    
 CURRENCY_RK | 34         
 BALANCE_OUT | 5087732.1  
-RECORD 4-----------------
 ON_DATE     | 31.12.2017 
 ACCOUNT_RK  | 34157174   
 CURRENCY_RK | 34         
 BALANCE_OUT | 7097806.9  
only showing top 5 rows

Всего строк: 114
Схема:
root
 |-- ON_DATE: string (nullable = true)
 |-- ACCOUNT_RK: integer (nullable = true)
 |-- CURRENCY_RK: integer (nullable = true)
 |-- BALANCE_OUT: double (nullable = true)


=== FT_POSTING_F ===
-RECORD 0-----------------------
 OPER_DATE         | 09-