# Extract Pipelines

In [None]:
import duckdb
from deltalake import write_deltalake
import logging
import os

# Configuração do logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)

def extract_func_sqlite(db_path: str, sqlite_table: str, bronze_delta_path: str, name_table: str, mode: str = "overwrite"):
    conn = duckdb.connect()
    
    try:
        logger.info(f"Iniciando extração da tabela SQLite - {sqlite_table}")
        conn.execute(f"ATTACH '{db_path}' AS sqlite_db;") # conectar com sqlite usando duckdb
        dataframe = conn.sql(f"SELECT * FROM sqlite_db.{sqlite_table}").arrow()  # converte para PyArrow Table
        table_path = os.path.join(bronze_delta_path, name_table) 
        os.makedirs(table_path, exist_ok=True) # criação de diretorio antes de salvar
        write_deltalake(table_path, dataframe, mode=mode)
        logger.info(f"\033[32m[OK]\033[0m Extração da tabela {sqlite_table} concluída e salva em {table_path}.")

    except Exception as e:
        logger.error(f"\033[031m[ERROR]\033[0m Erro ao processar {sqlite_table}: {str(e)}")

    finally:
        conn.close()

def extract_func_csv(source_path_csv: str, bronze_delta_path: str, name_table: str, mode: str = "overwrite"):
    conn = duckdb.connect()
    
    try:
        logger.info(f"Iniciando extração do arquivo {source_path_csv}")        
        dataframe = conn.sql(f"SELECT * FROM read_csv_auto('{source_path_csv}')").arrow()
        table_path = os.path.join(bronze_delta_path, name_table)
        os.makedirs(table_path, exist_ok=True)
        write_deltalake(table_path, dataframe, mode=mode)
        logger.info(f"\033[32m[OK]\033[0m Processo de extração do .CSV {source_path_csv} foi concluído.")

    except Exception as e:
        logger.error(f"\033[031m[ERROR]\033[0m Erro ao processar {source_path_csv}: {str(e)}")

    finally:
        conn.close()

if __name__ == "__main__":
    # extração de dados .csv
    extract_func_csv(source_path_csv="../data/olist_customers_dataset.csv", name_table="customers_bronze", bronze_delta_path="../delta_lake/bronze")
    extract_func_csv(source_path_csv="../data/olist_geolocation_dataset.csv", name_table="geolocation_bronze", bronze_delta_path="../delta_lake/bronze")
    extract_func_csv(source_path_csv="../data/olist_order_items_dataset.csv", name_table="order_items_bronze", bronze_delta_path="../delta_lake/bronze")
    extract_func_csv(source_path_csv="../data/olist_order_payments_dataset.csv", name_table="payments_bronze", bronze_delta_path="../delta_lake/bronze")
    extract_func_csv(source_path_csv="../data/olist_order_reviews_dataset.csv", name_table="reviews_bronze", bronze_delta_path="../delta_lake/bronze")
    extract_func_csv(source_path_csv="../data/olist_orders_dataset.csv", name_table="orders_bronze", bronze_delta_path="../delta_lake/bronze")
    extract_func_csv(source_path_csv="../data/olist_products_dataset.csv", name_table="products_bronze", bronze_delta_path="../delta_lake/bronze")
    extract_func_csv(source_path_csv="../data/olist_sellers_dataset.csv", name_table="sellers_bronze", bronze_delta_path="../delta_lake/bronze")
    extract_func_csv(source_path_csv="../data/product_category_name_translation.csv", name_table="product_category_name_translation_bronze", bronze_delta_path="../delta_lake/bronze")

    # extração de dados sqlite
    extract_func_sqlite(db_path="../data/olist.sqlite", sqlite_table="leads_qualified", name_table="leads_qualified_bronze", bronze_delta_path="../delta_lake/bronze")
    extract_func_sqlite(db_path="../data/olist.sqlite", sqlite_table="leads_closed", name_table="leads_closed_bronze", bronze_delta_path="../delta_lake/bronze")


2025-03-29 20:52:38,406 - INFO - Iniciando extração do arquivo ../data/olist_customers_dataset.csv
2025-03-29 20:52:38,572 - INFO - [32m[OK][0m Processo de extração do ../data/olist_customers_dataset.csv foi concluído.
2025-03-29 20:52:38,585 - INFO - Iniciando extração do arquivo ../data/olist_geolocation_dataset.csv
2025-03-29 20:52:38,788 - INFO - [32m[OK][0m Processo de extração do ../data/olist_geolocation_dataset.csv foi concluído.
2025-03-29 20:52:38,811 - INFO - Iniciando extração do arquivo ../data/olist_order_items_dataset.csv
2025-03-29 20:52:39,018 - INFO - [32m[OK][0m Processo de extração do ../data/olist_order_items_dataset.csv foi concluído.
2025-03-29 20:52:39,032 - INFO - Iniciando extração do arquivo ../data/olist_order_payments_dataset.csv
2025-03-29 20:52:39,137 - INFO - [32m[OK][0m Processo de extração do ../data/olist_order_payments_dataset.csv foi concluído.
2025-03-29 20:52:39,148 - INFO - Iniciando extração do arquivo ../data/olist_order_reviews_dataset

# Transform Pipelines

In [2]:
import pandas as pd
import duckdb

def pandas_sql(query: str) -> pd.DataFrame:
    """Executa uma consulta SQL e retorna um DataFrame."""
    conn = duckdb.connect()
    result = conn.sql(query).fetchdf()
    conn.close()
    return result

pandas_sql("SELECT * FROM delta_scan('../delta_lake/bronze/leads_closed_bronze') LIMIT 5")

Unnamed: 0,mql_id,seller_id,sdr_id,sr_id,won_date,business_segment,lead_type,lead_behaviour_profile,has_company,has_gtin,average_stock,business_type,declared_product_catalog_size,declared_monthly_revenue
0,5420aad7fec3549a85876ba1c529bd84,2c43fb513632d29b3b58df74816f1b06,a8387c01a09e99ce014107505b92388c,4ef15afb4b2723d8f3d81e51ec7afefe,2018-02-26 19:58:54,pet,online_medium,cat,,,,reseller,,0.0
1,a555fb36b9368110ede0f043dfc3b9a0,bbb7d7893a450660432ea6652310ebb7,09285259593c61296eef10c734121d5b,d3d1e91a157ea7f90548eef82f1955e3,2018-05-08 20:17:59,car_accessories,industry,eagle,,,,reseller,,0.0
2,327174d3648a2d047e8940d7d15204ca,612170e34b97004b3ba37eae81836b4c,b90f87164b5f8c2cfa5c8572834dbe3f,6565aa9ce3178a5caf6171827af3a9ba,2018-06-05 17:27:23,home_appliances,online_big,cat,,,,reseller,,0.0
3,f5fee8f7da74f4887f5bcae2bafb6dd6,21e1781e36faf92725dde4730a88ca0f,56bf83c4bb35763a51c2baab501b4c67,d3d1e91a157ea7f90548eef82f1955e3,2018-01-17 13:51:03,food_drink,online_small,,,,,reseller,,0.0
4,ffe640179b554e295c167a2f6be528e0,ed8cb7b190ceb6067227478e48cf8dde,4b339f9567d060bcea4f5136b9f5949e,d3d1e91a157ea7f90548eef82f1955e3,2018-07-03 20:17:45,home_appliances,industry,wolf,,,,manufacturer,,0.0


In [2]:
pandas_sql("DESCRIBE SELECT * FROM delta_scan('../delta_lake/bronze/leads_closed_bronze')")
# pandas_sql("SELECT COUNT(*) FROM delta_scan('../delta_lake/bronze/leads_closed_bronze') WHERE declared_monthly_revenue >= 0.0 LIMIT 5")

Unnamed: 0,column_name,column_type,null,key,default,extra
0,mql_id,VARCHAR,YES,,,
1,seller_id,VARCHAR,YES,,,
2,sdr_id,VARCHAR,YES,,,
3,sr_id,VARCHAR,YES,,,
4,won_date,VARCHAR,YES,,,
5,business_segment,VARCHAR,YES,,,
6,lead_type,VARCHAR,YES,,,
7,lead_behaviour_profile,VARCHAR,YES,,,
8,has_company,BIGINT,YES,,,
9,has_gtin,BIGINT,YES,,,


In [3]:
import os
import logging
import duckdb
from deltalake import write_deltalake

# Configuração do logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)

def transform_pipeline_sql(query: str, table_name: str, mode: str = "overwrite"):
    """Executa uma query SQL sobre um Delta Table e salva o resultado na camada Silver."""
    
    silver_path = "../delta_lake/silver/"
    silver_path_delta = f"{silver_path}{table_name}"
    os.makedirs(silver_path, exist_ok=True) # cria o diretório se não existir

    try:
        with duckdb.connect() as conn:
            logger.info(f"Executando transformação na tabela '{table_name}'")
            df_transformed = conn.sql(query).arrow()
            write_deltalake(silver_path_delta, df_transformed, mode=mode) # gravando o resultado em uma delta table
            logger.info(f"\033[32m[OK]\033[0m Tabela '{table_name}' processada com sucesso.")

    except Exception as e:
        logger.error(f"\033[31m[ERROR]\033[0m Erro inesperado ao processar '{table_name}': {str(e)}")

    finally:
        if conn:
            conn.close()

## Transform pipelines SQL

In [None]:
# customers table
transform_pipeline_sql(
    query=f"""
    SELECT
        customer_id,
        customer_unique_id,
        LOWER(TRIM(customer_zip_code_prefix)) AS customer_cep,
        LOWER(TRIM(customer_city)) AS customer_city,
        UPPER(TRIM(customer_state)) AS customer_state,
    FROM delta_scan('../delta_lake/bronze/customers_bronze')
    """,
    table_name="customers_silver"
)

In [None]:
# geolocation table
transform_pipeline_sql(
    query=f"""
    SELECT
        geolocation_zip_code_prefix AS geolocation_cep,
        COALESCE(geolocation_lat, NULL) AS geolocation_lat,  -- Mantém NULL para evitar coordenadas erradas
        COALESCE(geolocation_lng, NULL) AS geolocation_lng,
        LOWER(TRIM(COALESCE(geolocation_city, 'N/A'))) AS geolocation_city,
        UPPER(TRIM(COALESCE(geolocation_state, 'N/A'))) AS geolocation_state
    FROM delta_scan('../delta_lake/bronze/geolocation_bronze');
    """, 
    table_name="geolocation_silver"
)

In [None]:
# order_items table
transform_pipeline_sql(
    query=f"""
    SELECT 
        order_id,
        order_item_id,
        product_id,
        seller_id,
        CAST(shipping_limit_date AS TIMESTAMP) AS shipping_limit_date,
        CAST(price AS DOUBLE) AS price,
        CAST(freight_value AS DOUBLE) AS freight_value
    FROM delta_scan('../delta_lake/bronze/order_items_bronze')
    """,
    table_name="order_items_silver"
)

In [None]:
# orders table - transformar e retornar delta table com apenas os PEDIDOS ENTREGUES
transform_pipeline_sql(
    query=f"""
    SELECT
        CAST(order_id AS VARCHAR) AS order_id,
        CAST(customer_id AS VARCHAR) AS customer_id,
        CAST(order_status AS VARCHAR) AS order_status,
        CAST(order_purchase_timestamp AS TIMESTAMP) AS order_purchase_timestamp,
        CAST(order_approved_at AS TIMESTAMP) AS order_approved_at,
        CAST(order_delivered_carrier_date AS TIMESTAMP) AS order_delivered_carrier_date,
        CAST(order_delivered_customer_date AS TIMESTAMP) AS order_delivered_customer_date,
        CAST(order_estimated_delivery_date AS TIMESTAMP) AS order_estimated_delivery_date
    FROM delta_scan('../delta_lake/bronze/orders_bronze')
    WHERE order_status = 'delivered'
    """,
    table_name="orders_only_delivered_silver"
)

In [None]:
# orders table - transformar e retornar delta table com TODOS OS DADOS
transform_pipeline_sql(
    query=f"""
    SELECT
        CAST(order_id AS VARCHAR) AS order_id,
        CAST(customer_id AS VARCHAR) AS customer_id,
        CAST(order_status AS VARCHAR) AS order_status,
        CAST(order_purchase_timestamp AS TIMESTAMP) AS order_purchase_timestamp,
        CAST(order_approved_at AS TIMESTAMP) AS order_approved_at,
        CAST(order_delivered_carrier_date AS TIMESTAMP) AS order_delivered_carrier_date,
        CAST(order_delivered_customer_date AS TIMESTAMP) AS order_delivered_customer_date,
        CAST(order_estimated_delivery_date AS TIMESTAMP) AS order_estimated_delivery_date
    FROM delta_scan('../delta_lake/bronze/orders_bronze')
    """,
    table_name="orders_full_data_silver"
)

In [None]:
# products table
transform_pipeline_sql(
    query=f"""
    SELECT
        CAST(product_id AS VARCHAR) AS product_id,
        COALESCE(LOWER(TRIM(product_category_name)), 'unknown') AS product_category,
        COALESCE(CAST(product_name_lenght AS INT), 0) AS product_name_length,
        COALESCE(CAST(product_description_lenght AS INT), 0) AS product_description_length,
        COALESCE(CAST(product_photos_qty AS INT), 0) AS product_photos_qty,
        COALESCE(CAST(product_weight_g AS INT), 0) AS product_weight_g,
        COALESCE(CAST(product_length_cm AS INT), 0) AS product_length_cm,
        COALESCE(CAST(product_height_cm AS INT), 0) AS product_height_cm,
        COALESCE(CAST(product_width_cm AS INT), 0) AS product_width_cm
    FROM delta_scan('../delta_lake/bronze/products_bronze');
    """,
    table_name="products_silver"
)

In [None]:
# sellers table
transform_pipeline_sql(
    query=f"""
    SELECT
        seller_id,
        seller_zip_code_prefix AS seller_cep,
        LOWER(TRIM(seller_city)) AS seller_city,
        UPPER(TRIM(seller_state)) AS seller_state
    FROM delta_scan('../delta_lake/bronze/sellers_bronze')
    """,
    table_name="sellers_silver"
)

In [None]:
# leads_closed table
transform_pipeline_sql(
    query=f"""
    SELECT
        mql_id
        seller_id
        sdr_id
        sr_id
        won_date
        business_segment
        lead_type
        lead_behaviour_profile
        has_company
        has_gtin
        average_stock
        business_type
        declared_product_catalog_size
        declared_monthly_revenue
    FROM delta_scan('../delta_lake/bronze/leads_closed_bronze')
    """,
    table_name="leads_closed_silver"
)

In [None]:
# has_company precisa ser limpo
# sdr_id precisa ser renomeado para pre_seller_id


pandas_sql("""
    SELECT
        CAST(mql_id AS VARCHAR) AS mql_id,
        CAST(seller_id AS VARCHAR) AS seller_id,
        CAST(sdr_id AS VARCHAR) AS sdr_id,
        CAST(sr_id AS VARCHAR) AS sr_id,
        CAST(won_date AS DATE) AS won_date,
        CAST(business_segment AS VARCHAR) AS business_segment,
        CAST(lead_type AS VARCHAR) AS lead_type,
        CAST(lead_behaviour_profile AS VARCHAR) AS lead_behaviour_profile,
        CAST(has_company AS BOOLEAN) AS has_company,
        CAST(has_gtin AS BOOLEAN) AS has_gtin,
        CAST(average_stock AS FLOAT) AS average_stock,
        CAST(declared_product_catalog_size AS FLOAT) AS declared_product_catalog_size,
        CAST(declared_monthly_revenue AS FLOAT) AS declared_monthly_revenue
    FROM delta_scan('../delta_lake/bronze/leads_closed_bronze')
    LIMIT 3
    """)

Unnamed: 0,mql_id,seller_id,sdr_id,sr_id,won_date,business_segment,lead_type,lead_behaviour_profile,has_company,has_gtin,average_stock,declared_product_catalog_size,declared_monthly_revenue
0,5420aad7fec3549a85876ba1c529bd84,2c43fb513632d29b3b58df74816f1b06,a8387c01a09e99ce014107505b92388c,4ef15afb4b2723d8f3d81e51ec7afefe,2018-02-26,pet,online_medium,cat,,,,,0.0
1,a555fb36b9368110ede0f043dfc3b9a0,bbb7d7893a450660432ea6652310ebb7,09285259593c61296eef10c734121d5b,d3d1e91a157ea7f90548eef82f1955e3,2018-05-08,car_accessories,industry,eagle,,,,,0.0
2,327174d3648a2d047e8940d7d15204ca,612170e34b97004b3ba37eae81836b4c,b90f87164b5f8c2cfa5c8572834dbe3f,6565aa9ce3178a5caf6171827af3a9ba,2018-06-05,home_appliances,online_big,cat,,,,,0.0


In [14]:
# visualizando as categorias de business_segment
# pandas_sql("SELECT DISTINCT business_segment FROM delta_scan('../delta_lake/bronze/leads_closed_bronze')")

# visualizando as categorias de lead_behaviour_profile
# pandas_sql("SELECT DISTINCT lead_behaviour_profile FROM delta_scan('../delta_lake/bronze/leads_closed_bronze')")

# visualizando as categorias de has_company
pandas_sql("SELECT DISTINCT has_company FROM delta_scan('../delta_lake/bronze/leads_closed_bronze')")

Unnamed: 0,has_company
0,0.0
1,
2,1.0


In [15]:
pandas_sql("DESCRIBE SELECT has_company FROM delta_scan('../delta_lake/bronze/leads_closed_bronze')")

Unnamed: 0,column_name,column_type,null,key,default,extra
0,has_company,BIGINT,YES,,,


## Silver Layer Transformations for especially tables

Neste caso será necessário criar novas tabelas dentro da camada Silver:

1. Tabela customers com geolocation - customers_geolocation_silver
2. 