In [37]:
#Configurações do Delta

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType

from delta import *


spark = (
    SparkSession.builder.master("local[*]")
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .appName("DeltaLakeSetup")
    .getOrCreate()
)

print("Spark Session com Delta Lake configurada com sucesso!")

Spark Session com Delta Lake configurada com sucesso!


In [36]:
#Dados da Delta table

data = [
    ("Joao","Pessoa Fisica","ATIVO",5000.00),
    ("Maria","Pessoa Fisica","ATIVO",8000.00),
    ("Empresa X","Pessoa Juridica","ATIVO",10000.00),
    ("Empresa Y","Pessoa Juridica","INATIVO",20000.00)
]

schema = StructType([
    StructField("NOME", StringType(), True),
    StructField("TIPO_PESSOA", StringType(), True),
    StructField("STATUS", StringType(), True),
    StructField("LIMITE_CREDITO", FloatType(), True)
])

df = spark.createDataFrame(data=data, schema=schema)
df.show(truncate=False)

+---------+---------------+-------+--------------+
|NOME     |TIPO_PESSOA    |STATUS |LIMITE_CREDITO|
+---------+---------------+-------+--------------+
|Joao     |Pessoa Fisica  |ATIVO  |5000.0        |
|Maria    |Pessoa Fisica  |ATIVO  |8000.0        |
|Empresa X|Pessoa Juridica|ATIVO  |10000.0       |
|Empresa Y|Pessoa Juridica|INATIVO|20000.0       |
+---------+---------------+-------+--------------+



In [35]:
#Criando a Delta table

( 
    df
    .write
    .format("delta")
    .mode('overwrite')
    .save("./RAW")
)

                                                                                

In [20]:
data_update = [
    ("Maria","Pessoa Fisica", "ATIVO", 15000.00),
    ("Empresa Z","Pessoa Juridica", "ATIVO", 30000.00)
]

from pyspark.sql.types import StructType, StructField, StringType, FloatType

schema = StructType([
    StructField("NOME", StringType(), True),
    StructField("TIPO_PESSOA", StringType(), True),
    StructField("STATUS", StringType(), True),
    StructField("LIMITE_CREDITO", FloatType(), True)
])

df_update = spark.createDataFrame(data=data_update, schema=schema)

print("Dados que serão atualizados/inseridos:")
df_update.show(truncate=False)

Dados que serão atualizados/inseridos:
+---------+---------------+------+--------------+
|NOME     |TIPO_PESSOA    |STATUS|LIMITE_CREDITO|
+---------+---------------+------+--------------+
|Maria    |Pessoa Fisica  |ATIVO |15000.0       |
|Empresa Z|Pessoa Juridica|ATIVO |30000.0       |
+---------+---------------+------+--------------+



In [32]:
#Upsert - Onde ocorre o update/insert na tabela

from delta.tables import DeltaTable

deltaTable = DeltaTable.forPath(spark, "./RAW")

deltaTable.alias("target") \
    .merge(
        df_update.alias("source"),
        "target.NOME = source.NOME"
    ) \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

print("Upsert concluído com sucesso!")

                                                                                

Upsert concluído com sucesso!


In [22]:
print("Dados após o merge")
df_result = spark.read.format("delta").load("./RAW")
df_result.orderBy("NOME").show(truncate=False)

Dados após o merge
+---------+---------------+-------+--------------+
|NOME     |TIPO_PESSOA    |STATUS |LIMITE_CREDITO|
+---------+---------------+-------+--------------+
|Empresa X|Pessoa Juridica|ATIVO  |10000.0       |
|Empresa Y|Pessoa Juridica|INATIVO|20000.0       |
|Empresa Z|Pessoa Juridica|ATIVO  |30000.0       |
|Joao     |Pessoa Fisica  |ATIVO  |5000.0        |
|Maria    |Pessoa Fisica  |ATIVO  |15000.0       |
+---------+---------------+-------+--------------+



In [24]:
deltaTable.delete("LIMITE_CREDITO < 6000.0")

In [25]:
#History - Log de transformações que ocorreram na delta table criada.

(
    deltaTable
    .history()
    .select("version", "timestamp", "operation", "operationMetrics")
    .show()
)

+-------+--------------------+---------+--------------------+
|version|           timestamp|operation|    operationMetrics|
+-------+--------------------+---------+--------------------+
|      2|2025-11-10 23:57:...|   DELETE|{numRemovedFiles ...|
|      1|2025-11-10 23:55:...|    MERGE|{numTargetRowsCop...|
|      0|2025-11-10 23:50:...|    WRITE|{numFiles -> 5, n...|
+-------+--------------------+---------+--------------------+



In [30]:
#Time Travel - Ler dados de versões anteriores da tabela, antes dos dados serem manipulados

(
    spark
    .read
    .format('delta')
    .option("versionAsOf", 1)
    .load('./RAW')
    .show()
)

+---------+---------------+-------+--------------+
|     NOME|    TIPO_PESSOA| STATUS|LIMITE_CREDITO|
+---------+---------------+-------+--------------+
|Empresa Y|Pessoa Juridica|INATIVO|       20000.0|
|Empresa X|Pessoa Juridica|  ATIVO|       10000.0|
|Empresa Z|Pessoa Juridica|  ATIVO|       30000.0|
|    Maria|  Pessoa Fisica|  ATIVO|       15000.0|
|     Joao|  Pessoa Fisica|  ATIVO|        5000.0|
+---------+---------------+-------+--------------+

