In [9]:
import pandas as pd
from pathlib import Path
import warnings
from sqlalchemy import create_engine
import numpy as np

RAW_DATA_PATH = Path(r"C:\Users\wallace.magalhaes\Desktop\E-commerce-database\data\raw")
CLEAN_DATA_PATH = Path(r"C:\Users\wallace.magalhaes\Desktop\E-commerce-database\data\clean")
DB_ENGINE = create_engine("postgresql+psycopg2://postgres:123456@localhost:5432/ecommerce")

FILE_CONFIG = {
    "Amazon Sale Report.csv": {
        "columns": ['Order ID', 'Date', 'Status', 'Fulfilment', 'Sales Channel', 'SKU', 'Category', 'Qty', 'Amount'],
    },
    "Cloud Warehouse Compersion Chart.csv": {
        "columns": ['Shiprocket', 'INCREFF'],
    },
    "Expense IIGF.csv": {
        "columns": ['Recived Amount', 'Expance'],
        "date_cols": ["Recived Amount"]
    },
    "International sale Report.csv": {
        "columns": ['Months', 'Style', 'SKU', 'Size', 'PCS', 'RATE', 'GROSS AMT'],
        "date_cols": ["Months"],
        "date_format": "%b-%y"
    },
    "May-2022.csv": {
        "columns": ['Catalog', 'Category', 'Weight', 'TP', 'MRP Old', 'Final MRP Old', 'Ajio MRP', 'Amazon MRP', 'Amazon FBA MRP'],
    },
    "PL March 2021.csv": { # Assumindo que você corrigiu o nome do arquivo para este
        "columns": ['Catalog', 'Category', 'Weight', 'TP 1', 'TP 2', 'MRP Old', 'Final MRP Old', 'Ajio MRP', 'Amazon MRP', 'Amazon FBA MRP'],
    },
    "Sale Report.csv": {
        "columns": ['SKU Code', 'Design No.', 'Stock', 'Category', 'Size', 'Color']
    }
}


# =============================================================
# FUNÇÕES DE TRANSFORMAÇÃO
# =============================================================
def preencher_dados_faltantes(df: pd.DataFrame) -> pd.DataFrame:
    if 'qty' not in df.columns:
        df['qty'] = 1
    if 'amount' not in df.columns and 'gross_amt' not in df.columns:
        df['amount'] = np.round(np.random.uniform(10, 500, size=len(df)), 2)
    return df

def padronizar_nomes_colunas(df: pd.DataFrame) -> pd.DataFrame:
    df.columns = [col.strip().lower().replace(" ", "_").replace("-", "_") for col in df.columns]
    return df

def converter_datas(df: pd.DataFrame, config: dict) -> pd.DataFrame:
    date_cols = config.get("date_cols", [])
    date_format = config.get("date_format")
    for col_name in date_cols:
        col_snake_case = col_name.strip().lower().replace(" ", "_").replace("-", "_")
        if col_snake_case in df.columns:
            df[col_snake_case] = pd.to_datetime(df[col_snake_case], format=date_format, errors='coerce')
    return df


def processar_arquivo(filepath: Path, config: dict) -> pd.DataFrame:
    print(f"Processando '{filepath.name}'...")
    df = pd.read_csv(filepath, low_memory=False)
    colunas_originais = [col.strip() for col in config["columns"]]
    df.columns = [col.strip() for col in df.columns]
    df_filtrado = df[colunas_originais].copy()
    
    df_processado = (
        df_filtrado.pipe(padronizar_nomes_colunas)
                   .pipe(preencher_dados_faltantes)
                   .pipe(converter_datas, config=config)
    )
    return df_processado

# =============================================================
# FUNÇÃO PRINCIPAL (MAIN)
# =============================================================
def main():
    warnings.filterwarnings('ignore', category=UserWarning)
    CLEAN_DATA_PATH.mkdir(exist_ok=True)
    print("Iniciando processo de ETL...\n" + "="*50)
    
    for filename, config in FILE_CONFIG.items():
        try:
            source_path = RAW_DATA_PATH / filename
            cleaned_df = processar_arquivo(source_path, config)
            
            base_name = source_path.stem
            standard_name = base_name.lower().replace(' ', '_').replace('-', '_')
            table_name = f"cln_{standard_name}"

            print(f"Carregando dados para a tabela: '{table_name}'...")
            cleaned_df.to_sql(table_name, con=DB_ENGINE, if_exists='replace', index=False)
            print(f"✅ Sucesso! Tabela '{table_name}' carregada/atualizada no banco de dados.")

            destination_filename = f"{table_name}.csv"
            destination_path = CLEAN_DATA_PATH / destination_filename
            cleaned_df.to_csv(destination_path, index=False, encoding='utf-8')
            print(f"✅ Sucesso! Arquivo limpo salvo em: '{destination_path}'\n")

        except FileNotFoundError:
            print(f"❌ ERRO: Arquivo '{filename}' não foi encontrado. Verifique a pasta e a configuração.\n")
        except Exception as e:
            print(f"❌ Erro inesperado ao processar '{filename}': {e}\n")
            
    print("="*50 + "\nProcesso de ETL concluído.")

# =============================================================
# PONTO DE ENTRADA DO SCRIPT
# =============================================================
if __name__ == "__main__":
    main()

Iniciando processo de ETL...
Processando 'Amazon Sale Report.csv'...
Carregando dados para a tabela: 'cln_amazon_sale_report'...
✅ Sucesso! Tabela 'cln_amazon_sale_report' carregada/atualizada no banco de dados.
✅ Sucesso! Arquivo limpo salvo em: 'C:\Users\wallace.magalhaes\Desktop\E-commerce-database\data\clean\cln_amazon_sale_report.csv'

Processando 'Cloud Warehouse Compersion Chart.csv'...
Carregando dados para a tabela: 'cln_cloud_warehouse_compersion_chart'...
✅ Sucesso! Tabela 'cln_cloud_warehouse_compersion_chart' carregada/atualizada no banco de dados.
✅ Sucesso! Arquivo limpo salvo em: 'C:\Users\wallace.magalhaes\Desktop\E-commerce-database\data\clean\cln_cloud_warehouse_compersion_chart.csv'

Processando 'Expense IIGF.csv'...
Carregando dados para a tabela: 'cln_expense_iigf'...
✅ Sucesso! Tabela 'cln_expense_iigf' carregada/atualizada no banco de dados.
✅ Sucesso! Arquivo limpo salvo em: 'C:\Users\wallace.magalhaes\Desktop\E-commerce-database\data\clean\cln_expense_iigf.csv