In [2]:
#pip install delta-spark

In [3]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql import SparkSession
from delta import DeltaTable

In [4]:
# Build Spark Session with Delta Lake and MinIO configurations
spark = SparkSession.builder \
    .appName("DeltaLake-Jupyter") \
    .master("local[*]") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.1,io.delta:delta-spark_2.12:3.2.0") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://host.docker.internal:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "MinioAdmin123") \
    .config("spark.hadoop.fs.s3a.secret.key", "MinioAdmin123") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()


In [5]:
spark

In [7]:
df = ( 
    spark
    .read
    .parquet("s3a://bronze-zone/dataway/sap/clients")
)

In [9]:
(
    df
    .write
    .format("delta")
    .mode("overwrite")
    .save("s3a://silver-zone/dataway/clients")    
)

# Explain Delta

In [None]:
# DeltaLog

In [10]:
# Read Delta
deltaTable = DeltaTable.forPath(spark, "s3a://silver-zone/dataway/clients")
deltaTable.toDF().show()

+--------------------+--------------------+--------------------+--------------------+------+--------------------+-------------+
|             address|               email|                name|        phone_number|system|        processed_at|  nationality|
+--------------------+--------------------+--------------------+--------------------+------+--------------------+-------------+
|6510 Williams Cur...|zachary35@example...|         Angela Ward|          4695162782|   sap|2025-01-11 19:31:...|NOT_BRAZILIAN|
|PSC 3948, Box 662...|browntammy@exampl...|        Donald Owens|        898.341.9392|   sap|2025-01-11 19:31:...|NOT_BRAZILIAN|
|807 Anderson Vill...|jessica82@example...|    Michele Williams|          7846921499|   sap|2025-01-11 19:31:...|NOT_BRAZILIAN|
|90749 Paula Mount...|smithchristopher@...|        Cathy Curtis|       (429)360-1800|   sap|2025-01-11 19:31:...|NOT_BRAZILIAN|
|425 Brittney Mews...|   hlowe@example.org|       Benjamin Kemp|    241.451.7586x618|   sap|2025-01-11 1

In [11]:
# Upsert
df_protheus = ( 
    spark
    .read
    .parquet("s3a://bronze-zone/dataway/protheus/clients")
)
df_protheus = df.dropDuplicates(["email"])
df_protheus.printSchema()

root
 |-- address: string (nullable = true)
 |-- email: string (nullable = true)
 |-- name: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- system: string (nullable = true)
 |-- processed_at: timestamp (nullable = true)
 |-- nationality: string (nullable = true)



# UPSERT

In [28]:
#spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled","true")
(
    deltaTable.alias("dados_atuais")
    .merge(
        df_protheus.alias("novos_dados"),
        "dados_atuais.email = novos_dados.email"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)

# Optimization

In [19]:
# Optimize
deltaTable.optimize().executeCompaction()

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,totalClusterPar

In [20]:
# Vaccum
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled","false")

spark.sql(f"VACUUM delta.`{delta_table_path}` RETAIN 0 HOURS")

DataFrame[path: string]

# History

In [31]:
(
    deltaTable
    .history()
    .select("version", "timestamp", "operation")
    .show(truncate=False, vertical=False)
)

+-------+-------------------+---------+
|version|timestamp          |operation|
+-------+-------------------+---------+
|6      |2025-01-11 21:50:44|RESTORE  |
|5      |2025-01-11 21:50:24|MERGE    |
|4      |2025-01-11 21:42:34|MERGE    |
|3      |2025-01-11 21:42:26|MERGE    |
|2      |2025-01-11 21:31:00|MERGE    |
|1      |2025-01-11 21:20:07|WRITE    |
|0      |2025-01-11 21:15:08|WRITE    |
+-------+-------------------+---------+



# TimeTravel

In [30]:
deltaTable.restoreToVersion(4)

DataFrame[table_size_after_restore: bigint, num_of_files_after_restore: bigint, num_removed_files: bigint, num_restored_files: bigint, removed_files_size: bigint, restored_files_size: bigint]