# üì¶ Pipeline Bronze - Ingest√£o e Governan√ßa de Dados de Estabelecimentos (CSV ‚ûù Delta Lake)

üë®‚Äçüíª **Autor:** Lucas Sousa Santos Oliveira  
üéØ **Especialista em Finan√ßas em transi√ß√£o para Engenharia de Dados** | P√≥s-gradua√ß√£o em Big Data e Cloud Computing

---

## üéØ Objetivo do Projeto

Este notebook implementa um **pipeline de ingest√£o robusto, escal√°vel e idempotente** para carregar dados brutos de **estabelecimentos** (CSV) na **Camada Bronze** de um Data Lakehouse utilizando **Delta Lake** no Databricks.

O foco √© garantir:

- ‚úÖ **Qualidade na origem** com schema enforcement e tratamento de dados inv√°lidos
- üîÅ **Upsert eficiente** com `MERGE INTO` para cargas incrementais
- üß† **Rastreabilidade total** com colunas de metadados
- ‚öôÔ∏è **Otimiza√ß√£o de performance** com particionamento, `OPTIMIZE ZORDER` e `VACUUM`
- üõ° **Governan√ßa e confiabilidade** com registro no metastore

---

## üß± Arquitetura do Pipeline

```mermaid
flowchart TD
    A[üìÑ CSV - Estabelecimentos] --> B[üì• Leitura com Schema Enforcement]
    B --> C[üßπ Limpeza e Valida√ß√£o Inicial]
    C --> D[üß† Enriquecimento com data_ingestao]
    D --> E[üß≠ Reparticionamento Estrat√©gico]
    E --> F{Tabela Delta existe?}
    F -- Sim --> G[üîÅ MERGE INTO para UPSERT]
    F -- N√£o --> H[üÜï Cria√ß√£o Inicial da Tabela Delta]
    G & H --> I[üõ° Registro no Metastore]
    I --> J[‚öôÔ∏è OPTIMIZE ZORDER BY (EstabelecimentoID)]
    J --> K[üßº VACUUM 168 HOURS]
```

---

## üìä T√©cnicas e Boas Pr√°ticas Aplicadas

| T√©cnica / Pr√°tica                       | Objetivo / Benef√≠cio |
|-----------------------------------------|----------------------|
| **Schema Enforcement (StructType)**    | Garante tipagem correta e evita infer√™ncia inconsistente |
| **Tratamento de registros inv√°lidos**   | Remove dados malformados prevenindo falhas no MERGE |
| **Deduplica√ß√£o e filtro de nulos**      | Assegura integridade da chave prim√°ria |
| **Coluna `data_ingestao`**              | Rastreabilidade e suporte a Time Travel |
| **Reparticionamento por chave**         | Melhora performance em escrita e consultas futuras |
| **`CREATE TABLE IF NOT EXISTS`**        | Garante visibilidade no cat√°logo e previne erros |
| **`MERGE INTO` (Upsert)**               | Atualiza e insere registros de forma incremental e ACID |
| **`OPTIMIZE ZORDER`**                   | Melhora filtragem e leitura |
| **`VACUUM`**                            | Reduz custo de armazenamento removendo arquivos obsoletos |

---

## üõ† Fluxo Detalhado

### 1Ô∏è‚É£ Leitura do CSV com Schema Enforcement

```python
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([
    StructField("Local", StringType(), True),
    StructField("Email", StringType(), True),
    StructField("EstabelecimentoID", StringType(), True),
    StructField("Telefone", StringType(), True)
])

df_raw = (
    spark.read.format("csv")
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .schema(schema)
    .load("dbfs:/FileStore/Ampev/estabelecimentos.csv")
)
```

üìå *Por que?* ‚Äî Evita infer√™ncia autom√°tica e garante consist√™ncia entre execu√ß√µes.

---

### 2Ô∏è‚É£ Limpeza e Valida√ß√£o

```python
df_clean = (
    df_raw.dropDuplicates()
          .na.drop()
          .filter("EstabelecimentoID IS NOT NULL AND TRIM(EstabelecimentoID) != ''")
)
```

üìå *Por que?* ‚Äî Remove duplicatas, nulos e registros sem chave para evitar falhas no upsert.

---

### 3Ô∏è‚É£ Enriquecimento e Reparticionamento

```python
from pyspark.sql.functions import current_timestamp

df_partitioned = (
    df_clean.withColumn("data_ingestao", current_timestamp())
            .repartition("EstabelecimentoID")
)
```

üìå *Por que?* ‚Äî Adiciona rastreabilidade e melhora performance de escrita.

---

### 4Ô∏è‚É£ Escrita com MERGE INTO

```python
from delta.tables import DeltaTable

if DeltaTable.isDeltaTable(spark, "dbfs:/FileStore/Ampev/tables/bronze/estabelecimentos"):
    delta_table = DeltaTable.forPath(spark, "dbfs:/FileStore/Ampev/tables/bronze/estabelecimentos")
    (
        delta_table.alias("tgt")
        .merge(df_partitioned.alias("src"), "tgt.EstabelecimentoID = src.EstabelecimentoID")
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )
else:
    df_partitioned.write.format("delta")         .partitionBy("data_ingestao")         .option("mergeSchema", "true")         .mode("overwrite")         .save("dbfs:/FileStore/Ampev/tables/bronze/estabelecimentos")
```

üìå *Por que?* ‚Äî Garante ingest√£o incremental sem sobrescrever dados √≠ntegros.

---

### 5Ô∏è‚É£ Otimiza√ß√£o e Manuten√ß√£o

```sql
OPTIMIZE bronze.estabelecimentos ZORDER BY (EstabelecimentoID);
VACUUM bronze.estabelecimentos RETAIN 168 HOURS;
```

üìå *Por que?* ‚Äî Compacta arquivos, melhora filtragem e libera espa√ßo.

---

## ‚úÖ Resultado Esperado

| M√©trica                     | Valor                          |
|-----------------------------|--------------------------------|
| üìå Tabela Delta Criada       | `bronze.estabelecimentos`      |
| üîë Chave de Neg√≥cio         | `EstabelecimentoID`            |
| üßæ Particionamento F√≠sico   | `data_ingestao`                 |
| ‚öôÔ∏è Otimiza√ß√µes Aplicadas     | `OPTIMIZE ZORDER + VACUUM`     |
| üîÅ Tipo de Ingest√£o         | `Batch` + `MERGE INTO`         |

---

## üß† Conclus√£o

Este pipeline √© **produ√ß√£o-ready** e incorpora princ√≠pios modernos de engenharia de dados:

- üìä Qualidade desde a origem
- üîÅ Idempot√™ncia e resili√™ncia
- ‚ö° Performance otimizada para custo e velocidade
- üõ° Governan√ßa e rastreabilidade completas
- üöÄ Pronto para evolu√ß√£o futura com streaming, camada Silver e Gold