In [None]:
# 🔹 Cria e configura a sessão do Spark — ponto de entrada para usar PySpark
from pyspark.sql import SparkSession

# 🔹 Configura o Spark para suportar Delta Lake usando o pacote Delta via pip (não Spark JARs)
from delta import configure_spark_with_delta_pip

# 🔹 Permite acessar e manipular tabelas Delta com operações como merge, update, delete, etc.
from delta.tables import DeltaTable

# 🔹 Funções do PySpark para transformação de dados:
#    - col: acessa colunas dinamicamente
#    - to_json: transforma struct em string JSON
#    - lit: cria colunas com valores fixos
#    - collect_list: agrega valores em listas
#    - size: retorna o tamanho de arrays/listas
from pyspark.sql.functions import col, to_json, lit, collect_list, size

# 🔹 Define esquemas explícitos para DataFrames:
#    - StructType e StructField criam a estrutura
#    - IntegerType e StringType definem os tipos das colunas
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [None]:
# Configuração do Spark com Delta Lake

# 🔹appName -  Define o nome da aplicação Spark (útil para logs e UI)
# 🔹spark.sql.extensions -  Ativa a extensão do Delta Lake no Spark SQL — necessária para habilitar comandos Delta (ex: MERGE, VACUUM, etc.)
# 🔹spark.sql.catalog.spark_catalog -  Substitui o catálogo padrão do Spark pelo DeltaCatalog — faz com que tabelas gerenciadas sejam Delta por padrão

builder = SparkSession.builder \
    .appName("LabDemo") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.s3a.access.key","datalake") \
    .config("spark.hadoop.fs.s3a.secret.key","datalake") \
    .config("spark.hadoop.fs.s3a.endpoint","http://minio:9000") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")


In [None]:
# Configurações para a Session
#Cria uma nova SparkSession ou retorna a existente, se já estiver ativa.
spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [8]:
# Define o nível de log
spark.sparkContext.setLogLevel("ERROR")

In [None]:
# Ela cria um DataFrame Spark em memória.

df_initial = spark.createDataFrame([
    (1, "Bruno", "SP"),
    (2, "Maria", "RJ"),
    (3, "Victor", "MG"),
    (4, "Tiago", "RJ")
], ["id", "nome", "estado"])

In [None]:
#tipo do objeto
type(df_initial)

In [None]:
# Salva o arquivo no formato Delta
df_initial.write.format("delta").mode("overwrite").save("file:///util/delta_clientes")

In [None]:
# READ - Ler os dados
spark.read.format("delta").load("file:///util/delta_clientes").show()

In [None]:
# UPDATE - Atualizar o nome do Aluno
delta_table = DeltaTable.forPath(spark, "file:///util/delta_clientes")
delta_table.update(
    condition = "id = 1",
    set = {"nome": "'Bruno da Silva'"}
)
print("Após atualização:")
delta_table.toDF().show(truncate=False)

In [None]:
# DELETE - Remover registros
delta_table.delete(condition = "estado == 'RJ'")
print("Após remoção")
delta_table.toDF().show(truncate=False)

In [None]:
# Criando um DataFrame para alteração
df_update = spark.createDataFrame([
    (2, "Maria Silva", "RJ"),
    (3, "Pedro", "MG"),
    (4, "Paulo", "MG"),
    (5, "José", "MG")
], ["id", "nome", "estado"])

In [None]:
# Buscando os dados e fazendo o Merge
delta_table = DeltaTable.forPath(spark, "file:///util/delta_clientes")
delta_table.alias("existingData").merge(
    df_update.alias("newData"),
    "existingData.id = newData.id").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

In [None]:
delta_table = DeltaTable.forPath(spark, "file:///util/delta_clientes")
print("Após inserção de novos registros:")
delta_table.toDF().show(truncate=False)

In [None]:
# Novos DataFrame
df_update = spark.createDataFrame([
    (2, "Maria Silva - Alterada", "RJ"),
    (6, "Paulo", "MG")
], ["id", "nome", "estado"])

In [None]:
# Merge com campos especificos

delta_table = DeltaTable.forPath(spark, "file:///util/delta_clientes")
delta_table.alias("oldData").merge(
    df_update.alias("upsertData"),
    "oldData.id = upsertData.id").whenMatchedUpdate(set={
    "oldData.nome": "upsertData.nome",
    "oldData.estado": "upsertData.estado"
}).whenNotMatchedInsert(values={
    "id": "upsertData.id",
    "nome": "upsertData.nome",
    "estado": "upsertData.estado"
}).execute()

In [None]:
delta_table = DeltaTable.forPath(spark, "file:///util/delta_clientes")
print("Após inserção de novos registros:")
delta_table.toDF().show(truncate=False)

## Filtros

In [None]:
# Where simples
delta_table.toDF().filter("id = 1").show(truncate=False)

In [None]:
# Agrupar por estado
delta_table.toDF().groupBy("estado").count().show(truncate=False)

In [None]:
# Agrupar por estado e filtrando seu total
delta_table.toDF() \
    .groupBy("estado") \
    .count() \
    .filter(col("count") == 4) \
    .show(truncate=False)

## 🕒 Time Travel - Visualizar versões antigas

In [60]:
delta_table = DeltaTable.forPath(spark, "file:///util/delta_clientes")

In [61]:
# Obter o histórico completo
history_df = delta_table.history()

In [None]:
# Contar o número de versões
num_versions = history_df.count()
print(f"A tabela tem {num_versions} versões.")

In [None]:
# Versões disponíveis
delta_table.history().show()

In [None]:
# Acessar a versão mais antiga da tabela (versão 0)
print("Versão inicial da tabela:")
version_0 = spark.read.format("delta").option("versionAsOf", 0).load("file:///util/delta_clientes")
version_0.show(truncate=False)

In [None]:
print("Versão 1 da tabela:")
version_1 = spark.read.format("delta").option("versionAsOf", 1).load("file:///util/delta_clientes")
version_1.show(truncate=False)

In [69]:
# Adicionar uma coluna que identifica a versão
version_0 = version_0.withColumn("versao", lit(0))
version_1 = version_1.withColumn("versao", lit(1))

# Unir as duas versões
changes = version_0.union(version_1)

In [None]:
changes.show(truncate=False)

In [None]:
# Pega a diferença
changes.groupBy("id", "nome") \
       .agg(collect_list("versao").alias("versoes")) \
       .filter(size("versoes") == 1) \
       .show(truncate=False)

In [None]:
# Carrega a tabela delta
version_1 = spark.read.format("delta").option("versionAsOf", 1).load("file:///util/delta_clientes")

# Carregar o histórico de alterações da tabela Delta
history = delta_table.history()

# Selecionar apenas as colunas relevantes
formatted_history = history.select(
    col("version").alias("Versão"),
    col("operation").alias("Operação"),
    col("operationMetrics").alias("Métricas"),
    col("userMetadata").alias("Metadados do Usuário")
)

# Mostrar as alterações 
formatted_history.show(truncate=False)

In [None]:
# Consultar uma versão antiga (versão 2)
spark.read.format("delta").option("versionAsOf", 2).load("file:///util/delta_clientes").show(truncate=False)

In [None]:
# Atualizar com uma versão antiga
old_version = spark.read.format("delta").option("versionAsOf", 2).load("file:///util/delta_clientes")

In [75]:
# Sobrescrever a tabela principal com a versão 2
# Isso vai gerar uma cópia da versão 2 que será agora a versão principal. 
old_version.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("file:///util/delta_clientes")

In [None]:
spark.read.format("delta").load("file:///util/delta_clientes").show(truncate=False)

In [None]:
# Caminho para a tabela Delta
delta_table = DeltaTable.forPath(spark, "file:///util/delta_clientes")

# Obter o histórico completo
history_df = delta_table.history()

# Contar o número de versões
num_versions = history_df.count()
print(f"A tabela tem {num_versions} versões.")

## 🧹 Vacuum - Remoção de arquivos obsoletos

Por padrão, o Delta Lake define um período mínimo de retenção de 7 dias. Essa regra existe para garantir a integridade de operações como time travel e evitar que dados importantes para transações sejam removidos acidentalmente. Se você quiser diminuir esse tempo, será necessário ajustar a configuração de retenção.

Após esse período, versões anteriores dos dados não poderão mais ser acessadas.

In [None]:
spark.sql("""
ALTER TABLE delta.`file:///util/delta_clientes`
SET TBLPROPERTIES ('delta.deletedFileRetentionDuration' = '1 day')
""")

In [None]:
delta_table = DeltaTable.forPath(spark, "file:///util/delta_clientes")
delta_table.detail().select("location", "properties").show(truncate=False)

## 🧬 Evolução de Schema

In [105]:
df_novo = spark.createDataFrame([
    (4, "Ana", "BA", 29)
], ["id", "nome", "estado", "idade"])

df_novo.write.format("delta").mode("append") \
    .option("mergeSchema", "true") \
    .save("file:///util/delta_clientes")

In [None]:
delta_table = DeltaTable.forPath(spark, "file:///util/delta_clientes")
print("Após inserção de novos registros:")
delta_table.toDF().show(truncate=False)

## 🗂️ Particionamento

In [None]:
## Criando um particionamento por estado
df_partition = spark.createDataFrame([
    (10, "Lucas", "SP"),
    (11, "Carla", "SP"),
    (12, "Rafa", "BA")
], ["id", "nome", "estado"])

df_partition.write.format("delta").mode("overwrite").partitionBy("estado").save("file:///util/particionados")

In [None]:
df_partition = DeltaTable.forPath(spark, "file:///util/delta_clientes/particionados")
print("Após inserção de novos registros:")
df_partition.toDF().show(truncate=False)

In [None]:
# Definir o esquema 
esquema = StructType([
    StructField("id", IntegerType(), True),
    StructField("nome", StringType(), True),
    StructField("estado", StringType(), True),
    StructField("idade", IntegerType(), True)
])

# Ler os dados e ajustar o esquema, se necessário
delta_table = spark.read.format("delta").load("file:///util/delta_clientes")
delta_table = spark.createDataFrame(delta_table.rdd, esquema)
delta_table.show()

In [121]:
# Finaliza a sessão Spark
spark.stop()