# üè¶ Data Masters: Pipeline H√≠brido (End-to-End)

Este notebook demonstra a execu√ß√£o do pipeline **Data Masters** em ambiente Cloud (Databricks).
Ele replica a l√≥gica exata dos scripts locais, provando a portabilidade da arquitetura.

### üèóÔ∏è Arquitetura Medallion
1. **Bronze:** Ingest√£o de dados sint√©ticos (Simulando API).
2. **Silver:** Limpeza, **Data Quality** e Anonimiza√ß√£o (**LGPD**).
3. **Gold:** Agrega√ß√£o de KPIs de vendas por Estado.

###Setup e Data Quality

In [0]:
# Instala biblioteca de dados falsos
%pip install faker

dbutils.library.restartPython()

from pyspark.sql.functions import col, regexp_replace, sum, count, desc, round, current_timestamp
from pyspark.sql.types import *
from faker import Faker
import random
from datetime import datetime

class DataQuality:
    def __init__(self):
        print("üõ°Ô∏è Guardi√£o de Qualidade Inicializado")

    def check_nulls(self, df, columns):
        """Verifica nulos em colunas cr√≠ticas"""
        print(f"   üîç Verificando nulos em: {columns}")
        for c in columns:
            null_count = df.filter(col(c).isNull() | (col(c) == "")).count()
            if null_count > 0:
                print(f"   ‚ö†Ô∏è ALERTA: Coluna '{c}' tem {null_count} nulos!")
            else:
                print(f"   ‚úÖ Coluna '{c}' est√° √≠ntegra.")
    
    def check_positive_values(self, df, columns):
        """Verifica se valores num√©ricos s√£o positivos"""
        print(f"   üîç Verificando valores negativos em: {columns}")
        for c in columns:
            neg_count = df.filter(col(c) < 0).count()
            if neg_count > 0:
                print(f"   üö® ERRO: Coluna '{c}' possui valores negativos!")
            else:
                print(f"   ‚úÖ Coluna '{c}' cont√©m apenas valores positivos.")

dq = DataQuality()
fake = Faker('pt_BR')
print("‚úÖ Ambiente Configurado!")

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m
üõ°Ô∏è Guardi√£o de Qualidade Inicializado
‚úÖ Ambiente Configurado!


###Bronze

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from faker import Faker
import random
from datetime import datetime
import uuid

print("üöÄ [BRONZE] Iniciando Ingest√£o de Dados...")

try:
    fake = Faker('pt_BR')
except NameError:
    fake = Faker('pt_BR')

def gerar_dados_fake(qtd=1000):
    dados = []
    estados = ["SP", "RJ", "MG", "RS", "SC", "BA", "PE", "AM", "DF", "GO"]
    
    for _ in range(qtd):
        dados.append((
            str(uuid.uuid4()),
            fake.date_time_this_year().isoformat(),
            round(random.uniform(10.0, 5000.0), 2),
            fake.name(),
            fake.cpf(),
            fake.credit_card_number(),
            fake.city(),
            random.choice(estados)
        ))
    return dados

schema = StructType([
    StructField("id_transacao", StringType(), True),
    StructField("data_evento", StringType(), True),
    StructField("valor", DoubleType(), True),
    StructField("cliente_nome", StringType(), True),
    StructField("cliente_cpf", StringType(), True),
    StructField("cartao", StringType(), True),
    StructField("cidade", StringType(), True),
    StructField("estado", StringType(), True)
])

dados = gerar_dados_fake(500)
df_bronze = spark.createDataFrame(dados, schema)

df_bronze.createOrReplaceTempView("v_bronze_raw")

print("‚úÖ View 'v_bronze_raw' criada na mem√≥ria (Substitui o arquivo JSON).")
display(spark.sql("SELECT * FROM v_bronze_raw LIMIT 50"))

üöÄ [BRONZE] Iniciando Ingest√£o de Dados...
‚úÖ View 'v_bronze_raw' criada na mem√≥ria (Substitui o arquivo JSON).


id_transacao,data_evento,valor,cliente_nome,cliente_cpf,cartao,cidade,estado
e1a706ab-2610-4d14-b336-facac05e0f65,2026-01-15T09:25:47.244298,4980.66,Henry Castro,620.479.813-87,4374474967926483332,Moraes do Amparo,SP
03b0b7c6-6691-4bab-a0f3-c38f59f7b21c,2026-01-05T21:56:59.039920,1463.99,Josu√© Arag√£o,158.734.260-07,4047196184248083639,Teixeira,DF
7e1a0607-b423-4103-a9ea-2425ca098704,2026-01-07T22:05:33.281282,497.38,Dr. Nicolas da Mota,694.715.038-84,4069107433728765500,Nogueira do Galho,PE
96f04868-f028-4cda-8b75-2703d65ad959,2026-01-14T14:30:53.617060,3910.06,Rebeca Nascimento,136.972.845-09,372446333851595,Ribeiro,RJ
21db8161-4cca-4298-9cf3-cd549f168d2e,2026-01-16T19:10:20.101264,4886.77,Otto Ferreira,398.754.012-50,4665655254308504,Sampaio,SC
13bf0ced-441e-4f0f-aeb2-7e42aa7755dc,2026-01-16T16:08:19.834032,1357.83,Sr. Theo da Paz,268.941.570-49,4085744597088413029,Vargas de Santos,PE
0a3a8309-3c24-4cc8-8d3d-5def87b2a3d1,2026-01-18T02:33:38.247764,2391.73,Srta. Anna Liz Campos,781.526.409-30,4538699129529,Sousa,DF
e1dd6427-bcb5-45eb-9af1-af20f8013cc2,2026-01-12T12:47:38.678184,3250.67,Daniela Farias,962.578.403-92,4792501207623,Moreira Paulista,BA
9b31aedb-1389-45a3-9faa-523753ecba1b,2026-01-06T17:28:42.883682,1349.04,Josu√© Abreu,218.036.954-98,3553289328041920,Montenegro do Amparo,GO
22121ef9-9672-43e5-805f-7cb028f8c50a,2026-01-21T11:19:57.002027,3445.8,Laura Pimenta,592.134.860-06,6516847182609584,Peixoto do Campo,DF


###Silver

In [0]:
from pyspark.sql.functions import col, regexp_replace

print("üöÄ [SILVER] Iniciando Tratamento e Anonimiza√ß√£o...")

try:
    df = spark.table("v_bronze_raw")
except Exception:
    print("‚ùå ERRO: A View 'v_bronze_raw' n√£o foi encontrada. Rode a c√©lula Bronze anterior!")
    dbutils.notebook.exit("1")

try:
    print("--- üïµÔ∏è Executando Auditoria de Qualidade ---")
    dq.check_nulls(df, ["id_transacao", "cliente_nome"])
    dq.check_positive_values(df, ["valor"])
    print("------------------------------------------")
except NameError:
    print("‚ö†Ô∏è Aviso: Classe DataQuality n√£o encontrada. Rode a C√©lula 1 de Setup.")

df_silver = df.withColumn(
    "cpf_mascarado", 
    regexp_replace(col("cliente_cpf"), r"\d{3}\.\d{3}\.\d{3}", "***.***.***")
).withColumn(
    "cartao_tokenizado",
    regexp_replace(col("cartao"), r"^.*(\d{4})$", "**** **** **** $1")
).drop("cliente_cpf", "cartao") 

df_silver.createOrReplaceTempView("v_silver_trusted")

print("‚úÖ View 'v_silver_trusted' criada na mem√≥ria (Dados anonimizados).")
display(spark.sql("SELECT id_transacao, valor, cpf_mascarado, cartao_tokenizado, estado FROM v_silver_trusted LIMIT 50"))

üöÄ [SILVER] Iniciando Tratamento e Anonimiza√ß√£o...
--- üïµÔ∏è Executando Auditoria de Qualidade ---
‚ö†Ô∏è Aviso: Classe DataQuality n√£o encontrada. Rode a C√©lula 1 de Setup.
‚úÖ View 'v_silver_trusted' criada na mem√≥ria (Dados anonimizados).


id_transacao,valor,cpf_mascarado,cartao_tokenizado,estado
e1a706ab-2610-4d14-b336-facac05e0f65,4980.66,***.***.***-87,**** **** **** 3332,SP
03b0b7c6-6691-4bab-a0f3-c38f59f7b21c,1463.99,***.***.***-07,**** **** **** 3639,DF
7e1a0607-b423-4103-a9ea-2425ca098704,497.38,***.***.***-84,**** **** **** 5500,PE
96f04868-f028-4cda-8b75-2703d65ad959,3910.06,***.***.***-09,**** **** **** 1595,RJ
21db8161-4cca-4298-9cf3-cd549f168d2e,4886.77,***.***.***-50,**** **** **** 8504,SC
13bf0ced-441e-4f0f-aeb2-7e42aa7755dc,1357.83,***.***.***-49,**** **** **** 3029,PE
0a3a8309-3c24-4cc8-8d3d-5def87b2a3d1,2391.73,***.***.***-30,**** **** **** 9529,DF
e1dd6427-bcb5-45eb-9af1-af20f8013cc2,3250.67,***.***.***-92,**** **** **** 7623,BA
9b31aedb-1389-45a3-9faa-523753ecba1b,1349.04,***.***.***-98,**** **** **** 1920,GO
22121ef9-9672-43e5-805f-7cb028f8c50a,3445.8,***.***.***-06,**** **** **** 9584,DF


###Gold

In [0]:
from pyspark.sql.functions import col, sum, count, desc, round

print("üöÄ [GOLD] Calculando KPIs de Vendas por Estado...")

try:
    df_trusted = spark.table("v_silver_trusted")
except:
    print("‚ùå ERRO: View Silver n√£o encontrada. Rode a c√©lula anterior.")

df_gold = df_trusted.groupBy("estado") \
    .agg(
        sum("valor").alias("total_vendas"),
        count("id_transacao").alias("qtd_transacoes")
    ) \
    .withColumn("total_vendas", round(col("total_vendas"), 2)) \
    .orderBy(desc("total_vendas"))

df_gold.createOrReplaceTempView("v_gold_kpi")

print("‚úÖ View 'v_gold_kpi' criada. Ranking de vendas pronto:")
display(spark.sql("SELECT * FROM v_gold_kpi"))

üöÄ [GOLD] Calculando KPIs de Vendas por Estado...
‚úÖ View 'v_gold_kpi' criada. Ranking de vendas pronto:


estado,total_vendas,qtd_transacoes
GO,140020.68,59
SC,137817.61,52
RS,133707.29,50
BA,128113.43,51
MG,126498.22,50
AM,117723.33,53
SP,112558.73,44
PE,110712.25,53
DF,104137.17,45
RJ,96908.71,43


###Data Observability

In [0]:
from pyspark.sql.functions import sum

print("üîç Iniciando Auditoria de Integridade...")

try:
    qtd_bronze = spark.table("v_bronze_raw").count()
    qtd_silver = spark.table("v_silver_trusted").count()
    
    qtd_gold_recuperada = spark.table("v_gold_kpi").agg(sum("qtd_transacoes")).collect()[0][0]

    print(f"\nüìä RELAT√ìRIO DE VOLUMETRIA:")
    print(f"   üîπ Bronze (Raw):     {qtd_bronze} registros")
    print(f"   üîπ Silver (Trusted): {qtd_silver} registros")
    print(f"   üîπ Gold (Agg):       {qtd_gold_recuperada} registros (Soma das transa√ß√µes)")

    if qtd_bronze == qtd_silver == qtd_gold_recuperada:
        print("\n‚úÖ SUCESSO TOTAL: O Pipeline processou 100% dos dados sem perdas!")
    else:
        print("\n‚ùå ALERTA CR√çTICO: Diverg√™ncia de dados encontrada entre as camadas.")

except Exception as e:
    print(f"‚ùå Erro na auditoria: {e}. Verifique se todas as c√©lulas anteriores rodaram.")

üîç Iniciando Auditoria de Integridade...

üìä RELAT√ìRIO DE VOLUMETRIA:
   üîπ Bronze (Raw):     500 registros
   üîπ Silver (Trusted): 500 registros
   üîπ Gold (Agg):       500 registros (Soma das transa√ß√µes)

‚úÖ SUCESSO TOTAL: O Pipeline processou 100% dos dados sem perdas!


###Simula√ß√£o de consulta do GOLD em estudo

In [0]:
%sql
SELECT 
    estado,
    total_vendas,
    qtd_transacoes,
    concat('R$ ', format_number(total_vendas, 2)) as vendas_formatado
FROM v_gold_kpi
WHERE total_vendas > 5000
ORDER BY total_vendas DESC
LIMIT 27

estado,total_vendas,qtd_transacoes,vendas_formatado
GO,140020.68,59,"R$ 140,020.68"
SC,137817.61,52,"R$ 137,817.61"
RS,133707.29,50,"R$ 133,707.29"
BA,128113.43,51,"R$ 128,113.43"
MG,126498.22,50,"R$ 126,498.22"
AM,117723.33,53,"R$ 117,723.33"
SP,112558.73,44,"R$ 112,558.73"
PE,110712.25,53,"R$ 110,712.25"
DF,104137.17,45,"R$ 104,137.17"
RJ,96908.71,43,"R$ 96,908.71"
