# Libs

In [None]:
import pyspark
import os
import pandas as pd
from pyspark.sql.functions import *
from pyspark.sql.functions import col
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from functools import reduce

In [None]:
#Iniciando a sessão com spark
spark = (
    SparkSession.builder
    .master('local')
    .appName("Pyspark_01")
    .getOrCreate()
)

# SINASC

## Leitura dos dataframes

In [None]:
# Define o caminho do diretório onde estão os arquivos CSV
diretorio = r'C:\Users\carol\OneDrive\Estudos\MBA Data Science\TCC\SINASC\Dados_SINASC'

# Lista para armazenar os DataFrames de cada arquivo CSV
dataframes = []

# Percorre todos os arquivos no diretório
for arquivo in os.listdir(diretorio):
    if arquivo.endswith('.csv'):  # Verifica se o arquivo é um CSV
        caminho_arquivo = os.path.join(diretorio, arquivo)
        # Lê o arquivo CSV com o separador ';'
        df = spark.read.option("header", "true").csv(caminho_arquivo, sep=';')
        # Remove a coluna "CONTADOR" ou "contador" do DataFrame, se existir
        for coluna in df.columns:
            if coluna.lower() == "contador":
                df = df.drop(coluna)
        dataframes.append(df)

# Define uma função para unir dois DataFrames preservando apenas as colunas de mesmo nome
def merge_dfs(df1, df2):
    # Seleciona as colunas comuns para ambas os DataFrames
    colunas_comuns = [coluna for coluna in df2.columns if coluna in df1.columns]
    # Seleciona as colunas comuns para ambos os DataFrames
    df2 = df2.select(colunas_comuns)
    # Realiza a união dos DataFrames
    return df1.unionByName(df2)

# Aplica a função de união em todos os DataFrames
df_sinasc = reduce(merge_dfs, dataframes)

In [None]:
df_sinasc.show(5)

In [None]:
# Conta o número de linhas no DataFrame
num_linhas = df_sinasc.count()
# Obtém o número de colunas no DataFrame
num_colunas = len(df_sinasc.columns)
# Exibe a quantidade de linhas e colunas
print(f"Quantidade de linhas: {num_linhas}")
print(f"Quantidade de colunas: {num_colunas}")

In [None]:
df_sinasc.printSchema()

In [None]:
#for coluna in df_sinasc.columns:
#    print(coluna, df_sinasc.filter(df_sinasc[coluna].isNull()).count())

ORIGEM 0
CODESTAB 197029
CODMUNNASC 0
LOCNASC 0
IDADEMAE 235
ESTCIVMAE 134793
ESCMAE 134998
CODOCUPMAE 4337117
QTDFILVIVO 588584
QTDFILMORT 929109
CODMUNRES 0
GESTACAO 282763
GRAVIDEZ 23468
PARTO 13558
CONSULTAS 8937
DTNASC 0
HORANASC 21353
SEXO 0
APGAR1 344933
APGAR5 345625
RACACOR 610120
PESO 5385
IDANOMAL 188364
DTCADASTRO 1166
CODANOMAL 20350522
NUMEROLOTE 21474
VERSAOSIST 22154
DTRECEBIM 72732
DIFDATA 0
DTRECORIGA 14126025
NATURALMAE 350957
CODMUNNATU 351799
CODUFNATU 351799
ESCMAE2010 270170
SERIESCMAE 7445179
DTNASCMAE 194067
RACACORMAE 735202
QTDGESTANT 633460
QTDPARTNOR 859196
QTDPARTCES 953151
IDADEPAI 12770015
DTULTMENST 10133444
SEMAGESTAC 289627
TPMETESTIM 289613
CONSPRENAT 369974
MESPRENAT 537813
TPAPRESENT 209143
STTRABPART 327999
STCESPARTO 296096
TPNASCASSI 165082
TPFUNCRESP 774876
TPDOCRESP 140325
DTDECLARAC 314407
ESCMAEAGR1 270170
STDNEPIDEM 1595
STDNNOVA 0
CODPAISRES 2924761
TPROBSON 0
PARIDADE 0
KOTELCHUCK 0

In [None]:
#selecionando parte de colunas
colunas_selecionadas = ['DTNASC','LOCNASC','CODMUNNASC','CODESTAB','SEXO','RACACOR','PESO','CODANOMAL','IDANOMAL','GESTACAO','SEMAGESTAC','GRAVIDEZ','TPAPRESENT','STTRABPART','STCESPARTO','PARTO','TPNASCASSI','IDADEMAE','ESTCIVMAE','RACACORMAE','ESCMAE','ESCMAE2010','ESCMAEAGR1','CODOCUPMAE','PARIDADE','QTDGESTANT','QTDFILVIVO','QTDFILMORT','QTDPARTNOR','QTDPARTCES','MESPRENAT','CONSULTAS','CONSPRENAT','KOTELCHUCK','TPROBSON','IDADEPAI']
df_sinasc_fil = df_sinasc[colunas_selecionadas]

In [None]:
df_sinasc_fil.printSchema()

In [None]:
# Conta o número de linhas no DataFrame
num_linhas = df_sinasc_fil.count()
# Obtém o número de colunas no DataFrame
num_colunas = len(df_sinasc_fil.columns)
# Exibe a quantidade de linhas e colunas
print(f"Quantidade de linhas: {num_linhas}")
print(f"Quantidade de colunas: {num_colunas}")

In [None]:
df_sinasc_fil.show(5)

## Transformação de colunas

### DTNASC

In [None]:
#checando os anos disponíveis
'''valores_distintos = df_sinasc_fil.select(substring("DTNASC", -4, 4).alias("Primeiros_4_Caracteres_DT")).distinct()
valores_distintos = valores_distintos.orderBy("Primeiros_4_Caracteres_DT")
valores_distintos.show()'''

In [None]:
# Converte a coluna 'DTNASC' para string e preenche com zeros à esquerda
df_sinasc_fil = df_sinasc_fil.withColumn("DTNASC", lpad(col("DTNASC").cast("string"), 8, '0'))
df_sinasc_fil = df_sinasc_fil.withColumn("DTNASC", to_date(col("DTNASC"), "ddMMyyyy"))

#poderíamos usar o seguinte código
#df_sinasc_fil = df_sinasc_fil.withColumn("DTNASC", to_date(lpad(col("DTNASC").cast("string"), 8, '0'), "ddMMyyyy"))

In [None]:
#Cria a coluna de ano
df_sinasc_fil = df_sinasc_fil.withColumn("ano", year(df_sinasc_fil["DTNASC"]))

### CODANOMAL

In [None]:
# Substitui a letra X por nada na coluna 'CODANOMAL'
df_sinasc_fil = df_sinasc_fil\
                .withColumn("CODANOMAL", regexp_replace(col("CODANOMAL"), "X", ""))\
                # Substitui 'Q356' por 'Q359' na coluna 'CODANOMAL'
                .withColumn("CODANOMAL", when(col("CODANOMAL") == "Q356", "Q359").otherwise(col("CODANOMAL")))\
                # Conta quantas letras possui no campo CODANOMAL e substitui valores nulos por 0
                .withColumn("qt_anomal", (col("CODANOMAL").rlike("[a-zA-Z]")).cast("int"))

### CODESTAB

In [None]:
df_sinasc_fil = df_sinasc_fil.withColumn("CODESTAB", col("CODESTAB").cast("string"))

## Criação de novas colunas

### indice

In [None]:
# Adiciona uma coluna de índice usando monotonically_increasing_id()
#df_sinasc_fil = df_sinasc_fil.withColumn("indice", monotonically_increasing_id())
# Converte a coluna 'indice' para string
#df_sinasc_fil = df_sinasc_fil.withColumn("indice", df_sinasc_fil["indice"].cast("string"))
df_sinasc_fil = df_sinasc_fil.withColumn("indice", monotonically_increasing_id().cast("string"))

### ano_mes

In [None]:
df_sinasc_fil = df_sinasc_fil.withColumn("ano_mes", date_format("DTNASC", "yyyy-MM"))

### atualização colunas

In [None]:
novo_nome = {
    'DTNASC': 'dt_nasc',
    'LOCNASC': 'loc_nasc',
    'CODMUNNASC': 'cod_mun_nasc',
    'CODESTAB': 'cod_estab',
    'SEXO': 'sexo',
    'RACACOR': 'raca_cor',
    'PESO': 'peso',
    'CODANOMAL': 'cod_anomal',
    'IDANOMAL': 'id_anomal',
    'GESTACAO': 'gestacao',
    'SEMAGESTAC': 'sema_gestac',
    'GRAVIDEZ': 'gravidez',
    'TPAPRESENT': 'tpa_present',
    'STTRABPART': 'st_trab_parto',
    'STCESPARTO': 'st_ces_parto',
    'PARTO': 'parto',
    'TPNASCASSI': 'tp_nasc_assi',
    'IDADEMAE': 'idade_mae',
    'ESTCIVMAE': 'est_civ_mae',
    'RACACORMAE': 'raca_cor_mae',
    'ESCMAE': 'esc_mae',
    'ESCMAE2010': 'esc_mae_2010',
    'ESCMAEAGR1': 'esc_mae_gr1',
    'CODOCUPMAE': 'cod_ocup_mae',
    'PARIDADE': 'paridade',
    'QTDGESTANT': 'qtd_gestant',
    'QTDFILVIVO': 'qtd_fil_vivo',
    'QTDFILMORT': 'qtd_fil_mort',
    'QTDPARTNOR': 'qtd_part_nor',
    'QTDPARTCES': 'qtd_part_ces',
    'MESPRENAT': 'mes_pre_nat',
    'CONSULTAS': 'consultas',
    'CONSPRENAT': 'cons_pre_nat',
    'KOTELCHUCK': 'kotelchuck',
    'TPROBSON': 'tp_robson',
    'IDADEPAI': 'idade_pai'
}

for antigo_nome, novo_nome in novo_nome.items():
    df_sinasc_fil = df_sinasc_fil.withColumnRenamed(antigo_nome, novo_nome)

In [None]:
df_sinasc_fil.printSchema()

In [None]:
df_sinasc_fil.groupBy("ano")\
            .count().orderBy("ano")\
            .withColumnRenamed("count", "qt_linhas")\
            .show()

In [None]:
#transformando colunas float que na verdade são string
colunas_float = ['raca_cor', 'cod_anomal', 'id_anomal', 'gestacao', 'gravidez', 'tpa_present', 'st_trab_parto', 'st_ces_parto', 'parto', 'tp_nasc_assi', 'est_civ_mae', 'raca_cor_mae', 'esc_mae', 'esc_mae_2010', 'cod_ocup_mae', 'qtd_gestant', 'qtd_fil_vivo', 'qtd_fil_mort', 'qtd_part_nor', 'qtd_part_ces', 'mes_pre_nat', 'consultas', 'cons_pre_nat', 'sema_gestac']

for coluna in colunas_float:
     df_sinasc_fil = df_sinasc_fil.withColumn(coluna, regexp_replace(col(coluna), "\\.0", ""))

In [None]:
#transformando coluna que está como float mas é int
#coluna_int = ['idade_mae', 'idade_pai']
#for coluna_nome in coluna_int:
#   df_sinasc_fil = df_sinasc_fil.withColumn(coluna_nome, df_sinasc_fil[coluna_nome].fillna(0).cast("int"))

In [None]:
df_sinasc_fil.show(10)

## Gerando CSV

In [None]:
#caminho_csv = r'C:\Users\carol\OneDrive\Estudos\MBA Data Science\TCC\SINASC\sinasc_compilado.csv'
#df_sinasc_fil.write.csv(caminho_csv, header=True, mode="overwrite")

# FATO ANOMALIAS

In [None]:
#criando lista distinta de cod_anomal
df_cod_anomal_sep = df_sinasc_fil\
    .select("cod_anomal")\
    .distinct()

In [None]:
#a partir da cod_anomal distinta vamos segmentar os valores que possuem mais de um cod_anomal em uma nova coluna e depois quebrar isso para linhas
df_cod_anomal_sep = df_cod_anomal_sep\
                    .withColumn("cod_anomal_sep", split(col("cod_anomal"), r"(?<=\d)(?=[a-zA-Z])"))\
                    .select(col("cod_anomal"), explode(col("cod_anomal_sep")).alias("cod_anomal_sep"))\
                    .filter(col("cod_anomal_sep").isNotNull())

In [None]:
df_cod_anomal_sep.show(5)

In [None]:
# Conta o número de linhas no DataFrame
num_linhas = df_cod_anomal_sep.count()
# Obtém o número de colunas no DataFrame
num_colunas = len(df_cod_anomal_sep.columns)
# Exibe a quantidade de linhas e colunas
print(f"Quantidade de linhas: {num_linhas}")
print(f"Quantidade de colunas: {num_colunas}")

In [None]:
df_cod_anomal_sep.printSchema()

In [None]:
#salvando CSV da base tratada
#caminho_csv = r'C:\Users\carol\OneDrive\Estudos\MBA Data Science\TCC\SINASC\cod_anomal_sep.csv'
#df_cod_anomal_sep.write.csv(caminho_csv, header=True, mode="overwrite")