
## PARTE 0: CONFIGURAÇÃO DO AMBIENTE
 

In [1]:
# 1. Instalação de bibliotecas usando o comando mágico %pip
# Garante que a instalação ocorra no kernel correto do notebook.
%pip install -q pandas numpy pyarrow fastparquet matplotlib seaborn

# 2. Importação das bibliotecas
import pandas as pd
import numpy as np
import os
import glob
import matplotlib.pyplot as plt
import seaborn as sns

# 3. Configuração de visualização do Pandas
# Garante que todas as colunas de um DataFrame sejam exibidas
pd.set_option('display.max_columns', None)

# Configura o estilo dos gráficos para um visual mais agradável
sns.set_style("whitegrid")
plt.style.use("fivethirtyeight")

print("Ambiente configurado e bibliotecas importadas com sucesso!")
print(f"Versão do Pandas: {pd.__version__}")

Note: you may need to restart the kernel to use updated packages.
Ambiente configurado e bibliotecas importadas com sucesso!
Versão do Pandas: 2.3.3



## PARTE 1: CARGA E ESTRUTURAÇÃO DO SINASC


In [2]:
# --- 1.1 Carregando os dados do SINASC (Nascidos Vivos) ---
print("Iniciando a carga dos dados do SINASC...")

# ETAPA 1: Encontrar todos os caminhos dos arquivos .parquet
# O padrão 'sinasc_2020_2022/**/*.parquet' busca recursivamente em todas as subpastas.
parquet_path = 'sinasc_2020_2022/**/*.parquet'
list_parqutet_files = glob.glob(parquet_path, recursive=True)
print(f"Encontrados {len(list_parqutet_files)} arquivos Parquet.")
# Vamos exibir os 3 primeiros caminhos para verificar se estão corretos.
print("Exemplo de caminhos encontrados:")
print(list_parqutet_files[:3])

# ETAPA 2: Iterar, ler cada arquivo, extrair metadados e armazenar em uma lista
list_dfs = []

for file_path in list_parqutet_files:
    temp_df = pd.read_parquet(file_path)   # Lê o arquivo Parquet em um DataFrame temporário
    path_parts = file_path.split(os.sep) # Divide o caminho em partes
    dir_name = path_parts[-2]  # Nome do diretório (ano)
    year_collect = int(dir_name[-4:]) # Extrai o ano dos últimos 4 caracteres
    uf_collect = dir_name[2:4]   # Extrai a UF dos caracteres na posição 2 e 3

    temp_df['ANO_COLETA'] = year_collect  # Adiciona a coluna do ano de coleta
    temp_df['UF_COLETA'] = uf_collect    # Adiciona a coluna da UF de coleta

    list_dfs.append(temp_df)  # Adiciona o DataFrame temporário à lista

# ETAPA 3: Concatenar todos os DataFrames em um único DataFrame
if list_dfs:
    df_sinasc = pd.concat(list_dfs, ignore_index=True)
    print("\nDados do SINASC carregados e consolidados com sucesso!")
    print(f"Total de registros no DataFrame final: {len(df_sinasc)}")
    print(f"Total de colunas: {len(df_sinasc.columns)}")

    display(df_sinasc.head())
else:
    print("\nErro: Nenhum DataFrame foi carregado. Verifique os arquivos Parquet.")

Iniciando a carga dos dados do SINASC...
Encontrados 984 arquivos Parquet.
Exemplo de caminhos encontrados:
['sinasc_2020_2022/DNSP2022/part-00001-bd6c59fe-baea-40ee-a750-b5ab8e63d89a-c000.gz.parquet', 'sinasc_2020_2022/DNSP2022/part-00003-bd6c59fe-baea-40ee-a750-b5ab8e63d89a-c000.gz.parquet', 'sinasc_2020_2022/DNSP2022/part-00010-bd6c59fe-baea-40ee-a750-b5ab8e63d89a-c000.gz.parquet']

Dados do SINASC carregados e consolidados com sucesso!
Total de registros no DataFrame final: 3999785
Total de colunas: 21


Unnamed: 0,CODMUNNASC,LOCNASC,CODMUNRES,DTNASC,SEXO,RACACOR,PESO,IDADEMAE,ESTCIVMAE,ESCMAE,GRAVIDEZ,CONSULTAS,RACACORMAE,DTNASCMAE,GESTACAO,SEMAGESTAC,CONSPRENAT,PARTO,CODESTAB,ANO_COLETA,UF_COLETA
0,355240,1,355240,30032022,2,4,2600,33,1,4,1,3,4,19021989,4,36,5,2,2083981,2022,SP
1,355030,1,355030,29062022,2,4,3800,36,2,3,1,4,4,3061986,5,41,16,1,7711980,2022,SP
2,355710,1,355710,21062022,2,1,3035,36,2,4,1,4,1,22021986,5,38,10,2,2081377,2022,SP
3,355210,1,355210,29042022,2,1,3420,34,1,5,1,2,1,18041988,5,38,3,2,2079704,2022,SP
4,355370,1,355370,27062022,1,4,2880,22,1,4,1,4,4,2101999,5,38,11,2,2078295,2022,SP


## ESTRATÉGIA DE OTIMIZAÇÃO: REDUÇÃO DO USO DE MEMÓRIA

In [None]:
# Função para reduzir o uso de memória dos DataFrames.

def optimize_memory_usage(df, print_log=True):
    """
    Itera sobre todas as colunas de um DataFrame e modifica o tipo de dado
    para o formato mais eficiente em memória.

    Args:
    df (pd.DataFrame): O DataFrame a ser otimizado.
    print_log (bool): Se True, imprime o log da redução de memória.

    Returns:
    pd.DataFrame: O DataFrame otimizado.
    """
    if print_log:
        # Calcula o uso de memória inicial
        mem_usage_before = df.memory_usage(deep=True).sum() / 1024**2
        print(f"Uso de memória inicial: {mem_usage_before:.2f} MB")
    
    for col in df.columns:
        col_type = df[col].dtype

        # Verifica se a coluna é numérica (inteiro ou float)
        if pd.api.types.is_numeric_dtype(col_type) and not pd.api.types.is_datetime64_any_dtype(df[col]):
            c_min = df[col].min()
            c_max = df[col].max()
            
            # Se for do tipo inteiro, tenta diminuir para o menor inteiro possível
            if str(col_type)[:3] == 'int':
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)
            # Se for do tipo float, tenta diminuir para um float menor
            else:
                if c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float64)
        
        # Se a coluna for de texto (object)
        elif col_type == 'object':
            # Se a proporção de valores únicos for baixa, converte para 'category'
            # O tipo 'category' é muito mais eficiente para strings repetidas (ex: 'UF_COLETA')
            if df[col].nunique() / len(df[col]) < 0.5:
                df[col] = df[col].astype('category')
    
    if print_log:
        # Calcula o uso de memória final
        mem_usage_after = df.memory_usage(deep=True).sum() / 1024**2
        reduction = 100 * (mem_usage_before - mem_usage_after) / mem_usage_before
        print(f"Uso de memória final: {mem_usage_after:.2f} MB ({reduction:.2f}% de redução)")
        
    return df

print("Função 'optimize_memory_usage' definida com sucesso.")

Função 'optimize_memory_usage' definida com sucesso.


In [6]:
# Aplicando a otimização no df_sinasc

print("Otimizando df_sinasc...")
# Chamamos a função e sobrescrevemos a variável original com a versão otimizada
df_sinasc = optimize_memory_usage(df_sinasc)

# Faremos o mesmo para o df_ibge_cities quando o carregarmos.
# Por enquanto, garantimos que o df_sinasc está leve.

Otimizando df_sinasc...
Uso de memória inicial: 1085.42 MB
Uso de memória final: 223.78 MB (79.38% de redução)



## PARTE 1.2: CARREGANDO DADOS DO IBGE E ENRIQUECENDO O DF_SINASC

In [5]:
# --- 1.2.1 Carregando e Otimizando os dados do IBGE (.map() strategy)

print("Carregando e otimizando a base de cidades e estados do IBGE")
url_ibge_cities = 'https://raw.githubusercontent.com/leogermani/estados-e-municipios-ibge/master/municipios.csv'
df_ibge_cities = pd.read_csv(url_ibge_cities)
df_ibge_cities.rename(columns={
    'COD UF': 'COD_UF', 
    'COD': 'COD_MUN', 
    'NOME': 'NOME_MUN'
}, inplace=True)
df_ibge_cities = optimize_memory_usage(df_ibge_cities, print_log=False)
url_ibge_states = 'https://raw.githubusercontent.com/leogermani/estados-e-municipios-ibge/master/estados.csv'
df_ibge_states = pd.read_csv(url_ibge_states)
df_ibge_states.rename(columns={
    'COD': 'COD_UF', 
    'NOME': 'NOME_UF', 
    'SIGLA': 'SIGLA_UF'
}, inplace=True)

# --- 1.2.2 Preparando os "Dicionários de Tradução"

print("\nCriando dicionários de tradução para cidades e estados")
map_uf_code_to_name = pd.Series(df_ibge_states.NOME_UF.values, index=df_ibge_states.COD_UF).to_dict()
map_uf_code_to_sigla = pd.Series(df_ibge_states.SIGLA_UF.values, index=df_ibge_states.COD_UF).to_dict()
index_mun_6_digits = df_ibge_cities['COD_MUN'].astype(str).str[:6].astype(int) # capture apenas os 6 primeiros dígitos de COD_MUN
map_mun_code_to_name = pd.Series(df_ibge_cities.NOME_MUN.values, index=index_mun_6_digits).to_dict()
print("Dicionários criados.")

# --- 1.2.3 Preparando o df_sinasc e Aplicando o .map()
print("\nLimpando chaves do df_sinasc e aplicando .map() para enriquecer os dados")
codmunnasc_cleaned_str = df_sinasc['CODMUNNASC'].astype(str).str.extract('(\\d+)')[0].fillna('0') # Extrai apenas os dígitos
codmunnasc_limpo_int = pd.to_numeric( codmunnasc_cleaned_str, errors='coerce').astype('Int64') # Converte para Int64, permitindo NaN
cod_uf_nasc = pd.to_numeric(codmunnasc_cleaned_str.str[:2], errors='coerce').astype('Int64') # Extrai os 2 primeiros dígitos para COD_UF

#Aplicando o mapeamento.
df_sinasc['COD_UF_NASC'] = cod_uf_nasc
df_sinasc['NOME_UF_NASC'] = df_sinasc['COD_UF_NASC'].map(map_uf_code_to_name)
df_sinasc['SIGLA_UF_NASC'] = df_sinasc['COD_UF_NASC'].map(map_uf_code_to_sigla)
df_sinasc['NOME_MUNIC_NASC'] = codmunnasc_limpo_int.map(map_mun_code_to_name)
#Atualizando a coluna CODMUNNASC com a versão limpa
df_sinasc['CODMUNNASC'] = codmunnasc_limpo_int

df_sinasc_enriched = df_sinasc
print("Enriquecimento e limpeza da CODMUNNASC concluído.")
display(df_sinasc_enriched.sample(10))

#liberando memória
del df_ibge_cities
del df_ibge_states
import gc
gc.collect()
print("\nDataFrames auxiliares liberados da memória")

Carregando e otimizando a base de cidades e estados do IBGE

Criando dicionários de tradução para cidades e estados
Dicionários criados.

Limpando chaves do df_sinasc e aplicando .map() para enriquecer os dados
Enriquecimento e limpeza da CODMUNNASC concluído.


Unnamed: 0,CODMUNNASC,LOCNASC,CODMUNRES,DTNASC,SEXO,RACACOR,PESO,IDADEMAE,ESTCIVMAE,ESCMAE,GRAVIDEZ,CONSULTAS,RACACORMAE,DTNASCMAE,GESTACAO,SEMAGESTAC,CONSPRENAT,PARTO,CODESTAB,ANO_COLETA,UF_COLETA,COD_UF_NASC,NOME_UF_NASC,SIGLA_UF_NASC,NOME_MUNIC_NASC
451070,230440,1,230440,11092020,1,4,3640,27,1,3,1,4,4,19011993,5,38,7,1,2651351,2020,CE,23,Ceará,CE,Fortaleza
1867709,410580,1,410580,22122020,2,1,3100,22,1,4,1,4,1,6081998,5,39,13,1,2753332,2020,PR,41,Paraná,PR,Colombo
3481706,500660,1,500660,9122021,1,1,2435,25,5,4,2,4,1,15101996,5,37,9,2,2651610,2021,MS,50,Mato Grosso do Sul,MS,Ponta Porã
2614224,220770,1,220770,27122021,1,4,3860,23,5,4,1,4,4,18031998,5,41,10,1,8015899,2021,PI,22,Piauí,PI,Parnaíba
3304920,510704,1,510700,28012021,2,4,3070,24,2,5,1,4,4,22031996,5,38,9,2,3636364,2021,MT,51,Mato Grosso,MT,Primavera do Leste
2411407,355030,1,355030,13072020,1,1,2515,33,2,5,1,4,1,26051987,4,35,8,2,2058391,2020,SP,35,São Paulo,SP,São Paulo
3902532,521800,1,521800,30062022,1,4,3230,38,1,5,1,3,4,9031984,5,39,6,2,8001510,2022,GO,52,Goiás,GO,Porangatu
2009881,520870,1,520870,27022021,1,1,3480,21,2,5,1,4,1,26121999,5,37,9,1,2338564,2021,GO,52,Goiás,GO,Goiânia
1970526,520870,1,520870,4112021,2,1,2460,37,2,5,1,4,1,19061984,5,37,10,2,2518392,2021,GO,52,Goiás,GO,Goiânia
2156437,353070,1,353070,2052021,2,4,2618,26,2,4,1,4,4,16021995,5,40,12,1,2096463,2021,SP,35,São Paulo,SP,Mogi Guaçu



DataFrames auxiliares liberados da memória


## DIAGNÓSTICO: Análise das colunas DTNASC e DTNASCMAE antes da limpeza.


In [None]:
print("--- INICIANDO DIAGNÓSTICO DAS COLUNAS DE DATA ---")

df_diag = df_sinasc_enriched.copy()

# 1 - VALIDAÇÃO DO ANO DE COLETA
print("\n--- Validando o Ano de Coleta ---")
valid_years = [2020, 2021, 2022]
find_years = df_diag['ANO_COLETA'].unique()
invalid_years = [year for year in find_years if year not in valid_years]

if invalid_years:
    print(f"ALERTA: A coluna 'ANO_COLETA' contém anos inválidos: {invalid_years}")
    # Definimos nosso teto de realidade manualmente
    ceiling_year = 2022
else:
    print(f"OK: Todos os anos de coleta encontrados ({list(find_years)}) são válidos.")
    # Usamos o máximo dos dados, pois confiamos neles
    ceiling_year = df_diag['ANO_COLETA'].max()
    
print(f"==> Usando {ceiling_year} como o ano máximo de referência para a análise.")

date_cols_to_diagnose = ['DTNASC', 'DTNASCMAE']

for col in date_cols_to_diagnose:
    print(f"\n\n---Análise da coluna: {col}---")
    # 2 - Analise de tipos e nulos
    print(f"Tipo de dado original: { df_diag[col].dtype}")
    null_count = df_diag[col].isnull().sum()
    print(f"Valores nulos (originais): { null_count} ({ null_count / len(df_diag):.2%})") # Exibe a contagem e a porcentagem de valores nulos

    # 3 - Analise do formato (após converter para string)
    print("\nAnálise de formato (Tamanho esperado da string 8 'DDMMAAAA')")
    series_str = df_diag[col].astype(str)
    length_counts = series_str.str.len().value_counts().sort_index()
    print(f"Contagem de tamanhos de string: { length_counts}")
    
    # Verificando conteúdo não-numérico
    no_digit = series_str[~series_str.str.isdigit().fillna(False)] # Filtra valores que não são dígitos
    if not no_digit.empty:
        print(f"\nEncontrados {len(no_digit)} valores não numericos (estão 'sujos').")
        print("Exemplos de valores não numéricos")
        display(no_digit.value_counts().head())
    else:
        print("\n Todos os valores (não nulos) são digitos. (estão 'limpos').")

    # 4 - Tentativa de conversão e análise de erros
    series_dt = pd.to_datetime( series_str, format='%d%m%Y', errors='coerce') # Converte para datetime, valores inválidos viram NaT
    count_convert_fails = series_dt.isnull().sum() # Conta quantos valores falharam na conversão
    print(f"\n Após conversão para data (ddmmyyyy):")
    print(f"Total falhas na conversão (NaT): { count_convert_fails } ({ count_convert_fails / len(df_diag):.2%})") #

    # 5 - Análise de intervalos (datas convertidas e plausíveis)
    
    valid_dates = series_dt.dropna()
    if not valid_dates.empty:
        print(f"\n Análise de intervalos das datas válidas:")
        print(f"Data mínima: { valid_dates.min().date() }")
        print(f"Data máxima: { valid_dates.max().date() }")

        #verificando datas futuras (dentro do contexto do DataFrame 2020-2022)
        future_dates = valid_dates[valid_dates.dt.year > ceiling_year]

        if not future_dates.empty:
            print(f"\n ALERTA: Encontradas { len(future_dates) } datas posteriores ao último ano válido de coleta ({ceiling_year})!")
            print("Exemplos de datas futuras:")
            print(future_dates.value_counts().head())
        
        #verificando datas muito antigas (dentro do contexto do DataFrame 2020-2022)
        old_dates = valid_dates[valid_dates.dt.year < 1920]
        if not old_dates.empty:
            print(f"\n ALERTA: Encontradas { len(old_dates) } datas anteriores a 1920!")
            print("Exemplos de datas muito antigas:")
            print(old_dates.value_counts().head())
            
print("\n--- FIM DO DIAGNÓSTICO DAS COLUNAS DE DATA ---")

## PARTE 2: PIPELINE DE LIMPEZA E ENGENHARIA DE FEATURES

In [None]:
#Funções para etapas de limpeza e engenharia de features
def clean_to_numeric(series):
    """
    Converte uma série para numérico, extraindo apenas os dígitos.
    """
    return pd.to_numeric(series.astype(str).str.extract('(\\d+)', expand=False), errors='coerce')

def clean_to_datetime(series):
    """
    Converte uma série para datetime, extraindo o formato 'DDMMAAAA'.
    """
    return pd.to_datetime( series.astype(str).str.extract('( \\d{ 8})', expand=False), format='%d%m%Y', errors='coerce')

def clean_to_categorical_code(series):
    """
    Converte uma série para numérico, extraindo apenas o dígito único.
    """
    cleaned_series = series.astype(str).str.extract('( \\d)', expand=False)
    return pd.to_numeric(cleaned_series, errors='coerce')

def calculate_age_manual(row):
    """
    Calcula a idade da mãe no nascimento com base nas datas de nascimento.
    """
    nasc_date, mae_nasc_date = row['DTNASC'], row['DTNASCMAE']
    if pd.isna(nasc_date) or pd.isna(mae_nasc_date): return np.nan
    age = nasc_date.year - mae_nasc_date.year # Calcula a diferença de anos
    if (nasc_date.month, nasc_date.day) < (mae_nasc_date.month, mae_nasc_date.day): # Verifiqua se o aniversário já ocorreu no ano do nascimento
        age -= 1 # Subtrai 1 se o aniversário ainda não ocorreu
    return age