# ETL: Silver → Gold Layer

**Objetivo:** Transformar dados do Silver em Data Warehouse (Gold) com Star Schema.

**Processo:**
1. Extrair dados de `silver.uber_silver`
2. Popular 4 dimensões consolidadas (dim_dtt, dim_cst, dim_loc, dim_rid)
3. Popular tabela fato `fat_cor` com métricas e FKs
4. Validar Data Warehouse

**Schema:** dwh.dim_dtt, dim_cst, dim_loc, dim_rid → fat_cor

**Mnemônicos:** Utiliza nomenclatura silábica (3-4 caracteres). Veja docs/Mnemonicos_DWH.md

In [33]:
import pandas as pd
import psycopg2
from psycopg2.extras import execute_values
from datetime import datetime, timedelta
import numpy as np
import hashlib
from dotenv import load_dotenv
import os

In [34]:
def get_connection():
    load_dotenv()
    
    DB_USER = os.getenv('POSTGRES_USER', 'postgres')
    DB_PASSWORD = os.getenv('POSTGRES_PASSWORD', 'postgres')
    DB_HOST = os.getenv('POSTGRES_HOST', 'localhost')
    DB_PORT = os.getenv('POSTGRES_PORT', '5432')
    DB_NAME = os.getenv('POSTGRES_DB', 'uber')  # Corrigido: POSTGRES_DB

    return psycopg2.connect(
        host=DB_HOST,
        dbname=DB_NAME,
        user=DB_USER,
        password=DB_PASSWORD,
        port=DB_PORT
    )

# Teste conexão
try:
    conn = get_connection()
    print(" Conexão estabelecida!")
    conn.close()
except Exception as e:
    print(f" Erro: {e}")

 Conexão estabelecida!


## 0. PREPARAÇÃO: Criar Schema e Tabelas do DWH

In [35]:
# Executar DDL do Gold para criar schema e tabelas
import os

ddl_path = os.path.join('..', 'Data Layer', 'gold', 'gold_ddl.sql')

print(" Lendo arquivo gold_ddl.sql...")
with open(ddl_path, 'r', encoding='utf-8') as f:
    ddl_script = f.read()

print(" Executando DDL no PostgreSQL...")
conn = get_connection()
cur = conn.cursor()

try:
    cur.execute(ddl_script)
    conn.commit()
    print(" Schema 'dwh' e todas as tabelas criadas com sucesso!")
    
    # Verificar tabelas criadas
    cur.execute("""
        SELECT table_name 
        FROM information_schema.tables 
        WHERE table_schema = 'dwh'
        ORDER BY table_name;
    """)
    tabelas = cur.fetchall()
    print(f"\n Tabelas criadas no schema 'dwh': {len(tabelas)}")
    for tabela in tabelas:
        print(f"   • {tabela[0]}")
        
except Exception as e:
    conn.rollback()
    print(f" Erro ao executar DDL: {e}")
    raise
finally:
    cur.close()
    conn.close()

 Lendo arquivo gold_ddl.sql...
 Executando DDL no PostgreSQL...
 Schema 'dwh' e todas as tabelas criadas com sucesso!

 Tabelas criadas no schema 'dwh': 5
   • dim_cst
   • dim_dtt
   • dim_loc
   • dim_rid
   • fat_cor


## 1. EXTRAÇÃO: Carregar Silver

In [36]:
query_silver = """
SELECT booking_id, customer_id, vehicle_type, pickup_location, drop_location,
       booking_value, ride_distance, payment_method, booking_status,
       reason_for_cancelling_by_customer, driver_cancellation_reason, incomplete_rides_reason,
       date, time, avg_vtat, avg_ctat, driver_ratings, customer_rating
FROM silver.uber_silver
ORDER BY date, time;
"""

conn = get_connection()
df_silver = pd.read_sql(query_silver, conn)
conn.close()

print(f" Registros carregados: {len(df_silver):,}")
df_silver.head()

  df_silver = pd.read_sql(query_silver, conn)


 Registros carregados: 148,767


Unnamed: 0,booking_id,customer_id,vehicle_type,pickup_location,drop_location,booking_value,ride_distance,payment_method,booking_status,reason_for_cancelling_by_customer,driver_cancellation_reason,incomplete_rides_reason,date,time,avg_vtat,avg_ctat,driver_ratings,customer_rating
0,CNR4352144,CID8362794,Bike,Udyog Vihar,Ambience Mall,99.0,37.98,Cash,Completed,Reason Unknown,Reason Unknown,Reason Unknown,2024-01-01,00:19:34,10.8,38.9,4.8,4.8
1,CNR9147645,CID8300238,Go Mini,Basai Dhankot,Madipur,114.0,39.29,Uber Wallet,Completed,Reason Unknown,Reason Unknown,Reason Unknown,2024-01-01,01:35:18,8.5,15.1,4.2,4.1
2,CNR1009222,CID2030746,Go Sedan,Tughlakabad,Greater Kailash,508.29023,24.640956,UPI,Cancelled by Driver,Reason Unknown,More than permitted people in there,Reason Unknown,2024-01-01,01:37:50,7.4,29.150249,4.230756,4.404301
3,CNR2740479,CID3231181,Auto,Palam Vihar,Kherki Daula Toll,508.29023,24.640956,UPI,Cancelled by Driver,Reason Unknown,Personal & Car related issues,Reason Unknown,2024-01-01,01:48:03,5.6,29.150249,4.230756,4.404301
4,CNR7650148,CID3381661,Go Sedan,Narsinghpur,Pulbangash,508.29023,24.640956,UPI,Cancelled by Driver,Reason Unknown,More than permitted people in there,Reason Unknown,2024-01-01,01:49:56,6.2,29.150249,4.230756,4.404301


## 2. TRANSFORMAÇÃO: Criar Dimensões

In [37]:
# 2.1 dim_dtt (DateTime - Data e Hora combinadas)
df_silver['date'] = pd.to_datetime(df_silver['date'])
df_silver['datetime'] = pd.to_datetime(df_silver['date'].astype(str) + ' ' + df_silver['time'].astype(str))

# Criar dtt_key único: YYYYMMDDHHMM
df_silver['dtt_key'] = df_silver['datetime'].dt.strftime('%Y%m%d%H%M').astype('Int64')

# Garantir que não há duplicatas no dtt_key (pegar apenas o primeiro de cada grupo)
dim_dtt = df_silver[['dtt_key', 'datetime']].drop_duplicates(subset=['dtt_key']).copy()
dim_dtt['dat'] = dim_dtt['datetime'].dt.date
dim_dtt['hor'] = dim_dtt['datetime'].dt.time
dim_dtt['yrr'] = dim_dtt['datetime'].dt.year
dim_dtt['qtr'] = dim_dtt['datetime'].dt.quarter  # Trimestre 1-4
dim_dtt['mth'] = dim_dtt['datetime'].dt.month
dim_dtt['day'] = dim_dtt['datetime'].dt.day
dim_dtt['dow'] = dim_dtt['datetime'].dt.dayofweek + 1  # 1=Segunda, 7=Domingo
dim_dtt['wkd'] = dim_dtt['datetime'].dt.dayofweek.isin([5, 6]).map({True: 'Yes', False: 'No'})

dim_dtt = dim_dtt.sort_values('dtt_key')
print(f" dim_dtt: {len(dim_dtt):,} registros ({dim_dtt['dat'].min()} a {dim_dtt['dat'].max()})")
print(f"   Exemplo trimestre: Q1=Jan-Mar, Q2=Abr-Jun, Q3=Jul-Set, Q4=Out-Dez")
print(f"   Verificação: {dim_dtt['dtt_key'].nunique()} chaves únicas de {len(dim_dtt)} registros")

 dim_dtt: 124,452 registros (2024-01-01 a 2024-12-30)
   Exemplo trimestre: Q1=Jan-Mar, Q2=Abr-Jun, Q3=Jul-Set, Q4=Out-Dez
   Verificação: 124452 chaves únicas de 124452 registros


In [38]:
# 2.2 dim_cst (Customer - Cliente)
dim_cst = df_silver[['customer_id']].drop_duplicates().copy()
dim_cst = dim_cst.rename(columns={'customer_id': 'cst_ide'})
dim_cst['dat_cad'] = df_silver.groupby('customer_id')['date'].min().values

print(f" dim_cst: {len(dim_cst):,} registros")

 dim_cst: 147,580 registros


In [39]:
# 2.3 dim_loc (Location - Localização role-playing)
pickup_loc = df_silver[['pickup_location']].rename(columns={'pickup_location': 'loc_nme'})
drop_loc = df_silver[['drop_location']].rename(columns={'drop_location': 'loc_nme'})
dim_loc = pd.concat([pickup_loc, drop_loc]).drop_duplicates()
dim_loc['rgn'] = None  # Região (a ser derivada via geocoding)
dim_loc['zon'] = None  # Zona (a ser derivada)

print(f" dim_loc: {len(dim_loc):,} registros")

 dim_loc: 176 registros


In [40]:
# 2.4 dim_rid (Ride - Atributos consolidados da corrida)
# Consolidar atributos únicos: vehicle_type, booking_status, payment_method, motivos, tempos
dim_rid = df_silver[[
    'vehicle_type', 'booking_status', 'payment_method',
    'reason_for_cancelling_by_customer', 'driver_cancellation_reason', 'incomplete_rides_reason',
    'avg_vtat', 'avg_ctat'
]].drop_duplicates().copy()

dim_rid = dim_rid.rename(columns={
    'vehicle_type': 'vhc_tpe',
    'booking_status': 'bkg_sts',
    'payment_method': 'pmt_mtd',
    'reason_for_cancelling_by_customer': 'rsn_cst',
    'driver_cancellation_reason': 'rsn_drv',
    'incomplete_rides_reason': 'rsn_inc',
    'avg_vtat': 'avg_vtt',
    'avg_ctat': 'avg_ctt'
})

print(f" dim_rid: {len(dim_rid):,} registros")

 dim_rid: 102,023 registros


## 3. CARGA: Inserir Dimensões no DWH

In [41]:
# 3.1 Inserir dim_dtt (DateTime)
conn = get_connection()
cur = conn.cursor()

try:
    cur.execute("TRUNCATE TABLE dwh.dim_dtt CASCADE;")
    conn.commit()
    print(" Tabela dim_dtt truncada")
except Exception as e:
    print(f" Aviso no TRUNCATE: {e}")
    conn.rollback()
    # Se TRUNCATE falhar, usar DELETE
    cur.execute("DELETE FROM dwh.dim_dtt;")
    conn.commit()
    print(" Tabela dim_dtt limpa com DELETE")

# Verificar duplicatas antes de inserir
if dim_dtt['dtt_key'].duplicated().any():
    print(f" ATENÇÃO: Encontradas {dim_dtt['dtt_key'].duplicated().sum()} duplicatas em dtt_key!")
    dim_dtt = dim_dtt.drop_duplicates(subset=['dtt_key'], keep='first')
    print(f"   Removidas duplicatas. Registros finais: {len(dim_dtt)}")

dtt_values = [
    (int(row['dtt_key']), row['dat'], row['hor'], int(row['yrr']), int(row['qtr']),
     int(row['mth']), int(row['day']), int(row['dow']), row['wkd'])
    for _, row in dim_dtt.iterrows()
]

insert_query = """
INSERT INTO dwh.dim_dtt (dtt_key, dat, hor, yrr, qtr, mth, day, dow, wkd)
VALUES %s
"""
execute_values(cur, insert_query, dtt_values, page_size=1000)
conn.commit()

cur.execute("SELECT COUNT(*) FROM dwh.dim_dtt;")
print(f" dim_dtt inserida: {cur.fetchone()[0]:,} registros")
cur.close()
conn.close()

 Tabela dim_dtt truncada
 dim_dtt inserida: 124,452 registros
 dim_dtt inserida: 124,452 registros


In [42]:
# 3.2 Inserir dim_cst (Customer)
conn = get_connection()
cur = conn.cursor()

try:
    cur.execute("TRUNCATE TABLE dwh.dim_cst CASCADE;")
    conn.commit()
except Exception as e:
    print(f" Aviso no TRUNCATE: {e}")
    conn.rollback()
    cur.execute("DELETE FROM dwh.dim_cst;")
    conn.commit()

for _, row in dim_cst.iterrows():
    cur.execute("""
    INSERT INTO dwh.dim_cst (cst_ide, dat_cad)
    VALUES (%s, %s)
    ON CONFLICT (cst_ide) DO UPDATE SET
        dat_cad = EXCLUDED.dat_cad;
    """, (row['cst_ide'], row['dat_cad'].date()))

conn.commit()
cur.execute("SELECT COUNT(*) FROM dwh.dim_cst;")
print(f" dim_cst inserida: {cur.fetchone()[0]:,} registros")
cur.close()
conn.close()

 dim_cst inserida: 147,580 registros


In [43]:
# 3.3 Inserir dim_loc (Location)
conn = get_connection()
cur = conn.cursor()

try:
    cur.execute("TRUNCATE TABLE dwh.dim_loc CASCADE;")
    conn.commit()
except Exception as e:
    print(f" Aviso no TRUNCATE: {e}")
    conn.rollback()
    cur.execute("DELETE FROM dwh.dim_loc;")
    conn.commit()

for _, row in dim_loc.iterrows():
    cur.execute("""
    INSERT INTO dwh.dim_loc (loc_nme, rgn, zon)
    VALUES (%s, %s, %s)
    ON CONFLICT (loc_nme) DO UPDATE SET
        rgn = EXCLUDED.rgn,
        zon = EXCLUDED.zon;
    """, (row['loc_nme'], row['rgn'], row['zon']))

conn.commit()
cur.execute("SELECT COUNT(*) FROM dwh.dim_loc;")
print(f" dim_loc inserida: {cur.fetchone()[0]:,} registros")
cur.close()
conn.close()

 dim_loc inserida: 176 registros


In [44]:
# 3.4 Inserir dim_rid (Ride)
conn = get_connection()
cur = conn.cursor()

try:
    cur.execute("TRUNCATE TABLE dwh.dim_rid CASCADE;")
    conn.commit()
except Exception as e:
    print(f" Aviso no TRUNCATE: {e}")
    conn.rollback()
    cur.execute("DELETE FROM dwh.dim_rid;")
    conn.commit()

# Usar execute_values para melhor performance
rid_values = [
    (row['vhc_tpe'], row['bkg_sts'], row['pmt_mtd'], row['rsn_cst'], 
     row['rsn_drv'], row['rsn_inc'], row['avg_vtt'], row['avg_ctt'])
    for _, row in dim_rid.iterrows()
]

insert_query = """
INSERT INTO dwh.dim_rid (vhc_tpe, bkg_sts, pmt_mtd, rsn_cst, rsn_drv, rsn_inc, avg_vtt, avg_ctt)
VALUES %s
"""
execute_values(cur, insert_query, rid_values, page_size=1000)
conn.commit()

cur.execute("SELECT COUNT(*) FROM dwh.dim_rid;")
print(f" dim_rid inserida: {cur.fetchone()[0]:,} registros")
cur.close()
conn.close()

 dim_rid inserida: 102,023 registros


## 4. PREPARAR FATO: Lookups e Transformações

In [45]:
# Carregar lookups das dimensões
conn = get_connection()
lookup_cst = pd.read_sql("SELECT srk_cst, cst_ide FROM dwh.dim_cst", conn)
lookup_loc = pd.read_sql("SELECT srk_loc, loc_nme FROM dwh.dim_loc", conn)
lookup_rid = pd.read_sql("""
    SELECT srk_rid, vhc_tpe, bkg_sts, pmt_mtd, rsn_cst, rsn_drv, rsn_inc, avg_vtt, avg_ctt 
    FROM dwh.dim_rid
""", conn)
conn.close()

print(f" Lookups carregados")
print(f"   Clientes: {len(lookup_cst):,}")
print(f"   Localizações: {len(lookup_loc):,}")
print(f"   Ride Attributes: {len(lookup_rid):,}")

  lookup_cst = pd.read_sql("SELECT srk_cst, cst_ide FROM dwh.dim_cst", conn)
  lookup_loc = pd.read_sql("SELECT srk_loc, loc_nme FROM dwh.dim_loc", conn)
  lookup_rid = pd.read_sql("""


 Lookups carregados
   Clientes: 147,580
   Localizações: 176
   Ride Attributes: 102,023


In [46]:
# Preparar tabela fato
df_fato = df_silver.copy()

# Criar dtt_key (já criado em dim_dtt)
df_fato['datetime'] = pd.to_datetime(df_fato['date'].astype(str) + ' ' + df_fato['time'].astype(str))
df_fato['dtt_key'] = df_fato['datetime'].dt.strftime('%Y%m%d%H%M').astype('Int64')

# Merge com dim_dtt para obter srk_dtt
conn = get_connection()
lookup_dtt = pd.read_sql("SELECT srk_dtt, dtt_key FROM dwh.dim_dtt", conn)
conn.close()
df_fato = df_fato.merge(lookup_dtt, on='dtt_key', how='left')
print(f" Lookup dim_dtt: {len(lookup_dtt):,} registros")

# Merge com dim_cst (customer)
df_fato = df_fato.merge(
    lookup_cst.rename(columns={'cst_ide': 'customer_id'}), 
    on='customer_id', how='left'
)

# Merge com dim_rid (ride attributes)
df_fato = df_fato.merge(
    lookup_rid.rename(columns={
        'vhc_tpe': 'vehicle_type',
        'bkg_sts': 'booking_status',
        'pmt_mtd': 'payment_method',
        'rsn_cst': 'reason_for_cancelling_by_customer',
        'rsn_drv': 'driver_cancellation_reason',
        'rsn_inc': 'incomplete_rides_reason',
        'avg_vtt': 'avg_vtat',
        'avg_ctt': 'avg_ctat'
    }),
    on=['vehicle_type', 'booking_status', 'payment_method', 
        'reason_for_cancelling_by_customer', 'driver_cancellation_reason', 
        'incomplete_rides_reason', 'avg_vtat', 'avg_ctat'],
    how='left'
)

# Merge com dim_loc para pickup
df_fato = df_fato.merge(
    lookup_loc.rename(columns={'srk_loc': 'srk_pck', 'loc_nme': 'pickup_location'}),
    on='pickup_location', how='left'
)

# Merge com dim_loc para drop (role-playing)
df_fato = df_fato.merge(
    lookup_loc.rename(columns={'srk_loc': 'srk_drp', 'loc_nme': 'drop_location'}),
    on='drop_location', how='left'
)

# Calcular métricas derivadas
df_fato['amt_km'] = df_fato.apply(
    lambda x: round(x['booking_value'] / x['ride_distance'], 2) 
    if pd.notna(x['ride_distance']) and x['ride_distance'] > 0 else None,
    axis=1
)

# Flags VARCHAR(3) com 'Yes'/'No'
df_fato['flg_cmp'] = df_fato['booking_status'].str.lower().str.contains('complete', na=False).map({True: 'Yes', False: 'No'})
df_fato['flg_cnc'] = df_fato['booking_status'].str.lower().str.contains('cancel', na=False).map({True: 'Yes', False: 'No'})
df_fato['flg_inc'] = df_fato['booking_status'].str.lower().str.contains('incomplete', na=False).map({True: 'Yes', False: 'No'})

print(f" Fato preparada: {len(df_fato):,} registros")
print(f"   Completas: {(df_fato['flg_cmp'] == 'Yes').sum():,}")
print(f"   Canceladas: {(df_fato['flg_cnc'] == 'Yes').sum():,}")
print(f"   Incompletas: {(df_fato['flg_inc'] == 'Yes').sum():,}")

  lookup_dtt = pd.read_sql("SELECT srk_dtt, dtt_key FROM dwh.dim_dtt", conn)


 Lookup dim_dtt: 124,452 registros
 Fato preparada: 148,767 registros
   Completas: 101,175
   Canceladas: 37,191
   Incompletas: 8,927
 Fato preparada: 148,767 registros
   Completas: 101,175
   Canceladas: 37,191
   Incompletas: 8,927


## 5. CARGA: Inserir Tabela Fato

In [47]:
# Selecionar colunas para inserção na fat_cor
fato_columns = [
    'booking_id',  # cor_key
    'srk_dtt', 'srk_cst', 'srk_rid', 'srk_pck', 'srk_drp',
    'booking_value', 'ride_distance', 'driver_ratings', 'customer_rating', 'amt_km',
    'flg_cmp', 'flg_cnc', 'flg_inc'
]

df_fato_insert = df_fato[fato_columns].where(pd.notnull(df_fato[fato_columns]), None)
fato_values = [tuple(row) for row in df_fato_insert.values]

conn = get_connection()
cur = conn.cursor()
cur.execute("TRUNCATE TABLE dwh.fat_cor;")

insert_query = """
INSERT INTO dwh.fat_cor (
    cor_key, srk_dtt, srk_cst, srk_rid, srk_pck, srk_drp,
    amt, dst, rtg_drv, rtg_cst, amt_km,
    flg_cmp, flg_cnc, flg_inc
)
VALUES %s
ON CONFLICT (cor_key) DO NOTHING;
"""

# Inserir em batches
batch_size = 1000
total_batches = (len(fato_values) + batch_size - 1) // batch_size
print(f" Inserindo {len(fato_values):,} registros em {total_batches} batches...")

for i in range(0, len(fato_values), batch_size):
    batch = fato_values[i:i+batch_size]
    execute_values(cur, insert_query, batch, page_size=batch_size)
    if (i // batch_size + 1) % 10 == 0:
        print(f"   Batch {i // batch_size + 1}/{total_batches}")

conn.commit()
cur.execute("SELECT COUNT(*) FROM dwh.fat_cor;")
print(f"\n fat_cor inserida: {cur.fetchone()[0]:,} registros")
cur.close()
conn.close()

 Inserindo 148,767 registros em 149 batches...
   Batch 10/149
   Batch 10/149
   Batch 20/149
   Batch 20/149
   Batch 30/149
   Batch 30/149
   Batch 40/149
   Batch 40/149
   Batch 50/149
   Batch 50/149
   Batch 60/149
   Batch 60/149
   Batch 70/149
   Batch 70/149
   Batch 80/149
   Batch 80/149
   Batch 90/149
   Batch 90/149
   Batch 100/149
   Batch 100/149
   Batch 110/149
   Batch 110/149
   Batch 120/149
   Batch 120/149
   Batch 130/149
   Batch 130/149
   Batch 140/149
   Batch 140/149

 fat_cor inserida: 148,767 registros

 fat_cor inserida: 148,767 registros


## 6. VALIDAÇÃO do Data Warehouse

In [48]:
# Verificar integridade do DWH
conn = get_connection()
validation_queries = {
    'Total Corridas': "SELECT COUNT(*) FROM dwh.fat_cor",
    'Corridas Completas': "SELECT COUNT(*) FROM dwh.fat_cor WHERE flg_cmp = 'Yes'",
    'Corridas Canceladas': "SELECT COUNT(*) FROM dwh.fat_cor WHERE flg_cnc = 'Yes'",
    'Corridas Incompletas': "SELECT COUNT(*) FROM dwh.fat_cor WHERE flg_inc = 'Yes'",
    'Total Clientes': "SELECT COUNT(*) FROM dwh.dim_cst",
    'Total Localizações': "SELECT COUNT(*) FROM dwh.dim_loc",
    'Total DateTime Registros': "SELECT COUNT(*) FROM dwh.dim_dtt",
    'Total Ride Attributes': "SELECT COUNT(*) FROM dwh.dim_rid",
    'Receita Total (R$)': "SELECT SUM(amt) FROM dwh.fat_cor",
    'Distância Total (km)': "SELECT SUM(dst) FROM dwh.fat_cor",
    'Média Rating Motorista': "SELECT AVG(rtg_drv) FROM dwh.fat_cor WHERE rtg_drv IS NOT NULL",
    'Média Rating Cliente': "SELECT AVG(rtg_cst) FROM dwh.fat_cor WHERE rtg_cst IS NOT NULL"
}

print("="*70)
print(" VALIDAÇÃO DO DATA WAREHOUSE")
print("="*70)
for label, query in validation_queries.items():
    result = pd.read_sql(query, conn).iloc[0, 0]
    if isinstance(result, (int, np.integer)):
        print(f"{label:.<50} {result:>15,}")
    elif isinstance(result, (float, np.floating)):
        print(f"{label:.<50} {result:>15,.2f}")
print("="*70)
conn.close()

 VALIDAÇÃO DO DATA WAREHOUSE
Total Corridas....................................         148,767
Corridas Completas................................         101,175
Corridas Canceladas...............................          37,191
Corridas Incompletas..............................           8,927
Total Clientes....................................         147,580
Total Localizações................................             176
Total DateTime Registros..........................         124,452
Total Ride Attributes.............................         102,023
Receita Total (R$)................................   75,616,801.68
Distância Total (km)..............................    3,665,761.05
Média Rating Motorista............................            4.23
Média Rating Cliente..............................            4.40


  result = pd.read_sql(query, conn).iloc[0, 0]


In [49]:
# Query analítica: Top 10 rotas por receita
conn = get_connection()
df_top_rotas = pd.read_sql("""
SELECT 
    pickup.loc_nme AS origem, 
    drop.loc_nme AS destino,
    COUNT(*) AS total_corridas,
    SUM(f.amt) AS receita_total,
    AVG(f.amt) AS ticket_medio,
    AVG(f.dst) AS distancia_media
FROM dwh.fat_cor f
JOIN dwh.dim_loc pickup ON f.srk_pck = pickup.srk_loc
JOIN dwh.dim_loc drop ON f.srk_drp = drop.srk_loc
WHERE f.flg_cmp = 'Yes'
GROUP BY pickup.loc_nme, drop.loc_nme
ORDER BY receita_total DESC
LIMIT 10;
""", conn)
conn.close()

print("\n TOP 10 ROTAS MAIS RENTÁVEIS:\n")
df_top_rotas

  df_top_rotas = pd.read_sql("""



 TOP 10 ROTAS MAIS RENTÁVEIS:



Unnamed: 0,origem,destino,total_corridas,receita_total,ticket_medio,distancia_media
0,New Delhi Railway Station,Rajouri Garden,6,9559.0,1593.166667,23.798333
1,Cyber Hub,Gurgaon Railway Station,10,9348.0,934.8,32.522
2,Nirman Vihar,Vatika Chowk,5,9284.0,1856.8,29.89
3,Ashok Vihar,Basai Dhankot,9,9280.0,1031.111111,24.782222
4,Anand Vihar ISBT,Noida Film City,7,8960.0,1280.0,24.452857
5,Mayur Vihar,Samaypur Badli,9,8588.0,954.222222,17.853333
6,Model Town,Jahangirpuri,8,8540.0,1067.5,26.11625
7,Ambience Mall,Akshardham,11,8518.0,774.363636,26.936364
8,Greater Noida,Jor Bagh,8,8252.0,1031.5,32.48875
9,Greater Noida,Rithala,7,8082.0,1154.571429,30.317143


## 7. SUMÁRIO FINAL

In [50]:
print("\n" + "="*70)
print(" " * 15 + " ETL SILVER → GOLD CONCLUÍDO! ")
print("="*70)
print("\n RESUMO DA CARGA:")
print("-"*70)

conn = get_connection()
cur = conn.cursor()
tabelas = [
    ('dwh.dim_dtt', 'Dimensão DateTime (dtt)'),
    ('dwh.dim_cst', 'Dimensão Customer (cst)'),
    ('dwh.dim_loc', 'Dimensão Location (loc)'),
    ('dwh.dim_rid', 'Dimensão Ride (rid)'),
    ('dwh.fat_cor', ' FATO CORRIDAS (fat_cor)')
]

for tabela, descricao in tabelas:
    cur.execute(f"SELECT COUNT(*) FROM {tabela};")
    count = cur.fetchone()[0]
    print(f"{descricao:.<50} {count:>15,} registros")

cur.close()
conn.close()
print("\n" + "="*70)
print(" Data Warehouse pronto para análises!")
print(" Mnemônicos documentados em: docs/Mnemonicos_DWH.md")
print("="*70)


                ETL SILVER → GOLD CONCLUÍDO! 

 RESUMO DA CARGA:
----------------------------------------------------------------------
Dimensão DateTime (dtt)...........................         124,452 registros
Dimensão Customer (cst)...........................         147,580 registros
Dimensão Location (loc)...........................             176 registros
Dimensão Ride (rid)...............................         102,023 registros
 FATO CORRIDAS (fat_cor).........................         148,767 registros

 Data Warehouse pronto para análises!
 Mnemônicos documentados em: docs/Mnemonicos_DWH.md
