In [None]:
from pyspark.sql import SparkSession #Import para sessão spark
from pyspark.sql.types import StructType, StructField, StringType, FloatType #Imports para estrutura e tipos

from delta import *

import logging

logging.getLogger("py4j").setLevel(logging.DEBUG)

In [None]:
#Confiraçã0 da Sessão spark
spark = (
    SparkSession
    .builder
    .master("local[*]")
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.0.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate()
)

In [None]:
spark # Sessão

In [None]:
#Simulação base de clientes
data = [
    ("ID001", "CLIENTE_X","SP","ATIVO",   250000.00),
    ("ID002", "CLIENTE_Y","SC","INATIVO", 400000.00),
    ("ID003", "CLIENTE_Z","DF","ATIVO",   1000000.00)
]

schema = (
    StructType([
        StructField("ID_CLIENTE",     StringType(),True), #True = Pode ser nulo
        StructField("NOME_CLIENTE",   StringType(),True),
        StructField("UF",             StringType(),True),
        StructField("STATUS",         StringType(),True),
        StructField("LIMITE_CREDITO", FloatType(), True)
    ])
)

df = spark.createDataFrame(data=data,schema=schema) #Dataframe

df.show(truncate=False) #Permite visualizar os dados sem quebrar

# Grava Delta Table

In [6]:
# Gera uma delta table a partir do dataframe

(
    df
    .write
    .format("delta")
    .mode("overwrite")
    .save("./RAW/CLIENTES")
)

                                                                                

# Simulação de cenário (NOVOS DADOS)

In [8]:
new_data = [
    ("ID001","CLIENTE_X","SP","INATIVO", 0.00),
    ("ID002","CLIENTE_Y","SC","ATIVO",   400000.00),
    ("ID004","CLIENTE_Z","DF","ATIVO",   5000000.00)
] # Novo lote da partição do data lake

df_new = spark.createDataFrame(data=new_data, schema=schema) # Cria um dataframe com os novo dados

df_new.show() # Mostra o dataframe

+----------+------------+---+-------+--------------+
|ID_CLIENTE|NOME_CLIENTE| UF| STATUS|LIMITE_CREDITO|
+----------+------------+---+-------+--------------+
|     ID001|   CLIENTE_X| SP|INATIVO|           0.0|
|     ID002|   CLIENTE_Y| SC|  ATIVO|      400000.0|
|     ID004|   CLIENTE_Z| DF|  ATIVO|     5000000.0|
+----------+------------+---+-------+--------------+



# UPSERT / MERGE 

### OBS: Não ler delta tables com read parquet

In [10]:
#Criação da delta table
delta_table= DeltaTable.forPath(spark, "./RAW/CLIENTES")

In [11]:
delta_table.toDF().show() #Transforma a leitura em um dataframe e da output

                                                                                

+----------+------------+---+-------+--------------+
|ID_CLIENTE|NOME_CLIENTE| UF| STATUS|LIMITE_CREDITO|
+----------+------------+---+-------+--------------+
|     ID002|   CLIENTE_Y| SC|INATIVO|      400000.0|
|     ID003|   CLIENTE_Z| DF|  ATIVO|     1000000.0|
|     ID001|   CLIENTE_X| SP|  ATIVO|      250000.0|
+----------+------------+---+-------+--------------+



In [12]:
# MERGE
(
    delta_table.alias("dados_atuais")
    .merge(
        df_new.alias("novos_dados"),
        "dados_atuais.ID_CLIENTE = novos_dados.ID_CLIENTE" # Relacionamento de chaves
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll() # All se refere a todos os dados, caso queira especificar, é possível remover o all colocar apenas o desejado.
    .execute() # executa
)

                                                                                

In [13]:
delta_table.toDF().show()

                                                                                

+----------+------------+---+-------+--------------+
|ID_CLIENTE|NOME_CLIENTE| UF| STATUS|LIMITE_CREDITO|
+----------+------------+---+-------+--------------+
|     ID001|   CLIENTE_X| SP|INATIVO|           0.0|
|     ID002|   CLIENTE_Y| SC|  ATIVO|      400000.0|
|     ID004|   CLIENTE_Z| DF|  ATIVO|     5000000.0|
|     ID003|   CLIENTE_Z| DF|  ATIVO|     1000000.0|
+----------+------------+---+-------+--------------+



In [14]:
# Delete
delta_table.delete("LIMITE_CREDITO < 400000.0") #Condição (Não exclui fisicamente)

                                                                                

In [15]:
delta_table.toDF().show()

+----------+------------+---+------+--------------+
|ID_CLIENTE|NOME_CLIENTE| UF|STATUS|LIMITE_CREDITO|
+----------+------------+---+------+--------------+
|     ID002|   CLIENTE_Y| SC| ATIVO|      400000.0|
|     ID004|   CLIENTE_Z| DF| ATIVO|     5000000.0|
|     ID003|   CLIENTE_Z| DF| ATIVO|     1000000.0|
+----------+------------+---+------+--------------+



# Operações com delta table (CRUD)

# CRIAÇÃO DE TABELA

In [17]:
# Criação da tabela cliente_delta
spark.sql(
     """
     CREATE TABLE IF NOT EXISTS cliente_delta (
         id INT,
         nome_cliente STRING,
         uf CHAR(2),
         status BOOLEAN,
         limite_credito FLOAT
     ) USING delta
     """
)

DataFrame[]

In [18]:
# Relaiza um select na tabela criada e faz o output
spark.sql(
    """
    SELECT * from cliente_delta
    """
).show()

+---+------------+---+------+--------------+
| id|nome_cliente| uf|status|limite_credito|
+---+------------+---+------+--------------+
+---+------------+---+------+--------------+



                                                                                

In [27]:
# Criação da delta table para cliente delta
cliente = DeltaTable.forPath(spark, "./spark-warehouse/cliente_delta") 

In [28]:
# mostra o histórico
cliente.history().show()

                                                                                

+-------+--------------------+------+--------+------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|   operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      3|2025-04-22 17:31:...|  NULL|    NULL|      UPDATE|   {predicate -> []}|NULL|    NULL|     NULL|          2|  Serializable|        false|{numRemovedFiles ...|        NULL|Apache-Spark/3.5....|
|      2|2025-04-22 17:27:...|  NULL|    NULL| ADD COLUMNS|{columns -> [{"co...|NULL|    NULL|     NULL|          1|  Serializable|         true|                  {}|        NULL|Apache-Spark/3.5.

# INSERT

In [20]:
#INSERT
spark.sql(
    """
        INSERT INTO cliente_delta VALUES 
        (1, 'Jean Guichard²', 'RS', 'true', '10000000.0'), 
        (2, 'Lucas da Rosa', 'SC', 'true', '1000000.0'), 
        (3, 'Matheus Daminelli', 'SC', 'true', '4000000.0')
    """
)

                                                                                

DataFrame[]

In [21]:
spark.sql(
    """
    SELECT * from cliente_delta
    """
).show()

+---+-----------------+---+------+--------------+
| id|     nome_cliente| uf|status|limite_credito|
+---+-----------------+---+------+--------------+
|  3|Matheus Daminelli| SC|  true|     4000000.0|
|  1|   Jean Guichard²| RS|  true|         1.0E7|
|  2|    Lucas da Rosa| SC|  true|     1000000.0|
+---+-----------------+---+------+--------------+



In [29]:
# Mostra o histórico sem quebrar dados
cliente.history().show(truncate=False)

+-------+-----------------------+------+--------+------------+-----------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+-----------------------------------+
|version|timestamp              |userId|userName|operation   |operationParameters                                                                      |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                                                                                                                                                                      

# ALTERANDO TABELA

In [23]:
# Adicionando coluna
spark.sql(
    """
    alter table cliente_delta add column cidade STRING
    """
)

                                                                                

DataFrame[]

In [24]:
spark.sql(
    """
    SELECT * from cliente_delta
    """
).show()

                                                                                

+---+-----------------+---+------+--------------+------+
| id|     nome_cliente| uf|status|limite_credito|cidade|
+---+-----------------+---+------+--------------+------+
|  3|Matheus Daminelli| SC|  true|     4000000.0|  NULL|
|  1|   Jean Guichard²| RS|  true|         1.0E7|  NULL|
|  2|    Lucas da Rosa| SC|  true|     1000000.0|  NULL|
+---+-----------------+---+------+--------------+------+



# UPDATE

In [25]:
# Update
spark.sql("""
    UPDATE cliente_delta
    SET cidade = CASE
        WHEN id = 1 THEN 'Torres RS'
        WHEN id = 2 THEN 'Tubarão SC'
        WHEN id = 3 THEN 'Criciúma SC'
        ELSE cidade
    END
""")

                                                                                

DataFrame[num_affected_rows: bigint]

In [26]:
spark.sql(
    """
    SELECT * from cliente_delta
    """
).show()

+---+-----------------+---+------+--------------+-----------+
| id|     nome_cliente| uf|status|limite_credito|     cidade|
+---+-----------------+---+------+--------------+-----------+
|  3|Matheus Daminelli| SC|  true|     4000000.0|Criciúma SC|
|  2|    Lucas da Rosa| SC|  true|     1000000.0| Tubarão SC|
|  1|   Jean Guichard²| RS|  true|         1.0E7|  Torres RS|
+---+-----------------+---+------+--------------+-----------+



### Verifica se é uma delta table

In [30]:
DeltaTable.isDeltaTable(spark, "spark-warehouse/cliente_delta")

True

# DELETE

In [None]:
# Delete
spark.sql(
    """
        DELETE FROM cliente_delta
        WHERE uf = 'SC' AND status = false
    """
)

In [None]:
spark.sql(
    """
    SELECT * from cliente_delta
    """
).show()

# Exibição em Display

In [31]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [32]:
spark.sql('describe HISTORY cliente_delta').show(truncate=False);

+-------+-----------------------+------+--------+------------+-----------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+-----------------------------------+
|version|timestamp              |userId|userName|operation   |operationParameters                                                                      |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                                                                                                                                                                      