## gold.rst_comercio_exterior

### 1. Objetivo
Processar e integrar dados de **exportações e importações por NCM e UF**, enriquecendo com classificações ISIC, para gerar a **tabela GOLD `gold.rst_comercio_exterior`**.  

A tabela resultante fornece informações sobre:
- Valores FOB (exportado/importado)
- Peso líquido em kg das operações
- Distribuição por UF, NCM e ISIC
- Tipo de operação: Exportação (EXP) ou Importação (IMP)
- Atualização incremental via Delta Merge

---

### 2. Fontes de Dados

#### SILVER
| DataFrame | Fonte | Descrição |
|-----------|-------|-----------|
| `df_exportacoes` | `tb_exportacoes` | Dados de exportações: ano, mês, UF, NCM, valor FOB, peso líquido |
| `df_importacoes` | `tb_importacoes` | Dados de importações: ano, mês, UF, NCM, valor FOB, peso líquido |
| `df_nome_mercosul` | `tb_nomenclatura_mercosul` | Mapeamento NCM → ISIC Classe |
| `df_ref_isic` | `tb_referencia_ncm_isic` | Mapeamento ISIC Classe → ISIC Divisão |

#### GOLD (Dimensões)
| DataFrame | Fonte | Descrição |
|-----------|-------|-----------|
| `df_dim_ncm_isic` | `gold.dim_ncm_isic` | Mapeamento NCM → ISIC Classe |

---

### 3. Fluxo do Notebook

1. **Configuração de Spark**
   - Inicialização do `SparkSession` e contexto Spark
   - Definição de paths para SILVER e GOLD

2. **Leitura de Dados**
   - Tabelas Delta das exportações, importações e dimensões auxiliares
   - Leitura da dimensão `dim_ncm_isic` e broadcast para otimização de joins

3. **Transformações Exportações**
   - Criação das colunas padronizadas:
     | Coluna | Descrição |
     |--------|-----------|
     | `an_operacao` | Ano da operação |
     | `me_operacao` | Mês da operação |
     | `sg_unidade_federativa` | Sigla da UF |
     | `cd_nomenclatura_mercosul` | Código NCM |
     | `vl_free_on_board` | Valor FOB |
     | `qt_peso_liquido_kg` | Peso líquido (kg) |
     | `tp_operacao` | Tipo da operação (`EXP`) |
   - Filtragem de registros com ano ou mês nulos

4. **Transformações Importações**
   - Mesmas colunas que exportações, com `tp_operacao` = `IMP`
   - Criação da coluna `ts_atualizacao` com timestamp atual
   - Filtragem de registros inválidos

5. **Transformações Nomenclatura Mercosul**
   - Criação de colunas `cd_nomenclatura_mercosul` e `cd_classificacao_internacional`
   - Broadcast para otimização do join

6. **União Exportações + Importações**
   - `unionByName` com `allowMissingColumns=True`
   - Permite ter um único DataFrame `df_unified` com todas as operações

7. **Join com NCM → ISIC**
   - Mapeamento das operações com a dimensão `dim_ncm_isic` via broadcast
   - Criação da coluna `cod_isic`

8. **Agregação Final**
   - Agrupamento por:
     - `ano_operacao`, `mes_operacao`, `sg_unidade_federativa`, `cod_ncm`, `cod_isic`, `tipo_operacao`
   - Métricas:
     | Coluna | Descrição |
     |--------|-----------|
     | `vl_fob` | Soma do valor FOB da operação |
     | `qtd_peso_liquido_kg` | Soma do peso líquido em kg |
   
9. **Carga GOLD (Delta Merge)**
   - Atualização incremental usando `DeltaTable.merge`
   - Quando `Matched`: atualiza `vl_fob` e `qtd_peso_liquido_kg`
   - Quando `NotMatched`: insere todos os registros novos

---

### 4. Estrutura da Tabela GOLD

| Coluna | Tipo | Descrição |
|--------|------|-----------|
| `ano_operacao` | int | Ano da operação |
| `mes_operacao` | int | Mês da operação |
| `sg_unidade_federativa` | string | Sigla da UF |
| `cod_ncm` | string | Código NCM (Nomenclatura Mercosul) |
| `cod_isic` | string | Código ISIC Classe |
| `tipo_operacao` | string | Tipo da operação (`EXP` ou `IMP`) |
| `vl_fob` | float | Valor FOB agregado |
| `qtd_peso_liquido_kg` | decimal(20,3) | Peso líquido agregado (kg) |

---

### 5. Resultados Esperados

- Tabela **consolidada e enriquecida** de comércio exterior por NCM e UF
- Permite análises como:
  - Comparação exportações vs importações por UF e setor
  - Evolução mensal de valor FOB e peso líquido
  - Distribuição setorial via ISIC
  - Atualização incremental eficiente via Delta Merge

---

### 6. Observações

- Uso de broadcast join para otimizar a junção com dimensão pequena
- Valores nulos tratados como zero durante agregações
- Separação de exportações e importações com coluna `tipo_operacao`
- Merge Delta garante que a tabela GOLD permaneça incremental e consistente


In [0]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import *
from delta.tables import DeltaTable

In [0]:
# Paths
path_exportacoes = "/mnt/silver/landingbeca2026jan/comercio_ext_estatisticas/tb_exportacoes/"
path_importacoes = "/mnt/silver/landingbeca2026jan/comercio_ext_estatisticas/tb_importacoes/"
path_nome_mercosul = "/mnt/silver/landingbeca2026jan/comercio_ext_indices/tb_nomenclatura_mercosul/"
path_ref_isic = "/mnt/silver/landingbeca2026jan/comercio_ext_indices/tb_referencia_ncm_isic/"
target = "/mnt/gold/rst_comercio_exterior"
dim_isic_table = "gold.dim_ncm_isic"

In [0]:
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext


# Read Delta Tables
df_exportacoes = spark.read.format("delta").load(path_exportacoes)
df_importacoes = spark.read.format("delta").load(path_importacoes)
df_nome_mercosul = spark.read.format("delta").load(path_nome_mercosul)
df_ref_isic = spark.read.format("delta").load(path_ref_isic)

df_dim_ncm_isic = (
    spark.table(dim_isic_table)
    .select(
        F.col("cod_ncm"),
        F.col("cod_isic")
    )
)

df_dim_ncm_isic = F.broadcast(df_dim_ncm_isic)

In [0]:
# Transformações Exportações
df_select_exportacoes = (
    df_exportacoes
    .withColumn("an_operacao", F.col("CO_ANO").cast("int"))
    .withColumn("me_operacao", F.col("CO_MES").cast("int"))
    .withColumn("sg_unidade_federativa", F.col("SG_UF_NCM").cast("string"))
    .withColumn("cd_nomenclatura_mercosul", F.col("CO_NCM").cast("string"))
    .withColumn("vl_free_on_board", F.col("VL_FOB").cast("float"))
    .withColumn("qt_peso_liquido_kg", F.col("KG_LIQUIDO").cast(DecimalType(20,3)))
    .withColumn("tp_operacao", F.lit("EXP"))
    .select(
        "an_operacao",
        "me_operacao",
        "sg_unidade_federativa",
        "cd_nomenclatura_mercosul",
        "tp_operacao",
        "vl_free_on_board",
        "qt_peso_liquido_kg"
    )
    .filter(
        (F.col("an_operacao").isNotNull()) & (F.col("me_operacao").isNotNull())
    )
)

# Transformações Importações
df_select_importacoes = (
    df_importacoes
    .withColumn("an_operacao", F.col("CO_ANO").cast("int"))
    .withColumn("me_operacao", F.col("CO_MES").cast("int"))
    .withColumn("sg_unidade_federativa", F.col("SG_UF_NCM").cast("string"))
    .withColumn("cd_nomenclatura_mercosul", F.col("CO_NCM").cast("string"))
    .withColumn("vl_free_on_board", F.col("VL_FOB").cast("float"))
    .withColumn("qt_peso_liquido_kg", F.col("KG_LIQUIDO").cast(DecimalType(20,3)))
    .withColumn("tp_operacao", F.lit("IMP"))
    .withColumn("ts_atualizacao", F.current_timestamp())
    .select(
        "an_operacao",
        "me_operacao",
        "sg_unidade_federativa",
        "cd_nomenclatura_mercosul",
        "tp_operacao",
        "vl_free_on_board",
        "qt_peso_liquido_kg"
    )
    .filter(
        (F.col("an_operacao").isNotNull()) & (F.col("me_operacao").isNotNull())
    )
)

# Transformações Nomenclatura Mercosul
df_select_nome_mercosul = (
    df_nome_mercosul
    .withColumn("cd_nomenclatura_mercosul", F.col("CO_NCM").cast("string"))
    .withColumn("cd_classificacao_internacional", F.col("CO_ISIC_CLASSE").cast("string"))
    .select("cd_nomenclatura_mercosul", "cd_classificacao_internacional")
    .filter(F.col("cd_classificacao_internacional").isNotNull())
)

In [0]:
# Broadcast 
df_select_nome_mercosul = F.broadcast(df_select_nome_mercosul)


# Counts (Opcional)
print(f"Exportacoes count: {df_select_exportacoes.count()}")
print(f"Importacoes count: {df_select_importacoes.count()}")


# Union Exportações + Importações
df_unified = df_select_exportacoes.unionByName(df_select_importacoes, allowMissingColumns=True)

# Join com NCM x ISIC
df_join = (
    df_unified
    .withColumnRenamed("cd_nomenclatura_mercosul", "cod_ncm")
    .join(
        df_dim_ncm_isic,
        on="cod_ncm",
        how="left"
    )
)


# Agregação Final (Incluindo cod_ncm)
df_agg = (
    df_join
    .withColumnRenamed("an_operacao", "ano_operacao")
    .withColumnRenamed("me_operacao", "mes_operacao")
    .withColumnRenamed("tp_operacao", "tipo_operacao")
    .groupBy(
        "ano_operacao",
        "mes_operacao",
        "sg_unidade_federativa",
        "cod_ncm",      
        "cod_isic",
        "tipo_operacao"
    )
    .agg(
        F.sum(F.coalesce(F.col("vl_free_on_board"), F.lit(0)))
            .cast("float")
            .alias("vl_fob"),
        F.sum(F.coalesce(F.col("qt_peso_liquido_kg"), F.lit(0)))
            .cast(DecimalType(20,3))
            .alias("qtd_peso_liquido_kg")
    )
)

In [0]:
delta_target = DeltaTable.forName(spark, "gold.rst_comercio_exterior")

delta_target.alias("t").merge(
    df_agg.alias("s"),
    """
    t.ano_operacao = s.ano_operacao AND
    t.mes_operacao = s.mes_operacao AND
    t.sg_unidade_federativa = s.sg_unidade_federativa AND
    t.cod_ncm = s.cod_ncm AND
    (t.cod_isic <=> s.cod_isic) AND
    t.tipo_operacao = s.tipo_operacao
    """
).whenMatchedUpdate(
    set={
        "vl_fob": F.col("s.vl_fob"),
        "qtd_peso_liquido_kg": F.col("s.qtd_peso_liquido_kg")
    }
).whenNotMatchedInsertAll().execute()