In [0]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder \
    .appName("IngestaoCNPJ") \
    .getOrCreate()

In [0]:
%run ./Intancia_Containers

In [0]:
import zipfile
import os

def unzip_file(path_origem, container_nome):
    """
    Copia o ZIP do Blob Storage para o DBFS local e extrai os CSVs.
    """

    # Nome do arquivo
    zip_name = os.path.basename(path_origem)

    # Caminho tempor√°rio no DBFS
    dbfs_zip_path = f"dbfs:/tmp/{zip_name}"
    local_zip_path = f"/dbfs/tmp/{zip_name}"

    # Copia do Blob (wasbs) para DBFS
    dbutils.fs.cp(path_origem, dbfs_zip_path, recurse=False)

    # Pasta de extra√ß√£o
    extract_path = f"/dbfs/tmp/unzipped/{container_nome}"
    os.makedirs(extract_path, exist_ok=True)

    # Extrai os arquivos
    with zipfile.ZipFile(local_zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_path)

        extracted_files = [
            f"/dbfs/tmp/unzipped/{container_nome}/{f}"
            for f in zip_ref.namelist()
        ]

    # Retorna caminhos compat√≠veis com Spark
    return [f.replace("/dbfs", "dbfs:") for f in extracted_files]

In [0]:
%run ./Colunas_CNPJ

In [0]:
def renomeia_colunas(df, nome_tabela):
    # Normaliza o nome (remove n√∫meros como Empresas0, Empresas1)
    nome_normalizado = ''.join(filter(str.isalpha, nome_tabela))

    colunas_corretas = colunas_map.get(nome_normalizado)

    if not colunas_corretas:
        print(f"[Aviso] Nenhum dicion√°rio encontrado para '{nome_tabela}'.")
        return df

    if len(df.columns) != len(colunas_corretas):
        print(
            f"[Aviso] '{nome_tabela}': n√∫mero de colunas diferente "
            f"(DF={len(df.columns)} | Esperado={len(colunas_corretas)})."
        )
        return df

    # Renomeia todas as colunas de uma vez (mais perform√°tico)
    return df.toDF(*colunas_corretas)

In [0]:
from pyspark.sql.functions import max as spark_max, col

# Verifica ultima atualiza√ß√£o da tabela bronze
df_last_ingestion = (
    spark.read.format("delta")
        .load(CONTROL_TABLE_PATH)
        .filter(col("originator") == "CNPJ")
        .groupBy("table_name")
        .agg(
            spark_max("last_ingestion_timestamp")
                .alias("last_ingestion_timestamp")
        )
)

# Join com tabelas da origem
df_inventario_filtrado = (
    df_inventario
        .filter(col("container_name") == "CNPJ")
        .join(
            df_last_ingestion,
            on="table_name",
            how="left"
        )
        .filter(
            col("last_ingestion_timestamp").isNull() |
            (col("modificationTime") > col("last_ingestion_timestamp"))
        )
        .drop("last_ingestion_timestamp")
)

display(df_inventario_filtrado)


In [0]:
import re

# iterando nos Arquivos
for row in df_inventario_filtrado.collect():
    path_origem = row['path']
    container = row['container_name']
    nome_arquivo = row['table_name']

    # Removendo a extens√£o para o nome da tabela Delta
    tabela_nome = nome_arquivo.split('.')[0].lower()
    print(f"‚è≥ Processando: {tabela_nome}")

    try:
        if not nome_arquivo.endswith('.zip'):
            print(f"‚ùå Arquivo {nome_arquivo} n√£o √© ZIP")
            continue

        print(f"üì¶ Descompactando: {nome_arquivo}")
        arquivos_extraidos = unzip_file(path_origem, container)

        if not arquivos_extraidos:
            print(f"‚ö†Ô∏è Nenhum CSV encontrado em {nome_arquivo}")
            continue

        # Remove sufixo num√©rico ‚Üí empresas0 ‚Üí empresas
        nome_tabela = re.sub(r"\d+$", "", tabela_nome)
    
        destino = f"{BRONZE_BASE_PATH}/{container.lower()}/{nome_tabela}"

        for arquivo in arquivos_extraidos:
            df_temp = spark.read.format("csv") \
                .option("header", "false") \
                .option("sep", ";") \
                .option("quote", '"') \
                .option("escape", '"') \
                .option("encoding", "ISO-8859-1") \
                .load(arquivo)
            
            df_temp = renomeia_colunas(df_temp, nome_tabela)

            df_temp = df_temp.select(
                [col(c).cast("string").alias(c) for c in df_temp.columns]
            )

            print(f"‚úÖ Gravando tabela bronze: {nome_tabela}")

            # Adicionando metadados para qualidade dos dados
            df = (
                df_temp
                .withColumn("_ingestion_date", current_date())
                .withColumn("_ingestion_timestamp", current_timestamp())
                .withColumn("_source_path", lit(path_origem+"/"+arquivo.split("/")[-1]))
            )
            
            # Particionando por data de ingest√£o para manter hist√≥rico
            df.write \
                .format("delta") \
                .mode("append") \
                .partitionBy("_ingestion_date") \
                .save(destino)

            # Atualizando tabela de controle
            dt = DeltaTable.forPath(spark, CONTROL_TABLE_PATH)        
            
            controle_df = spark.createDataFrame(
                [("CNPJ", destino, path_origem)],
                ["originator", "table_name", "input_file_name"]
            ).withColumn(
                "last_ingestion_timestamp", current_timestamp()
            )

            dt.alias("target").merge(
                controle_df.alias("source"),
                "target.originator = source.originator AND target.table_name = source.table_name"
            ).whenMatchedUpdateAll() \
            .whenNotMatchedInsertAll() \
            .execute()

    except Exception as e:
        print(f"‚ùå Erro ao processar {nome_arquivo}: {e}")

In [0]:
%skip
from functools import reduce

tabelas_unificadas = {}

for nome_tabela, lista_dfs in tabelas_bronze.items():
    df_final = reduce(
        lambda df1, df2: df1.unionByName(df2, allowMissingColumns=True),
        lista_dfs
    )
    tabelas_unificadas[nome_tabela] = df_final

In [0]:
%skip
from pyspark.sql.functions import current_timestamp, current_date, lit
from delta.tables import DeltaTable

for nome_tabela, df in tabelas_unificadas.items():
    destino = f"{BRONZE_BASE_PATH}/{container.lower()}/{nome_tabela}"

    print(f"‚úÖ Gravando tabela bronze: {nome_tabela}")

    # Adicionando metadados para qualidade dos dados
    df = (
        df
        .withColumn("_ingestion_date", current_date())
        .withColumn("_ingestion_timestamp", current_timestamp())
        .withColumn("_source_path", lit(controle_ingestao.get(nome_tabela)))
    )
    
    # Particionando por data de ingest√£o para manter hist√≥rico
    df.write \
        .format("delta") \
        .mode("append") \
        .partitionBy("_ingestion_date") \
        .save(destino)

    # Atualizando tabela de controle
    dt = DeltaTable.forPath(spark, CONTROL_TABLE_PATH)        
    
    controle_df = spark.createDataFrame(
        [("BALANCE", destino, path_origem)],
        ["originator", "table_name", "input_file_name"]
    ).withColumn(
        "last_ingestion_timestamp", current_timestamp()
    )

    dt.alias("target").merge(
        controle_df.alias("source"),
        "target.originator = source.originator AND target.table_name = source.table_name"
    ).whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()