# Explorando o pipeline de ELT

In [1]:
# Import das bibliotecas
import os
from pathlib import Path
import json

from pysus.online_data.SIH import download
from pyspark.sql import SparkSession, functions

# Etapa de extração

In [2]:
ROOT_PATH = os.path.abspath('../..')

estado = 'mg'
ano = '2024'
mes = '1'
save_path = Path(f"{ROOT_PATH}/data/estado={estado}/ano={ano}/mes={mes}")

download(states=estado, years=ano, months=mes, groups='RD', data_dir=save_path)

100%|██████████| 9.52M/9.52M [00:00<00:00, 7.60GB/s]


/home/gustavo-cunha/Documentos/GitHub/preditor-sus-alto-custo/data/estado=mg/ano=2024/mes=1/RDMG2401.parquet

# Etapa de transformação

In [3]:
spark = SparkSession\
    .builder\
    .appName("datasus_transformation")\
    .getOrCreate()

24/07/25 21:43:29 WARN Utils: Your hostname, gustavo-cunha-IdeaPad-1-15IAU7 resolves to a loopback address: 127.0.1.1; using 192.168.0.95 instead (on interface wlp0s20f3)
24/07/25 21:43:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/25 21:43:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
df_spark = spark.read.parquet(f"../../data/estado={estado}/ano={ano}/mes={mes}/RDMG2401.parquet")

                                                                                

In [5]:
df_spark.show()

24/07/25 21:43:34 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+------+--------+--------+-----+--------------+-------------+-----+--------+---------+--------+----+----------+----------+----------+----------+---------+----------+----------+----------+----------+---------+----------+----------+----------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-----------+-------------+-----------+-----------+--------------+--------+----------+--------+--------+----------+----------+--------+--------+-------+------+-------+--------+---------+---------+-----+---------+-----+--------+--------+-------+---------+-----------+--------+----------+------+---------+----------+----------+---------+------------+--------+------+-----+--------+----------+---------+---------------+---------+-------+--------------+--------+--------+---------+-------+------+-------+-----+--------+-----+---------+--------------------+--------------------+--------------------+----------+----------+----------+----------+----------+---------+---

In [6]:
df_spark.printSchema()

root
 |-- UF_ZI: string (nullable = true)
 |-- ANO_CMPT: string (nullable = true)
 |-- MES_CMPT: string (nullable = true)
 |-- ESPEC: string (nullable = true)
 |-- CGC_HOSP: string (nullable = true)
 |-- N_AIH: string (nullable = true)
 |-- IDENT: string (nullable = true)
 |-- CEP: string (nullable = true)
 |-- MUNIC_RES: string (nullable = true)
 |-- NASC: string (nullable = true)
 |-- SEXO: string (nullable = true)
 |-- UTI_MES_IN: string (nullable = true)
 |-- UTI_MES_AN: string (nullable = true)
 |-- UTI_MES_AL: string (nullable = true)
 |-- UTI_MES_TO: string (nullable = true)
 |-- MARCA_UTI: string (nullable = true)
 |-- UTI_INT_IN: string (nullable = true)
 |-- UTI_INT_AN: string (nullable = true)
 |-- UTI_INT_AL: string (nullable = true)
 |-- UTI_INT_TO: string (nullable = true)
 |-- DIAR_ACOM: string (nullable = true)
 |-- QT_DIARIAS: string (nullable = true)
 |-- PROC_SOLIC: string (nullable = true)
 |-- PROC_REA: string (nullable = true)
 |-- VAL_SH: string (nullable = true)

In [7]:
# Filtrando pessoas que não vieram a falecer durante a internação

df_spark.select("MORTE").where(" MORTE == 0").show()

+-----+
|MORTE|
+-----+
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
+-----+
only showing top 20 rows



In [8]:
#Filtrando colunas de interes para o modelo de predicao de alto custo

filter_columns = [
    'ANO_CMPT',
    'MES_CMPT',
    'CEP',
    'SEXO',
    'RACA_COR',
    'INSTRU',
    'IDADE',
    'DIAG_PRINC',
    'DIAS_PERM',
    'US_TOT'
]

df_spark\
    .where("MORTE == 0")\
    .select(filter_columns).show()

+--------+--------+--------+----+--------+------+-----+----------+---------+----------+
|ANO_CMPT|MES_CMPT|     CEP|SEXO|RACA_COR|INSTRU|IDADE|DIAG_PRINC|DIAS_PERM|    US_TOT|
+--------+--------+--------+----+--------+------+-----+----------+---------+----------+
|    2024|      01|36895000|   3|      01|     0|   50|      I200|        4|    228.33|
|    2024|      01|36555000|   1|      02|     0|   59|      I219|        3|    297.00|
|    2024|      01|36880002|   3|      02|     0|   55|      I509|        5|    526.19|
|    2024|      01|36185000|   3|      01|     0|   63|      A488|        2|     68.86|
|    2024|      01|36180000|   3|      03|     0|   79|      J110|        3|    122.50|
|    2024|      01|36180000|   1|      01|     0|   77|      E46 |        3|     95.10|
|    2024|      01|36180000|   1|      03|     0|   66|      J159|        2|    120.89|
|    2024|      01|36160000|   3|      01|     0|   60|      E136|        3|     77.73|
|    2024|      01|36180000|   1

In [9]:
# substituindo os códigos por valores semanticos

dicionario_sexo   = {'1':'Masculino', '3':'Feminino'}
dicionario_raca   = {'99':'ND', '01':'Branca', '02':'Negra', '03':'Parda', '04':'Amarela', '05':'Indigena'}
dicionario_instru = {'0':'ND', '1':'Analfabeto', '2':'1_Grau', '3':'2_Grau', '4':'Superior'}

In [10]:
df_spark\
    .select("SEXO", "RACA_COR", "INSTRU")\
    .replace(dicionario_sexo)\
    .replace(dicionario_raca)\
    .replace(dicionario_instru)\
    .distinct().show()

+---------+--------+---------+
|     SEXO|RACA_COR|   INSTRU|
+---------+--------+---------+
| Feminino|  Branca|   1_Grau|
| Feminino|   Parda|       ND|
| Feminino| Amarela|   1_Grau|
|Masculino|  Branca|   1_Grau|
| Feminino|  Branca| Superior|
|Masculino|   Negra|       ND|
|Masculino|  Branca|       ND|
|Masculino|   Parda|       ND|
| Feminino|Indigena|       ND|
| Feminino|   Negra|   1_Grau|
| Feminino|   Negra| Superior|
|Masculino|   Parda| Superior|
| Feminino| Amarela| Feminino|
| Feminino| Amarela| Superior|
| Feminino|   Parda|Masculino|
|Masculino|   Parda| Feminino|
|Masculino| Amarela|       ND|
| Feminino| Amarela|       ND|
| Feminino|  Branca|Masculino|
| Feminino|   Negra|Masculino|
+---------+--------+---------+
only showing top 20 rows



In [11]:
# Criando a coluna de tratamento cronico
# primeiro definir os CIDs para doenças cronicas não transmissíveis

def lista_cids(min: int, max: int, prefix_cod: str):
    lista = []
    for i in range(min, max+1):
        if i < 10:
            lista.append(f"{prefix_cod}0{i}")
        else:
            lista.append(f"{prefix_cod}{i}")
    return lista

cid_cardiovasculares = lista_cids(0, 99, 'I')
cid_respiratorio = lista_cids(30, 98, 'J')
cid_diabetes = lista_cids(10, 14, 'E')
cid_obseidade = lista_cids(65, 68, 'E')
cid_neoplasia = lista_cids(0, 97, 'C')
cid_figado = lista_cids(70, 74, 'K')
cid_renal = lista_cids(3, 5, 'N') + lista_cids(13, 16, 'N') + ['N11', 'N18']

In [12]:
# Retirando o ultimo digito do cid para filtrar
df_cid_3 = df_spark.withColumn(
    "DIAG_PRINC",
    functions.expr("substring(DIAG_PRINC, 1, length(DIAG_PRINC) - 1)")
).select("DIAG_PRINC")

df_cid_3.withColumn("CLASS_DIAG",
    functions\
        .when(df_cid_3["DIAG_PRINC"].isin(cid_cardiovasculares), "CARDIOVASCULAR")
    ).select("DIAG_PRINC", "CLASS_DIAG").show()

+----------+--------------+
|DIAG_PRINC|    CLASS_DIAG|
+----------+--------------+
|       I20|CARDIOVASCULAR|
|       I21|CARDIOVASCULAR|
|       I50|CARDIOVASCULAR|
|       A48|          NULL|
|       J11|          NULL|
|       E46|          NULL|
|       J15|          NULL|
|       R57|          NULL|
|       E13|          NULL|
|       A48|          NULL|
|       K81|          NULL|
|       O80|          NULL|
|       O82|          NULL|
|       O82|          NULL|
|       O82|          NULL|
|       O80|          NULL|
|       O80|          NULL|
|       I50|CARDIOVASCULAR|
|       I48|CARDIOVASCULAR|
|       N39|          NULL|
+----------+--------------+
only showing top 20 rows



### Unificando as transformacoes

In [13]:
dict_dcnt = {
    "CARDIOVASCULAR" : cid_cardiovasculares,
    "RESPIRATORIO" : cid_respiratorio,
    "DIABTES" : cid_diabetes,
    "OBESIDADE" : cid_obseidade,
    "NEPLASIA" : cid_neoplasia,
    "FIGADO" : cid_figado,
    "RENAL" : cid_renal
}

with open("cid_doenca_cronica_nt.json", "w") as config:
    json.dump(dict_dcnt, config)

In [14]:
def transform_dataframe_alto_custo(df):
    df = df.where("MORTE == 0")
    df = df.withColumn("DIAG_PRINC",functions.expr("substring(DIAG_PRINC, 1, length(DIAG_PRINC) - 1)"))

    df = df.select(filter_columns)\
            .replace(dicionario_sexo, subset=["SEXO"])\
            .replace(dicionario_raca, subset=["RACA_COR"])\
            .replace(dicionario_instru, subset=["INSTRU"])
            
    df = df.withColumn("DIAG_CLASS",
                        functions.when(df["DIAG_PRINC"].isin(cid_cardiovasculares), "CARDIOVASCULAR")\
                        .when(df["DIAG_PRINC"].isin(cid_respiratorio), "RESPIRATORIO")\
                        .when(df["DIAG_PRINC"].isin(cid_diabetes), "DIABTES")\
                        .when(df["DIAG_PRINC"].isin(cid_obseidade), "OBESIDADE")\
                        .when(df["DIAG_PRINC"].isin(cid_neoplasia), "NEPLASIA")\
                        .when(df["DIAG_PRINC"].isin(cid_figado), "FIGADO")\
                        .when(df["DIAG_PRINC"].isin(cid_renal), "RENAL")
                        )
    
    df = df.na.drop(subset=["DIAG_CLASS"])
    
    return df

In [15]:
df_final = transform_dataframe_alto_custo(df_spark)

In [16]:
df_final.show()

+--------+--------+--------+---------+--------+------+-----+----------+---------+----------+--------------+
|ANO_CMPT|MES_CMPT|     CEP|     SEXO|RACA_COR|INSTRU|IDADE|DIAG_PRINC|DIAS_PERM|    US_TOT|    DIAG_CLASS|
+--------+--------+--------+---------+--------+------+-----+----------+---------+----------+--------------+
|    2024|      01|36895000| Feminino|  Branca|    ND|   50|       I20|        4|    228.33|CARDIOVASCULAR|
|    2024|      01|36555000|Masculino|   Negra|    ND|   59|       I21|        3|    297.00|CARDIOVASCULAR|
|    2024|      01|36880002| Feminino|   Negra|    ND|   55|       I50|        5|    526.19|CARDIOVASCULAR|
|    2024|      01|36160000| Feminino|  Branca|    ND|   60|       E13|        3|     77.73|       DIABTES|
|    2024|      01|36530000| Feminino|  Branca|    ND|   79|       I50|        5|    149.38|CARDIOVASCULAR|
|    2024|      01|36520000|Masculino|  Branca|    ND|   59|       I48|        5|    665.52|CARDIOVASCULAR|
|    2024|      01|36555000|

In [17]:
df_final.printSchema()

root
 |-- ANO_CMPT: string (nullable = true)
 |-- MES_CMPT: string (nullable = true)
 |-- CEP: string (nullable = true)
 |-- SEXO: string (nullable = true)
 |-- RACA_COR: string (nullable = true)
 |-- INSTRU: string (nullable = true)
 |-- IDADE: string (nullable = true)
 |-- DIAG_PRINC: string (nullable = true)
 |-- DIAS_PERM: string (nullable = true)
 |-- US_TOT: string (nullable = true)
 |-- DIAG_CLASS: string (nullable = true)

