## Install neccessary packages

In [1]:
!pip install delta-spark==3.2.0



## Work with DeltaLake (CDF)

### Import neccessary packages

In [55]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
from delta import *

### Initialize and set up spark to work with DeltaLake (CDF)

In [3]:
builder = SparkSession.builder.appName("Delta Lake Implementation Lab") \
                              .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
                              .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

### Create schema of data and put first data to DeltaLake

In [40]:
schema = StructType([
    # id маршруту
    StructField("route_id", IntegerType(), nullable=False),
    # Версія маршруту, яка буде братися з іншої гіпотетичної таблиці, де буде детально розписано всі зупинки маршруту
    StructField("route_version", IntegerType(), nullable=False),
    # Кількість транспорту на маршруті наразі - теж можна вважати статистичним полем
    StructField("count_vehicles", IntegerType(), nullable=False),
    # Тип транспорту -> автобус, трамвай, тролейбус тощо. Буде складником складеного ключа, оскільки ID маршрутів можуть бути одними тими самими для різних типів тр.
    StructField("vehicle_type", StringType(), nullable=False),
    # id розкладу, яка буде силатися на іншу таблицю, де буде міститься детальна інформація про розклад
    StructField("timetables_id", IntegerType(), nullable=False),
    # Сума часу затримки, яка буде враховувати всі виїзди транспорту на маршрут (буде мати статистику за певний день). Буде оновлюватися по прибуттю транспорту
    # на кожну зупинку. В секундах
    StructField("sum_delay", FloatType(), nullable=False),
    # Середня затримка, яка буде рахуватися відношенням між сумарною затримкою та кількістю виїздів транспорту на маршрут (буде мати статистику за певний день).
    # Буде оновлюватися по прибуттю транспорту на кожну зупинку. В секундах
    StructField("mean_delay", FloatType(), nullable=False)
])

# Дана схема буде зберігати статистику про затримки певного маршруту, та буде оновлюватися при прибуттю на кожну зупинку (наприклад, за допомогою GPS-трекерів,
# які активно впроваджуються в міський транспорт в місті Києві та можна переглядати місцеперебування певного транспорту за допомогою такої системи як EWay),
# також кожного дня статистичні дані будуть обнулятися (наприклад, при першому виїзді транспорту кожного дня будуть обнулятися колонки sum_delay та mean_delay). 
# Тоді можна буде аналізувати затримки в розрізі днів, годин тощо. Або обʼєднавши дані з таблицею розкладу, аналізувати затримки між певними зупинками транспорту

# Зімітуємо початок робочого дня, коли починають виходити перші транспорти на маршрути, тобто статистичні дані будуть містити нулі
initial_data = [
    (1, 1, 0, "автобус", 101, 0.0, 0.0),
    (1, 1, 0, "тролейбус", 102, 0.0, 0.0),
    (2, 1, 0, "трамвай", 103, 0.0, 0.0),
    (3, 1, 0, "тролейбус", 104, 0.0, 0.0)
]

df = spark.createDataFrame(initial_data, schema=schema)
df.show()

+--------+-------------+--------------+------------+-------------+---------+----------+
|route_id|route_version|count_vehicles|vehicle_type|timetables_id|sum_delay|mean_delay|
+--------+-------------+--------------+------------+-------------+---------+----------+
|       1|            1|             0|     автобус|          101|      0.0|       0.0|
|       1|            1|             0|   тролейбус|          102|      0.0|       0.0|
|       2|            1|             0|     трамвай|          103|      0.0|       0.0|
|       3|            1|             0|   тролейбус|          104|      0.0|       0.0|
+--------+-------------+--------------+------------+-------------+---------+----------+



In [41]:
# Створюємо Delta таблицю по раніше створеній схемі та включимо CDF
DeltaTable.createIfNotExists(spark) \
          .tableName("vehicle_monitoring") \
          .property("delta.enableChangeDataFeed", "true") \
          .addColumns(schema) \
          .execute()

<delta.tables.DeltaTable at 0xffff50f35a10>

In [42]:
# Запишемо перші дані в Delta таблицю
df.write.format("delta").mode("overwrite").saveAsTable("vehicle_monitoring")

In [43]:
# Прочитаємо записані дані
data_df = spark.read.format("delta").table("vehicle_monitoring")
data_df.show()

+--------+-------------+--------------+------------+-------------+---------+----------+
|route_id|route_version|count_vehicles|vehicle_type|timetables_id|sum_delay|mean_delay|
+--------+-------------+--------------+------------+-------------+---------+----------+
|       3|            1|             0|   тролейбус|          104|      0.0|       0.0|
|       1|            1|             0|   тролейбус|          102|      0.0|       0.0|
|       2|            1|             0|     трамвай|          103|      0.0|       0.0|
|       1|            1|             0|     автобус|          101|      0.0|       0.0|
+--------+-------------+--------------+------------+-------------+---------+----------+



In [44]:
# Створимо новий DataFrame (виявили, що змінилась версія маршруту для першого маршруту автобусів та оновився графік) та додамо новий маршрут
updated_data = [
    (1, 2, 0, "автобус", 105, 0.0, 0.0),
    (28, 1, 0, "автобус", 110, 0.0, 0.0),
]

updated_df = spark.createDataFrame(updated_data, schema=schema)
updated_df.show()

+--------+-------------+--------------+------------+-------------+---------+----------+
|route_id|route_version|count_vehicles|vehicle_type|timetables_id|sum_delay|mean_delay|
+--------+-------------+--------------+------------+-------------+---------+----------+
|       1|            2|             0|     автобус|          105|      0.0|       0.0|
|      28|            1|             0|     автобус|          110|      0.0|       0.0|
+--------+-------------+--------------+------------+-------------+---------+----------+



### Update information into Delta Table

In [45]:
# Отримаємо Delta таблицю (створеної раніше)
delta_table = DeltaTable.forName(spark, "vehicle_monitoring")

# Виконання merge (upsert) операції
delta_table.alias("old_data") \
           .merge(updated_df.alias("updated_data"),
                  "old_data.route_id = updated_data.route_id AND old_data.vehicle_type = updated_data.vehicle_type") \
           .whenMatchedUpdate(set={
               "route_version": "updated_data.route_version",
               "count_vehicles": "updated_data.count_vehicles",
               "timetables_id": "updated_data.timetables_id",
               "sum_delay": "updated_data.sum_delay",
               "mean_delay": "updated_data.mean_delay"}) \
           .whenNotMatchedInsertAll() \
           .execute()

In [46]:
# Перегляд оновленої інформації
delta_table.toDF().show()

+--------+-------------+--------------+------------+-------------+---------+----------+
|route_id|route_version|count_vehicles|vehicle_type|timetables_id|sum_delay|mean_delay|
+--------+-------------+--------------+------------+-------------+---------+----------+
|       1|            2|             0|     автобус|          105|      0.0|       0.0|
|      28|            1|             0|     автобус|          110|      0.0|       0.0|
|       3|            1|             0|   тролейбус|          104|      0.0|       0.0|
|       1|            1|             0|   тролейбус|          102|      0.0|       0.0|
|       2|            1|             0|     трамвай|          103|      0.0|       0.0|
+--------+-------------+--------------+------------+-------------+---------+----------+



In [47]:
# Оновимо інформацію по кількості транспорту на маршруті та значення затримок на маршрутах після першої зупинки
data_first_station = [
    (1, 2, 2, "автобус", 105, 15.0, 7.5),
    (28, 1, 2, "автобус", 110, 10.0, 5.0),
    (1, 1, 1, "тролейбус", 102, 7.0, 7.0),
    (2, 1, 1, "трамвай", 103, 0.0, 0.0),
    (3, 1, 1, "тролейбус", 104, 20.0, 20.0)
]

data_first_station_df = spark.createDataFrame(data_first_station, schema=schema)
data_first_station_df.show()

+--------+-------------+--------------+------------+-------------+---------+----------+
|route_id|route_version|count_vehicles|vehicle_type|timetables_id|sum_delay|mean_delay|
+--------+-------------+--------------+------------+-------------+---------+----------+
|       1|            2|             2|     автобус|          105|     15.0|       7.5|
|      28|            1|             2|     автобус|          110|     10.0|       5.0|
|       1|            1|             1|   тролейбус|          102|      7.0|       7.0|
|       2|            1|             1|     трамвай|          103|      0.0|       0.0|
|       3|            1|             1|   тролейбус|          104|     20.0|      20.0|
+--------+-------------+--------------+------------+-------------+---------+----------+



In [48]:
# Оновимо дані в Delta таблиці (використаємо раніше створений обʼєкт Delta таблиці)
delta_table.alias("old_data") \
           .merge(data_first_station_df.alias("updated_data"),
                  "old_data.route_id = updated_data.route_id AND old_data.vehicle_type = updated_data.vehicle_type") \
           .whenMatchedUpdate(set={
               "route_version": "updated_data.route_version",
               "count_vehicles": "updated_data.count_vehicles",
               "timetables_id": "updated_data.timetables_id",
               "sum_delay": "updated_data.sum_delay",
               "mean_delay": "updated_data.mean_delay"}) \
           .whenNotMatchedInsertAll() \
           .execute()

In [49]:
# Перегляд оновленої інформації
delta_table.toDF().show()

+--------+-------------+--------------+------------+-------------+---------+----------+
|route_id|route_version|count_vehicles|vehicle_type|timetables_id|sum_delay|mean_delay|
+--------+-------------+--------------+------------+-------------+---------+----------+
|       1|            2|             2|     автобус|          105|     15.0|       7.5|
|       1|            1|             1|   тролейбус|          102|      7.0|       7.0|
|       2|            1|             1|     трамвай|          103|      0.0|       0.0|
|       3|            1|             1|   тролейбус|          104|     20.0|      20.0|
|      28|            1|             2|     автобус|          110|     10.0|       5.0|
+--------+-------------+--------------+------------+-------------+---------+----------+



### Check history of data

In [50]:
# Подивимось всі версії даних в Delta таблиці
delta_table.history().show()

+-------+--------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|           operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      3|2025-01-05 18:53:...|  NULL|    NULL|               MERGE|{predicate -> ["(...|NULL|    NULL|     NULL|          2|  Serializable|        false|{numTargetRowsCop...|        NULL|Apache-Spark/3.5....|
|      2|2025-01-05 18:53:...|  NULL|    NULL|               MERGE|{predicate -> ["(...|NULL|    NULL|     NULL|          1|  Serializable|        false|{numTargetR

In [52]:
# Повернемось до попередньої версії Delta таблиці (Застосування: наприклад, для перегляду інформації про затримки попереднього дня)
previous_version_df = spark.read.format("delta").option("versionAsOf", 2).table("vehicle_monitoring")
previous_version_df.show()

+--------+-------------+--------------+------------+-------------+---------+----------+
|route_id|route_version|count_vehicles|vehicle_type|timetables_id|sum_delay|mean_delay|
+--------+-------------+--------------+------------+-------------+---------+----------+
|       1|            2|             0|     автобус|          105|      0.0|       0.0|
|      28|            1|             0|     автобус|          110|      0.0|       0.0|
|       3|            1|             0|   тролейбус|          104|      0.0|       0.0|
|       1|            1|             0|   тролейбус|          102|      0.0|       0.0|
|       2|            1|             0|     трамвай|          103|      0.0|       0.0|
+--------+-------------+--------------+------------+-------------+---------+----------+



In [54]:
# Для перегляду змін використаємо CDF (Застосування, для перегляду змін в інформації про затримки певного дня)
history = spark.read.format("delta") \
                .option("readChangeFeed", "true") \
                .option("startingVersion", "0") \
                .table("vehicle_monitoring")

history.show(truncate=False)

+--------+-------------+--------------+------------+-------------+---------+----------+----------------+---------------+-----------------------+
|route_id|route_version|count_vehicles|vehicle_type|timetables_id|sum_delay|mean_delay|_change_type    |_commit_version|_commit_timestamp      |
+--------+-------------+--------------+------------+-------------+---------+----------+----------------+---------------+-----------------------+
|1       |2            |0             |автобус     |105          |0.0      |0.0       |update_preimage |3              |2025-01-05 18:53:24.436|
|1       |2            |2             |автобус     |105          |15.0     |7.5       |update_postimage|3              |2025-01-05 18:53:24.436|
|1       |1            |0             |тролейбус   |102          |0.0      |0.0       |update_preimage |3              |2025-01-05 18:53:24.436|
|1       |1            |1             |тролейбус   |102          |7.0      |7.0       |update_postimage|3              |2025-01-05

In [None]:
spark

In [39]:
spark.sql(f"DROP TABLE IF EXISTS vehicle_monitoring")

DataFrame[]