In [None]:
import pandas as pd
import psycopg2
from psycopg2 import extras
import os
from datetime import datetime

# --- 1. CONFIGURAÇÕES ---
# ----------------------------------------------------------------------------------
# ATENÇÃO: SUBSTITUA ESTAS CREDENCIAIS PELAS DO SEU BANCO DE DADOS PostgreSQL LOCAL
DB_CONFIG = {
    'host': 'localhost',                 # Para PostgreSQL local
    'port': '5432',                      # Porta padrão para PostgreSQL
    'user': 'postgres',                  # Usuário padrão do PostgreSQL
    'password': 'senha123', # <-- MUITO IMPORTANTE: SUBSTITUA PELA SUA SENHA REAL DO POSTGRES
    'dbname': 'brazilian_ecommerce'      # Nome do banco de dados que será criado/usado
}

# Caminho base para os arquivos CSV no seu computador local
# Ex: 'C:\\Users\\SeuUsuario\\Documentos\\DadosECommerce'
# Certifique-se de que este caminho está correto no seu sistema
CSV_BASE_PATH = 'D:\\Users\\LRL\\Desktop\\PBI Projects\\Brazilian e-commerce\\archive' # <-- AJUSTE ESTE CAMINHO

CSV_FILES = {
    'product_category_name_translation': 'product_category_name_translation.csv',
    'products': 'olist_products_dataset.csv',
    'orders': 'olist_orders_dataset.csv',
    'order_reviews': 'olist_order_reviews_dataset.csv',
    'order_payments': 'olist_order_payments_dataset.csv',
    'sellers': 'olist_sellers_dataset.csv',
    'order_items': 'olist_order_items_dataset.csv',
    'geolocation': 'olist_geolocation_dataset.csv',
    'customers': 'olist_customers_dataset.csv'
}

# --- 2. SCHEMA SQL ---
# ----------------------------------------------------------------------------------
# Este SQL é usado para criar as tabelas no PostgreSQL.
# Inclui todas as correções de tipo de dados e definições de FK/PK.
# A tabela 'zip_code_master' e sua população estão incluídas aqui.
SCHEMA_SQL = """
-- Criação da tabela product_category_name_translation
CREATE TABLE public.product_category_name_translation (
    product_category_name TEXT PRIMARY KEY,
    product_category_name_english TEXT
);

-- Criação da tabela products
CREATE TABLE public.products (
    product_id TEXT PRIMARY KEY,
    product_category_name TEXT,
    product_name_lenght BIGINT,
    product_description_lenght BIGINT,
    product_photos_qty BIGINT,
    product_length_cm BIGINT,
    product_height_cm BIGINT,
    product_width_cm BIGINT,
    product_weight_g BIGINT, -- Corrigido: product_weight_g estava faltando aqui
    CONSTRAINT fk_product_category_name
        FOREIGN KEY (product_category_name)
        REFERENCES public.product_category_name_translation (product_category_name)
);

-- Criação da tabela orders
CREATE TABLE public.orders (
    order_id TEXT PRIMARY KEY,
    customer_id TEXT NOT NULL,
    order_status TEXT,
    order_purchase_timestamp TIMESTAMP WITH TIME ZONE,
    order_approved_at TIMESTAMP WITH TIME ZONE,
    order_delivered_carrier_date TIMESTAMP WITH TIME ZONE,
    order_delivered_customer_date TIMESTAMP WITH TIME ZONE,
    order_estimated_delivery_date TIMESTAMP WITH TIME ZONE
);

-- Criação da tabela order_reviews
CREATE TABLE public.order_reviews (
    review_id TEXT PRIMARY KEY,
    order_id TEXT NOT NULL,
    review_score BIGINT,
    review_comment_title TEXT,
    review_comment_message TEXT,
    review_creation_date TIMESTAMP WITH TIME ZONE,
    review_answer_timestamp TIMESTAMP WITH TIME ZONE,
    CONSTRAINT fk_order_id
        FOREIGN KEY (order_id)
        REFERENCES public.orders (order_id)
);

-- Criação da tabela order_payments
CREATE TABLE public.order_payments (
    order_id TEXT NOT NULL,
    payment_sequential BIGINT NOT NULL,
    payment_type TEXT,
    payment_installments BIGINT,
    payment_value DOUBLE PRECISION, -- CORRIGIDO: de BIGINT para DOUBLE PRECISION
    PRIMARY KEY (order_id, payment_sequential),
    CONSTRAINT fk_order_id
        FOREIGN KEY (order_id)
        REFERENCES public.orders (order_id)
);

-- Criação da tabela sellers
CREATE TABLE public.sellers (
    seller_id TEXT PRIMARY KEY,
    seller_zip_code_prefix TEXT,
    seller_city TEXT,
    seller_state TEXT
);

-- Criação da tabela order_items
CREATE TABLE public.order_items (
    order_id TEXT NOT NULL,
    order_item_id TEXT NOT NULL,
    product_id TEXT NOT NULL,
    seller_id TEXT NOT NULL,
    shipping_limit_date TIMESTAMP WITH TIME ZONE,
    price DOUBLE PRECISION,
    freight_value DOUBLE PRECISION,
    PRIMARY KEY (order_id, order_item_id),
    CONSTRAINT fk_order_id
        FOREIGN KEY (order_id)
        REFERENCES public.orders (order_id),
    CONSTRAINT fk_product_id
        FOREIGN KEY (product_id)
        REFERENCES public.products (product_id),
    CONSTRAINT fk_seller_id
        FOREIGN KEY (seller_id)
        REFERENCES public.sellers (seller_id)
);

-- Criação da tabela geolocation
CREATE TABLE public.geolocation (
    geolocation_zip_code_prefix TEXT NOT NULL,
    geolocation_lat DOUBLE PRECISION,
    geolocation_lng DOUBLE PRECISION,
    geolocation_city TEXT,
    geolocation_state TEXT,
    -- A PK composta garante unicidade, mas o zip_code_prefix sozinho não é único aqui.
    -- A unicidade para FKs será tratada pela tabela zip_code_master.
    PRIMARY KEY (geolocation_zip_code_prefix, geolocation_lat, geolocation_lng)
);

-- Criação da nova tabela intermediária 'zip_code_master'
-- Esta tabela terá um registro único para cada CEP e suas coordenadas representativas.
CREATE TABLE public.zip_code_master (
    zip_code_prefix TEXT PRIMARY KEY,
    latitude DOUBLE PRECISION,
    longitude DOUBLE PRECISION,
    city TEXT,
    state TEXT
);

-- Criação da tabela customers
CREATE TABLE public.customers (
    customer_id TEXT PRIMARY KEY,
    customer_unique_id TEXT,
    customer_zip_code_prefix TEXT,
    customer_city TEXT,
    customer_state TEXT,
    CONSTRAINT fk_customer_zip_code_prefix
        FOREIGN KEY (customer_zip_code_prefix)
        REFERENCES public.zip_code_master (zip_code_prefix) -- Referencia a nova tabela zip_code_master
);

-- Adição de Chaves Estrangeiras pendentes em orders
ALTER TABLE public.orders
ADD CONSTRAINT fk_customer_id
    FOREIGN KEY (customer_id)
    REFERENCES public.customers (customer_id);

-- Adição de Chaves Estrangeiras pendentes em sellers (referenciando zip_code_master)
ALTER TABLE public.sellers
ADD CONSTRAINT fk_seller_zip_code_prefix
    FOREIGN KEY (seller_zip_code_prefix)
    REFERENCES public.zip_code_master (zip_code_prefix);
"""

# SQL para popular a tabela zip_code_master (executado após carregar 'geolocation')
POPULATE_ZIP_CODE_MASTER_SQL = """
INSERT INTO public.zip_code_master (zip_code_prefix, latitude, longitude, city, state)
SELECT DISTINCT ON (geolocation_zip_code_prefix)
       geolocation_zip_code_prefix,
       geolocation_lat,
       geolocation_lng,
       geolocation_city,
       geolocation_state
FROM public.geolocation
ORDER BY geolocation_zip_code_prefix, geolocation_lat, geolocation_lng;
"""

# --- 3. FUNÇÕES DE ETL ---
# ----------------------------------------------------------------------------------

def connect_db(dbname='postgres'): # Conecta ao banco de dados padrão 'postgres' para criar o DB principal
    """Conecta ao banco de dados PostgreSQL."""
    try:
        conn = psycopg2.connect(
            host=DB_CONFIG['host'],
            port=DB_CONFIG['port'],
            user=DB_CONFIG['user'],
            password=DB_CONFIG['password'],
            dbname=dbname
        )
        conn.autocommit = True # Para comandos como CREATE DATABASE
        print(f"Conectado ao banco de dados '{dbname}' com sucesso.")
        return conn
    except Exception as e:
        print(f"Erro ao conectar ao banco de dados '{dbname}': {e}")
        return None

def create_main_database_if_not_exists():
    """Cria o banco de dados principal (brazilian_ecommerce) se ele não existir."""
    conn = None
    try:
        # Tenta conectar ao banco de dados principal para verificar se existe
        conn = connect_db(dbname=DB_CONFIG['dbname'])
        if conn:
            print(f"Banco de dados '{DB_CONFIG['dbname']}' já existe. Prosseguindo.")
            conn.close()
            return
        
        # Se a conexão direta falhou (DB não existe ou erro de conexão), tenta criar
        # Conecta ao banco de dados padrão 'postgres' para criar um novo DB
        conn_postgres = connect_db(dbname='postgres')
        if conn_postgres:
            cur = conn_postgres.cursor()
            print(f"Criando banco de dados '{DB_CONFIG['dbname']}'...")
            cur.execute(f"CREATE DATABASE {DB_CONFIG['dbname']} WITH OWNER = {DB_CONFIG['user']} ENCODING = 'UTF8' LC_COLLATE = 'C' LC_CTYPE = 'C' TEMPLATE = template0 CONNECTION LIMIT = -1;")
            print(f"Banco de dados '{DB_CONFIG['dbname']}' criado com sucesso.")
            cur.close()
            conn_postgres.close()
        else:
            print("Não foi possível conectar ao banco de dados 'postgres' para criar o DB principal. Verifique as credenciais e o status do servidor.")
            raise Exception("Falha na conexão inicial ao PostgreSQL.")
    except Exception as e:
        print(f"Erro ao criar/verificar banco de dados principal: {e}")
        raise # Re-lança o erro para parar o processo ETL

def setup_tables(conn):
    """Executa o script SQL para dropar e criar as tabelas."""
    try:
        cur = conn.cursor()
        print("Executando script SQL para dropar e criar tabelas...")
        cur.execute(SCHEMA_SQL)
        conn.commit()
        print("Tabelas criadas/redefinidas com sucesso.")
    except Exception as e:
        print(f"Erro ao configurar tabelas: {e}")
        conn.rollback() # Reverte em caso de erro
        raise # Re-lança o erro para parar o processo ETL

def load_data_to_db(df, table_name, conn):
    """Carrega um DataFrame para uma tabela PostgreSQL."""
    try:
        # Convert NaT (Not a Time) to None for datetime columns
        for col in df.select_dtypes(include=['datetime64[ns, UTC]', 'datetime64[ns]']).columns:
            df[col] = df[col].replace({pd.NaT: None})

        # Substitui NaN (Not a Number) por None para compatibilidade com NULL do PostgreSQL
        df = df.where(pd.notnull(df), None)

        tuples = [tuple(x) for x in df.to_numpy()]
        cols = ','.join([f'"{c}"' for c in list(df.columns)]) # Adiciona aspas para nomes de colunas
        
        query = f"INSERT INTO public.{table_name}({cols}) VALUES %s;"
        cur = conn.cursor()
        extras.execute_values(cur, query, tuples, page_size=10000)
        conn.commit()
        print(f"Dados carregados para '{table_name}' com sucesso. ({len(df)} linhas)")
    except Exception as e:
        print(f"Erro ao carregar dados para '{table_name}': {e}")
        conn.rollback()
        raise # Re-lança o erro para parar o processo ETL

def get_existing_categories(conn):
    """Obtém as categorias existentes na tabela product_category_name_translation."""
    try:
        cur = conn.cursor()
        cur.execute("SELECT product_category_name FROM public.product_category_name_translation;")
        existing_categories = {row[0] for row in cur.fetchall()}
        cur.close()
        return existing_categories
    except Exception as e:
        print(f"Erro ao obter categorias existentes: {e}")
        return set()

def insert_missing_categories(conn, missing_categories_df):
    """Insere categorias ausentes na tabela product_category_name_translation."""
    if missing_categories_df.empty:
        return
    try:
        tuples = [tuple(x) for x in missing_categories_df.to_numpy()]
        cols = ','.join([f'"{c}"' for c in list(missing_categories_df.columns)])
        query = f"INSERT INTO public.product_category_name_translation({cols}) VALUES %s ON CONFLICT (product_category_name) DO NOTHING;"
        cur = conn.cursor()
        extras.execute_values(cur, query, tuples, page_size=1000)
        conn.commit()
        print(f"Inseridas {len(missing_categories_df)} categorias ausentes em 'product_category_name_translation'.")
    except Exception as e:
        print(f"Erro ao inserir categorias ausentes: {e}")
        conn.rollback()
        raise # Re-lança o erro

def get_existing_zip_codes(conn):
    """Obtém os CEPs existentes na tabela zip_code_master."""
    try:
        cur = conn.cursor()
        cur.execute("SELECT zip_code_prefix FROM public.zip_code_master;")
        existing_zips = {row[0] for row in cur.fetchall()}
        cur.close()
        return existing_zips
    except Exception as e:
        print(f"Erro ao obter CEPs existentes da zip_code_master: {e}")
        return set()

def get_existing_customer_ids(conn):
    """Obtém os customer_ids existentes na tabela customers."""
    try:
        cur = conn.cursor()
        cur.execute("SELECT customer_id FROM public.customers;")
        existing_customer_ids = {row[0] for row in cur.fetchall()}
        cur.close()
        return existing_customer_ids
    except Exception as e:
        print(f"Erro ao obter customer_ids existentes: {e}")
        return set()

def get_existing_product_ids(conn):
    """Obtém os product_ids existentes na tabela products."""
    try:
        cur = conn.cursor()
        cur.execute("SELECT product_id FROM public.products;")
        existing_product_ids = {row[0] for row in cur.fetchall()}
        cur.close()
        return existing_product_ids
    except Exception as e:
        print(f"Erro ao obter product_ids existentes: {e}")
        return set()

def get_existing_seller_ids(conn):
    """Obtém os seller_ids existentes na tabela sellers."""
    try:
        cur = conn.cursor()
        cur.execute("SELECT seller_id FROM public.sellers;")
        existing_seller_ids = {row[0] for row in cur.fetchall()}
        cur.close()
        return existing_seller_ids
    except Exception as e:
        print(f"Erro ao obter seller_ids existentes: {e}")
        return set()

def get_existing_order_ids(conn):
    """Obtém os order_ids existentes na tabela orders."""
    try:
        cur = conn.cursor()
        cur.execute("SELECT order_id FROM public.orders;")
        existing_order_ids = {row[0] for row in cur.fetchall()}
        cur.close()
        return existing_order_ids
    except Exception as e:
        print(f"Erro ao obter order_ids existentes: {e}")
        return set()

# --- 4. PROCESSO PRINCIPAL DE ETL ---
# ----------------------------------------------------------------------------------

def run_etl():
    """Orquestra o processo ETL completo."""
    print("Iniciando processo ETL...")

    # 1. Criar banco de dados principal (se necessário)
    try:
        create_main_database_if_not_exists()
    except Exception as e:
        print(f"Falha na etapa de criação do banco de dados: {e}")
        print("Encerrando ETL.")
        return

    # 2. Conectar ao banco de dados principal para operações de schema e dados
    conn = connect_db(dbname=DB_CONFIG['dbname'])
    if not conn:
        print("Não foi possível conectar ao banco de dados principal. Encerrando ETL.")
        return

    try:
        # 3. Configurar tabelas (dropar e criar)
        setup_tables(conn)

        # Dicionário para armazenar DataFrames limpos (não estritamente necessário para este fluxo, mas útil para depuração)
        cleaned_dfs = {}

        # --- Carregamento e Limpeza de Dados ---
        print("\n--- Carregando e limpando dados dos CSVs ---")

        # 1. product_category_name_translation
        print(f"Processando {CSV_FILES['product_category_name_translation']}...")
        df_cat_trans = pd.read_csv(os.path.join(CSV_BASE_PATH, CSV_FILES['product_category_name_translation']), encoding='utf-8')
        df_cat_trans.columns = df_cat_trans.columns.str.strip() # Remover espaços em branco nos nomes das colunas
        df_cat_trans['product_category_name'] = df_cat_trans['product_category_name'].astype(str).str.strip()
        df_cat_trans['product_category_name_english'] = df_cat_trans['product_category_name_english'].astype(str).str.strip()
        df_cat_trans.drop_duplicates(subset=['product_category_name'], inplace=True) # Garantir PK única
        load_data_to_db(df_cat_trans, 'product_category_name_translation', conn)
        cleaned_dfs['product_category_name_translation'] = df_cat_trans # Armazena para referência futura

        # 2. geolocation
        print(f"Processando {CSV_FILES['geolocation']}...")
        df_geo = pd.read_csv(os.path.join(CSV_BASE_PATH, CSV_FILES['geolocation']), encoding='utf-8')
        df_geo.columns = df_geo.columns.str.strip()
        # Correção de tipo de dados para lat/lng (garantir que são numéricos)
        df_geo['geolocation_lat'] = pd.to_numeric(df_geo['geolocation_lat'], errors='coerce')
        df_geo['geolocation_lng'] = pd.to_numeric(df_geo['geolocation_lng'], errors='coerce')
        df_geo.dropna(subset=['geolocation_lat', 'geolocation_lng'], inplace=True) # Remover linhas com lat/lng nulos após coerção
        # Remover duplicatas da PK composta (zip, lat, lng)
        df_geo.drop_duplicates(subset=['geolocation_zip_code_prefix', 'geolocation_lat', 'geolocation_lng'], inplace=True)
        load_data_to_db(df_geo, 'geolocation', conn)
        cleaned_dfs['geolocation'] = df_geo

        # 3. Popular zip_code_master a partir de geolocation (executado via SQL)
        print("Populando 'zip_code_master' a partir de 'geolocation'...")
        cur = conn.cursor()
        cur.execute(POPULATE_ZIP_CODE_MASTER_SQL)
        conn.commit()
        print("Tabela 'zip_code_master' populada com sucesso.")

        # 4. customers (depende de zip_code_master) - MOVIDO PARA CIMA
        print(f"Processando {CSV_FILES['customers']}...")
        df_customers = pd.read_csv(os.path.join(CSV_BASE_PATH, CSV_FILES['customers']), encoding='utf-8')
        df_customers.columns = df_customers.columns.str.strip()
        df_customers.drop_duplicates(subset=['customer_id'], inplace=True) # Garantir PK única

        # Filtrar clientes com CEPs não existentes em zip_code_master
        existing_zips = get_existing_zip_codes(conn)
        initial_customer_count = len(df_customers)
        df_customers = df_customers[df_customers['customer_zip_code_prefix'].astype(str).isin(existing_zips)] # Cast to str for comparison
        if len(df_customers) < initial_customer_count:
            print(f"Removidas {initial_customer_count - len(df_customers)} linhas de 'customers' devido a CEPs não encontrados em 'zip_code_master'.")
        load_data_to_db(df_customers, 'customers', conn)
        cleaned_dfs['customers'] = df_customers

        # 5. sellers (depende de zip_code_master) - MOVIDO PARA CIMA
        print(f"Processando {CSV_FILES['sellers']}...")
        df_sellers = pd.read_csv(os.path.join(CSV_BASE_PATH, CSV_FILES['sellers']), encoding='utf-8')
        df_sellers.columns = df_sellers.columns.str.strip()
        df_sellers.drop_duplicates(subset=['seller_id'], inplace=True) # Garantir PK única

        # Filtrar vendedores com CEPs não existentes em zip_code_master
        initial_seller_count = len(df_sellers)
        df_sellers = df_sellers[df_sellers['seller_zip_code_prefix'].astype(str).isin(existing_zips)] # Cast to str for comparison
        if len(df_sellers) < initial_seller_count:
            print(f"Removidas {initial_seller_count - len(df_sellers)} linhas de 'sellers' devido a CEPs não encontrados em 'zip_code_master'.")
        load_data_to_db(df_sellers, 'sellers', conn)
        cleaned_dfs['sellers'] = df_sellers

        # 6. products (depende de product_category_name_translation) - ORDEM MANTIDA
        print(f"Processando {CSV_FILES['products']}...")
        df_products = pd.read_csv(os.path.join(CSV_BASE_PATH, CSV_FILES['products']), encoding='utf-8')
        df_products.columns = df_products.columns.str.strip()
        df_products['product_category_name'] = df_products['product_category_name'].astype(str).str.strip() # Remove espaços em branco

        # CORREÇÃO: Converter colunas BIGINT para numérico e tratar NaNs
        bigint_cols_products = [
            'product_name_lenght', 'product_description_lenght', 'product_photos_qty',
            'product_weight_g', 'product_length_cm', 'product_height_cm', 'product_width_cm'
        ]
        for col in bigint_cols_products:
            df_products[col] = pd.to_numeric(df_products[col], errors='coerce').astype('Int64') # Use 'Int64' para suportar NaNs
        df_products.dropna(subset=bigint_cols_products, inplace=True) # Remover linhas onde a conversão falhou

        df_products.drop_duplicates(subset=['product_id'], inplace=True) # Garantir PK única

        # Lidar com categorias de produtos ausentes em product_category_name_translation
        existing_categories = get_existing_categories(conn)
        product_categories = set(df_products['product_category_name'].dropna().unique())
        missing_categories = product_categories - existing_categories

        if missing_categories:
            print(f"Identificadas {len(missing_categories)} categorias de produtos ausentes. Inserindo...")
            # Cria um DataFrame para as categorias ausentes (sem tradução, usa o próprio nome)
            missing_cat_df = pd.DataFrame({
                'product_category_name': list(missing_categories),
                'product_category_name_english': list(missing_categories) # Usa o próprio nome como tradução padrão
            })
            insert_missing_categories(conn, missing_cat_df)
        else:
            print("Nenhuma categoria de produto ausente identificada.")

        load_data_to_db(df_products, 'products', conn)
        cleaned_dfs['products'] = df_products

        # 7. orders (depende de customers) - AGORA VEM DEPOIS DE CUSTOMERS
        print(f"Processando {CSV_FILES['orders']}...")
        df_orders = pd.read_csv(os.path.join(CSV_BASE_PATH, CSV_FILES['orders']), encoding='utf-8')
        df_orders.columns = df_orders.columns.str.strip()
        # Converter colunas de data/hora
        datetime_cols_orders = ['order_purchase_timestamp', 'order_approved_at', 'order_delivered_carrier_date', 'order_delivered_customer_date', 'order_estimated_delivery_date']
        for col in datetime_cols_orders:
            df_orders[col] = pd.to_datetime(df_orders[col], errors='coerce', utc=True) # errors='coerce' para NaN em erros
        df_orders.drop_duplicates(subset=['order_id'], inplace=True) # Garantir PK única

        # NOVO: Filtrar orders com customer_id não existentes em customers
        existing_customer_ids = get_existing_customer_ids(conn) # Obtém os IDs de clientes que foram carregados
        initial_order_count = len(df_orders)
        df_orders = df_orders[df_orders['customer_id'].astype(str).isin(existing_customer_ids)]
        if len(df_orders) < initial_order_count:
            print(f"Removidas {initial_order_count - len(df_orders)} linhas de 'orders' devido a customer_ids não encontrados em 'customers'.")

        load_data_to_db(df_orders, 'orders', conn)
        cleaned_dfs['orders'] = df_orders

        # 8. order_reviews (depende de orders)
        print(f"Processando {CSV_FILES['order_reviews']}...")
        df_reviews = pd.read_csv(os.path.join(CSV_BASE_PATH, CSV_FILES['order_reviews']), encoding='utf-8')
        df_reviews.columns = df_reviews.columns.str.strip()
        df_reviews.drop_duplicates(subset=['review_id'], inplace=True) # Remover duplicatas da PK
        # Converter colunas de data/hora
        datetime_cols_reviews = ['review_creation_date', 'review_answer_timestamp']
        for col in datetime_cols_reviews:
            df_reviews[col] = pd.to_datetime(df_reviews[col], errors='coerce', utc=True)
        
        # NOVO: Filtrar order_reviews com order_id não existentes em orders
        existing_order_ids = get_existing_order_ids(conn) # Obtém os IDs de pedidos que foram carregados
        initial_review_count = len(df_reviews)
        df_reviews = df_reviews[df_reviews['order_id'].astype(str).isin(existing_order_ids)]
        if len(df_reviews) < initial_review_count:
            print(f"Removidas {initial_review_count - len(df_reviews)} linhas de 'order_reviews' devido a order_ids não encontrados em 'orders'.")

        load_data_to_db(df_reviews, 'order_reviews', conn)
        cleaned_dfs['order_reviews'] = df_reviews

        # 9. order_payments (depende de orders)
        print(f"Processando {CSV_FILES['order_payments']}...")
        df_payments = pd.read_csv(os.path.join(CSV_BASE_PATH, CSV_FILES['order_payments']), encoding='utf-8')
        df_payments.columns = df_payments.columns.str.strip()
        # Correção de tipo de dados para payment_value
        df_payments['payment_value'] = pd.to_numeric(df_payments['payment_value'], errors='coerce')
        df_payments.dropna(subset=['payment_value'], inplace=True) # Remover linhas com payment_value nulo após coerção
        # PK composta (order_id, payment_sequential)
        df_payments.drop_duplicates(subset=['order_id', 'payment_sequential'], inplace=True)

        # NOVO: Filtrar order_payments com order_id não existentes em orders
        # existing_order_ids já foi obtido para order_reviews
        initial_payment_count = len(df_payments)
        df_payments = df_payments[df_payments['order_id'].astype(str).isin(existing_order_ids)]
        if len(df_payments) < initial_payment_count:
            print(f"Removidas {initial_payment_count - len(df_payments)} linhas de 'order_payments' devido a order_ids não encontrados em 'orders'.")

        load_data_to_db(df_payments, 'order_payments', conn)
        cleaned_dfs['order_payments'] = df_payments

        # 10. order_items (depende de orders, products, sellers)
        print(f"Processando {CSV_FILES['order_items']}...")
        df_items = pd.read_csv(os.path.join(CSV_BASE_PATH, CSV_FILES['order_items']), encoding='utf-8')
        df_items.columns = df_items.columns.str.strip()
        # Correção de tipo de dados para price e freight_value
        df_items['price'] = pd.to_numeric(df_items['price'], errors='coerce')
        df_items['freight_value'] = pd.to_numeric(df_items['freight_value'], errors='coerce')
        df_items.dropna(subset=['price', 'freight_value'], inplace=True) # Remover linhas com valores nulos após coerção
        # Converter coluna de data/hora
        df_items['shipping_limit_date'] = pd.to_datetime(df_items['shipping_limit_date'], errors='coerce', utc=True)
        # PK composta (order_id, order_item_id)
        df_items.drop_duplicates(subset=['order_id', 'order_item_id'], inplace=True)

        # NOVO: Filtrar order_items com FKs não existentes
        # existing_order_ids já foi obtido
        existing_product_ids = get_existing_product_ids(conn) # Obtém os IDs de produtos que foram carregados
        existing_seller_ids = get_existing_seller_ids(conn)   # Obtém os IDs de vendedores que foram carregados

        initial_item_count = len(df_items)
        df_items = df_items[df_items['order_id'].astype(str).isin(existing_order_ids)]
        df_items = df_items[df_items['product_id'].astype(str).isin(existing_product_ids)]
        df_items = df_items[df_items['seller_id'].astype(str).isin(existing_seller_ids)]

        if len(df_items) < initial_item_count:
            print(f"Removidas {initial_item_count - len(df_items)} linhas de 'order_items' devido a FKs não encontradas.")

        load_data_to_db(df_items, 'order_items', conn)
        cleaned_dfs['order_items'] = df_items


        print("\n--- Verificando Chaves Estrangeiras ---")
        print("Verificações de chave estrangeira foram incorporadas na lógica de carregamento e no schema SQL.")

    except Exception as e:
        print(f"\nERRO FATAL NO ETL: {e}")
        print("Processo ETL encerrado devido a erro.")
    finally:
        if conn:
            conn.close()
            print("Conexão com o banco de dados fechada.")
    print("\nProcesso ETL concluído.")

if __name__ == "__main__":
    run_etl()


Iniciando processo ETL...
Conectado ao banco de dados 'brazilian_ecommerce' com sucesso.
Banco de dados 'brazilian_ecommerce' já existe. Prosseguindo.
Conectado ao banco de dados 'brazilian_ecommerce' com sucesso.
Executando script SQL para dropar e criar tabelas...
Tabelas criadas/redefinidas com sucesso.

--- Carregando e limpando dados dos CSVs ---
Processando product_category_name_translation.csv...
Dados carregados para 'product_category_name_translation' com sucesso. (72 linhas)
Processando olist_geolocation_dataset.csv...
Dados carregados para 'geolocation' com sucesso. (720154 linhas)
Populando 'zip_code_master' a partir de 'geolocation'...
Tabela 'zip_code_master' populada com sucesso.
Processando olist_customers_dataset.csv...
Removidas 278 linhas de 'customers' devido a CEPs não encontrados em 'zip_code_master'.
Dados carregados para 'customers' com sucesso. (99163 linhas)
Processando olist_sellers_dataset.csv...
Removidas 7 linhas de 'sellers' devido a CEPs não encontrados 