In [1]:
pip install delta-spark==1.2.0

Collecting delta-spark==1.2.0
  Downloading delta_spark-1.2.0-py3-none-any.whl (19 kB)
Collecting pyspark<3.3.0,>=3.2.0 (from delta-spark==1.2.0)
  Downloading pyspark-3.2.4.tar.gz (281.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.5/281.5 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5 (from pyspark<3.3.0,>=3.2.0->delta-spark==1.2.0)
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m23.3 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.4-py2.py3-none-any.whl size=282040915 sha256=150074e26b7b6ded19c229076ef2fde8730917dfe4b114b3c53f517480c4c772
  Stored in directory: /root/.cache/pip/wheels/e7/e3/c8/c358dac750f2b6a4b03328d10e05a5c69501664bd65

In [2]:
import os
import csv
from datetime import datetime as dt
import pandas as pd

In [3]:
import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [4]:
# Функция для поиска последней сессии
def get_last_session_id(path_to_log_file):
    logs_dataframe = pd.read_csv(path_to_log_file)
    return str(logs_dataframe.iloc[-1]['Session ID'])

In [5]:
# Функция для записи информации о обработанной дельте в CSV-файл с заголовками
def log_processed_delta(session_id, table_name, time, status):
    # Путь к CSV-файлу
    log_file_path = 'logs.csv'

    # Проверяем существование файла
    file_exists = os.path.exists(log_file_path)

    # Записываем данные в CSV-файл
    with open(log_file_path, 'a', newline='') as csvfile:
        fieldnames = ['Session ID', 'Table Name', 'Time', 'Status']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

        # Если файл не существует, записываем заголовки
        if not file_exists:
            writer.writeheader()

        writer.writerow({'Session ID': session_id, 'Table Name': table_name, 'Time': time, 'Status': status})


# log_processed_delta(session_id, table_name, time, status)

#Preprocessing

In [6]:
#Configure
delta_path = '/content/drive/MyDrive/data_deltas'  # Путь к директории с дельтами
filepath = '/content/drive/MyDrive/data_deltas/1000/md_account_d.csv'
table_name = 'md_account_d'
pk_column_name = 'ACCOUNT_RK'

In [7]:
temp = spark.read.csv(filepath, header = True, sep = ';', inferSchema = True)
temp.printSchema()

root
 |-- DATA_ACTUAL_DATE: string (nullable = true)
 |-- DATA_ACTUAL_END_DATE: string (nullable = true)
 |-- ACCOUNT_RK: integer (nullable = true)
 |-- ACCOUNT_NUMBER: decimal(20,0) (nullable = true)
 |-- CHAR_TYPE: string (nullable = true)
 |-- CURRENCY_RK: integer (nullable = true)
 |-- CURRENCY_CODE: integer (nullable = true)
 |-- CLIENT_ID: integer (nullable = true)
 |-- BRANCH_ID: integer (nullable = true)
 |-- OPEN_IN_INTERNET: string (nullable = true)



In [8]:
#Создаем датафрейм для зеркала (пока что его не существует)
mirror = spark.createDataFrame([], schema=temp.schema)

In [9]:
mirror.write.format("delta").mode("overwrite").save("/content/temp")

In [10]:
delta_mirror = DeltaTable.forPath(spark, "/content/temp")

In [11]:
delta_mirror.toDF().show()

+----------------+--------------------+----------+--------------+---------+-----------+-------------+---------+---------+----------------+
|DATA_ACTUAL_DATE|DATA_ACTUAL_END_DATE|ACCOUNT_RK|ACCOUNT_NUMBER|CHAR_TYPE|CURRENCY_RK|CURRENCY_CODE|CLIENT_ID|BRANCH_ID|OPEN_IN_INTERNET|
+----------------+--------------------+----------+--------------+---------+-----------+-------------+---------+---------+----------------+
+----------------+--------------------+----------+--------------+---------+-----------+-------------+---------+---------+----------------+



# Processing

In [12]:
def process_delta(delta_path, table_name, pk_column_name):

    # Получаем список директорий (без скрытых)
    delta_dirs = [d for d in os.listdir(delta_path) if not d.startswith('.')]

    # Фильтруем список дельт, оставляя только непрошедшие
    if os.path.exists('logs.csv'):
        last_id = get_last_session_id('logs.csv')
        delta_dirs = [delta_id for delta_id in delta_dirs if delta_id > last_id]


    for delta_id in delta_dirs:

        #переход к файлу
        delta_dir = os.path.join(delta_path, delta_id)
        filename = os.listdir(delta_dir)[0]
        deltafilepath = os.path.join(delta_dir, filename)

        temp = spark.read.csv(deltafilepath, header = True, sep = ';', inferSchema = True)

        log_processed_delta(delta_id, table_name, dt.now().strftime("%Y-%m-%d %H:%M:%S"),'START')

        delta_mirror.alias("mirror").merge(
        source = temp.alias("updates"),
        condition = f"mirror.{pk_column_name} = updates.{pk_column_name}"
    ).whenMatchedUpdate(set =
        {
        "DATA_ACTUAL_DATE": "updates.DATA_ACTUAL_DATE",
        "DATA_ACTUAL_END_DATE": "updates.DATA_ACTUAL_END_DATE",
        "ACCOUNT_NUMBER": "updates.ACCOUNT_NUMBER",
        "CHAR_TYPE": "updates.CHAR_TYPE",
        "CURRENCY_RK": "updates.CURRENCY_RK",
        "CURRENCY_CODE": "updates.CURRENCY_CODE",
        "CLIENT_ID": "updates.CLIENT_ID",
        "BRANCH_ID": "updates.BRANCH_ID",
        "OPEN_IN_INTERNET": "updates.OPEN_IN_INTERNET",
        }
    ).whenNotMatchedInsert(values =
        {
        "DATA_ACTUAL_DATE": "updates.DATA_ACTUAL_DATE",
        "DATA_ACTUAL_END_DATE": "updates.DATA_ACTUAL_END_DATE",
        "ACCOUNT_RK": "updates.ACCOUNT_RK",
        "ACCOUNT_NUMBER": "updates.ACCOUNT_NUMBER",
        "CHAR_TYPE": "updates.CHAR_TYPE",
        "CURRENCY_RK": "updates.CURRENCY_RK",
        "CURRENCY_CODE": "updates.CURRENCY_CODE",
        "CLIENT_ID": "updates.CLIENT_ID",
        "BRANCH_ID": "updates.BRANCH_ID",
        "OPEN_IN_INTERNET": "updates.OPEN_IN_INTERNET",
        }
    ).execute()

        log_processed_delta(delta_id, table_name, dt.now().strftime("%Y-%m-%d %H:%M:%S"),'END')
        delta_mirror.toDF().show()

In [14]:

# delta_path = '/content/drive/MyDrive/data_deltas'  # Путь к директории с дельтами
# table_name = 'md_account_d'
# pk_column_name = 'ACCOUNT_RK'

process_delta(delta_path, table_name, pk_column_name)

+----------------+--------------------+----------+--------------------+---------+-----------+-------------+---------+---------+----------------+
|DATA_ACTUAL_DATE|DATA_ACTUAL_END_DATE|ACCOUNT_RK|      ACCOUNT_NUMBER|CHAR_TYPE|CURRENCY_RK|CURRENCY_CODE|CLIENT_ID|BRANCH_ID|OPEN_IN_INTERNET|
+----------------+--------------------+----------+--------------------+---------+-----------+-------------+---------+---------+----------------+
|      15.02.2018|          31.12.2050|     13560|30110810300000008001|        A|         34|          643|       20|      105|               Y|
|      21.04.2018|          31.12.2050|     13630|30102810900000002185|        A|         34|          643|       21|      107|            null|
|      21.04.2018|          31.12.2050|     13811|30221978100000008100|        A|         44|          978|       33|      201|            null|
|      21.04.2018|          31.12.2050|     13871|30222978200000004100|        P|         44|          978|       63|      105|   

# Запись в итоговый файл

In [15]:
delta_mirror.toDF().write.format("csv").option("header", "true").mode("overwrite").save('/content/mirr_md_account_d')