# Pipeline de ETL: Camada Bronze para Prata

## 1. Objetivo

Este notebook é o motor central do pipeline de ETL. Ele é responsável por:
1.  **Extrair (Extract)**: Ler todos os dados brutos da Camada Bronze (`/DataLayer/raw/data.zip`).
2.  **Transformar (Transform)**: Aplicar todas as regras de negócio descobertas na análise, incluindo:
    * Unificação (JOINs) de todas as tabelas.
    * **Tratamento de Outliers** (via método IQR), conforme feedback.
    * Limpeza de tipos de dados (ex: `try_cast` em `review_score`).
    * Imputação de nulos (ex: `coalesce` em `categoria_produto`).
    * Engenharia de Features (ex: `tempo_entrega_dias`).
3.  **Carregar (Load)**: Salvar o resultado limpo.

In [None]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
import zipfile 
import os
from dotenv import load_dotenv
import psycopg2
import shutil
import glob

spark = SparkSession.builder \
    .appName("ETLBronzeParaPrataOlist") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.5.0") \
    .getOrCreate()

print("SparkSession iniciada com o driver PostgreSQL.")

## 2. Extração (Extract)

Carregamos todos os datasets da camada Bronze (`/DataLayer/raw/data.zip`) para DataFrames Spark.

In [2]:
zip_path = '../../DataLayer/raw/data.zip' 

extract_path = './data/' 

os.makedirs(extract_path, exist_ok=True)

print(f"Extraindo '{zip_path}' para '{extract_path}'...")
try:
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_path)
    print("Arquivos extraídos com sucesso.")
except FileNotFoundError:
    print(f"!!! ERRO: Arquivo '{zip_path}' não encontrado.")
    spark.stop()
except Exception as e:
    print(f"!!! ERRO ao extrair o arquivo zip: {e} !!!")
    spark.stop()

caminho_base_dados = extract_path 
dfs = {}

arquivos = [
    "olist_customers_dataset.csv", "olist_geolocation_dataset.csv", 
    "olist_order_items_dataset.csv", "olist_order_payments_dataset.csv",
    "olist_order_reviews_dataset.csv", "olist_orders_dataset.csv",
    "olist_products_dataset.csv", "olist_sellers_dataset.csv",
    "product_category_name_translation.csv"
]

print("\nCarregando arquivos CSV para DataFrames PySpark...")
for arquivo in arquivos:
    nome_chave = arquivo.replace("_dataset.csv", "").replace(".csv", "")
    caminho_completo = os.path.join(caminho_base_dados, arquivo)
    
    if os.path.exists(caminho_completo):
        try:
            # Não precisamos mais do .cache() pois os arquivos não serão apagados
            dfs[nome_chave] = spark.read.csv(caminho_completo, header=True, inferSchema=True)
            print(f"- DataFrame '{nome_chave}' carregado.")
        except Exception as e:
            print(f"!! ERRO ao carregar '{arquivo}': {e} !!")
    else:
        print(f"!! AVISO: Arquivo '{arquivo}' não encontrado em '{extract_path}'. Pulando...")

print("\nDataFrames disponíveis para análise:")
for nome in dfs.keys():
    print(f"- {nome}")

Extraindo '../../DataLayer/raw/data.zip' para './data/'...
Arquivos extraídos com sucesso.

Carregando arquivos CSV para DataFrames PySpark...
- DataFrame 'olist_customers' carregado.


                                                                                

- DataFrame 'olist_geolocation' carregado.
- DataFrame 'olist_order_items' carregado.
- DataFrame 'olist_order_payments' carregado.
- DataFrame 'olist_order_reviews' carregado.
- DataFrame 'olist_orders' carregado.
- DataFrame 'olist_products' carregado.
- DataFrame 'olist_sellers' carregado.
- DataFrame 'product_category_name_translation' carregado.

DataFrames disponíveis para análise:
- olist_customers
- olist_geolocation
- olist_order_items
- olist_order_payments
- olist_order_reviews
- olist_orders
- olist_products
- olist_sellers
- product_category_name_translation


## 3. Transformação (Transform)

Aplicamos a unificação dos dados, seguida das regras de negócio de limpeza e enriquecimento.

In [3]:
print("Iniciando unificação dos DataFrames (JOINs)...")

df_prata = dfs["olist_order_items"].join(
    dfs["olist_orders"], "order_id", "left"
).join(
    dfs["olist_products"], "product_id", "left"
).join(
    dfs["olist_sellers"], "seller_id", "left"
).join(
    dfs["olist_customers"], "customer_id", "left"
).join(
    dfs["olist_order_payments"].groupBy("order_id")
        .agg(
            F.first("payment_type").alias("payment_type"),
            F.first("payment_installments").alias("payment_installments"),
            F.sum("payment_value").alias("payment_value")
        ), "order_id", "left"
).join(
    dfs["olist_order_reviews"].groupBy("order_id")
        .agg(
            F.first("review_score").alias("review_score")
        ), "order_id", "left"
).join(
    dfs["product_category_name_translation"], "product_category_name", "left"
)

df_prata.cache()
print(f"JOINs concluídos. Total de linhas pós-join: {df_prata.count()}")

Iniciando unificação dos DataFrames (JOINs)...


25/11/12 23:09:48 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

JOINs concluídos. Total de linhas pós-join: 112650


                                                                                

In [4]:
print("Iniciando tratamento de outliers (Método IQR)...")

colunas_outliers = ['price', 'freight_value', 'payment_value']
df_prata_sem_outliers = df_prata

for coluna in colunas_outliers:
    if coluna in df_prata.columns:
        print(f"Tratando outliers da coluna: '{coluna}'")
        
        quantiles = df_prata_sem_outliers.approxQuantile(coluna, [0.25, 0.75], 0.01)
        q1 = quantiles[0]
        q3 = quantiles[1]
        iqr = q3 - q1
        
        limite_inferior = q1 - (1.5 * iqr)
        limite_superior = q3 + (1.5 * iqr)
        
        df_prata_sem_outliers = df_prata_sem_outliers.filter(
            (F.col(coluna) >= limite_inferior) & (F.col(coluna) <= limite_superior)
        )
    else:
        print(f"AVISO: Coluna '{coluna}' não encontrada para tratamento de outlier.")

df_prata_sem_outliers.cache()

print(f"\nLinhas antes do tratamento de outliers: {df_prata.count():,}")
print(f"Linhas após o tratamento de outliers: {df_prata_sem_outliers.count():,}")

Iniciando tratamento de outliers (Método IQR)...
Tratando outliers da coluna: 'price'


                                                                                

Tratando outliers da coluna: 'freight_value'
Tratando outliers da coluna: 'payment_value'


                                                                                


Linhas antes do tratamento de outliers: 112,650




Linhas após o tratamento de outliers: 88,981


                                                                                

In [5]:
print("Iniciando limpeza final, seleção e engenharia de features...")

df_prata_transformado = df_prata_sem_outliers.select(
    # --- CHAVES ---
    F.concat(F.col("order_id"), F.lit("_"), F.col("order_item_id")).alias("order_item_id"), 
    F.col("order_id"),
    F.col("customer_unique_id"),
    F.col("product_id"),
    F.col("seller_id"),
    
    # --- DADOS DO PEDIDO ---
    F.col("order_status"),
    F.col("order_purchase_timestamp"),
    F.col("order_approved_at"),
    F.col("order_delivered_carrier_date"),
    F.col("order_delivered_customer_date"),
    F.col("order_estimated_delivery_date"),

    # --- DADOS DO ITEM ---
    F.col("price").cast("decimal(10,2)"),
    F.col("freight_value").cast("decimal(10,2)"),
    
    # --- DADOS DO PRODUTO ---
    F.coalesce(F.col("product_category_name_english"), F.lit("unknown")).alias("product_category_name"),

    # --- DADOS DE LOCALIZAÇÃO ---
    F.col("customer_city"),
    F.col("customer_state"),
    F.col("seller_city"),
    F.col("seller_state"),

    # --- DADOS DE PAGAMENTO ---
    F.col("payment_type"),
    F.col("payment_installments"),
    F.col("payment_value").cast("decimal(10,2)"),

    # --- DADOS DE AVALIAÇÃO ---
    F.expr("try_cast(review_score as int)").alias("review_score"),
    
    # --- ENGENHARIA DE FEATURES ---
    F.datediff(F.col("order_delivered_customer_date"), F.col("order_purchase_timestamp")).alias("delivery_days"),
    (F.col("order_delivered_customer_date") > F.col("order_estimated_delivery_date")).alias("is_delivery_late")
)

df_prata_final = df_prata_transformado.filter(
    (F.col("review_score").isNull()) | (F.col("review_score").between(1, 5))
)

df_prata_final.cache()

print("\nTransformações concluídas. Schema final da camada Prata:")
df_prata_final.printSchema()

print("\nAmostra dos dados transformados:")
df_prata_final.show(5, truncate=False)

Iniciando limpeza final, seleção e engenharia de features...

Transformações concluídas. Schema final da camada Prata:
root
 |-- order_item_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- price: decimal(10,2) (nullable = true)
 |-- freight_value: decimal(10,2) (nullable = true)
 |-- product_category_name: string (nullable = false)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (n



+----------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+------+-------------+---------------------+---------------------+--------------+---------------+------------+------------+--------------------+-------------+------------+-------------+----------------+
|order_item_id                     |order_id                        |customer_unique_id              |product_id                      |seller_id                       |order_status|order_purchase_timestamp|order_approved_at  |order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|price |freight_value|product_category_name|customer_city        |customer_state|seller_city    |seller_state|payment_type|payment_installments|payment_value|review_scor

                                                                                

## 4. Carga (Load)

Com o DataFrame final transformado e limpo, carregamos os dados nos destinos da Camada Prata:
1.  **Data Lake (Parquet):** Formato colunar otimizado para futuras análises em Spark/BI.
2.  **Data Lake (CSV):** Conforme solicitado, um arquivo CSV único para visualização simples.
3.  **Data Warehouse (PostgreSQL):** Carrega os dados no banco para consumo pelo dashboard.

In [6]:
def carregar_variaveis_ambiente():
    env_path = '../../.env'
    if os.path.exists(env_path):
        load_dotenv(dotenv_path=env_path)
        print("Variáveis de ambiente carregadas.")
        return {
            "db_user": os.getenv("DB_USER"), "db_password": os.getenv("DB_PASSWORD"),
            "db_host": os.getenv("DB_HOST"), "db_port": os.getenv("DB_PORT"),
            "db_name": os.getenv("DB_NAME")
        }
    else:
        print(f"ERRO: Arquivo .env não encontrado em '{env_path}'")
        return None

def gerar_e_salvar_ddl(df, table_name, schema_name, ddl_path):
    print(f"Gerando DDL para a tabela '{schema_name}.{table_name}'...")
    
    dtype_mapping = {
        'string': 'VARCHAR(255)', 'bigint': 'BIGINT',
        'int': 'INTEGER', 'integer': 'INTEGER',
        'double': 'DOUBLE PRECISION', 'float': 'FLOAT',
        'decimal(10,2)': 'NUMERIC(10, 2)',
        'timestamp': 'TIMESTAMP', 'date': 'DATE',
        'boolean': 'BOOLEAN'
    }
    
    schema = df.dtypes
    
    ddl = f"CREATE SCHEMA IF NOT EXISTS {schema_name};\n\n"
    ddl += f"DROP TABLE IF EXISTS {schema_name}.{table_name};\n\n"
    ddl += f"CREATE TABLE {schema_name}.{table_name} (\n"
    
    colunas_ddl = []
    for col_name, col_type in schema:
        sql_type = dtype_mapping.get(col_type, 'TEXT') 
        
        not_null_cols = ['order_item_id', 'order_id', 'customer_unique_id', 'product_id', 'seller_id']
        not_null = " NOT NULL" if col_name in not_null_cols else ""
        
        pk = " PRIMARY KEY" if col_name == 'order_item_id' else ""
        
        colunas_ddl.append(f"    {col_name} {sql_type}{not_null}{pk}")
    
    ddl += ",\n".join(colunas_ddl)
    ddl += "\n);"
    
    try:
        os.makedirs(os.path.dirname(ddl_path), exist_ok=True)
        with open(ddl_path, 'w', encoding='utf-8') as f:
            f.write(ddl)
        print(f"Script DDL salvo com sucesso em: {ddl_path}")
    except Exception as e:
        print(f"ERRO ao salvar script DDL: {e}")
        
    return ddl

def executar_ddl_no_banco(ddl_script, env_vars):
    print("Executando DDL no banco de dados...")
    try:
        conn = None
        cur = None
        conn = psycopg2.connect(
            user=env_vars["db_user"], password=env_vars["db_password"],
            host=env_vars["db_host"], port=env_vars["db_port"], dbname=env_vars["db_name"]
        )
        conn.autocommit = True
        cur = conn.cursor()
        cur.execute(ddl_script)
        print(f"Tabela criada com sucesso no PostgreSQL.")
        
    except Exception as e:
        print(f"ERRO ao executar o script DDL: {e}")
        raise
    finally:
        if cur: cur.close()
        if conn: conn.close()

In [7]:
print("\n--- INICIANDO ETAPA DE CARGA (LOAD) ---")

schema_name = "public"
table_name = "orders"
ddl_output_path = f"../../DataLayer/silver/ddl.sql"
csv_final_path = f"../../DataLayer/silver/{table_name}.csv"
csv_temp_path = "../../DataLayer/silver/temp_csv_output"

ddl_script = gerar_e_salvar_ddl(df_prata_final, table_name, schema_name, ddl_output_path)

print("\nIniciando carga para o PostgreSQL...")
env_vars = carregar_variaveis_ambiente()

if env_vars:
    try:
        executar_ddl_no_banco(ddl_script, env_vars)
        
        jdbc_url = f"jdbc:postgresql://{env_vars['db_host']}:{env_vars['db_port']}/{env_vars['db_name']}"
        jdbc_properties = {
            "user": env_vars["db_user"], "password": env_vars["db_password"],
            "driver": "org.postgresql.Driver"
        }
        
        print(f"Inserindo dados na tabela '{schema_name}.{table_name}'...")
        df_prata_final.write.jdbc(
            url=jdbc_url,
            table=f"{schema_name}.{table_name}",
            mode="overwrite",
            properties=jdbc_properties
        )
        print("Carga de dados no PostgreSQL concluída com sucesso.")
        
    except Exception as e:
        print(f"!!! ERRO GERAL NA CARGA PARA O BANCO: {e} !!!")
else:
    print("ERRO: Variáveis de ambiente não carregadas. Carga no PostgreSQL abortada.")

print(f"\nIniciando carga para CSV em: {csv_final_path}")

try:
    df_prata_final.coalesce(1).write.mode("overwrite").option("header", "true").csv(csv_temp_path)
    
    arquivo_part = glob.glob(os.path.join(csv_temp_path, "part-*.csv"))
    
    if arquivo_part:
        shutil.move(arquivo_part[0], csv_final_path)
        print(f"Arquivo CSV movido e renomeado para: {csv_final_path}")
        
        shutil.rmtree(csv_temp_path)
        print(f"Pasta temporária '{csv_temp_path}' removida.")
    else:
        print("ERRO: Nenhum arquivo CSV 'part-' foi encontrado na pasta temporária.")

except Exception as e:
    print(f"Erro ao salvar e limpar o CSV: {e}")

caminho_parquet = "../../DataLayer/silver/data_parquet"
if os.path.exists(caminho_parquet):
    shutil.rmtree(caminho_parquet)
    print(f"Pasta Parquet antiga '{caminho_parquet}' removida.")

print("\n--- Pipeline ETL concluído ---")


--- INICIANDO ETAPA DE CARGA (LOAD) ---
Gerando DDL para a tabela 'public.orders'...
Script DDL salvo com sucesso em: ../../DataLayer/silver/ddl.sql

Iniciando carga para o PostgreSQL...
Variáveis de ambiente carregadas.
Executando DDL no banco de dados...
Tabela criada com sucesso no PostgreSQL.
Inserindo dados na tabela 'public.orders'...


                                                                                

Carga de dados no PostgreSQL concluída com sucesso.

Iniciando carga para CSV em: ../../DataLayer/silver/orders.csv


[Stage 80:>                                                         (0 + 1) / 1]

Arquivo CSV movido e renomeado para: ../../DataLayer/silver/orders.csv
Pasta temporária '../../DataLayer/silver/temp_csv_output' removida.

--- Pipeline ETL concluído ---


                                                                                

In [8]:
spark.stop()
print("Sessão Spark finalizada.")

pasta_temporaria = './data/'

if os.path.exists(pasta_temporaria):
    try:
        shutil.rmtree(pasta_temporaria)
        print(f"Pasta temporária '{pasta_temporaria}' removida com sucesso.")
    except Exception as e:
        print(f"AVISO: Não foi possível remover '{pasta_temporaria}': {e}")
else:
    print(f"Pasta '{pasta_temporaria}' já não existe.")

Sessão Spark finalizada.
Pasta temporária './data/' removida com sucesso.
