# Pipeline ETL - Projeto de integração de dados - Grupo 06

## Imports e setup

In [None]:
!pip install pandas ipython

Collecting jedi>=0.16 (from ipython)
  Downloading jedi-0.19.2-py2.py3-none-any.whl.metadata (22 kB)
Downloading jedi-0.19.2-py2.py3-none-any.whl (1.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m28.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: jedi
Successfully installed jedi-0.19.2


In [None]:
import pandas as pd
from IPython.display import HTML
from glob import glob

## Informação dos dados

De [OpenDataSUS Sinan/Dengue](https://opendatasus.saude.gov.br/dataset/arboviroses-dengue):


> O Sistema de Informação de Agravos de Notificação (Sinan) tem como objetivo coletar, transmitir e disseminar dados gerados rotineiramente pela vigilância epidemiológica das três esferas de governo, por meio de uma rede informatizada, para apoiar o processo de investigação e dar subsídios à análise das informações das doenças e dos agravos de notificação compulsória. Atualmente, o sistema possui duas versões vigentes, Sinan Online e Sinan Net.

> O Sinan Online visa à inserção e disseminação dos dados de notificação e investigação de dengue e de febre de chikungunya, enquanto que o Sinan Net é alimentado pela notificação e investigação da grande maioria dos agravos e doenças, que constam na Lista Nacional de Notificação Compulsória de Doenças, Agravos e Eventos de Saúde Pública, do Anexo 1 do Anexo V da Portaria de Consolidação nº 4, de 28 de setembro de 2017, que consolida as normas sobre os sistemas e os subsistemas do Sistema Único de Saúde, mas é facultado a estados e municípios incluir outros problemas de saúde importantes para o seu contexto local.

> Destaca-se que a dengue é doença de notificação compulsória, ou seja, todo caso suspeito e/ou confirmado deve ser obrigatoriamente notificado ao Serviço de Vigilância Epidemiológica da Secretaria Municipal de Saúde (SMS). As notificações de casos suspeitos de dengue devem ser registradas na Ficha de Notificação/Investigação da dengue e chikungunya e inseridas no Sistema de Informação de Agravos de Notificação – Sinan Online. Os óbitos suspeitos pela infecção do vírus dengue (DENV) são de notificação compulsória imediata para todas as esferas de gestão do Sistema Único de Saúde (SUS), a ser realizada em até 24 horas a partir do seu conhecimento, pelo meio de comunicação mais rápido disponível. Posteriormente, os dados devem ser inseridos no Sistema de Informação de Agravos de Notificação (Sinan Online).


Para informações específicas das colunas, consultar o [Dicionário de dados](https://s3.sa-east-1.amazonaws.com/ckan.saude.gov.br/SINAN/Dengue/dic_dados_dengue.pdf).


## Extração dos dados

Baixando os dados. Obtemos os dados de 2020 à 2022 pois o uso de memória é menor que o dos registros mais recentes (e mesmo assim temos por volta de 3G de RAM sendo utilizadas para carregar os dados)

In [None]:
MIN_YEAR = 20
MAX_YEAR = 22

In [None]:
for year in range(MIN_YEAR, MAX_YEAR + 1):
    url = f"https://s3.sa-east-1.amazonaws.com/ckan.saude.gov.br/SINAN/Dengue/csv/DENGBR{year}.csv.zip"
    !wget "$url"
    !unzip -o "DENGBR{year}.csv.zip"
!rm -f *.zip # deleting all zip files

--2025-11-28 23:35:34--  https://s3.sa-east-1.amazonaws.com/ckan.saude.gov.br/SINAN/Dengue/csv/DENGBR20.csv.zip
Resolving s3.sa-east-1.amazonaws.com (s3.sa-east-1.amazonaws.com)... 3.5.234.178, 3.5.233.61, 3.5.232.37, ...
Connecting to s3.sa-east-1.amazonaws.com (s3.sa-east-1.amazonaws.com)|3.5.234.178|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 51373250 (49M) [application/zip]
Saving to: ‘DENGBR20.csv.zip’


2025-11-28 23:35:38 (13.5 MB/s) - ‘DENGBR20.csv.zip’ saved [51373250/51373250]

Archive:  DENGBR20.csv.zip
  inflating: DENGBR20.csv            
--2025-11-28 23:35:43--  https://s3.sa-east-1.amazonaws.com/ckan.saude.gov.br/SINAN/Dengue/csv/DENGBR21.csv.zip
Resolving s3.sa-east-1.amazonaws.com (s3.sa-east-1.amazonaws.com)... 3.5.232.215, 52.95.163.54, 3.5.234.130, ...
Connecting to s3.sa-east-1.amazonaws.com (s3.sa-east-1.amazonaws.com)|3.5.232.215|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 27043845 (26M) [application/zip

Carregando os arquivos individuais e juntando os DataFrames em um só

ATENÇÃO: ALTO USO DE RAM

In [None]:
common_name = "DENGBR{}.csv"

# Start with columns from the first file
sample_path = common_name.format(MAX_YEAR)
master_cols = set(pd.read_csv(sample_path, nrows=0).columns.tolist())

column_tracker = {}

all_dataframes = []

for year in range(MIN_YEAR, MAX_YEAR + 1):
    path = common_name.format(year)

    df = pd.read_csv(path, encoding='utf-8')

    for col in df.columns:
        if col not in column_tracker:
            column_tracker[col] = []
        column_tracker[col].append(year)

    all_dataframes.append(df)

# Convert master_cols to ordered list (optional: sort)
master_cols = sorted(master_cols)

# Normalize columns across all dataframes
for i in range(len(all_dataframes)):
    df = all_dataframes[i]
    for col in master_cols:
        if col not in df.columns:
            df[col] = pd.NA
    all_dataframes[i] = df[master_cols]

# Final concatenation
df = pd.concat(all_dataframes, ignore_index=True)

  df = pd.read_csv(path, encoding='utf-8')
  df = pd.read_csv(path, encoding='utf-8')
  df = pd.read_csv(path, encoding='utf-8')
  df = pd.concat(all_dataframes, ignore_index=True)


In [None]:
del all_dataframes

## Transformação dos dados

In [None]:
df.info(verbose=True, show_counts=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3899353 entries, 0 to 3899352
Data columns (total 121 columns):
 #    Column      Non-Null Count    Dtype  
---   ------      --------------    -----  
 0    ACIDO_PEPT  3782453 non-null  float64
 1    ALRM_ABDOM  38132 non-null    float64
 2    ALRM_HEMAT  37975 non-null    float64
 3    ALRM_HEPAT  37959 non-null    float64
 4    ALRM_HIPOT  38083 non-null    float64
 5    ALRM_LETAR  38005 non-null    float64
 6    ALRM_LIQ    37971 non-null    float64
 7    ALRM_PLAQ   38167 non-null    float64
 8    ALRM_SANG   38066 non-null    float64
 9    ALRM_VOM    38043 non-null    float64
 10   ANO_NASC    2389636 non-null  float64
 11   ARTRALGIA   3782453 non-null  float64
 12   ARTRITE     3782453 non-null  float64
 13   AUTO_IMUNE  3782453 non-null  float64
 14   CEFALEIA    3782453 non-null  float64
 15   CLASSI_FIN  3895398 non-null  float64
 16   CLINC_CHIK  22748 non-null    float64
 17   COMPLICA    0 non-null        float64
 18   

In [None]:
total_samples = len(df)

### Verificando quantidade de valores únicos

In [None]:
unique_value_counts = df.nunique(dropna=False)

In [None]:
HTML(unique_value_counts.to_frame("n_unique").to_html(max_rows=None))

Unnamed: 0,n_unique
ACIDO_PEPT,3
ALRM_ABDOM,3
ALRM_HEMAT,3
ALRM_HEPAT,3
ALRM_HIPOT,3
ALRM_LETAR,3
ALRM_LIQ,3
ALRM_PLAQ,3
ALRM_SANG,3
ALRM_VOM,3


Seguindo o dicionário de dados, `ID_AGRAVO` diz respeito ao código internacional de doenças (CID-10). Na base de dados adquirida, apenas um valor possível é encontrado: A90 (Dengue). Como essa coluna é não-informativa, podemos removê-la para economizar espaço


In [None]:
df.drop("ID_AGRAVO", axis=1, inplace=True)

TODO: Outros elementos possuem apenas um valor úncio mas precisa ser investigado se podem ser removidos também

### Verificando conteúdo dos valores únicos

In [None]:
columns_of_interest = unique_value_counts[unique_value_counts <= 30].index.to_list()
for column in columns_of_interest:
    if column in df.columns:
        print("=" * 10, column, "="*10)
        print(df[column].unique())

[ 2.  1. nan]
[nan  2.  1.]
[nan  2.  1.]
[nan  2.  1.]
[nan  2.  1.]
[nan  2.  1.]
[nan  2.  1.]
[nan  2.  1.]
[nan  1.  2.]
[nan  2.  1.]
[ 2.  1. nan]
[ 2.  1. nan]
[ 2.  1. nan]
[ 2.  1. nan]
[ 5. 10.  8. 11. 12. nan]
[nan  1.  2.]
[nan]
[ 2.  1. nan]
[nan]
[ nan   1.  31.  44. 126.  68. 140.  32. 186.  21. 129.  57.  72. 156.
 111.]
[nan 12. 27. 13. 15. 35. 16. 29. 52. 53. 41. 51. 31. 23. 26. 22. 33. 21.
 17. 24. 25. 32. 50. 11. 43. 42. 14. 28.]
[ 2.  1. nan  3.]
[nan  6. 10.  4.  9.  1.  7.  3.  0.  8.  2.  5.]
[ 1.  0. nan]
[ 5.  6.  2.  1.  9.  3.  4. nan]
[ 4.  1.  9.  2.  5.  3. nan]
['F' 'M' 'I' nan]
[ 2.  1. nan]
[nan]
[ 1.  2. nan]
[ 1.  2. nan]
[nan '2020-01-02' '2020-01-16' '2020-03-04' '2020-02-21' '2020-10-20'
 '2021-02-10' '2021-04-18' '2021-06-12' '2021-06-22' '2021-08-09'
 '2022-04-29' '2022-04-20' '2022-06-14' '2022-03-30' '2022-04-11'
 '2022-05-04' '2022-06-28' '2022-04-18' '2022-06-01' '2022-04-26']
[nan]
[nan]
[ 1. nan  9.  2.  3.  4.]
[ 2.  1. nan]
[ 1.  2. nan

TODO: Cruzar informações com dicionário de dados para checar se há colunas redundantes ou possíveis simplificações

### Verificando porcentagem de valores nulos

In [None]:
null_information = df.isnull().mean()

In [None]:
df_analysis = (null_information * 100).to_frame("perc_null")

# 2. Cria a nova coluna 'years_found' mapeando o índice (nome da coluna)
#    com o dicionário column_tracker.
#    Convertemos para string (str) para que a lista apareça inteira na célula da tabela.
df_analysis["years_found"] = df_analysis.index.map(lambda col: str(column_tracker.get(col, [])))

# 3. Gera o HTML ordenado pela porcentagem de nulos
HTML(df_analysis.sort_values("perc_null").to_html(max_rows=None, float_format="%.6f"))

Unnamed: 0,perc_null,years_found
DT_NOTIFIC,0.0,"[20, 21, 22]"
TP_NOT,0.0,"[20, 21, 22]"
SG_UF_NOT,0.0,"[20, 21, 22]"
SG_UF,0.0,"[20, 21, 22]"
NU_IDADE_N,0.0,"[20, 21, 22]"
ID_MUNICIP,0.0,"[20, 21, 22]"
NU_ANO,0.0,"[20, 21, 22]"
SEM_PRI,2.6e-05,"[20, 21, 22]"
ID_PAIS,0.000308,"[20, 21, 22]"
CS_SEXO,0.000949,"[20, 21, 22]"


### Colunas removidas

|Coluna    |NULL PERCENT|NOME                               |SIGNIFICADO                                                                                        |OBS                                                                            |APAGADO|
|----------|------------|-----------------------------------|---------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------|-------|
|UF        |97.140192   |UF (Geral/Duplicado)               |Provável duplicata ou alias para SG_UF (UF de Residência). Ver campo SG_UF.                        |Ver definição de SG_UF no documento.                                           |       |
|DOENCA_TRA|100.000000  |Doença do Trabalho                 |Indica se a doença está ou não relacionada ao trabalho.                                            |Habilitado se confirmado.                                                      |       |
|MIGRADO_W |100.000000  |Migração Windows                   |Identifica se o registro é oriundo da rotina de migração da base Windows.                          |Campo interno.                                                                 |       |

In [None]:
df = df.drop(columns=["UF", "DOENCA_TRA", "MIGRADO_W"])

## Load (Carregamento) dos dados

In [None]:
def mapper(row):
  return {
      "Notificacao": {
"idade_paciente":	row.NU_IDADE_N,
"ano":	row.NU_ANO,
"tipo":	row.TP_NOT,
"semana_inicio":	row.SEM_PRI,
"data_inicio_sintomas":	row.DT_SIN_PRI,
"municipio_residencia":	row.ID_MN_RESI,
"idade_gestacional":	row.CS_GESTANT,
"Classificacao_final":	row.CLASSI_FIN,
"data_encerramentoo":	row.DT_ENCERRA,
"Sistema":	row.TP_SISTEMA,
"data_inicio_investigacao":	row.DT_INVEST,
"criterio_confirmacao":	row.CRITERIO,
"evolucao_caso":	row.EVOLUCAO,
"hospitalizacao":	row.HOSPITALIZ,
"semana_notificacao":	row.SEM_NOT,
"fluxo_retorno":	row.CS_FLXRET,
"data_coleta_igm_dengue":	row.DT_SORO,
"data_coleta_ns1":	row.DT_NS1,
"caso_autocne":	row.TPAUTOCTO,
"pais_infeccao":	row.COPAISINF,
"uf_infeccao":	row.COUFINF,
"municipio_infeccao":	row.COMUNINF,
"data_internacao":	row.DT_INTERNA,
"municipio_internacao":	row.MUNICIPIO,
"data_coleta_rt_pcr":	row.DT_PCR,
"data_obito":	row.DT_OBITO,
"duplicada":	row.NDUPLIC_N,
"receb_flux_retorno":	row.FLXRECEBI,
"data_digitacao": row.DT_DIGITA,
      },
      "Paciente": {
"UF_residencia": row.SG_UF,
"pais_residencia_exterior": row.ID_PAIS,
"sexo": row.CS_SEXO,
"Raça/Cor": row.CS_RACA,
"id_regional_saude": row.ID_RG_RESI,
"Escolaridade": row.CS_ESCOL_N,
"Ocupação": row.ID_OCUPA_N,
"sa_dor_abdominal": row.ALRM_ABDOM,
"sa_hipotensão": row.ALRM_HIPOT,
"sa_sangramento": row.ALRM_SANG,
"sa_vômitos": row.ALRM_VOM,
"sa_letargia": row.ALRM_LETAR,
"sa_hematócrito": row.ALRM_HEMAT,
"sa_líquidos": row.ALRM_LIQ,
"sa_hepatomegalia": row.ALRM_HEPAT,
"data_sa": row.DT_ALRM,
"sorotipo": row.SOROTIPO,
"tipo_chikungunya": row.CLINC_CHIK,
"data_nascimento": row.DT_NASC
      },
      "Unidade_Saude": {
"municipio":	row.ID_MUNICIP,
"UF":	row.SG_UF_NOT,
"Codigo (Nome completo + codigo)":	row.ID_UNIDADE,
"id_regional_saude":	row.ID_REGIONA,
      },
      "Exames": {
  "resultado_igm": row.RESUL_SORO,
"resultado_ns1": row.RESUL_NS1,
"resultado_rt_pcr": row.RESUL_PCR_,
"resultado_isolamento_viral": row.RESUL_VI_N,
"resultado_imunohistoquímica": row.IMUNOH_N,
"resultado_histopatologia": row.HISTOPA_N,
"resultado_IgM_Chik_S1": row.RES_CHIKS1,
"resultado_IgM_Chik_S2": row.RES_CHIKS2,
"resultado_PRNT": row.RESUL_PRNT,
"data_coleta_isolamento_viral": row.DT_VIRAL,
"data_coleta_igm_chik_S1": row.DT_CHIK_S1,
"data_coleta_prnt": row.DT_PRNT,
"data_coleta_igm_chik_S2": row.DT_CHIK_S2
      },
      "Quadro_Clinico": {
"sc_FEBRE":	row.FEBRE,
"sc_LEUCOPENIA":	row.LEUCOPENIA,
"sc_EXANTEMA":	row.EXANTEMA,
"sc_DOR_RETRO":	row.DOR_RETRO,
"dpe_hipertensao":	row.HIPERTENSA,
"sc_CONJUNTVIT":	row.CONJUNTVIT,
"sc_CEFALEIA":	row.CEFALEIA,
"sc_DOR_COSTAS":	row.DOR_COSTAS,
"dpe_diabetes":	row.DIABETES,
"dpe_auto_imunes":	row.AUTO_IMUNE,
"dpe_ácido_péptica":	row.ACIDO_PEPT,
"sc_Artralgia_intensa":	row.ARTRALGIA,
"sc_Artrite":	row.ARTRITE,
"dpe_Doenças_hematológicas":	row.HEMATOLOG,
"dpe_Hepatopatias":	row.HEPATOPAT,
"sc_prova_laço":	row.LACO,
"dpe_Doença_renal_crônica":	row.RENAL,
"sc_Vômito":	row.VOMITO,
"sc_Náusea":	row.NAUSEA,
"sc_Mialgia":	row.MIALGIA,
"sc_Petéquias":	row.PETEQUIA_N,
"sa_plaquetas":	row.ALRM_PLAQ,
"dg_hipotensão":	row.GRAV_HIPOT,
"dg_taquicardia":	row.GRAV_TAQUI,
"dg_pulso":	row.GRAV_PULSO,
"dg_extremidades":	row.GRAV_EXTRE,
"dg_insuf_Resp":	row.GRAV_INSUF,
"dg_enchimento":	row.GRAV_ENCH,
"dg_melena":	row.GRAV_MELEN,
"dg_pa":	row.GRAV_CONV,
"dg_consciência":	row.GRAV_CONSC,
"dg_hematêmese":	row.GRAV_HEMAT,
"dg_metrorragia":	row.GRAV_METRO,
"dg_órgãos":	row.GRAV_ORGAO,
"dg_sangramento":	row.GRAV_SANG,
"dg_miocardite":	row.GRAV_MIOC,
"dg_ast_alt":	row.GRAV_AST,
"data_dengue_grave":	row.DT_GRAV,
"mh_gengivorragia":	row.GENGIVO,
"mh_epistaxe":	row.EPISTAXE,
"caso_fhd_scd":	row.CON_FHD,
"complicacoes":	row.COMPLICA,
"evidencia_extravasamento":	row.EVIDENCIA,
"mh_metrorragia":	row.METRO,
"manifestacoes_hemorragicas":	row.MANI_HEMOR,
"prova_laço_especif":	row.LACO_N,
"mh_hematúria":	row.HEMATURA,
"mh_petéquias":	row.PETEQUIAS,
"extravasamento_plasmatico":	row.PLASMATICO,
"plaquetas_menor":	row.PLAQ_MENOR,
"mh_sangramento_gastrointestinal":	row.SANGRAM,
      }
  }


In [None]:
## mapped = df.apply(mapper, axis=1).tolist() Faltou RAM

In [None]:
def generate_data_on_demand(dataframe):
    for row in df.itertuples(index=False):
        yield mapper(row)

def chunks(generator, batch_size=1000):
    batch = []
    for item in generator:
        batch.append(item)
        if len(batch) >= batch_size:
            yield batch
            batch = []
    if batch:
        yield batch

In [None]:
my_generator = generate_data_on_demand(df)

## Fazer o insert no banco aqui dentro!!!!
for batch in chunks(my_generator, batch_size=5000):
    print("BATCH 1: ", batch[0])
    break
    # Exemplo: db.insert_many(batch)

AttributeError: 'Pandas' object has no attribute 'DT_NASC'

## Finalizando o Load

In [None]:
import pandas as pd
import sqlite3
from sqlalchemy import create_engine

db_path = 'dengue_dw.db'
engine = create_engine(f'sqlite:///{db_path}')

column_map = {
    # Chaves e Tempo
    'DT_NOTIFIC': 'data_notificacao',
    'ID_MUNICIP': 'id_municipio',
    'NU_ANO': 'ano',

    # Paciente
    'NU_IDADE_N': 'idade',
    'CS_SEXO': 'sexo',
    'CS_RACA': 'raca',
    'CS_GESTANT': 'gestante',
    'CS_ESCOL_N': 'escolaridade',
    'ID_MN_RESI': 'municipio_residencia',

    # Clinico / Sintomas
    'FEBRE': 'febre', 'MIALGIA': 'mialgia', 'CEFALEIA': 'cefaleia',
    'EXANTEMA': 'exantema', 'VOMITO': 'vomito', 'NAUSEA': 'nausea',
    'DOR_COSTAS': 'dor_costas', 'CONJUNTVIT': 'conjuntivite',
    'ARTRITE': 'artrite', 'ARTRALGIA': 'artralgia',
    'DIABETES': 'diabetes', 'HIPERTENSA': 'hipertensao',

    # Exames e Conclusão
    'CLASSI_FIN': 'classificacao_final',
    'CRITERIO': 'criterio_confirmacao',
    'EVOLUCAO': 'evolucao',
    'HOSPITALIZ': 'hospitalizacao',
    'RESUL_NS1': 'res_ns1',
    'RESUL_SORO': 'res_igm',
    'RESUL_PCR_': 'res_pcr'
}

cols_to_keep = list(column_map.keys())

df_clean = df[cols_to_keep].rename(columns=column_map).copy()

# Tratamento de Data (Essencial para Dim_Tempo)
df_clean['data_notificacao'] = pd.to_datetime(df_clean['data_notificacao'], errors='coerce')

# Criar um ID único para cada notificação (Chave Primária da Fato)
df_clean['id_notificacao'] = df_clean.index + 1

print("Dados preliminares preparados.")

In [None]:
dim_tempo = pd.DataFrame(df_clean['data_notificacao'].unique(), columns=['data'])
dim_tempo = dim_tempo.dropna()
dim_tempo['ano'] = dim_tempo['data'].dt.year
dim_tempo['mes'] = dim_tempo['data'].dt.month
dim_tempo['dia'] = dim_tempo['data'].dt.day
dim_tempo['semana_ano'] = dim_tempo['data'].dt.isocalendar().week
dim_tempo['dia_semana'] = dim_tempo['data'].dt.day_name()

# Load no Banco
dim_tempo.to_sql('dim_tempo', engine, if_exists='replace', index=False)
print("Dimensão Tempo carregada.")

# Load Dimensão Localidade
dim_localidade = df_clean[['id_municipio']].drop_duplicates().dropna()
dim_localidade.to_sql('dim_localidade', engine, if_exists='replace', index=False)
print("Dimensão Localidade carregada.")

# Transformação das Outras Dimensões (Via Chave da Notificação) ---

# Dimensão Paciente (Demografia)
cols_paciente = ['id_notificacao', 'idade', 'sexo', 'raca', 'escolaridade', 'gestante', 'municipio_residencia']
dim_paciente = df_clean[cols_paciente]
dim_paciente.to_sql('dim_paciente', engine, if_exists='replace', index=False, chunksize=10000)
print("Dimensão Paciente carregada.")

# Dimensão Clínica (Sintomas)
cols_clinica = ['id_notificacao', 'febre', 'mialgia', 'cefaleia', 'exantema', 'vomito',
                'nausea', 'dor_costas', 'conjuntivite', 'artrite', 'artralgia',
                'diabetes', 'hipertensao']
dim_clinica = df_clean[cols_clinica]
dim_clinica.to_sql('dim_clinica', engine, if_exists='replace', index=False, chunksize=10000)
print("Dimensão Clínica carregada.")

# Dimensão Exames
cols_exames = ['id_notificacao', 'res_ns1', 'res_igm', 'res_pcr']
dim_exames = df_clean[cols_exames]
dim_exames.to_sql('dim_exames', engine, if_exists='replace', index=False, chunksize=10000)
print("Dimensão Exames carregada.")

In [None]:

cols_fato = [
    'id_notificacao',      # PK
    'data_notificacao',    # FK para Dim_Tempo (Join pela data)
    'id_municipio',        # FK para Dim_Localidade
    'classificacao_final', # Métrica/Status
    'evolucao',            # Métrica/Status
    'criterio_confirmacao',# Métrica/Status
    'hospitalizacao'       # Métrica/Status
]

fato_notificacao = df_clean[cols_fato]

fato_notificacao.to_sql('fato_notificacao', engine, if_exists='replace', index=False, chunksize=10000)

print("Tabela Fato carregada com sucesso!")

In [None]:
# Teste: Contagem de casos por Data e Sexo
query = """
SELECT
    f.data_notificacao,
    p.sexo,
    COUNT(*) as total_casos
FROM fato_notificacao f
JOIN dim_paciente p ON f.id_notificacao = p.id_notificacao
GROUP BY f.data_notificacao, p.sexo
ORDER BY total_casos DESC
LIMIT 10;
"""

df_resultado = pd.read_sql(query, engine)
print(df_resultado)