In [1]:
# Imports

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType, DateType

from delta import *

In [None]:
# Criando a SparkSession 

spark = ( 
    SparkSession
    .builder
    .master("local[*]")
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate() 
)

In [4]:
# Criando uma lista de tuplas que contem os dados da tabela cliente que vao ser utilizados neste caso

data_clientes = [
    ("1", "MARIA","12345678901"),
    ("2", "LUANA","99999999999"),
    ("3", "PAULO","11122233345")
]

# Criando o schema da tabela clientes

schema_clientes = (
    StructType([
        StructField("CLIENTE_ID", StringType(), False), # false indica que nao pode ser nulo
        StructField("NOME", StringType(), True),
        StructField("CPF", StringType(), True) # true indica que pode ser nulo
    ])
)

# Criando um dataframe a partir dos dados e do schema

df_clientes = spark.createDataFrame(data=data_clientes,schema=schema_clientes)

# Mostrando o dataframe criado

df_clientes.show(truncate=False)

+----------+-----+-----------+
|CLIENTE_ID|NOME |CPF        |
+----------+-----+-----------+
|1         |MARIA|12345678901|
|2         |LUANA|99999999999|
|3         |PAULO|11122233345|
+----------+-----+-----------+



In [None]:
# Salvando a delta table localmente

( 
    df_clientes
    .write
    .format("delta")
    .mode('overwrite')
    .save("./RAW/CLIENTES")
)

In [6]:
# Simulando a atualizacao de registros no data lakehouse

new_data_clientes = [
    ("1", "MARIA LUIZA","12345678901")  
]

df_new_clientes = spark.createDataFrame(data=new_data_clientes, schema=schema_clientes)

df_new_clientes.show()

+----------+-----------+-----------+
|CLIENTE_ID|       NOME|        CPF|
+----------+-----------+-----------+
|         1|MARIA LUIZA|12345678901|
+----------+-----------+-----------+



In [7]:
# Upsert/Merge dos dados atuais com os novos dados

deltaTable = DeltaTable.forPath(spark, "./RAW/CLIENTES")

(
    deltaTable.alias("dados_atuais")
    .merge(
        df_new_clientes.alias("novos_dados"),
        "dados_atuais.CLIENTE_ID = novos_dados.CLIENTE_ID"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)

                                                                                

In [18]:
# Utilizamos esse comando para conferir se o conteudo foi alterado corretamente

deltaTable.toDF().show()



+----------+-----------+-----------+
|CLIENTE_ID|       NOME|        CPF|
+----------+-----------+-----------+
|         1|MARIA LUIZA|12345678901|
|         2|      LUANA|99999999999|
|         3|      PAULO|11122233345|
+----------+-----------+-----------+



                                                                                

In [11]:
# Simulando o delete de registro do DeltaTable

deltaTable.delete("CPF = 12345678901")

                                                                                

In [13]:
# Acessando o historico de mudancas que ocorrem no delta table
(
    deltaTable
    .history()
    .select("version", "timestamp", "operation", "operationMetrics")
    .show()
)

+-------+--------------------+---------+--------------------+
|version|           timestamp|operation|    operationMetrics|
+-------+--------------------+---------+--------------------+
|      2|2024-10-02 19:47:...|   DELETE|{numRemovedFiles ...|
|      1|2024-10-02 19:42:...|    MERGE|{numTargetRowsCop...|
|      0|2024-10-02 19:34:...|    WRITE|{numFiles -> 2, n...|
+-------+--------------------+---------+--------------------+



In [16]:
# Lendo dados da primeira versao da tabela

(
    spark
    .read
    .format('delta')
    .option("versionAsOf", 0)
    .load('./RAW/CLIENTES')
    .show()
)

                                                                                

+----------+-----+-----------+
|CLIENTE_ID| NOME|        CPF|
+----------+-----+-----------+
|         1|MARIA|12345678901|
|         2|LUANA|99999999999|
|         3|PAULO|11122233345|
+----------+-----+-----------+



In [17]:
# Restaurando uma versao antiga, nesse caso a versao 1, antes de excluirmos o registro com id 1

deltaTable.restoreToVersion(1)

                                                                                

DataFrame[table_size_after_restore: bigint, num_of_files_after_restore: bigint, num_removed_files: bigint, num_restored_files: bigint, removed_files_size: bigint, restored_files_size: bigint]