<a href="https://colab.research.google.com/github/RubenCarty/Master-Artificial-Intelligence-UNIR/blob/main/delta_lake_Ruben.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setup

In [None]:
!pip install -q pyspark

!pip install -q delta-spark

In [None]:
def _create_delta_spark():
  from pyspark.sql import SparkSession
  from delta import configure_spark_with_delta_pip
  builder = SparkSession.builder.appName("KeyFeaturesDeltaLake") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
    .config("spark.jars.packages","io.delta:delta-core_2.12:2.0.0")
  return configure_spark_with_delta_pip(builder).getOrCreate()

spark = _create_delta_spark()

In [None]:
spark

# Create Delta Table

In [None]:
df1 = spark.createDataFrame(
        data=[(1, "A"), (2, "B"), (3, "C")],
        schema=["col1", "col2"]
    )

df1.write \
  .format("delta") \
  .saveAsTable("my_delta_table")

query = "SELECT * FROM my_delta_table ORDER BY col1"

spark.sql(query).show()

+----+----+
|col1|col2|
+----+----+
|   1|   A|
|   2|   B|
|   3|   C|
+----+----+



# Key Features

## 1. Schema Evolution / Enforcement

Automatically handles schema variations to prevent insertion of bad records during ingestion

In [None]:
# New DataFrame with a different schema
df2 = spark.createDataFrame(
        data=[(4, "D", "X"), (5, "E", "Y")],
        schema=["col1", "col2", "col3"]
    )

df2.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   4|   D|   X|
|   5|   E|   Y|
+----+----+----+



In [None]:
# Schema mismatch error
try:
    df2.write \
      .format("delta") \
      .mode("append") \
      .saveAsTable("my_delta_table")
except Exception as e:
    print(e)

[_LEGACY_ERROR_TEMP_DELTA_0007] A schema mismatch detected when writing to the Delta table (Table ID: 1c322c97-480f-463d-a40b-ae680d16d421).
To enable schema migration using DataFrameWriter or DataStreamWriter, please set:
'.option("mergeSchema", "true")'.
For other operations, set the session configuration
spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation
specific to the operation for details.

Table schema:
root
-- col1: long (nullable = true)
-- col2: string (nullable = true)


Data schema:
root
-- col1: long (nullable = true)
-- col2: string (nullable = true)
-- col3: string (nullable = true)

         


In [None]:
# Automatic schema evolution with mergeSchema
df2.write \
  .format("delta") \
  .mode("append") \
  .option("mergeSchema", "true") \
  .saveAsTable("my_delta_table")

query = "SELECT * FROM my_delta_table ORDER BY col1"

spark.sql(query).show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   A|NULL|
|   2|   B|NULL|
|   3|   C|NULL|
|   4|   D|   X|
|   5|   E|   Y|
+----+----+----+



## 2. Audit History

Delta Lake log all change details providing a fill audit trail

In [None]:
query = "DESCRIBE HISTORY my_delta_table"

spark.sql(query).select("version", "timestamp", "operation").show(truncate=False)

+-------+-----------------------+----------------------+
|version|timestamp              |operation             |
+-------+-----------------------+----------------------+
|1      |2024-12-19 13:34:14.1  |WRITE                 |
|0      |2024-12-19 13:33:49.539|CREATE TABLE AS SELECT|
+-------+-----------------------+----------------------+



## 3. DML Operations

Supports merge, update and delete operations

UPDATE

In [None]:
query = "UPDATE my_delta_table SET col3 = 'Z' WHERE col3 IS NULL"

spark.sql(query)

DataFrame[num_affected_rows: bigint]

In [None]:
query = "SELECT * FROM my_delta_table ORDER BY col1"

spark.sql(query).show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   A|   Z|
|   2|   B|   Z|
|   3|   C|   Z|
|   4|   D|   X|
|   5|   E|   Y|
+----+----+----+



DELETE

In [None]:
query = "DELETE FROM my_delta_table WHERE col1 IN (2, 4)"

spark.sql(query).show()

+-----------------+
|num_affected_rows|
+-----------------+
|                2|
+-----------------+



In [None]:
query = "SELECT * FROM my_delta_table ORDER BY col1"

spark.sql(query).show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   A|   Z|
|   3|   C|   Z|
|   5|   E|   Y|
+----+----+----+



DESCRIBE HISTORY

In [None]:
query = "DESCRIBE HISTORY my_delta_table"

spark.sql(query).select("version", "timestamp", "operation").show(truncate=False)

+-------+-----------------------+----------------------+
|version|timestamp              |operation             |
+-------+-----------------------+----------------------+
|3      |2024-12-19 13:34:41.337|DELETE                |
|2      |2024-12-19 13:34:29.151|UPDATE                |
|1      |2024-12-19 13:34:14.1  |WRITE                 |
|0      |2024-12-19 13:33:49.539|CREATE TABLE AS SELECT|
+-------+-----------------------+----------------------+



## 4. Time travel

Access/revert to earlier versions of data for audits, rollbacks, or reproduce

In [None]:
# Version 0 = CREATE TABLE AS SELECT
query = "SELECT * FROM my_delta_table VERSION AS OF 0 ORDER BY col1"

spark.sql(query).show()

+----+----+
|col1|col2|
+----+----+
|   1|   A|
|   2|   B|
|   3|   C|
+----+----+



In [None]:
# Version 2 = UPDATE
query = "SELECT * FROM my_delta_table VERSION AS OF 2 ORDER BY col1"

spark.sql(query).show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   A|   Z|
|   2|   B|   Z|
|   3|   C|   Z|
|   4|   D|   X|
|   5|   E|   Y|
+----+----+----+



## 5. Unified Batch/Streaming

A table in Delta Lake is a batch table as well as a streaming source and sink

In [None]:
# Batch
df3 = spark.read.table("my_delta_table")

df3.isStreaming

False

In [None]:
# Streaming
df3 = spark.readStream.table("my_delta_table")

df3.isStreaming

True