In [1]:
from pyspark.sql import SparkSession
from datetime import datetime
from delta import *
import os
from delta.tables import DeltaTable

In [2]:
builder = SparkSession \
    .builder \
    .appName("BuildMirrors") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 

In [3]:
spark = configure_spark_with_delta_pip(builder).getOrCreate()

:: loading settings :: url = jar:file:/home/kate/spark-3.5.1-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/kate/.ivy2/cache
The jars for the packages stored in: /home/kate/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-d757f48b-63f0-4bc9-9ccd-5de47c934dc3;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.2.0 in central
	found io.delta#delta-storage;3.2.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 2164ms :: artifacts dl 113ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.2.0 from central in [default]
	io.delta#delta-storage;3.2.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0  

In [4]:
def delta_logging(start_loading, table_name, id_delta, end_loading):
    """Процесс логирования

        :param start_loading: время начала загрузки дельты
        :type start_loading: datetime
        :param table_name: наименование обновляемой таблицы
        :type table_name: str
        :param id_delta: идентификатор дельты
        :type id_delta: str
        :param end_loading: время завершения загрузки дельты
        :type end_loading: datetime
    """
    log_data = [
        (start_loading, table_name, id_delta, end_loading)
    ]

    log_schema = "start_loading timestamp, table_name string, id_delta string, end_loading timestamp"

    log_df = spark.createDataFrame(log_data, log_schema)

    log_df.coalesce(1) \
        .write \
        .mode('append') \
        .format('csv') \
        .option('header', True) \
        .save('logs')

In [5]:
def get_processed_deltas():
    """Получение обработанных дельт

        :rtype: list
        :return: список обработанных дельт
    """
    if os.path.isdir('logs'): 
        logs_df = spark \
                .read \
                .format('csv') \
                .option('header', True) \
                .load('logs')
        
        processed_deltas = [i[0] for i in logs_df.select('id_delta').collect()]
        return processed_deltas
    else:
        return []

In [6]:
def build_mirrior(delta_path, table_name, unique_keys):
    """Построение зеркал

        :param delta_path: путь где хранятся дельты
        :type delta_path: str
        :param table_name: наименование таблицы
        :type table_name: str
        :param unique_keys: список полей, являющимся уникальным ключём
        :type unique_keys: list
    """
    processed_deltas = get_processed_deltas()
    delta_directories = sorted(os.listdir(delta_path))

    for delta_dir in delta_directories:
        if delta_dir not in processed_deltas:
            delta_file = os.path.join(delta_path, delta_dir)
            
            delta_df = spark \
                    .read \
                    .format('csv') \
                    .option('header', True) \
                    .option('sep', ';') \
                    .load(delta_file)
            
            start_loading = datetime.now()
            if not DeltaTable.isDeltaTable(spark, f'spark-warehouse/{table_name}'):
                delta_df.write.format('delta').saveAsTable(f'{table_name}')
            else:
                DeltaTable.forPath(spark, f'spark-warehouse/{table_name}').alias('target').merge(
                    delta_df.alias('source'),
                    ' AND '.join(f'target.{i} = source.{i}' for i in unique_keys)
                ).whenMatchedUpdateAll() \
                .whenNotMatchedInsertAll() \
                .execute()
            end_loading = datetime.now()

            delta_logging(start_loading, table_name, delta_dir, end_loading)

    res_mirror = spark.read.format('delta').load(f'spark-warehouse/{table_name}')
    res_mirror.coalesce(1).write \
            .format('csv') \
            .mode('overwrite') \
            .option('header', True) \
            .save(f'mirr_{table_name}')
            

In [7]:
delta_path = 'data_deltas'
table_name = 'md_account_d'
unique_keys = ['ACCOUNT_RK']

In [14]:
build_mirrior(delta_path, table_name, unique_keys)

                                                                                

In [9]:
delta_table = DeltaTable.forPath(spark, f'spark-warehouse/{table_name}')
delta_table.history().show()

                                                                                

+-------+--------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|           operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      3|2024-07-17 13:41:...|  NULL|    NULL|               MERGE|{predicate -> ["(...|NULL|    NULL|     NULL|          2|  Serializable|        false|{numTargetRowsCop...|        NULL|Apache-Spark/3.5....|
|      2|2024-07-17 13:41:...|  NULL|    NULL|               MERGE|{predicate -> ["(...|NULL|    NULL|     NULL|          1|  Serializable|        false|{numTargetR

In [12]:
dfv2 = spark.read.format('delta').option('versionAsOf', 3).load(f'spark-warehouse/{table_name}')
dfv2.show()

                                                                                

+----------------+--------------------+----------+--------------------+---------+-----------+-------------+---------+---------+----------------+
|DATA_ACTUAL_DATE|DATA_ACTUAL_END_DATE|ACCOUNT_RK|      ACCOUNT_NUMBER|CHAR_TYPE|CURRENCY_RK|CURRENCY_CODE|CLIENT_ID|BRANCH_ID|OPEN_IN_INTERNET|
+----------------+--------------------+----------+--------------------+---------+-----------+-------------+---------+---------+----------------+
|      15.02.2018|          31.12.2050|     13560|30110810300000008001|        A|         34|          643|       20|      105|               Y|
|      21.04.2018|          31.12.2050|     13630|30102810900000002185|        A|         34|          643|       21|      107|            NULL|
|      21.04.2018|          31.12.2050|     13811|30221978100000008100|        A|         44|          978|       33|      201|            NULL|
|      21.04.2018|          31.12.2050|     13871|30222978200000004100|        P|         44|          978|       63|      105|   

In [13]:
new_delta_df = spark.createDataFrame(
    [
        ('01.01.2018', '31.12.2050', '24656', 
         '30114840700000770002', 'A', '35', '840', '100', '107', None)
    ],
    schema='DATA_ACTUAL_DATE string, DATA_ACTUAL_END_DATE string, \
    ACCOUNT_RK string, ACCOUNT_NUMBER string, CHAR_TYPE string, \
    CURRENCY_RK string, CURRENCY_CODE string, CLIENT_ID string, \
    BRANCH_ID string, OPEN_IN_INTERNET string'
)

new_delta_df.coalesce(1) \
            .write \
            .format('csv') \
            .option('header', True) \
            .option('sep', ';') \
            .save(f'{delta_path}/1004/')

                                                                                