# üéØ GovBR Data Lake - Demonstra√ß√£o Completa

## Apresenta√ß√£o da Solu√ß√£o de Data Lake com Arquitetura Medallion

Este notebook demonstra a arquitetura completa do Data Lake GovBR:
- **Bronze**: Dados brutos das APIs
- **Prata**: Dados transformados e relacionados
- **Ouro**: Dados enriquecidos para an√°lise

### Tecnologias Utilizadas
- ‚úÖ Apache Spark 4.0.1
- ‚úÖ Delta Lake (via JARs)
- ‚úÖ MinIO (S3-compatible)
- ‚úÖ Python/PySpark

In [75]:
# ‚ö†Ô∏è IMPORTANTE: Execute primeiro o notebook CONFIGURAR_SPARK.ipynb
# Se ainda n√£o executou, execute o script de corre√ß√£o:
import sys

def verificar_spark_funcional():
    """Verifica se Spark est√° funcional, n√£o apenas se existe"""
    try:
        spark
    except NameError:
        return False, "Spark n√£o existe"
    
    try:
        # Tentar acessar uma propriedade simples
        _ = spark.sparkContext
        # Tentar criar um DataFrame de teste
        test_df = spark.range(1)
        test_df.collect()
        return True, "Spark funcional"
    except Exception as e:
        return False, f"Spark existe mas n√£o est√° funcional: {e}"

spark_ok, mensagem = verificar_spark_funcional()

if spark_ok:
    print(f"‚úÖ {mensagem}")
else:
    print(f"‚ö†Ô∏è  {mensagem}")
    
    # Verificar se √© erro de Connection Refused (sess√£o zombie)
    if "Connection refused" in str(mensagem) or "Errno 111" in str(mensagem):
        print("\nüîß Detectado: Sess√£o Spark 'zombie' (Connection Refused)")
        print("   Recuperando Spark Session...")
        try:
            exec(open('/home/jovyan/work/recuperar_spark.py').read())
            # Verificar novamente
            spark_ok, mensagem = verificar_spark_funcional()
            if spark_ok:
                print("‚úÖ Spark Session recuperada com sucesso!")
            else:
                print(f"‚ùå Falha na recupera√ß√£o: {mensagem}")
                print("üí° Execute CONFIGURAR_SPARK.ipynb manualmente")
        except (Exception, SystemExit, RuntimeError) as e:
            print(f"‚ùå Erro ao recuperar: {e}")
            import traceback
            traceback.print_exc()
            print("üí° Execute CONFIGURAR_SPARK.ipynb manualmente")
    else:
        print("\nüîß Tentando configurar Spark automaticamente...")
        try:
            # Tentar vers√£o V2 primeiro (mais robusta)
            try:
                exec(open('/home/jovyan/work/fix_spark_py4j_v2.py').read())
            except:
                # Fallback para vers√£o original
                exec(open('/home/jovyan/work/fix_spark_py4j.py').read())
            # Verificar novamente
            spark_ok, mensagem = verificar_spark_funcional()
            if spark_ok:
                print("‚úÖ Spark configurado com sucesso!")
            else:
                print(f"‚ùå Falha ao configurar: {mensagem}")
                print("üí° Execute CONFIGURAR_SPARK.ipynb manualmente")
        except (Exception, SystemExit, RuntimeError) as e:
            print(f"‚ùå Erro ao executar script de corre√ß√£o: {e}")
            import traceback
            traceback.print_exc()
            print("üí° Execute manualmente: CONFIGURAR_SPARK.ipynb")

‚úÖ Spark funcional


## üìä Vis√£o Geral da Arquitetura

In [76]:
print("=" * 80)
print("GOVBR DATA LAKE - ARQUITETURA MEDALLION")
print("=" * 80)

# Fun√ß√£o auxiliar para verificar Spark de forma segura
def verificar_spark_info():
    """Verifica informa√ß√µes do Spark com tratamento robusto de erros"""
    info = {}
    
    # Tentar obter vers√£o
    try:
        info['version'] = spark.version
    except Exception as e:
        try:
            # M√©todo alternativo via SparkContext
            info['version'] = spark.sparkContext.version
        except Exception as e2:
            print(f"‚ö†Ô∏è  N√£o foi poss√≠vel obter vers√£o: {e2}")
            info['version'] = "N/A (erro Py4J)"
    
    # Tentar obter app name
    try:
        info['app_name'] = spark.sparkContext.appName
    except Exception as e:
        print(f"‚ö†Ô∏è  N√£o foi poss√≠vel obter appName: {e}")
        info['app_name'] = "N/A"
    
    # Tentar obter master
    try:
        info['master'] = spark.sparkContext.master
    except Exception as e:
        print(f"‚ö†Ô∏è  N√£o foi poss√≠vel obter master: {e}")
        info['master'] = "N/A"
    
    # Teste b√°sico de funcionalidade
    try:
        test_df = spark.range(1)
        test_df.collect()
        info['funcional'] = True
    except Exception as e:
        print(f"‚ö†Ô∏è  Spark n√£o est√° totalmente funcional: {e}")
        info['funcional'] = False
    
    return info

# Verificar Spark com tratamento de erros robusto
try:
    spark_info = verificar_spark_info()
    
    print(f"\n‚úÖ Spark vers√£o: {spark_info.get('version', 'N/A')}")
    print(f"‚úÖ App: {spark_info.get('app_name', 'N/A')}")
    print(f"‚úÖ Master: {spark_info.get('master', 'N/A')}")
    
    if not spark_info.get('funcional', False):
        print("\n‚ö†Ô∏è  ATEN√á√ÉO: Spark Session existe mas pode n√£o estar totalmente funcional")
        print("üí° Se houver erros, execute CONFIGURAR_SPARK.ipynb novamente")
    
except Exception as e:
    print(f"\n‚ùå Erro ao verificar Spark Session: {e}")
    print("\nüí° SOLU√á√ïES:")
    print("1. Execute CONFIGURAR_SPARK.ipynb novamente")
    print("2. Verifique processos Java: !ps aux | grep java")
    print("3. Reinicie o container: docker restart govbr-jupyter-delta")
    print("4. Execute diagn√≥stico: exec(open('/home/jovyan/work/diagnostico_spark.py').read())")
    raise

print("\n" + "=" * 80)

GOVBR DATA LAKE - ARQUITETURA MEDALLION

‚úÖ Spark vers√£o: 3.5.0
‚úÖ App: GovBR Data Lake - JARs Manuais
‚úÖ Master: local[1]



## ü•â Camada Bronze - Dados Brutos

### ‚ö†Ô∏è Se os dados Bronze n√£o aparecerem (erro S3A)

Execute esta c√©lula para reiniciar o Spark com os JARs corretos:

In [77]:
# Reiniciar Spark com JARs do Hadoop AWS
# Execute esta c√©lula se houver erro "ClassNotFoundException: S3AFileSystem"

print("üîÑ Reiniciando Spark com JARs do Hadoop AWS...")
print("=" * 80)

try:
    exec(open('/home/jovyan/work/spark_com_jars_manual.py').read())
    print("\n‚úÖ Spark reiniciado com sucesso!")
    print("üí° Execute novamente a c√©lula acima para ver os dados Bronze")
except Exception as e:
    print(f"\n‚ùå Erro ao reiniciar Spark: {e}")
    import traceback
    traceback.print_exc()
    print("\nüí° Tente executar manualmente:")
    print("   exec(open('/home/jovyan/work/spark_com_jars_manual.py').read())")

üîÑ Reiniciando Spark com JARs do Hadoop AWS...
üöÄ INICIALIZANDO SPARK COM JARs MANUAIS

[1/4] Baixando JARs necess√°rios...
  ‚úÖ hadoop-aws-3.3.4.jar j√° existe
  ‚úÖ aws-java-sdk-bundle-1.12.262.jar j√° existe
  ‚úÖ 2 JARs prontos

[2/4] Limpando Spark existente...
  ‚úÖ Spark limpo

[3/4] Matando processos Java...

[4/4] Inicializando Spark com JARs...
  ‚ÑπÔ∏è  Criando SparkSession com JARs locais...
     JARs: /tmp/spark_jars/hadoop-aws-3.3.4.jar,/tmp/spark_jars/aws-java-sdk-bundle-1.12.262.jar
  ‚ÑπÔ∏è  Inicializando Spark (aguarde alguns segundos)...
  ‚ÑπÔ∏è  Configurando S3A...
  ‚ÑπÔ∏è  Verificando funcionalidade...
  ‚úÖ Spark inicializado com sucesso!
     Vers√£o: 3.5.0
     App: GovBR Data Lake - JARs Manuais
  ‚ÑπÔ∏è  Testando acesso S3A...
  ‚úÖ S3A configurado - tentando ler dados...
  ‚úÖ S3A funcionando! Encontrados 5,571 registros em municipios

‚úÖ SPARK INICIALIZADO COM SUCESSO!

Agora voc√™ pode ler os dados Bronze:
  df = spark.read.parquet('s3a://govbr/bron

## ‚ö†Ô∏è Executar Ingest√£o Bronze

Se a camada Bronze estiver vazia, execute a c√©lula abaixo para fazer a ingest√£o dos dados:

In [79]:
# Executar ingest√£o Bronze
# ‚ö†Ô∏è Execute esta c√©lula apenas se a camada Bronze estiver vazia

print("ü•â Executando ingest√£o Bronze...")
print("=" * 80)

try:
    # Executar script de ingest√£o
    exec(open('/home/jovyan/work/01_bronze_ingestion.py').read())
    
    print("\n‚úÖ Ingest√£o Bronze conclu√≠da!")
    print("üí° Execute novamente a c√©lula acima para ver os dados Bronze gerados")
    
except Exception as e:
    print(f"\n‚ùå Erro ao executar ingest√£o: {e}")
    import traceback
    traceback.print_exc()
    print("\nüí° Verifique:")
    print("   1. Conex√£o com a internet (APIs externas)")
    print("   2. Conex√£o com MinIO")
    print("   3. Permiss√µes de escrita no bucket")

ü•â Executando ingest√£o Bronze...
CAMADA BRONZE - INGEST√ÉO DE DADOS BRUTOS

[1/5] Coletando munic√≠pios do Brasil (IBGE)...
‚úÖ Bronze: bronze/ibge/municipios/dt=20251114/data.parquet (5571 registros, 132.35 KB)

[2/5] Coletando estados do Brasil (IBGE)...
‚úÖ Bronze: bronze/ibge/estados/dt=20251114/data.parquet (27 registros, 4.86 KB)

[3/5] Coletando √≥rg√£os SIAFI...
‚úÖ Bronze: bronze/portal_transparencia/orgaos_siafi/dt=20251114/data.parquet (6 registros, 2.10 KB)

[4/5] Coletando dados de BPC (amostra SP - primeiros 50 munic√≠pios)...
  Progresso: 3270/50 munic√≠pios...
  Progresso: 3280/50 munic√≠pios...
  Progresso: 3290/50 munic√≠pios...
  Progresso: 3300/50 munic√≠pios...
  Progresso: 3310/50 munic√≠pios...
‚úÖ Bronze: bronze/portal_transparencia/bpc_municipios/dt=20251114/data.parquet (50 registros, 10.19 KB)
  ‚úÖ 50 munic√≠pios com dados de BPC

[4b/5] Coletando dados de Bolsa Fam√≠lia (amostra SP - primeiros 50 munic√≠pios)...
  Progresso: 3270/50 munic√≠pios...
  Prog

## ‚ö†Ô∏è Gerar Dados Prata

Se a camada Prata n√£o tiver dados, execute a c√©lula abaixo para gerar:

In [80]:
# Executar transforma√ß√£o Prata
# ‚ö†Ô∏è Execute esta c√©lula apenas se a camada Prata estiver vazia

print("üîÑ Executando transforma√ß√£o Prata...")
print("=" * 80)

try:
    # Verificar se Spark est√° funcional
    test_df = spark.range(1)
    test_df.collect()
    
    # Executar script de transforma√ß√£o
    exec(open('/home/jovyan/work/02_prata_transformacao.py').read())
    
    print("\n‚úÖ Transforma√ß√£o Prata conclu√≠da!")
    print("üí° Execute novamente a c√©lula acima para ver os dados Prata gerados")
    
except Exception as e:
    print(f"\n‚ùå Erro ao executar transforma√ß√£o: {e}")
    import traceback
    traceback.print_exc()
    print("\nüí° Verifique se os dados Bronze est√£o dispon√≠veis")

üîÑ Executando transforma√ß√£o Prata...
CAMADA PRATA - TRANSFORMA√á√ÉO E RELACIONAMENTO

[1/4] Carregando dados da camada Bronze...
‚úÖ Lido Bronze: bronze/ibge/municipios/dt=20251114/data.parquet (5571 registros)
‚úÖ Lido Bronze: bronze/ibge/estados/dt=20251114/data.parquet (27 registros)
‚úÖ Lido Bronze: bronze/portal_transparencia/orgaos_siafi/dt=20251114/data.parquet (6 registros)
‚úÖ Lido Bronze: bronze/portal_transparencia/bpc_municipios/dt=20251114/data.parquet (50 registros)
‚úÖ Lido Bronze: bronze/ibge/populacao_estados/dt=20251114/data.parquet (27 registros)

üìä Status dos dados Bronze carregados:
  - Munic√≠pios: ‚úÖ (5571 registros)
  - Estados: ‚úÖ (27 registros)
  - √ìrg√£os: ‚úÖ (6 registros)
  - BPC: ‚úÖ (50 registros)
  - Bolsa Fam√≠lia: ‚ùå (0 registros)
  - Popula√ß√£o: ‚úÖ (27 registros)

[2/4] Tratando e limpando dados...

[3/4] Criando dimens√µes e relacionamentos...
  ‚úÖ Popula√ß√£o adicionada aos munic√≠pios

[Processando BPC] 50 registros encontrados
  ‚úÖ 

In [81]:
# Listar dados Bronze
# O Spark l√™ automaticamente parti√ß√µes quando apontamos para o diret√≥rio pai
bronze_datasets = [
    "s3a://govbr/bronze/ibge/municipios/",
    "s3a://govbr/bronze/ibge/estados/",
    "s3a://govbr/bronze/ibge/populacao_estados/",
    "s3a://govbr/bronze/portal_transparencia/orgaos_siafi/",
    "s3a://govbr/bronze/portal_transparencia/bpc_municipios/"
]

print("ü•â CAMADA BRONZE - Dados Brutos\n")
bronze_summary = []

for path in bronze_datasets:
    try:
        # Tentar ler - Spark deve detectar parti√ß√µes automaticamente
        df = spark.read.parquet(path)
        count = df.count()
        dataset_name = path.split('/')[-2]
        bronze_summary.append({
            'dataset': dataset_name,
            'registros': count,
            'colunas': len(df.columns)
        })
        print(f"‚úÖ {dataset_name:30s} | {count:>10,} registros | {len(df.columns):>3} colunas")
    except Exception as e:
        dataset_name = path.split('/')[-2]
        error_msg = str(e)
        # Se for erro de classe n√£o encontrada, √© problema de configura√ß√£o S3A
        if "ClassNotFoundException" in error_msg or "S3AFileSystem" in error_msg:
            print(f"‚ö†Ô∏è  {dataset_name:30s} | Erro S3A - Execute: exec(open('/home/jovyan/work/inicializar_spark.py').read())")
        elif "Path does not exist" in error_msg or "does not exist" in error_msg:
            print(f"‚ö†Ô∏è  {dataset_name:30s} | N√£o dispon√≠vel")
        else:
            print(f"‚ö†Ô∏è  {dataset_name:30s} | Erro: {error_msg[:50]}...")

print(f"\nüìä Total de datasets Bronze: {len(bronze_summary)}")

if len(bronze_summary) == 0:
    print("\nüí° DICAS:")
    print("   1. Verifique se a ingest√£o foi executada: exec(open('/home/jovyan/work/verificar_bronze.py').read())")
    print("   2. Se os dados existem mas n√£o aparecem, reinicie o Spark: exec(open('/home/jovyan/work/inicializar_spark.py').read())")
    print("   3. Execute a ingest√£o: exec(open('/home/jovyan/work/01_bronze_ingestion.py').read())")

ü•â CAMADA BRONZE - Dados Brutos

‚úÖ municipios                     |      5,571 registros |  11 colunas
‚úÖ estados                        |         27 registros |   7 colunas
‚úÖ populacao_estados              |         27 registros |   5 colunas
‚úÖ orgaos_siafi                   |          6 registros |   3 colunas
‚úÖ bpc_municipios                 |         50 registros |  13 colunas

üìä Total de datasets Bronze: 5


## üîÑ Camada Prata - Dados Transformados

## ‚ö†Ô∏è Gerar Dados Ouro

Se a camada Ouro n√£o tiver dados, execute a c√©lula abaixo para gerar:

In [82]:
# Executar enriquecimento Ouro
# ‚ö†Ô∏è Execute esta c√©lula apenas se a camada Ouro estiver vazia
# ‚ö†Ô∏è IMPORTANTE: A camada Prata deve estar gerada primeiro!

print("üèÜ Executando enriquecimento Ouro...")
print("=" * 80)

try:
    # Verificar se Spark est√° funcional
    test_df = spark.range(1)
    test_df.collect()
    
    # Verificar se Prata existe
    try:
        prata_test = spark.read.parquet("s3a://govbr/prata/dim_municipios/")
        prata_count = prata_test.count()
        print(f"‚úÖ Camada Prata dispon√≠vel ({prata_count:,} registros em dim_municipios)")
    except Exception as e:
        print("‚ùå Camada Prata n√£o est√° dispon√≠vel!")
        print("üí° Execute primeiro a transforma√ß√£o Prata (c√©lula acima)")
        raise
    
    # Executar script de enriquecimento
    exec(open('/home/jovyan/work/03_ouro_enriquecimento.py').read())
    
    print("\n‚úÖ Enriquecimento Ouro conclu√≠do!")
    print("üí° Execute novamente a c√©lula acima para ver os dados Ouro gerados")
    
except Exception as e:
    print(f"\n‚ùå Erro ao executar enriquecimento: {e}")
    import traceback
    traceback.print_exc()
    print("\nüí° Verifique se:")
    print("   1. A camada Prata est√° gerada")
    print("   2. Os dados Bronze est√£o dispon√≠veis")
    print("   3. Spark est√° funcionando corretamente")

üèÜ Executando enriquecimento Ouro...
‚úÖ Camada Prata dispon√≠vel (5,571 registros em dim_municipios)
CAMADA OURO - ENRIQUECIMENTO E DADOS FINAIS

[1/5] Carregando dados da camada Prata...
‚úÖ Lido Prata: prata/dim_municipios/dt=20251114/data.parquet (5571 registros)
‚úÖ Lido Prata: prata/dim_estados/dt=20251114/data.parquet (27 registros)
‚úÖ Lido Prata: prata/fato_bpc/dt=20251114/data.parquet (50 registros)
‚úÖ Lido Prata: prata/dim_orgaos/dt=20251114/data.parquet (6 registros)

[2/5] Enriquecendo dimens√£o de munic√≠pios...
‚úÖ Ouro: ouro/dim_municipios_enriquecida/dt=20251114/data.parquet (5571 registros, 142.47 KB)

[3/5] Enriquecendo dimens√£o de estados...
‚úÖ Ouro: ouro/dim_estados_enriquecida/dt=20251114/data.parquet (27 registros, 13.78 KB)

[4/5] Criando fato BPC enriquecido...
‚úÖ Ouro: ouro/fato_bpc_enriquecido/dt=20251114/data.parquet (50 registros, 19.49 KB)

[5/5] Criando tabelas agregadas para an√°lise...
‚úÖ Ouro: ouro/agregacao_bpc_por_regiao/dt=20251114/data.parqu

In [83]:
# Listar dados Prata
prata_datasets = [
    "s3a://govbr/prata/dim_municipios/",
    "s3a://govbr/prata/dim_estados/",
    "s3a://govbr/prata/dim_orgaos/",
    "s3a://govbr/prata/fato_bpc/",
    "s3a://govbr/prata/fato_bolsa_familia/"
]

print("üîÑ CAMADA PRATA - Dados Transformados\n")
prata_summary = []

for path in prata_datasets:
    try:
        df = spark.read.parquet(path)
        count = df.count()
        dataset_name = path.split('/')[-2]
        prata_summary.append({
            'dataset': dataset_name,
            'registros': count,
            'colunas': len(df.columns)
        })
        print(f"‚úÖ {dataset_name:30s} | {count:>10,} registros | {len(df.columns):>3} colunas")
    except Exception as e:
        print(f"‚ö†Ô∏è  {path.split('/')[-2]:30s} | N√£o dispon√≠vel")

print(f"\nüìä Total de datasets Prata: {len(prata_summary)}")

üîÑ CAMADA PRATA - Dados Transformados

‚úÖ dim_municipios                 |      5,571 registros |  16 colunas
‚úÖ dim_estados                    |         27 registros |  13 colunas
‚úÖ dim_orgaos                     |          6 registros |   3 colunas
‚ö†Ô∏è  fato_bpc                       | N√£o dispon√≠vel
‚ö†Ô∏è  fato_bolsa_familia             | N√£o dispon√≠vel

üìä Total de datasets Prata: 3


## üèÜ Camada Ouro - Dados Enriquecidos

In [84]:
# Listar dados Ouro
ouro_datasets = [
    "s3a://govbr/ouro/municipios_enriquecidos/",
    "s3a://govbr/ouro/estados_enriquecidos/",
    "s3a://govbr/ouro/bpc_analytics/",
    "s3a://govbr/ouro/rankings/",
    "s3a://govbr/ouro/agregacoes_regionais/"
]

print("üèÜ CAMADA OURO - Dados Enriquecidos\n")
ouro_summary = []

for path in ouro_datasets:
    try:
        df = spark.read.parquet(path)
        count = df.count()
        dataset_name = path.split('/')[-2]
        ouro_summary.append({
            'dataset': dataset_name,
            'registros': count,
            'colunas': len(df.columns)
        })
        print(f"‚úÖ {dataset_name:30s} | {count:>10,} registros | {len(df.columns):>3} colunas")
    except Exception as e:
        print(f"‚ö†Ô∏è  {path.split('/')[-2]:30s} | N√£o dispon√≠vel")

print(f"\nüìä Total de datasets Ouro: {len(ouro_summary)}")

üèÜ CAMADA OURO - Dados Enriquecidos

‚ö†Ô∏è  municipios_enriquecidos        | N√£o dispon√≠vel
‚ö†Ô∏è  estados_enriquecidos           | N√£o dispon√≠vel
‚ö†Ô∏è  bpc_analytics                  | N√£o dispon√≠vel
‚ö†Ô∏è  rankings                       | N√£o dispon√≠vel
‚ö†Ô∏è  agregacoes_regionais           | N√£o dispon√≠vel

üìä Total de datasets Ouro: 0


## üìà An√°lise Demonstrativa

In [85]:
# Exemplo: An√°lise cruzando as tr√™s camadas
try:
    # Ler dados de cada camada
    df_bronze_municipios = spark.read.parquet("s3a://govbr/bronze/ibge/municipios/")
    
    print("üìä Exemplo de An√°lise: Munic√≠pios por Regi√£o\n")
    
    from pyspark.sql.functions import count, col
    
    resultado = df_bronze_municipios \
        .groupBy("regiao_nome") \
        .agg(count("*").alias("total_municipios")) \
        .orderBy(col("total_municipios").desc())
    
    resultado.show(truncate=False)
    
except Exception as e:
    print(f"‚ö†Ô∏è  Erro na an√°lise: {e}")

üìä Exemplo de An√°lise: Munic√≠pios por Regi√£o

+------------+----------------+
|regiao_nome |total_municipios|
+------------+----------------+
|Nordeste    |1794            |
|Sudeste     |1668            |
|Sul         |1191            |
|Centro-Oeste|467             |
|Norte       |450             |
|NULL        |1               |
+------------+----------------+



## ‚úÖ Resumo da Arquitetura

### Fluxo de Dados:
```
APIs ‚Üí Bronze ‚Üí Prata ‚Üí Ouro
  ‚Üì       ‚Üì       ‚Üì       ‚Üì
IBGE   Parquet  Dimens√µes  M√©tricas
Portal          Fatos      Rankings
```

### Benef√≠cios:
- ‚úÖ **Bronze**: Preserva dados originais
- ‚úÖ **Prata**: Dados limpos e relacionados
- ‚úÖ **Ouro**: Pronto para an√°lise e dashboards
- ‚úÖ **Escal√°vel**: Spark processa grandes volumes
- ‚úÖ **Flex√≠vel**: MinIO armazena em formato Parquet

### Tecnologias:
- Apache Spark 4.0.1
- Delta Lake (via JARs)
- MinIO (S3-compatible)
- Python/PySpark