In [14]:
# CÉLULA 1 - INSTALAÇÃO DAS BIBLIOTECAS

!pip install -q pyspark==3.5.1 delta-spark==3.2.0


In [15]:
# CÉLULA 2 - CONFIGURAÇÃO DO SPARK + DELTA
# Nesta etapa criamos a SparkSession já habilitando o suporte ao Delta Lake,
# para conseguirmos usar as operações ACID e o formato open data na camada final.

from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

# Builder padrão do Spark, mas com as extensões do Delta ativadas.
# Essas configs fazem o Spark enxergar o Delta como catálogo padrão.
builder = (
    SparkSession.builder
    .appName("MBA_DataLakehouse_Delta")  # nome da aplicação no cluster
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

# Aqui o Delta ajusta a sessão Spark com as dependências necessárias.
spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark.sparkContext.setLogLevel("WARN")  # reduz o ruído de log para focar no pipeline

print("Spark inicializado")
print("Versão do Spark:", spark.version)


Spark inicializado
Versão do Spark: 3.5.1


In [21]:
# CÉLULA 3 - COLETA/AQUISIÇÃO (CAMADA RAW - PARQUET)
# Aqui simulamos a fonte transacional (logs de pedidos) e persistimos a camada RAW
# em formato Parquet, conforme sugestão da zona bruta do lake.

from pyspark.sql.functions import col
from datetime import datetime, timedelta
import random
import shutil

# Caminho da camada RAW no filesystem do ambiente
path_raw = "/tmp/vendas_raw"
# Limpeza da pasta a cada execução para evitar lixo de runs anteriores
shutil.rmtree(path_raw, ignore_errors=True)

# Geração de 5000 pedidos simulados em memória
# A ideia é imitar um fluxo de transações de e‑commerce/banking.
random.seed(42)
base_date = datetime(2024, 10, 1)
rows = []

for i in range(1, 5001):
    # Aproximadamente 5% dos customer_id ficam nulos para simular problema de qualidade
    customer_id = f"CUST{random.randint(1, 100)}" if random.random() > 0.05 else None
    # Aproximadamente 3% dos amounts nulos
    amount = round(random.uniform(5, 1000), 2) if random.random() > 0.03 else None
    # Aproximadamente 2% das datas nulas
    order_dt = base_date + timedelta(seconds=random.randint(0, 86400 * 60))
    order_date = order_dt.strftime("%Y-%m-%d %H:%M:%S") if random.random() > 0.02 else None
    # Status transacional típico
    status = random.choice(["PENDING", "SHIPPED", "DELIVERED", "CANCELLED"])

    rows.append((f"ORD{i:05d}", customer_id, amount, order_date, status))

# Crio o DataFrame diretamente da lista de tuplas (sem depender de Pandas)
df_raw = spark.createDataFrame(
    rows,
    ["order_id", "customer_id", "amount", "order_date", "status"]
)

print("Registros totais gerados:", df_raw.count())

# Persisto a camada RAW usando Parquet, que é um formato coluna aberto e otimizado
df_raw.write.mode("overwrite").parquet(path_raw)
print("Camada RAW salva em:", path_raw)

# Leitura de validação da RAW para garantir que os dados foram gravados corretamente
df_check = spark.read.parquet(path_raw)
print("Registros lidos da camada RAW:", df_check.count())
df_check.show(5, truncate=False)


Registros totais gerados: 5000
Camada RAW salva em: /tmp/vendas_raw
Registros lidos da camada RAW: 5000
+--------+-----------+------+-------------------+---------+
|order_id|customer_id|amount|order_date         |status   |
+--------+-----------+------+-------------------+---------+
|ORD02049|CUST37     |668.69|2024-10-25 03:05:23|PENDING  |
|ORD02050|CUST4      |857.88|2024-10-27 15:00:19|DELIVERED|
|ORD02051|CUST94     |163.69|2024-10-06 04:45:07|SHIPPED  |
|ORD02052|CUST95     |80.72 |2024-11-10 12:48:55|DELIVERED|
|ORD02053|CUST16     |266.41|2024-10-20 20:54:22|PENDING  |
+--------+-----------+------+-------------------+---------+
only showing top 5 rows



In [22]:
# CÉLULA 4 - PRÉ-PROCESSAMENTO (3 TRANSFORMAÇÕES) + CAMADA CURATED (DELTA)
# Nesta etapa parte da camada RAW e aplicamos as três transformações de qualidade
# combinadas em aula: limpeza de nulos, ajuste de tipos e desduplicação.

from pyspark.sql.functions import col, to_timestamp, row_number
from pyspark.sql.window import Window
import shutil

path_raw = "/tmp/vendas_raw"
path_curated_delta = "/tmp/vendas_curated_delta"
# Limpo a pasta da camada tratada para não misturar versões antigas
shutil.rmtree(path_curated_delta, ignore_errors=True)

# 0) Ler camada RAW
df = spark.read.parquet(path_raw)
print("Registros lidos da RAW:", df.count())

# 1) Limpeza de nulos: remover registros com amount OU order_date nulos
#    Aqui eu trato qualidade: se valor financeiro ou data da transação estão faltando,
#    o registro é considerado inválido para análises.
antes = df.count()
df = df.dropna(subset=["amount", "order_date"])
depois = df.count()
print("\n[1] Limpeza de nulos")
print("   Antes :", antes)
print("   Depois:", depois)
print("   Removidos:", antes - depois)

# 2) Casting de tipos: amount DECIMAL(10,2), order_date TIMESTAMP
#    Transformo os tipos para formatos mais adequados:
#    - amount como DECIMAL para cálculos financeiros precisos
#    - order_date como TIMESTAMP para facilitar filtros e agregações por tempo
df = df.withColumn("amount", col("amount").cast("DECIMAL(10,2)"))
df = df.withColumn("order_date", to_timestamp("order_date", "yyyy-MM-dd HH:mm:ss"))

print("\n[2] Casting de tipos")
df.printSchema()

# 3) Desduplicação: manter registro mais recente por order_id
#    Uso uma Window Function para ordenar por data dentro de cada order_id
#    e fico apenas com a linha mais recente (row_number == 1).
window_spec = Window.partitionBy("order_id").orderBy(col("order_date").desc())
antes_dedup = df.count()
df = df.withColumn("rn", row_number().over(window_spec)) \
       .filter(col("rn") == 1) \
       .drop("rn")
depois_dedup = df.count()

print("\n[3] Desduplicação")
print("   Antes :", antes_dedup)
print("   Depois:", depois_dedup)
print("   Removidos (duplicados):", antes_dedup - depois_dedup)

print("\nAmostra após as 3 transformações:")
df.show(5, truncate=False)

# Salvar em Delta Lake (camada CURATED)
# A partir daqui a tabela já está em um Open Table Format com suporte a ACID.
df.write.format("delta").mode("overwrite").save(path_curated_delta)
print("\nCamada CURATED salva em Delta:", path_curated_delta)
print("Total de registros na CURATED:", df.count())


Registros lidos da RAW: 5000

[1] Limpeza de nulos
   Antes : 5000
   Depois: 4737
   Removidos: 263

[2] Casting de tipos
root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- amount: decimal(10,2) (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- status: string (nullable = true)


[3] Desduplicação
   Antes : 4737
   Depois: 4737
   Removidos (duplicados): 0

Amostra após as 3 transformações:
+--------+-----------+------+-------------------+---------+
|order_id|customer_id|amount|order_date         |status   |
+--------+-----------+------+-------------------+---------+
|ORD00001|CUST4      |248.67|2024-10-14 13:08:48|PENDING  |
|ORD00003|CUST92     |547.22|2024-10-22 09:39:49|DELIVERED|
|ORD00004|CUST1      |163.86|2024-11-11 00:45:46|SHIPPED  |
|ORD00005|CUST98     |97.28 |2024-10-10 09:21:55|DELIVERED|
|ORD00006|CUST6      |538.55|2024-11-06 18:02:56|DELIVERED|
+--------+-----------+------+-------------------+---------+
only 

In [24]:
# CÉLULA 5 - ANÁLISES DE NEGÓCIO + COMPARATIVO PARQUET vs DELTA
# Nesta etapa exploramos a camada CURATED em Delta Lake com alguns KPIs
# e, em seguida, comparamos o tempo de execução da mesma query
#  rodando sobre a RAW em Parquet e sobre a CURATED em Delta.

from pyspark.sql.functions import round as _round
import time

path_raw = "/tmp/vendas_raw"
path_curated_delta = "/tmp/vendas_curated_delta"

# -----------------------------
# 5.1 Ler CURATED (Delta) e calcular KPIs
# -----------------------------
df_curated = spark.read.format("delta").load(path_curated_delta)
df_curated.createOrReplaceTempView("vendas_curated")

print("Total de registros na CURATED (Delta):", df_curated.count())

# KPI 1: visão geral do financeiro na tabela tratada em Delta
print("\n[KPI 1] Resumo financeiro geral (Delta)")
kpi1_delta = spark.sql("""
    SELECT
        COUNT(*) AS total_pedidos,
        ROUND(SUM(amount), 2) AS valor_total,
        ROUND(AVG(amount), 2) AS ticket_medio,
        ROUND(MIN(amount), 2) AS menor_compra,
        ROUND(MAX(amount), 2) AS maior_compra
    FROM vendas_curated
""")
kpi1_delta.show(truncate=False)

# KPI 2: distribuição de valor por status de pedido
print("\n[KPI 2] Valor total por status (Delta)")
kpi2_delta = spark.sql("""
    SELECT
        status,
        COUNT(*) AS qtd_pedidos,
        ROUND(SUM(amount), 2) AS valor_total,
        ROUND(AVG(amount), 2) AS ticket_medio
    FROM vendas_curated
    GROUP BY status
    ORDER BY qtd_pedidos DESC
""")
kpi2_delta.show(truncate=False)

# -----------------------------
# 5.2 Comparativo simples: Parquet (RAW) x Delta (CURATED)
#     Mesma query, medindo tempo de execução
# -----------------------------

print("\n================ COMPARATIVO PARQUET x DELTA ================")

# Ler RAW em Parquet (camada bruta)
df_raw = spark.read.parquet(path_raw)
df_raw.createOrReplaceTempView("vendas_raw")

# Query de resumo financeiro (equivalente ao KPI 1),
# parametrizada para eu poder trocar só o nome da tabela.
query_resumo = """
    SELECT
        COUNT(*) AS total_pedidos,
        ROUND(SUM(amount), 2) AS valor_total,
        ROUND(AVG(amount), 2) AS ticket_medio
    FROM {tabela}
"""

# Tempo no PARQUET (RAW)
inicio_parquet = time.time()
res_parquet = spark.sql(query_resumo.format(tabela="vendas_raw"))
tempo_parquet = time.time() - inicio_parquet

print("\n[PARQUET - RAW] Resumo financeiro")
res_parquet.show(truncate=False)
print(f"Tempo PARQUET: {tempo_parquet:.4f} segundos")

# Tempo no DELTA (CURATED)
inicio_delta = time.time()
res_delta = spark.sql(query_resumo.format(tabela="vendas_curated"))
tempo_delta = time.time() - inicio_delta

print("\n[DELTA - CURATED] Resumo financeiro")
res_delta.show(truncate=False)
print(f"Tempo DELTA:   {tempo_delta:.4f} segundos")

print("\nObservação: aqui não buscamos benchmark rigoroso,")
print("mas evidenciar a diferença entre consultar a camada RAW em Parquet")
print("e a camada tratada em Delta Lake, que além do desempenho oferece ACID")
print("e melhor gerenciamento de metadados para o Lakehouse.")


Total de registros na CURATED (Delta): 4737

[KPI 1] Resumo financeiro geral (Delta)
+-------------+-----------+------------+------------+------------+
|total_pedidos|valor_total|ticket_medio|menor_compra|maior_compra|
+-------------+-----------+------------+------------+------------+
|4737         |2406360.10 |507.99      |5.25        |999.99      |
+-------------+-----------+------------+------------+------------+


[KPI 2] Valor total por status (Delta)
+---------+-----------+-----------+------------+
|status   |qtd_pedidos|valor_total|ticket_medio|
+---------+-----------+-----------+------------+
|CANCELLED|1213       |617289.39  |508.89      |
|PENDING  |1200       |613652.57  |511.38      |
|SHIPPED  |1175       |605346.53  |515.19      |
|DELIVERED|1149       |570071.61  |496.15      |
+---------+-----------+-----------+------------+



[PARQUET - RAW] Resumo financeiro
+-------------+-----------+------------+
|total_pedidos|valor_total|ticket_medio|
+-------------+-----------+-

In [28]:
# CÉLULA 6 - OPERAÇÕES ACID (INSERT, DELETE, UPSERT) + TIME TRAVEL

from delta.tables import DeltaTable
from pyspark.sql.functions import to_timestamp, col
from datetime import datetime

path_curated_delta = "/tmp/vendas_curated_delta"

# Carregar a tabela Delta como DeltaTable (necessário para usar MERGE/DELETE)
delta_table = DeltaTable.forPath(spark, path_curated_delta)

print("Estado inicial (5 primeiros registros):")
delta_table.toDF().select("order_id", "customer_id", "amount", "order_date", "status") \
    .show(5, truncate=False)

# Guardar a versão INICIAL (primeira escrita da tabela) para usar no Time Travel.
# Assim eu garanto que é uma versão realmente anterior às operações ACID.
history_df = delta_table.history()
versao_inicial = history_df.select("version").orderBy(col("version").asc()).limit(1).collect()[0]["version"]
print(f"\nVersão inicial da tabela (antes das operações ACID desta célula): {versao_inicial}")

# 6.1 INSERT - inserir um novo pedido (ORD99999) se não existir
print("\n[ACID] INSERT - Novo pedido ORD99999")

df_insert = spark.createDataFrame(
    [
        ("ORD99999", "CUST101", 999.99, "2024-12-01 10:00:00", "PENDING")
    ],
    ["order_id", "customer_id", "amount", "order_date", "status"]
)

# Ajusto os tipos para ficar consistente com a camada CURATED
df_insert = (
    df_insert
    .withColumn("amount", col("amount").cast("DECIMAL(10,2)"))
    .withColumn("order_date", to_timestamp("order_date", "yyyy-MM-dd HH:mm:ss"))
)

# MERGE para tratar o INSERT de forma ACID: se não encontrar a chave, insere.
delta_table.alias("t").merge(
    df_insert.alias("s"),
    "t.order_id = s.order_id"
).whenNotMatchedInsertAll().execute()

print("Registro ORD99999 após INSERT:")
delta_table.toDF().filter(col("order_id") == "ORD99999").show(truncate=False)

# 6.2 DELETE - remover um pedido específico (ORD00003)
print("\n[ACID] DELETE - Remover pedido ORD00003")
delta_table.delete("order_id = 'ORD00003'")

print("Verificando ORD00003 (deve estar vazio):")
delta_table.toDF().filter(col("order_id") == "ORD00003").show(truncate=False)

# 6.3 UPSERT - atualizar pedido existente ORD00001 (ou inserir se não existir)
print("\n[ACID] UPSERT - Atualizar pedido ORD00001")

df_upsert = spark.createDataFrame(
    [
        ("ORD00001", "CUST001", 1500.00, "2024-11-24 15:30:00", "DELIVERED")
    ],
    ["order_id", "customer_id", "amount", "order_date", "status"]
)

df_upsert = (
    df_upsert
    .withColumn("amount", col("amount").cast("DECIMAL(10,2)"))
    .withColumn("order_date", to_timestamp("order_date", "yyyy-MM-dd HH:mm:ss"))
)

# MERGE como UPSERT:
# - quando encontra a chave, atualiza (UPDATE)
# - se não encontrar, insere (INSERT).
delta_table.alias("t").merge(
    df_upsert.alias("s"),
    "t.order_id = s.order_id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

print("Registro ORD00001 após UPSERT:")
delta_table.toDF().filter(col("order_id") == "ORD00001").show(truncate=False)

# 6.4 TIME TRAVEL - consultar a versão anterior da tabela (versão inicial)

print("\n================ TIME TRAVEL (VERSÃO INICIAL) ================")

df_historico = (
    spark.read.format("delta")
    .option("versionAsOf", versao_inicial)
    .load(path_curated_delta)
)

print(f"\nNa versão {versao_inicial}, antes das operações ACID desta célula, o pedido ORD00003 existia?")
df_historico.filter(col("order_id") == "ORD00003") \
    .select("order_id", "customer_id", "amount", "order_date", "status") \
    .show(truncate=False)

print(f"\nNa versão {versao_inicial}, o pedido ORD99999 ainda não existia:")
df_historico.filter(col("order_id") == "ORD99999") \
    .select("order_id", "customer_id", "amount", "order_date", "status") \
    .show(truncate=False)

print("\nVersão atual (após operações ACID) - registros afetados:")
delta_table.toDF().filter(
    col("order_id").isin("ORD00001", "ORD00003", "ORD99999")
).select("order_id", "customer_id", "amount", "order_date", "status") \
 .orderBy("order_id").show(truncate=False)

print("\nTime Travel demonstra que o Delta Lake mantém o histórico de versões,")
print("permitindo recuperar o estado anterior da tabela mesmo após DELETE e UPSERT.")


Estado inicial (5 primeiros registros):
+--------+-----------+-------+-------------------+---------+
|order_id|customer_id|amount |order_date         |status   |
+--------+-----------+-------+-------------------+---------+
|ORD00001|CUST001    |1500.00|2024-11-24 15:30:00|DELIVERED|
|ORD00004|CUST1      |163.86 |2024-11-11 00:45:46|SHIPPED  |
|ORD00005|CUST98     |97.28  |2024-10-10 09:21:55|DELIVERED|
|ORD00006|CUST6      |538.55 |2024-11-06 18:02:56|DELIVERED|
|ORD00007|CUST80     |364.83 |2024-10-19 16:03:53|PENDING  |
+--------+-----------+-------+-------------------+---------+
only showing top 5 rows


Versão inicial da tabela (antes das operações ACID desta célula): 0

[ACID] INSERT - Novo pedido ORD99999
Registro ORD99999 após INSERT:
+--------+-----------+------+-------------------+-------+
|order_id|customer_id|amount|order_date         |status |
+--------+-----------+------+-------------------+-------+
|ORD99999|CUST101    |999.99|2024-12-01 10:00:00|PENDING|
+--------+------

In [32]:
# CÉLULA 7 - RESUMO DO PIPELINE

print("=" * 70)
print("RESUMO DO PIPELINE - DATA LAKEHOUSE TRANSACIONAL")
print("=" * 70)

print("\n1) Coleta / Aquisição (Camada RAW)")
print("   - Geração de 5000 pedidos simulados (order_id, customer_id, amount, order_date, status).")
print("   - Introdução de nulos propositalmente em customer_id, amount e order_date.")
print("   - Persistência em formato aberto Parquet na camada RAW: /tmp/vendas_raw.")

print("\n2) Pré-processamento (Qualidade de Dados) - 3 Transformações")
print("   [1] Limpeza de nulos: remoção de registros com amount ou order_date nulos.")
print("   [2] Casting de tipos: amount -> DECIMAL(10,2), order_date -> TIMESTAMP.")
print("   [3] Desduplicação: uso de Window + row_number para manter o registro mais recente por order_id.")

print("\n3) Camada CURATED em Delta Lake (Open Data Format)")
print("   - Dados tratados gravados em Delta Lake: /tmp/vendas_curated_delta.")
print("   - Delta Lake garante ACID, time travel e log de transações (_delta_log).")

print("\n4) Análises de Negócio (KPIs)")
print("   - KPI 1: resumo financeiro geral (total de pedidos, valor total, ticket médio, menor e maior compra).")
print("   - KPI 2: valor total, quantidade de pedidos e ticket médio por status (PENDING, SHIPPED, DELIVERED, CANCELLED).")

print("\n5) Operações ACID em Delta Lake (Requisito Avançado)")
print("   - INSERT: inclusão do pedido ORD99999 via MERGE quando não correspondido.")
print("   - DELETE: remoção lógica/física do pedido ORD00003 na tabela Delta.")
print("   - UPSERT: atualização do pedido ORD00001 (valor e status) via MERGE (whenMatchedUpdateAll).")

print("\n6) Conclusão")
print("   - Pipeline completo de Data Lakehouse Transacional implementado em PySpark + Delta Lake.")
print("   - Atende aos requisitos: coleta, 3 transformações de qualidade, uso de formato Open Data (Delta),")
print("     e demonstração de operações ACID (INSERT, DELETE, UPSERT) na tabela final.")

print("\n7) Alunos")
print("   - Arthur Peres            - 10310501")
print("   - João Vitor Camargo      - 10739829")
print("   - Ricardo Betteloni Lopes - 10742658")
print("=" * 70)


RESUMO DO PIPELINE - DATA LAKEHOUSE TRANSACIONAL

1) Coleta / Aquisição (Camada RAW)
   - Geração de 5000 pedidos simulados (order_id, customer_id, amount, order_date, status).
   - Introdução de nulos propositalmente em customer_id, amount e order_date.
   - Persistência em formato aberto Parquet na camada RAW: /tmp/vendas_raw.

2) Pré-processamento (Qualidade de Dados) - 3 Transformações
   [1] Limpeza de nulos: remoção de registros com amount ou order_date nulos.
   [2] Casting de tipos: amount -> DECIMAL(10,2), order_date -> TIMESTAMP.
   [3] Desduplicação: uso de Window + row_number para manter o registro mais recente por order_id.

3) Camada CURATED em Delta Lake (Open Data Format)
   - Dados tratados gravados em Delta Lake: /tmp/vendas_curated_delta.
   - Delta Lake garante ACID, time travel e log de transações (_delta_log).

4) Análises de Negócio (KPIs)
   - KPI 1: resumo financeiro geral (total de pedidos, valor total, ticket médio, menor e maior compra).
   - KPI 2: valor to