In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType

from delta import *

In [4]:
# Criar uma sessão spark
spark = ( 
    SparkSession
    .builder
    .master("local[*]")
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate() 
)

In [5]:
data = [
    ("ID001", "CLIENTE_X","SP","ATIVO",   250000.00),
    ("ID002", "CLIENTE_Y","SC","INATIVO", 400000.00),
    ("ID003", "CLIENTE_Z","DF","ATIVO",   1000000.00)
]

schema = (
    StructType([
        StructField("ID_CLIENTE",     StringType(),True),
        StructField("NOME_CLIENTE",   StringType(),True),
        StructField("UF",             StringType(),True),
        StructField("STATUS",         StringType(),True),
        StructField("LIMITE_CREDITO", FloatType(), True)
    ])
)

df = spark.createDataFrame(data=data,schema=schema)

df.show(truncate=False)

                                                                                

+----------+------------+---+-------+--------------+
|ID_CLIENTE|NOME_CLIENTE|UF |STATUS |LIMITE_CREDITO|
+----------+------------+---+-------+--------------+
|ID001     |CLIENTE_X   |SP |ATIVO  |250000.0      |
|ID002     |CLIENTE_Y   |SC |INATIVO|400000.0      |
|ID003     |CLIENTE_Z   |DF |ATIVO  |1000000.0     |
+----------+------------+---+-------+--------------+



# Como gravar Delta Table

In [6]:
( 
    df
    .write
    .format("delta")
    .mode('overwrite') 
    .save("./RAW/CLIENTES")
)

24/04/23 21:43:23 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

# Novos Dados

In [7]:
new_data = [
    ("ID001","CLIENTE_X","SP","INATIVO", 0.00),
    ("ID002","CLIENTE_Y","SC","ATIVO",   400000.00),
    ("ID004","CLIENTE_Z","DF","ATIVO",   5000000.00)
]

df_new = spark.createDataFrame(data=new_data, schema=schema)

df_new.show()

+----------+------------+---+-------+--------------+
|ID_CLIENTE|NOME_CLIENTE| UF| STATUS|LIMITE_CREDITO|
+----------+------------+---+-------+--------------+
|     ID001|   CLIENTE_X| SP|INATIVO|           0.0|
|     ID002|   CLIENTE_Y| SC|  ATIVO|      400000.0|
|     ID004|   CLIENTE_Z| DF|  ATIVO|     5000000.0|
+----------+------------+---+-------+--------------+



# UPSERT / MERGE

In [None]:
# upsert/merge é uma mistura de update e insert

In [12]:
# ler deltatable atual
deltaTable = DeltaTable.forPath(spark, "./RAW/CLIENTES") ## normalmente caminho cloud.
deltaTable.toDF().show()

+----------+------------+---+-------+--------------+
|ID_CLIENTE|NOME_CLIENTE| UF| STATUS|LIMITE_CREDITO|
+----------+------------+---+-------+--------------+
|     ID002|   CLIENTE_Y| SC|INATIVO|      400000.0|
|     ID003|   CLIENTE_Z| DF|  ATIVO|     1000000.0|
|     ID001|   CLIENTE_X| SP|  ATIVO|      250000.0|
+----------+------------+---+-------+--------------+



In [14]:
# UPSERT / MERGE OCORRENDO AQUI

(
    deltaTable.alias("dados_atuais") ## Dados que ja existem, usando alias para facilitar identificaçao e desenvolvimento
    .merge(
        df_new.alias("novos_dados"), ## Dados novos, usando alias para facilitar identificaçao e desenvolvimento
        "dados_atuais.ID_CLIENTE = novos_dados.ID_CLIENTE"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)


                                                                                

In [16]:
deltaTable.toDF().show()

+----------+------------+---+-------+--------------+
|ID_CLIENTE|NOME_CLIENTE| UF| STATUS|LIMITE_CREDITO|
+----------+------------+---+-------+--------------+
|     ID001|   CLIENTE_X| SP|INATIVO|           0.0|
|     ID002|   CLIENTE_Y| SC|  ATIVO|      400000.0|
|     ID004|   CLIENTE_Z| DF|  ATIVO|     5000000.0|
|     ID003|   CLIENTE_Z| DF|  ATIVO|     1000000.0|
+----------+------------+---+-------+--------------+



## DELETE

In [17]:
deltaTable.delete("LIMITE_CREDITO < 400000.0")

                                                                                

In [18]:
deltaTable.toDF().show()

+----------+------------+---+------+--------------+
|ID_CLIENTE|NOME_CLIENTE| UF|STATUS|LIMITE_CREDITO|
+----------+------------+---+------+--------------+
|     ID002|   CLIENTE_Y| SC| ATIVO|      400000.0|
|     ID004|   CLIENTE_Z| DF| ATIVO|     5000000.0|
|     ID003|   CLIENTE_Z| DF| ATIVO|     1000000.0|
+----------+------------+---+------+--------------+



# HISTORY

In [19]:
## É possível acessarmos todo o histórico de transformações que acontecem em uma delta table, 
## fazendo o uso dos metadados armazenados da pasta _delta_log.
## Isso é possível pois por padrão, os dados nunca serão deletados fisicamente de uma delta table, 
## a menos que uma rotina de vacuum seja configurada.

In [21]:
(
    deltaTable
    .history()
    .show()
)

+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      3|2024-04-23 22:01:...|  null|    null|   DELETE|{predicate -> ["(...|null|    null|     null|          2|  Serializable|        false|{numRemovedFiles ...|        null|Apache-Spark/3.4....|
|      2|2024-04-23 21:57:...|  null|    null|    MERGE|{predicate -> ["(...|null|    null|     null|          1|  Serializable|        false|{numTargetRowsCop...|        null|Apache-Spark/3.4....|
|      1|2

In [22]:
## Selecionar algumas colunas do histórico.
(
    deltaTable
    .history()
    .select("version", "timestamp", "operation", "operationMetrics")
    .show()
)

+-------+--------------------+---------+--------------------+
|version|           timestamp|operation|    operationMetrics|
+-------+--------------------+---------+--------------------+
|      3|2024-04-23 22:01:...|   DELETE|{numRemovedFiles ...|
|      2|2024-04-23 21:57:...|    MERGE|{numTargetRowsCop...|
|      1|2024-04-23 21:53:...|    MERGE|{numTargetRowsCop...|
|      0|2024-04-23 21:43:...|    WRITE|{numFiles -> 4, n...|
+-------+--------------------+---------+--------------------+



# Time Travel

In [23]:
# ler os dados antigos do seguinte modo:
(
    spark
    .read
    .format('delta')
    .option("versionAsOf", 2)
    .load('./RAW/CLIENTES')
    .show()
)

+----------+------------+---+-------+--------------+
|ID_CLIENTE|NOME_CLIENTE| UF| STATUS|LIMITE_CREDITO|
+----------+------------+---+-------+--------------+
|     ID001|   CLIENTE_X| SP|INATIVO|           0.0|
|     ID002|   CLIENTE_Y| SC|  ATIVO|      400000.0|
|     ID004|   CLIENTE_Z| DF|  ATIVO|     5000000.0|
|     ID003|   CLIENTE_Z| DF|  ATIVO|     1000000.0|
+----------+------------+---+-------+--------------+



In [30]:
# restaurar uma versão antiga:
deltaTable.restoreToVersion(1)

DataFrame[table_size_after_restore: bigint, num_of_files_after_restore: bigint, num_removed_files: bigint, num_restored_files: bigint, removed_files_size: bigint, restored_files_size: bigint]

In [31]:
deltaTable.toDF().show()

+----------+------------+---+-------+--------------+
|ID_CLIENTE|NOME_CLIENTE| UF| STATUS|LIMITE_CREDITO|
+----------+------------+---+-------+--------------+
|     ID001|   CLIENTE_X| SP|INATIVO|           0.0|
|     ID002|   CLIENTE_Y| SC|  ATIVO|      400000.0|
|     ID004|   CLIENTE_Z| DF|  ATIVO|     5000000.0|
|     ID003|   CLIENTE_Z| DF|  ATIVO|     1000000.0|
+----------+------------+---+-------+--------------+

