# ETL Silver to Raw

Nesta etapa, importamos as bibliotecas necessárias e definimos as credenciais de conexão com o banco de dados PostgreSQL (diabetes_health).

In [1]:
import pandas as pd
import psycopg2
from psycopg2.extras import execute_batch
import warnings
warnings.filterwarnings('ignore')

DB_CONFIG = {
    'host': 'localhost',
    'port': 5432,
    'database': 'diabetes_health',
    'user': 'postgres',
    'password': 'postgres'
}

# Conexão e Limpeza do Ambiente (Gold)

Antes de iniciar a carga, conectamos ao banco e executamos um TRUNCATE em todas as tabelas do schema dw. Isso garante que não haverá duplicidade de dados e que as chaves substitutas (SRKs) serão reiniciadas corretamente.

In [2]:
conn = psycopg2.connect(**DB_CONFIG)
cur = conn.cursor()

print("1. Limpando tabelas Gold (Truncate) para evitar duplicidade...")
tables = [
    'dw.fat_saude_pessoa', 
    'dw.dim_demografia', 
    'dw.dim_estilo_vida', 
    'dw.dim_acesso_medico', 
    'dw.dim_historico_clinico'
]
for t in tables:
    cur.execute(f"TRUNCATE TABLE {t} CASCADE;")
conn.commit()

1. Limpando tabelas Gold (Truncate) para evitar duplicidade...


# Leitura da Camada Silver

Carregamos os dados tratados da tabela silver.diabetes_indicators para um DataFrame do Pandas. Estes dados já passaram pelo tratamento de outliers na etapa anterior.

In [3]:
# 3. LEITURA DA SILVER
print("2. Lendo dados da Silver...")
df = pd.read_sql_query("SELECT * FROM silver.diabetes_indicators", conn)

2. Lendo dados da Silver...


# Carga da Dimensão Demografia (dmg)

Selecionamos as colunas demográficas distintas, geramos a chave substituta (dmg_srk) e inserimos na tabela dw.dim_demografia.

Mapeamento: sex_desc → dmg_sex, age_group → dmg_ida, etc.

In [4]:
# ---------------------------------------------------------
# 4. DIMENSÃO DEMOGRAFIA (Mnemônico: dmg)
# ---------------------------------------------------------
print("3. Processando Demografia (dmg)...")

cols_dmg_origem = ['sex_desc', 'age_group', 'education_level', 'income_level_raw']
df_dmg = df[cols_dmg_origem].drop_duplicates().reset_index(drop=True)
df_dmg['dmg_srk'] = df_dmg.index + 1 # Gerar Surrogate Key

dados_dmg = df_dmg[['dmg_srk', 'sex_desc', 'age_group', 'education_level', 'income_level_raw']].values.tolist()
execute_batch(cur, """
    INSERT INTO dw.dim_demografia (dmg_srk, dmg_sex, dmg_ida, dmg_esc, dmg_ren) 
    VALUES (%s, %s, %s, %s, %s)
""", dados_dmg)

3. Processando Demografia (dmg)...


# Carga da Dimensão Estilo de Vida (est)

Processamos as informações sobre hábitos do paciente (fumo, alimentação, atividades físicas), geramos a est_srk e carregamos na tabela dw.dim_estilo_vida.

In [5]:
# ---------------------------------------------------------
# 5. DIMENSÃO ESTILO DE VIDA (Mnemônico: est)
# ---------------------------------------------------------
print("4. Processando Estilo de Vida (est)...")

cols_est_origem = ['smoker', 'eats_fruits', 'eats_veggies', 'physical_activity', 'heavy_alcohol']
df_est = df[cols_est_origem].drop_duplicates().reset_index(drop=True)
df_est['est_srk'] = df_est.index + 1

dados_est = df_est[['est_srk'] + cols_est_origem].values.tolist()
execute_batch(cur, """
    INSERT INTO dw.dim_estilo_vida (est_srk, est_fum, est_fru, est_veg, est_fis, est_alc) 
    VALUES (%s, %s, %s, %s, %s, %s)
""", dados_est)

4. Processando Estilo de Vida (est)...


# Carga da Dimensão Acesso Médico (acs)

Tratamos os dados referentes ao plano de saúde e acesso a checkups, criando a chave acs_srk e inserindo em dw.dim_acesso_medico.

In [6]:
# ---------------------------------------------------------
# 6. DIMENSÃO ACESSO MÉDICO (Mnemônico: acs)
# ---------------------------------------------------------
print("5. Processando Acesso Médico (acs)...")

cols_acs_origem = ['has_healthcare', 'cant_afford_doctor', 'cholesterol_check']
df_acs = df[cols_acs_origem].drop_duplicates().reset_index(drop=True)
df_acs['acs_srk'] = df_acs.index + 1

dados_acs = df_acs[['acs_srk'] + cols_acs_origem].values.tolist()
execute_batch(cur, """
    INSERT INTO dw.dim_acesso_medico (acs_srk, acs_pla, acs_cus, acs_col) 
    VALUES (%s, %s, %s, %s)
""", dados_acs)

5. Processando Acesso Médico (acs)...


# Carga da Dimensão Histórico Clínico (cli)

Consolidamos o histórico de doenças (pressão alta, colesterol, AVC, etc.), geramos a cli_srk e salvamos em dw.dim_historico_clinico. Ao final, fazemos o COMMIT das dimensões.

In [7]:
# ---------------------------------------------------------
# 7. DIMENSÃO HISTÓRICO CLÍNICO (Mnemônico: cli)
# ---------------------------------------------------------
print("6. Processando Histórico Clínico (cli)...")

cols_cli_origem = ['high_bp', 'high_chol', 'stroke', 'heart_disease_attack', 'diff_walking']
df_cli = df[cols_cli_origem].drop_duplicates().reset_index(drop=True)
df_cli['cli_srk'] = df_cli.index + 1

dados_cli = df_cli[['cli_srk'] + cols_cli_origem].values.tolist()
execute_batch(cur, """
    INSERT INTO dw.dim_historico_clinico (cli_srk, cli_pre, cli_col, cli_avc, cli_cor, cli_and) 
    VALUES (%s, %s, %s, %s, %s, %s)
""", dados_cli)

conn.commit()

6. Processando Histórico Clínico (cli)...


# Construção e Carga da Tabela Fato (fat)

Realizamos o cruzamento (merge) do DataFrame principal com os DataFrames das dimensões para recuperar as chaves substitutas (SRKs). Em seguida, selecionamos as métricas e carregamos tudo na tabela final dw.fat_saude_pessoa.

In [8]:
# ---------------------------------------------------------
# 8. TABELA FATO (Mnemônico: fat)
# ---------------------------------------------------------
print("7. Montando Fato (fat)...")

# Join para pegar os IDs (SRKs)
df_fat = df.merge(df_dmg, on=cols_dmg_origem) \
           .merge(df_est, on=cols_est_origem) \
           .merge(df_acs, on=cols_acs_origem) \
           .merge(df_cli, on=cols_cli_origem)

cols_fato_final = [
    'dmg_srk', 'est_srk', 'acs_srk', 'cli_srk', # Chaves
    'diabetes_status', 
    'bmi', 
    'general_health', 
    'mental_health_days', 
    'physical_health_days', 
    'risk_factors_count'
]

dados_fato = df_fat[cols_fato_final].values.tolist()

print(f"   -> Inserindo {len(dados_fato)} registros na Fato...")
execute_batch(cur, """
    INSERT INTO dw.fat_saude_pessoa (
        dmg_srk, est_srk, acs_srk, cli_srk, 
        fat_dia, fat_imc, fat_sau, fat_men, fat_fis, fat_ris
    ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", dados_fato, page_size=2000)

conn.commit()
cur.close()
conn.close()

print("="*50)
print("ETL Silver -> Gold Concluído (Com Mnemônicos).")
print("="*50)

7. Montando Fato (fat)...
   -> Inserindo 224143 registros na Fato...
ETL Silver -> Gold Concluído (Com Mnemônicos).
