**Delta Lake basic commands**

This notebook aims to show delta lake commands on pyspark.  
There are two repository on github that help me and here are the credits:  

<a href="https://github.com/jaumpedro214/posts/tree/main/delta_lake_fundamentals">jaumpedro214</a>   
<a href= "https://github.com/handreassa/delta-docker/blob/main/notebooks/example.ipynb">handreassa</a> 

In [42]:
# Connect to Spark and import libs

import pyspark
from delta import *
from pyspark.sql.functions import lit
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, DoubleType

builder = pyspark.sql.SparkSession.builder.appName("LocalDelta") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [3]:
# Create the schema

SCHEMA = StructType(
    [
        StructField('id', StringType(), True), 
        StructField('data_inversa', StringType(), True), 
        StructField('dia_semana', StringType(), True), 
        StructField('horario', StringType(), True), 
        StructField('uf', StringType(), True), 
        StructField('br', StringType(), True), 
        StructField('km', StringType(), True), 
        StructField('municipio', StringType(), True), 
        StructField('causa_acidente', StringType(), True), 
        StructField('tipo_acidente', StringType(), True), 
        StructField('classificacao_acidente', StringType(), True), 
        StructField('fase_dia', StringType(), True), 
        StructField('sentido_via', StringType(), True), 
        StructField('condicao_metereologica', StringType(), True), 
        StructField('tipo_pista', StringType(), True), 
        StructField('tracado_via', StringType(), True), 
        StructField('uso_solo', StringType(), True), 
        StructField('pessoas', IntegerType(), True), 
        StructField('mortos', IntegerType(), True), 
        StructField('feridos_leves', IntegerType(), True), 
        StructField('feridos_graves', IntegerType(), True), 
        StructField('ilesos', IntegerType(), True), 
        StructField('ignorados', IntegerType(), True), 
        StructField('feridos', IntegerType(), True), 
        StructField('veiculos', StringType(), True), 
        StructField('latitude', DoubleType(), True), 
        StructField('longitude', DoubleType(), True), 
        StructField('regional', StringType(), True), 
        StructField('delegacia', StringType(), True), 
        StructField('uop', StringType(), True)
    ]
)

In [4]:
# Load data from source

df_acidentes = (
    spark
    .read.format("csv")
    .option("delimiter", ";")
    .option("header", "true")
    .option("encoding", "ISO-8859-1")
    .schema(SCHEMA)    
    .load("./../data/acidentes/datatran2020.csv")
)

df_acidentes.show(5)

23/09/24 16:30:51 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+------+------------+------------+--------+---+---+-----+--------------------+--------------------+--------------------+----------------------+---------+-----------+----------------------+----------+-----------+--------+-------+------+-------------+--------------+------+---------+-------+--------+------------+------------+--------+---------+--------------+
|    id|data_inversa|  dia_semana| horario| uf| br|   km|           municipio|      causa_acidente|       tipo_acidente|classificacao_acidente| fase_dia|sentido_via|condicao_metereologica|tipo_pista|tracado_via|uso_solo|pessoas|mortos|feridos_leves|feridos_graves|ilesos|ignorados|feridos|veiculos|    latitude|   longitude|regional|delegacia|           uop|
+------+------------+------------+--------+---+---+-----+--------------------+--------------------+--------------------+----------------------+---------+-----------+----------------------+----------+-----------+--------+-------+------+-------------+--------------+------+---------+-

##### 1. Write a delta table format

In [5]:
path = "./../data/delta/acidentes/"

(df_acidentes.write
             .format("delta")
             .mode("overwrite")
             .save(path))

                                                                                

##### 2. Read from a delta table

In [6]:
df_acidentes_delta = (
    spark
    .read.format("delta")
    .load(path)
)

In [7]:
# And that way it is possible execute query as usual 

df_acidentes_delta.select(["id", "data_inversa", "dia_semana", "horario", "uf"]).show(5)


                                                                                

+------+------------+----------+--------+---+
|    id|data_inversa|dia_semana| horario| uf|
+------+------------+----------+--------+---+
|324581|  2020-11-29|   domingo|12:30:00| SC|
|324585|  2020-11-29|   domingo|12:30:00| PE|
|324586|  2020-11-29|   domingo|12:10:00| RJ|
|324587|  2020-11-29|   domingo|13:00:00| SC|
|324588|  2020-11-29|   domingo|11:50:00| SC|
+------+------------+----------+--------+---+
only showing top 5 rows



In [8]:
df_acidentes_delta.count()

63576

##### 3.Add new data on the table

In [9]:
# Let's append 2019 data to the Delta table. Nothing different if was parquet table format

df_acidentes_2019 = (
    spark
    .read.format("csv")
    .option("delimiter", ";")
    .option("header", "true")
    .schema(SCHEMA)    
    .load("./../data/acidentes/datatran2019.csv")
)

In [10]:
df_acidentes_2019.write.format("delta").mode("append").save(path)
                 

                                                                                

In [11]:
df_acidentes_delta.count()

131132

##### 4. History logs of the table

Running the history command it's possible see like a version control history in reverse chronological order.  
To read the log, we can use a special python object called `DeltaTable`.

In [12]:
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, path)

delta_table.history().show()

+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      7|2023-09-24 16:42:...|  null|    null|    WRITE|{mode -> Append, ...|null|    null|     null|          6|  Serializable|         true|{numFiles -> 5, n...|        null|Apache-Spark/3.4....|
|      6|2023-09-24 16:31:...|  null|    null|    WRITE|{mode -> Overwrit...|null|    null|     null|          5|  Serializable|        false|{numFiles -> 5, n...|        null|Apache-Spark/3.4....|
|      5|2

In [13]:
delta_table.history().select("version", "timestamp", "operation", "operationParameters").show(10, False)

+-------+-----------------------+---------+--------------------------------------+
|version|timestamp              |operation|operationParameters                   |
+-------+-----------------------+---------+--------------------------------------+
|7      |2023-09-24 16:42:12.885|WRITE    |{mode -> Append, partitionBy -> []}   |
|6      |2023-09-24 16:31:04.086|WRITE    |{mode -> Overwrite, partitionBy -> []}|
|5      |2023-09-23 18:08:26.662|WRITE    |{mode -> Overwrite, partitionBy -> []}|
|4      |2023-09-23 18:08:22.151|WRITE    |{mode -> Overwrite, partitionBy -> []}|
|3      |2023-09-23 16:57:12.399|WRITE    |{mode -> Overwrite, partitionBy -> []}|
|2      |2023-09-23 16:56:28.117|WRITE    |{mode -> Overwrite, partitionBy -> []}|
|1      |2023-09-23 16:03:00.077|WRITE    |{mode -> Overwrite, partitionBy -> []}|
|0      |2023-09-22 22:57:38.058|WRITE    |{mode -> Overwrite, partitionBy -> []}|
+-------+-----------------------+---------+--------------------------------------+



In [16]:
# Read a specific version of the table, by default the Spark will read the latest version

df_acidentes_latest = (
    spark
    .read.format("delta")
    .load(path)
)

df_acidentes_latest.count()

131132

In [15]:
# It's possible to read a specific version

df_acidentes_version_0 = (
    spark
    .read.format("delta")
    .option("versionAsOf", 0)
    .load(path)
)

df_acidentes_version_0.count()

                                                                                

63576

In [17]:
# Revert to previous version, this comand save data on the original path where are delta table
delta_table.restoreToVersion(0)

                                                                                

DataFrame[table_size_after_restore: bigint, num_of_files_after_restore: bigint, num_removed_files: bigint, num_restored_files: bigint, removed_files_size: bigint, restored_files_size: bigint]

In [18]:
# Now there isn't more 2019 data in the last version
# Interesting that don't need read again the data to update value on the variable df_acidentes_latest
df_acidentes_latest.count()

                                                                                

63576

In [19]:
# Last operation to revert to version 0 was inclued in the log of history.
delta_table.history().select("version", "timestamp", "operation", "operationParameters").show(10, False)

+-------+-----------------------+---------+--------------------------------------+
|version|timestamp              |operation|operationParameters                   |
+-------+-----------------------+---------+--------------------------------------+
|8      |2023-09-24 17:17:49.697|RESTORE  |{version -> 0, timestamp -> null}     |
|7      |2023-09-24 16:42:12.885|WRITE    |{mode -> Append, partitionBy -> []}   |
|6      |2023-09-24 16:31:04.086|WRITE    |{mode -> Overwrite, partitionBy -> []}|
|5      |2023-09-23 18:08:26.662|WRITE    |{mode -> Overwrite, partitionBy -> []}|
|4      |2023-09-23 18:08:22.151|WRITE    |{mode -> Overwrite, partitionBy -> []}|
|3      |2023-09-23 16:57:12.399|WRITE    |{mode -> Overwrite, partitionBy -> []}|
|2      |2023-09-23 16:56:28.117|WRITE    |{mode -> Overwrite, partitionBy -> []}|
|1      |2023-09-23 16:03:00.077|WRITE    |{mode -> Overwrite, partitionBy -> []}|
|0      |2023-09-22 22:57:38.058|WRITE    |{mode -> Overwrite, partitionBy -> []}|
+---

In [20]:
# Actually, no information is lost.
# We can restore back to the version with the data form 2020 and 2019.
delta_table.restoreToVersion(7)

                                                                                

DataFrame[table_size_after_restore: bigint, num_of_files_after_restore: bigint, num_removed_files: bigint, num_restored_files: bigint, removed_files_size: bigint, restored_files_size: bigint]

In [21]:
df_acidentes_latest.count()

                                                                                

131132

##### 5. Update
The DeltaTable object there is a operation update wutg the SQL syntax.

In [22]:
df_acidentes_2016 = (
    spark
    .read.format("csv")
    .option("delimiter", ";")
    .option("header", "true")
    .option("encoding", "ISO-8859-1")
    .schema(SCHEMA)
    .load("./../data/acidentes/datatran2016.csv")
)

df_acidentes_2016.select("data_inversa").show(5)

+------------+
|data_inversa|
+------------+
|    10/06/16|
|    01/01/16|
|    01/01/16|
|    01/01/16|
|    01/01/16|
+------------+
only showing top 5 rows



In [24]:
df_acidentes_2016.write.format("delta").mode("append").save(path)

23/09/24 17:49:35 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 25, schema size: 30
CSV file: file:///home/alipio/Data-Engineering/delta_lake_commands/data/acidentes/datatran2016.csv
                                                                                

In [30]:
df_acidentes_latest.count()

                                                                                

227495

In [34]:
df_acidentes_latest.createOrReplaceTempView("acidentes_latest")

spark.sql(
    """
    UPDATE acidentes_latest
    SET data_inversa = CAST( TO_DATE(data_inversa, 'dd/MM/yy') AS STRING)
    WHERE data_inversa LIKE '%/16'
    """
)

                                                                                

DataFrame[num_affected_rows: bigint]

In [37]:
df_acidentes_latest.filter("data_inversa LIKE '%/16'").count()

0

In [38]:
delta_table.history().select("version", "timestamp", "operation", "operationParameters").show(10, False)

+-------+-----------------------+---------+---------------------------------------------------+
|version|timestamp              |operation|operationParameters                                |
+-------+-----------------------+---------+---------------------------------------------------+
|11     |2023-09-24 18:05:32.664|UPDATE   |{predicate -> ["EndsWith(data_inversa#5099, /16)"]}|
|10     |2023-09-24 17:49:37.462|WRITE    |{mode -> Append, partitionBy -> []}                |
|9      |2023-09-24 17:30:46.723|RESTORE  |{version -> 7, timestamp -> null}                  |
|8      |2023-09-24 17:17:49.697|RESTORE  |{version -> 0, timestamp -> null}                  |
|7      |2023-09-24 16:42:12.885|WRITE    |{mode -> Append, partitionBy -> []}                |
|6      |2023-09-24 16:31:04.086|WRITE    |{mode -> Overwrite, partitionBy -> []}             |
|5      |2023-09-23 18:08:26.662|WRITE    |{mode -> Overwrite, partitionBy -> []}             |
|4      |2023-09-23 18:08:22.151|WRITE  

In [39]:
df_acidentes_latest.filter("data_inversa LIKE '2016-%'").count()

96363

In [41]:
df_acidentes_latest.select("data_inversa").filter("data_inversa LIKE '2016-%'").show(5)

+------------+
|data_inversa|
+------------+
|  2016-03-18|
|  2016-03-17|
|  2016-03-18|
|  2016-03-18|
|  2016-03-18|
+------------+
only showing top 5 rows



##### 6. Merge


In [49]:
df_acidentes_2018 = (
    spark
    .read.format("csv")
    .option("delimiter", ";")
    .option("header", "true")
    .option("encoding", "ISO-8859-1")
    .schema(SCHEMA)
    .load("./../data/acidentes/datatran2018.csv")
)

# WithColumn here has replace function
df_acidentes_2018_zero = df_acidentes_2018.withColumn("pessoas", lit(0))

In [56]:
df_acidentes_2018.filter("data_inversa LIKE '2018-%'").select(["data_inversa","id","pessoas"]).show(5)

+------------+------+-------+
|data_inversa|    id|pessoas|
+------------+------+-------+
|  2018-01-01|100027|      2|
|  2018-01-01|100044|      2|
|  2018-01-01|100046|      2|
|  2018-01-01|100052|      2|
|  2018-01-01|100053|      1|
+------------+------+-------+
only showing top 5 rows



In [None]:
100023

In [51]:
df_acidentes_2018_zero.write.format("delta").mode("append").save(path)

                                                                                

In [52]:
df_acidentes_latest.count()

296827

In [53]:
df_acidentes_latest.filter("data_inversa LIKE '2018-%'").select(["data_inversa", "pessoas"]).show(5)


+------------+-------+
|data_inversa|pessoas|
+------------+-------+
|  2018-01-01|      0|
|  2018-01-01|      0|
|  2018-01-01|      0|
|  2018-01-01|      0|
|  2018-01-01|      0|
+------------+-------+
only showing top 5 rows



In [65]:
df_acidentes_2018.filter("data_inversa LIKE '2018-%' AND id = '100023'").select(["data_inversa","id","pessoas"]).show(5)

+------------+------+-------+
|data_inversa|    id|pessoas|
+------------+------+-------+
|  2018-01-01|100023|      4|
+------------+------+-------+



In [66]:
df_acidentes_2018_zero.filter("data_inversa LIKE '2018-%' AND id = '100023'").select(["data_inversa","id","pessoas"]).show(5)

+------------+------+-------+
|data_inversa|    id|pessoas|
+------------+------+-------+
|  2018-01-01|100023|      0|
+------------+------+-------+



In [57]:
df_acidentes_latest.createOrReplaceTempView("acidentes_latest")
df_acidentes_2018.createOrReplaceTempView("acidentes_2018_new_counts")

spark.sql(
    """
    MERGE INTO acidentes_latest
    USING acidentes_2018_new_counts

    ON acidentes_latest.id = acidentes_2018_new_counts.id
    AND acidentes_latest.data_inversa = acidentes_2018_new_counts.data_inversa
    
    WHEN MATCHED THEN
        UPDATE SET pessoas = acidentes_latest.pessoas + acidentes_2018_new_counts.pessoas
    
    WHEN NOT MATCHED THEN
        INSERT *
    """
)

23/09/24 18:58:27 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [59]:
df_acidentes_latest.filter("data_inversa LIKE '2018-%'").select(["data_inversa","id","pessoas"]).show(5)


+------------+------+-------+
|data_inversa|    id|pessoas|
+------------+------+-------+
|  2018-01-01|100002|      2|
|  2018-01-01|100023|      4|
|  2018-01-01|100028|      2|
|  2018-01-01|100031|      2|
|  2018-01-01|100035|      1|
+------------+------+-------+
only showing top 5 rows



In [62]:
df_acidentes_latest.filter("data_inversa LIKE '2018-%' AND id = '100054'").select(["data_inversa","id","pessoas"]).show(5)

+------------+------+-------+
|data_inversa|    id|pessoas|
+------------+------+-------+
|  2018-01-01|100053|      1|
+------------+------+-------+

