# ETL Silver → Gold
## Crime Data Analytics - Data Warehouse Pipeline

---

### Objetivo
Este notebook realiza a transformacao dos dados da camada **Silver** para a camada **Gold**, criando um modelo dimensional (Star Schema) otimizado para analises de Business Intelligence.

### Arquitetura Medallion
```
Raw (Bronze) → Silver → Gold
    |            |        |
    |            |        └─► Modelo Dimensional (Star Schema)
    |            └─► Dados Limpos e Padronizados
    └─► Dados Brutos
```

### Entrada e Saida
- **Entrada**: `Data Layer/silver/data_silver.csv`
- **Saida**: PostgreSQL schema `gold` + CSVs de backup

### Tabelas Geradas
| Tipo | Tabela | Descricao |
|------|--------|----------|
| Dimensao | dim_date | Calendario com hierarquia temporal |
| Dimensao | dim_time | Horas do dia com periodos |
| Dimensao | dim_area | Areas policiais do LAPD |
| Dimensao | dim_crime_type | Tipos e categorias de crimes |
| Dimensao | dim_victim | Perfil demografico das vitimas |
| Fato | fato_crimes | Eventos de crimes com FKs |
| Agregacao | agg_crimes_area_period | Metricas por area/mes/periodo |
| Agregacao | agg_crimes_type_year | Metricas por tipo/ano |

---
## 1. Configuracao Inicial

### O que esta celula faz:
1. **Importa bibliotecas**: pandas, numpy, sqlalchemy, psycopg2
2. **Define funcao `find_project_root()`**: Localiza a raiz do projeto
3. **Configura caminhos**: SILVER_PATH, GOLD_PATH, DDL_PATH
4. **Cria diretorio Gold**: Para backup em CSV

### Bibliotecas necessarias:
```bash
pip install pandas numpy sqlalchemy psycopg2-binary
```

In [None]:
# Configuracao inicial
import pandas as pd
import numpy as np
from pathlib import Path
from datetime import datetime, time
import os

# SQLAlchemy para conexao com banco
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError

def find_project_root(start: Path) -> Path:
    for p in [start, *start.parents]:
        if (p / 'Data Layer').exists():
            return p
    return start

# Configurar caminhos
PROJECT_ROOT = find_project_root(Path.cwd())
SILVER_PATH = PROJECT_ROOT / 'Data Layer' / 'silver' / 'data_silver.csv'
GOLD_PATH = PROJECT_ROOT / 'Data Layer' / 'gold'
DDL_PATH = GOLD_PATH / 'ddl.sql'

# Criar diretorio gold se nao existir
os.makedirs(GOLD_PATH, exist_ok=True)

print(f"Projeto: {PROJECT_ROOT}")
print(f"Silver: {SILVER_PATH}")
print(f"Gold: {GOLD_PATH}")
print(f"DDL: {DDL_PATH}")

---
## 2. Configuracao do Banco de Dados

### O que esta celula faz:
1. **Define parametros de conexao**: host, porta, usuario, senha, database
2. **Cria engine SQLAlchemy**: Conexao com PostgreSQL
3. **Testa conexao**: Verifica se o banco esta acessivel
4. **Executa DDL**: Cria schema e tabelas se nao existirem

### Configuracao:
Altere as variaveis abaixo conforme seu ambiente:
```python
DB_HOST = 'localhost'
DB_PORT = '5432'
DB_NAME = 'crime_analytics'
DB_USER = 'postgres'
DB_PASS = 'sua_senha'
```

In [None]:
# Configuracao do banco de dados PostgreSQL
DB_CONFIG = {
    'host': 'localhost',
    'port': '5432',
    'database': 'crime_analytics',
    'user': 'postgres',
    'password': 'postgres'  # Altere conforme seu ambiente
}

# String de conexao
DATABASE_URL = f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}"

# Criar engine SQLAlchemy
engine = None
DB_AVAILABLE = False

try:
    engine = create_engine(DATABASE_URL)
    # Testar conexao
    with engine.connect() as conn:
        result = conn.execute(text("SELECT version()"))
        version = result.fetchone()[0]
        print(f"Conectado ao PostgreSQL")
        print(f"Versao: {version[:50]}...")
        DB_AVAILABLE = True
except Exception as e:
    print(f"Aviso: Banco de dados nao disponivel - {e}")
    print("Continuando apenas com exportacao CSV...")
    DB_AVAILABLE = False

---
## 3. Execucao do DDL

### O que esta celula faz:
1. **Le arquivo DDL**: `gold/ddl.sql`
2. **Executa comandos SQL**: CREATE SCHEMA, CREATE TABLE, CREATE INDEX
3. **Cria estrutura**: Schema `gold` com todas as tabelas

### Tabelas criadas:
- dim_area, dim_crime_type, dim_date, dim_time, dim_victim
- fato_crimes
- agg_crimes_area_period, agg_crimes_type_year, agg_crime_hotspots

In [None]:
# Executar DDL para criar schema e tabelas
if DB_AVAILABLE and DDL_PATH.exists():
    print("Executando DDL...")
    
    with open(DDL_PATH, 'r', encoding='utf-8') as f:
        ddl_content = f.read()
    
    # Separar comandos (por ;)
    commands = [cmd.strip() for cmd in ddl_content.split(';') if cmd.strip() and not cmd.strip().startswith('--')]
    
    with engine.begin() as conn:
        for cmd in commands:
            if cmd and len(cmd) > 10:  # Ignorar comandos vazios
                try:
                    conn.execute(text(cmd))
                except SQLAlchemyError as e:
                    # Ignorar erros de tabela ja existe
                    if 'already exists' not in str(e):
                        print(f"   Aviso: {str(e)[:80]}")
    
    print("   Schema 'gold' criado/verificado com sucesso!")
else:
    if not DB_AVAILABLE:
        print("Banco nao disponivel - pulando DDL")
    else:
        print(f"Arquivo DDL nao encontrado: {DDL_PATH}")

---
## 4. Funcoes Auxiliares de Carga

### O que esta celula faz:
1. **`truncate_table()`**: Limpa tabela antes de inserir (TRUNCATE CASCADE)
2. **`load_to_db()`**: Carrega DataFrame no banco usando pandas `to_sql()`
3. **`save_csv_backup()`**: Salva CSV como backup

### Estrategia de carga:
- **TRUNCATE + INSERT**: Limpa e recarrega toda a tabela
- **CASCADE**: Remove dependencias de FK automaticamente
- **Backup CSV**: Sempre salva arquivo local

In [None]:
def truncate_table(table_name: str, schema: str = 'gold'):
    """Limpa tabela antes de inserir novos dados."""
    if not DB_AVAILABLE:
        return
    try:
        with engine.begin() as conn:
            conn.execute(text(f"TRUNCATE TABLE {schema}.{table_name} CASCADE"))
    except SQLAlchemyError as e:
        print(f"   Aviso truncate {table_name}: {e}")

def load_to_db(df: pd.DataFrame, table_name: str, schema: str = 'gold', if_exists: str = 'append'):
    """Carrega DataFrame no banco de dados PostgreSQL."""
    if not DB_AVAILABLE:
        print(f"   [DB OFF] {table_name}: apenas CSV")
        return False
    
    try:
        # Limpar tabela antes
        truncate_table(table_name, schema)
        
        # Inserir dados
        df.to_sql(
            name=table_name,
            con=engine,
            schema=schema,
            if_exists=if_exists,
            index=False,
            method='multi',
            chunksize=1000
        )
        print(f"   [DB OK] {schema}.{table_name}: {len(df):,} registros")
        return True
    except SQLAlchemyError as e:
        print(f"   [DB ERRO] {table_name}: {e}")
        return False

def save_csv_backup(df: pd.DataFrame, filename: str):
    """Salva DataFrame como CSV de backup."""
    filepath = GOLD_PATH / filename
    df.to_csv(filepath, index=False)
    size_kb = filepath.stat().st_size / 1024
    print(f"   [CSV] {filename}: {size_kb:.1f} KB")

print("Funcoes de carga definidas.")

---
## 5. Carregamento dos Dados Silver

### O que esta celula faz:
1. **Carrega CSV**: Le o arquivo `data_silver.csv`
2. **Converte datas**: Transforma colunas para datetime64
3. **Exibe estatisticas**: Quantidade de registros e colunas

In [None]:
# Carregar dados Silver
print("Carregando dados Silver...")
df_silver = pd.read_csv(SILVER_PATH)

# Converter colunas de data
df_silver['date_occurred'] = pd.to_datetime(df_silver['date_occurred'], errors='coerce')
df_silver['date_reported'] = pd.to_datetime(df_silver['date_reported'], errors='coerce')

print(f"Dados Silver carregados: {len(df_silver):,} registros")
print(f"Colunas: {len(df_silver.columns)}")
df_silver.head(3)

---
## 6. Validacao de Schema e Qualidade

### O que esta celula faz:
1. **Valida colunas obrigatorias**: 20+ colunas necessarias
2. **Verifica limites de nulos**: Thresholds por coluna critica
3. **Valida dominios**: hour (0-23), coordenadas LA, severity
4. **Detecta duplicatas**: crime_id unico

In [None]:
# Validacoes de schema e qualidade
print("Validando schema e qualidade...")

required_cols = [
    'crime_id', 'date_occurred', 'date_reported', 'hour',
    'area_code', 'area_name',
    'crime_code', 'crime_description', 'crime_category', 'crime_severity',
    'victim_age_group', 'victim_sex_desc', 'victim_descent_desc',
    'victim_age', 'latitude', 'longitude',
    'is_violent', 'has_weapon', 'case_closed', 'year', 'month'
]

null_thresholds = {
    'crime_id': 0.00, 'date_occurred': 0.01, 'hour': 0.01,
    'area_code': 0.01, 'crime_code': 0.01
}

def validate_silver_schema(df):
    errors, warnings = [], []
    
    if df.empty:
        errors.append("Dataset vazio.")
    
    missing = [c for c in required_cols if c not in df.columns]
    if missing:
        errors.append(f"Colunas ausentes: {missing}")
    
    for col, max_null in null_thresholds.items():
        if col in df.columns:
            pct = df[col].isna().mean()
            if pct > max_null:
                errors.append(f"{col} com {pct:.1%} nulos (limite {max_null:.1%}).")
    
    if 'hour' in df.columns:
        invalid = ~df['hour'].between(0, 23)
        if invalid.any():
            errors.append(f"hour fora de 0-23: {invalid.sum():,} registros.")
    
    if 'crime_id' in df.columns:
        dup = df['crime_id'].duplicated().sum()
        if dup > 0:
            warnings.append(f"crime_id duplicado: {dup:,}")
    
    return errors, warnings

errors, warnings = validate_silver_schema(df_silver)

if warnings:
    print("Avisos:")
    for w in warnings:
        print(f"   - {w}")

if errors:
    print("Erros:")
    for e in errors:
        print(f"   - {e}")
    raise ValueError("Falha nas validacoes. Corrija antes de continuar.")
else:
    print("Validacoes concluidas com sucesso.")

---
## 7. Criacao e Carga das Dimensoes

As dimensoes sao criadas e carregadas no banco de dados PostgreSQL.
Cada dimensao tambem e salva como CSV de backup.

---
### 7.1 Dimensao Data (dim_date)

### Atributos:
- `sk_date`: Surrogate Key (SERIAL)
- `full_date`: Data completa (DATE)
- `year`, `quarter`, `month`, `week_of_year`: Hierarquia temporal
- `day_of_month`, `day_of_week`, `day_name`, `month_name`: Detalhes
- `is_weekend`: Flag fim de semana
- `is_holiday`: Flag feriado (default FALSE)

In [None]:
# Dimensao: Data (dim_date)
print("\n" + "="*50)
print("Criando dim_date...")

dim_date = df_silver[['date_occurred']].drop_duplicates().dropna().copy()
dim_date = dim_date.sort_values('date_occurred').reset_index(drop=True)
dim_date['sk_date'] = range(1, len(dim_date) + 1)
dim_date['full_date'] = dim_date['date_occurred'].dt.date
dim_date['year'] = dim_date['date_occurred'].dt.year
dim_date['quarter'] = dim_date['date_occurred'].dt.quarter
dim_date['month'] = dim_date['date_occurred'].dt.month
dim_date['month_name'] = dim_date['date_occurred'].dt.month_name()
dim_date['week_of_year'] = dim_date['date_occurred'].dt.isocalendar().week.astype(int)
dim_date['day_of_month'] = dim_date['date_occurred'].dt.day
dim_date['day_of_week'] = dim_date['date_occurred'].dt.dayofweek
dim_date['day_name'] = dim_date['date_occurred'].dt.day_name()
dim_date['is_weekend'] = dim_date['day_of_week'].isin([5, 6])
dim_date['is_holiday'] = False

# Selecionar colunas para o banco (conforme DDL)
dim_date_db = dim_date[['sk_date', 'full_date', 'year', 'quarter', 'month', 'month_name',
                        'week_of_year', 'day_of_month', 'day_of_week', 'day_name', 
                        'is_weekend', 'is_holiday']].copy()

# Carregar no banco e salvar CSV
load_to_db(dim_date_db, 'dim_date')
save_csv_backup(dim_date_db, 'dim_date.csv')
print(f"   Total: {len(dim_date_db):,} datas unicas")

---
### 7.2 Dimensao Tempo (dim_time)

### Atributos:
- `sk_time`: Surrogate Key (SERIAL)
- `full_time`: Hora completa (TIME)
- `hour`: Hora (0-23)
- `minute`: Minuto (sempre 0 - granularidade por hora)
- `period_of_day`: Madrugada, Manha, Tarde, Noite
- `is_rush_hour`: Horario de pico (7-9h, 17-19h)

In [None]:
# Dimensao: Tempo (dim_time)
print("\n" + "="*50)
print("Criando dim_time...")

dim_time = pd.DataFrame({'hour': range(24)})
dim_time['sk_time'] = dim_time['hour'] + 1
dim_time['full_time'] = dim_time['hour'].apply(lambda h: time(h, 0, 0))
dim_time['minute'] = 0
dim_time['period_of_day'] = dim_time['hour'].apply(
    lambda h: 'Madrugada' if h < 6 else 'Manha' if h < 12 else 'Tarde' if h < 18 else 'Noite'
)
dim_time['is_rush_hour'] = dim_time['hour'].isin([7, 8, 9, 17, 18, 19])

# Selecionar colunas para o banco
dim_time_db = dim_time[['sk_time', 'full_time', 'hour', 'minute', 'period_of_day', 'is_rush_hour']].copy()

# Carregar no banco e salvar CSV
load_to_db(dim_time_db, 'dim_time')
save_csv_backup(dim_time_db, 'dim_time.csv')
print(f"   Total: {len(dim_time_db):,} horas")

---
### 7.3 Dimensao Area (dim_area)

### Atributos:
- `sk_area`: Surrogate Key (SERIAL)
- `area_code`: Codigo da area LAPD
- `area_name`: Nome da area
- `region`: North, South, Central, West, Other

In [None]:
# Dimensao: Area (dim_area)
print("\n" + "="*50)
print("Criando dim_area...")

dim_area = df_silver[['area_code', 'area_name']].drop_duplicates().copy()
dim_area = dim_area.sort_values('area_code').reset_index(drop=True)
dim_area['sk_area'] = range(1, len(dim_area) + 1)

def get_region(area_name):
    north = ['DEVONSHIRE', 'FOOTHILL', 'MISSION', 'NORTH HOLLYWOOD', 'VAN NUYS', 'WEST VALLEY']
    south = ['77TH STREET', 'HARBOR', 'SOUTHEAST', 'SOUTHWEST']
    central = ['CENTRAL', 'HOLLENBECK', 'RAMPART', 'NEWTON']
    west = ['HOLLYWOOD', 'OLYMPIC', 'PACIFIC', 'WEST LA', 'WILSHIRE', 'TOPANGA']
    
    name_upper = area_name.upper() if area_name else ''
    if name_upper in north: return 'North'
    elif name_upper in south: return 'South'
    elif name_upper in central: return 'Central'
    elif name_upper in west: return 'West'
    else: return 'Other'

dim_area['region'] = dim_area['area_name'].apply(get_region)

# Selecionar colunas para o banco
dim_area_db = dim_area[['sk_area', 'area_code', 'area_name', 'region']].copy()

# Carregar no banco e salvar CSV
load_to_db(dim_area_db, 'dim_area')
save_csv_backup(dim_area_db, 'dim_area.csv')
print(f"   Total: {len(dim_area_db):,} areas")

---
### 7.4 Dimensao Tipo de Crime (dim_crime_type)

### Atributos:
- `sk_crime_type`: Surrogate Key (SERIAL)
- `crime_code`: Codigo do crime
- `crime_description`: Descricao
- `crime_category`: Violent Crime, Property Crime, Other
- `is_violent`: Flag crime violento
- `severity_level`: 1 (Minor), 3 (Serious)

In [None]:
# Dimensao: Tipo de Crime (dim_crime_type)
print("\n" + "="*50)
print("Criando dim_crime_type...")

dim_crime_type = df_silver[['crime_code', 'crime_description', 'crime_category', 'crime_severity']].drop_duplicates().copy()
dim_crime_type = dim_crime_type.sort_values('crime_code').reset_index(drop=True)
dim_crime_type['sk_crime_type'] = range(1, len(dim_crime_type) + 1)
dim_crime_type['is_violent'] = dim_crime_type['crime_category'] == 'Violent Crime'
dim_crime_type['severity_level'] = dim_crime_type['crime_severity'].map({'Serious': 3, 'Minor': 1}).fillna(1).astype(int)

# Selecionar colunas para o banco (remover crime_severity, usar severity_level)
dim_crime_type_db = dim_crime_type[['sk_crime_type', 'crime_code', 'crime_description', 
                                     'crime_category', 'is_violent', 'severity_level']].copy()

# Carregar no banco e salvar CSV
load_to_db(dim_crime_type_db, 'dim_crime_type')
save_csv_backup(dim_crime_type_db, 'dim_crime_type.csv')
print(f"   Total: {len(dim_crime_type_db):,} tipos de crime")

---
### 7.5 Dimensao Vitima (dim_victim)

### Atributos:
- `sk_victim`: Surrogate Key (SERIAL)
- `age_group`: Faixa etaria
- `sex`: M, F, X (CHAR 1)
- `descent`: Codigo etnia (CHAR 1)
- `descent_description`: Descricao da etnia

In [None]:
# Dimensao: Vitima (dim_victim)
print("\n" + "="*50)
print("Criando dim_victim...")

dim_victim = df_silver[['victim_age_group', 'victim_sex_desc', 'victim_descent_desc']].drop_duplicates().copy()
dim_victim = dim_victim.reset_index(drop=True)
dim_victim['sk_victim'] = range(1, len(dim_victim) + 1)

# Mapear sex para codigo de 1 caractere
sex_map = {'Male': 'M', 'Female': 'F', 'Unknown': 'X', 'Other': 'X'}
dim_victim['sex'] = dim_victim['victim_sex_desc'].map(sex_map).fillna('X')

# Mapear descent para codigo de 1 caractere
descent_map = {
    'Hispanic': 'H', 'White': 'W', 'Black': 'B', 'Asian': 'A',
    'Other': 'O', 'Unknown': 'X', 'American Indian': 'I',
    'Pacific Islander': 'P', 'Filipino': 'F'
}
dim_victim['descent'] = dim_victim['victim_descent_desc'].map(descent_map).fillna('X')

# Renomear colunas
dim_victim = dim_victim.rename(columns={
    'victim_age_group': 'age_group',
    'victim_descent_desc': 'descent_description'
})

# Selecionar colunas para o banco
dim_victim_db = dim_victim[['sk_victim', 'age_group', 'sex', 'descent', 'descent_description']].copy()

# Carregar no banco e salvar CSV
load_to_db(dim_victim_db, 'dim_victim')
save_csv_backup(dim_victim_db, 'dim_victim.csv')
print(f"   Total: {len(dim_victim_db):,} perfis de vitima")

---
## 8. Criacao e Carga da Tabela Fato

### O que esta celula faz:
1. **Cria dicionarios de mapeamento** para surrogate keys
2. **Mapeia cada registro Silver** para as FKs correspondentes
3. **Adiciona metricas**: latitude, longitude, is_violent
4. **Carrega no banco** e salva CSV de backup

### Estrutura fato_crimes:
- `sk_crime` (PK): Surrogate Key
- `nk_crime_id`: Natural Key (ID original)
- `sk_area`, `sk_crime_type`, `sk_date`, `sk_time`, `sk_victim` (FKs)
- `latitude`, `longitude`: Coordenadas
- `is_violent`: Flag

In [None]:
# Tabela Fato: fato_crimes
print("\n" + "="*50)
print("Criando fato_crimes...")

# Criar mapeamentos de surrogate keys
date_map = dim_date.set_index('date_occurred')['sk_date'].to_dict()
time_map = dim_time.set_index('hour')['sk_time'].to_dict()
area_map = dim_area.set_index('area_code')['sk_area'].to_dict()
crime_type_map = dim_crime_type.set_index('crime_code')['sk_crime_type'].to_dict()

# Criar chave composta para victim
dim_victim['victim_key'] = (dim_victim['age_group'].fillna('Unknown') + '|' + 
                            dim_victim['sex'].fillna('X') + '|' + 
                            dim_victim['descent'].fillna('X'))
victim_map = dim_victim.set_index('victim_key')['sk_victim'].to_dict()

# Construir fato
fato = pd.DataFrame()
fato['sk_crime'] = range(1, len(df_silver) + 1)
fato['nk_crime_id'] = df_silver['crime_id'].values
fato['sk_date'] = df_silver['date_occurred'].map(date_map).values
fato['sk_time'] = df_silver['hour'].map(time_map).values
fato['sk_area'] = df_silver['area_code'].map(area_map).values
fato['sk_crime_type'] = df_silver['crime_code'].map(crime_type_map).values

# Mapear vitima
victim_sex_map = {'Male': 'M', 'Female': 'F', 'Unknown': 'X', 'Other': 'X'}
descent_code_map = {
    'Hispanic': 'H', 'White': 'W', 'Black': 'B', 'Asian': 'A',
    'Other': 'O', 'Unknown': 'X', 'American Indian': 'I',
    'Pacific Islander': 'P', 'Filipino': 'F'
}

df_silver['victim_key'] = (
    df_silver['victim_age_group'].fillna('Unknown') + '|' + 
    df_silver['victim_sex_desc'].map(victim_sex_map).fillna('X') + '|' + 
    df_silver['victim_descent_desc'].map(descent_code_map).fillna('X')
)
fato['sk_victim'] = df_silver['victim_key'].map(victim_map).values

# Metricas
fato['latitude'] = df_silver['latitude'].values
fato['longitude'] = df_silver['longitude'].values
fato['is_violent'] = df_silver['is_violent'].values

# Converter FKs para int (tratar NaN)
for col in ['sk_date', 'sk_time', 'sk_area', 'sk_crime_type', 'sk_victim']:
    fato[col] = fato[col].astype('Int64')  # nullable int

# Selecionar colunas para o banco (conforme DDL - sem sk_weapon e sk_premise)
fato_db = fato[['sk_crime', 'nk_crime_id', 'sk_area', 'sk_crime_type', 
                'sk_date', 'sk_time', 'sk_victim', 'latitude', 'longitude', 'is_violent']].copy()

# Carregar no banco e salvar CSV
load_to_db(fato_db, 'fato_crimes')
save_csv_backup(fato_db, 'fato_crimes.csv')
print(f"   Total: {len(fato_db):,} crimes")

---
## 9. Criacao e Carga das Agregacoes

Tabelas pre-agregadas para otimizar dashboards.

---
### 9.1 Agregacao: Crimes por Area e Periodo

### Granularidade: area x ano x mes x periodo_do_dia
### Metricas:
- `total_crimes`: COUNT
- `violent_crimes`: SUM(is_violent)
- `property_crimes`: SUM(crime_category = 'Property Crime')
- `avg_victim_age`: AVG(victim_age)

In [None]:
# Agregacao: Crimes por Area e Periodo
print("\n" + "="*50)
print("Criando agg_crimes_area_period...")

# Adicionar period_of_day ao df_silver
df_silver['period_of_day'] = df_silver['hour'].apply(
    lambda h: 'Madrugada' if h < 6 else 'Manha' if h < 12 else 'Tarde' if h < 18 else 'Noite'
)

# Flag para property crimes
df_silver['is_property'] = df_silver['crime_category'] == 'Property Crime'

# Agregar
agg_area_period = df_silver.groupby(['area_code', 'year', 'month', 'period_of_day']).agg(
    total_crimes=('crime_id', 'count'),
    violent_crimes=('is_violent', 'sum'),
    property_crimes=('is_property', 'sum'),
    avg_victim_age=('victim_age', 'mean')
).reset_index()

# Mapear area_code para sk_area
agg_area_period['sk_area'] = agg_area_period['area_code'].map(area_map)

# Selecionar colunas para o banco
agg_area_period_db = agg_area_period[['sk_area', 'year', 'month', 'period_of_day',
                                       'total_crimes', 'violent_crimes', 'property_crimes', 
                                       'avg_victim_age']].copy()
agg_area_period_db['avg_victim_age'] = agg_area_period_db['avg_victim_age'].round(2)

# Carregar no banco e salvar CSV
load_to_db(agg_area_period_db, 'agg_crimes_area_period')
save_csv_backup(agg_area_period_db, 'agg_crimes_area_period.csv')
print(f"   Total: {len(agg_area_period_db):,} registros")

---
### 9.2 Agregacao: Crimes por Tipo e Ano

### Granularidade: tipo_crime x ano
### Metricas:
- `total_crimes`: COUNT
- `weekday_crimes`: COUNT onde is_weekend = FALSE
- `weekend_crimes`: COUNT onde is_weekend = TRUE

In [None]:
# Agregacao: Crimes por Tipo e Ano
print("\n" + "="*50)
print("Criando agg_crimes_type_year...")

# Adicionar flag de fim de semana
df_silver['is_weekend'] = df_silver['date_occurred'].dt.dayofweek.isin([5, 6])
df_silver['is_weekday'] = ~df_silver['is_weekend']

# Agregar
agg_crime_year = df_silver.groupby(['crime_code', 'year']).agg(
    total_crimes=('crime_id', 'count'),
    weekday_crimes=('is_weekday', 'sum'),
    weekend_crimes=('is_weekend', 'sum')
).reset_index()

# Mapear crime_code para sk_crime_type
agg_crime_year['sk_crime_type'] = agg_crime_year['crime_code'].map(crime_type_map)

# Selecionar colunas para o banco
agg_crime_year_db = agg_crime_year[['sk_crime_type', 'year', 'total_crimes', 
                                     'weekday_crimes', 'weekend_crimes']].copy()

# Carregar no banco e salvar CSV
load_to_db(agg_crime_year_db, 'agg_crimes_type_year')
save_csv_backup(agg_crime_year_db, 'agg_crimes_type_year.csv')
print(f"   Total: {len(agg_crime_year_db):,} registros")

---
### 9.3 Agregacao: Hotspots Geograficos

### Granularidade: grid_lat x grid_lon x ano
### Metricas:
- `total_crimes`: COUNT
- `violent_crimes`: SUM(is_violent)
- `hotspot_level`: Classificacao baseada em percentis

In [None]:
# Agregacao: Hotspots Geograficos
print("\n" + "="*50)
print("Criando agg_crime_hotspots...")

# Filtrar coordenadas validas
df_geo = df_silver[(df_silver['latitude'].notna()) & 
                   (df_silver['longitude'].notna()) &
                   (df_silver['latitude'] != 0) & 
                   (df_silver['longitude'] != 0)].copy()

# Criar grid (arredondar para 2 casas decimais ~ 1km)
df_geo['grid_lat'] = df_geo['latitude'].round(2)
df_geo['grid_lon'] = df_geo['longitude'].round(2)

# Agregar
agg_hotspots = df_geo.groupby(['grid_lat', 'grid_lon', 'year']).agg(
    total_crimes=('crime_id', 'count'),
    violent_crimes=('is_violent', 'sum')
).reset_index()

# Classificar hotspot_level baseado em percentis
def classify_hotspot(total, p75, p90):
    if total >= p90:
        return 'Critical'
    elif total >= p75:
        return 'High'
    else:
        return 'Normal'

p75 = agg_hotspots['total_crimes'].quantile(0.75)
p90 = agg_hotspots['total_crimes'].quantile(0.90)
agg_hotspots['hotspot_level'] = agg_hotspots['total_crimes'].apply(lambda x: classify_hotspot(x, p75, p90))

# Selecionar colunas para o banco
agg_hotspots_db = agg_hotspots[['grid_lat', 'grid_lon', 'year', 'total_crimes', 
                                 'violent_crimes', 'hotspot_level']].copy()

# Carregar no banco e salvar CSV
load_to_db(agg_hotspots_db, 'agg_crime_hotspots')
save_csv_backup(agg_hotspots_db, 'agg_crime_hotspots.csv')
print(f"   Total: {len(agg_hotspots_db):,} grids")
print(f"   Hotspots Critical: {(agg_hotspots_db['hotspot_level'] == 'Critical').sum():,}")
print(f"   Hotspots High: {(agg_hotspots_db['hotspot_level'] == 'High').sum():,}")

---
## 10. Resumo Final

### O que esta celula faz:
1. **Lista tabelas no banco** (se disponivel)
2. **Lista arquivos CSV** gerados como backup
3. **Exibe estatisticas** de registros por tabela

In [None]:
# Resumo final
print("\n" + "="*60)
print("ETL Silver -> Gold CONCLUIDO!")
print("="*60)

# Estatisticas do banco
if DB_AVAILABLE:
    print("\n[BANCO DE DADOS PostgreSQL]")
    with engine.connect() as conn:
        result = conn.execute(text("""
            SELECT table_name, 
                   (xpath('/row/cnt/text()', xml_count))[1]::text::int as row_count
            FROM (
                SELECT table_name, 
                       query_to_xml(format('SELECT COUNT(*) as cnt FROM gold.%I', table_name), false, true, '') as xml_count
                FROM information_schema.tables 
                WHERE table_schema = 'gold'
            ) t
            ORDER BY table_name
        """))
        print(f"   {'Tabela':<30} {'Registros':>12}")
        print("   " + "-"*42)
        for row in result:
            print(f"   {row[0]:<30} {row[1]:>12,}")
else:
    print("\n[BANCO DE DADOS] Nao disponivel")

# Estatisticas dos CSVs
print("\n[ARQUIVOS CSV - Backup]")
print(f"   {'Arquivo':<35} {'Tamanho':>10}")
print("   " + "-"*45)
for f in sorted(GOLD_PATH.glob('*.csv')):
    size_kb = f.stat().st_size / 1024
    print(f"   {f.name:<35} {size_kb:>8.1f} KB")

print(f"\nDiretorio Gold: {GOLD_PATH}")
print("\nPipeline finalizado com sucesso!")

---
## 11. Verificacao de Integridade (Opcional)

Executa queries de verificacao para validar a carga.

In [None]:
# Verificacao de integridade
if DB_AVAILABLE:
    print("Verificando integridade referencial...")
    
    checks = [
        ("Fatos sem data", "SELECT COUNT(*) FROM gold.fato_crimes WHERE sk_date IS NULL"),
        ("Fatos sem area", "SELECT COUNT(*) FROM gold.fato_crimes WHERE sk_area IS NULL"),
        ("Fatos sem tipo crime", "SELECT COUNT(*) FROM gold.fato_crimes WHERE sk_crime_type IS NULL"),
        ("Datas unicas", "SELECT COUNT(DISTINCT full_date) FROM gold.dim_date"),
        ("Areas unicas", "SELECT COUNT(DISTINCT area_code) FROM gold.dim_area"),
        ("Tipos crime unicos", "SELECT COUNT(DISTINCT crime_code) FROM gold.dim_crime_type"),
    ]
    
    with engine.connect() as conn:
        for name, query in checks:
            try:
                result = conn.execute(text(query))
                value = result.fetchone()[0]
                status = "OK" if "sem" not in name.lower() or value == 0 else "ALERTA"
                print(f"   [{status}] {name}: {value:,}")
            except Exception as e:
                print(f"   [ERRO] {name}: {e}")
    
    print("\nVerificacao concluida.")
else:
    print("Banco nao disponivel - verificacao ignorada.")