In [1]:
#Instalando fuções de tratamento de dados
from pyspark.sql import functions as F
from pyspark.sql.window import Window


StatementMeta(, f003dfb1-93ed-469b-a40d-10a8c99a6a1d, 3, Finished, Available, Finished)

In [3]:
#Lendo as tabelas que estão no Lake_SUS
amb = spark.read.table("Ambulatorial")
interna = spark.read.table("Internacoes")
mort = spark.read.table("Mortalidade")

StatementMeta(, f003dfb1-93ed-469b-a40d-10a8c99a6a1d, 5, Finished, Available, Finished)

In [None]:
#Dimensão Tempo
#Junta todas as datas relevantes das 3 bases em uma unica coluna, garante que todas vão estar como date e separa por ano, mes, trimestre e decada. 
df_tempo = (
    amb.select(F.col("DATA_CMP").alias("data_ref"))
    .unionByName(interna.select(F.col("DT_INTER").alias("data_ref")))
    .unionByName(mort.select(F.col("DATOBITO").alias("data_ref")))
    .filter(F.col("data_ref").isNotNull())
    .withColumn("data_ref", F.to_date("data_ref"))
    .withColumn("ano", F.year("data_ref"))
    .withColumn("mes", F.month("data_ref"))
    .withColumn("dia", F.dayofmonth("data_ref"))
    .withColumn("trimestre", F.quarter("data_ref"))
    .withColumn("decada", (F.year("data_ref")/10).cast("int")*10)
    .withColumn("date_key", F.date_format("data_ref","yyyyMMdd").cast("int"))
    .select("date_key", "data_ref", "ano", "mes", "dia","trimestre", "decada")
    .distinct()
)

# Cria surrogate key idD_TEMPO
w = Window.orderBy("date_key")
df_tempo = df_tempo.withColumn("idD_TEMPO", F.row_number().over(w))

# Salva tabela no Lakehouse
df_tempo.write.mode("overwrite").saveAsTable("D_TEMPO")


StatementMeta(, cdc36745-8b06-42d1-88a5-217540466322, 9, Finished, Available, Finished)

In [None]:
# Ler tabela auxiliar de municípios
munic = spark.read.table("aux_municipios_mg")

# Garantir nomes das colunas padronizados
munic = munic.select(
    F.col("COD_MUNIC").cast("bigint"),
    F.col("DESC_MUNIC").alias("nome_municipio"),
)

StatementMeta(, dff48d7b-4a37-45ba-bd52-bf6ae35a239d, 14, Finished, Available, Finished)

In [None]:
#Dimensão Região
# Juntando os codigos de municipio das 3 bases, retirando nulos e padronizando como bigint
df_regiao_base = (
    amb.select(F.col("PA_UFMUN").alias("codigo_municipio"))
    .unionByName(interna.select(F.col("MUNIC_MOV").alias("codigo_municipio")))
    .unionByName(mort.select(F.col("CODMUNOCOR").alias("codigo_municipio")))
    .filter(F.col("codigo_municipio").isNotNull())
    .withColumn("codigo_municipio", F.col("codigo_municipio").cast("bigint"))
    .distinct()
)

# Junção com tabela auxiliar para trazer a descrição dos municipios
df_regiao = (
    df_regiao_base.join(
        munic,
        df_regiao_base.codigo_municipio == munic.COD_MUNIC,
        how="left"
    )
    .select(df_regiao_base.codigo_municipio, munic.nome_municipio)
)

#Tratando os municipios que estão fora de MG
df_regiao = (
    df_regiao
    .withColumn(
        "nome_municipio",
        F.when(F.col("nome_municipio").isNull(), F.lit("MUNICÍPIO FORA DE MG"))
         .otherwise(F.col("nome_municipio"))
    )
)

# Cria surrogate key idD_REGIAO
w = Window.orderBy("codigo_municipio")
df_regiao = df_regiao.withColumn("idD_REGIAO", F.row_number().over(w))

#Grava no lakehouse
df_regiao.write.mode("overwrite").saveAsTable("D_REGIAO")

StatementMeta(, dff48d7b-4a37-45ba-bd52-bf6ae35a239d, 30, Finished, Available, Finished)

In [None]:
# Ler tabela auxiliar de cids
cid10 = spark.read.table("aux_desc_cid10")

# Garantir nomes das colunas padronizados
cid10 = cid10.select(
    F.col("CD_CID").cast("string"),
    F.col("DS_CID").alias("desc_cid"),
)

StatementMeta(, dff48d7b-4a37-45ba-bd52-bf6ae35a239d, 32, Finished, Available, Finished)

In [None]:
#Dimensão CID
# Juntando todos os codigos CID10 das 3 bases
df_cid_base = (
    amb.select(F.col("PA_CIDPRI").alias("cid10"))
    .union(interna.select(F.col("CID10").alias("cid10")))
    .union(mort.select(F.col("CAUSABAS").alias("cid10")))
    .filter(F.col("cid10").isNotNull())
    .distinct()
)

# Junção com tabela auxiliar para trazer a descrição do CID10
df_cid = (
    df_cid_base.join(
        cid10,
        df_cid_base.cid10 == cid10.CD_CID,
        how="left"
    )
    .select(df_cid_base.cid10, cid10.desc_cid)
)

# Cria surrogate key idD_CID
w = Window.orderBy("cid10")
df_cid = df_cid.withColumn("idD_CID", F.row_number().over(w))

#Gravando na lakehouse
df_cid.write.mode("overwrite").saveAsTable("D_CID")

StatementMeta(, dff48d7b-4a37-45ba-bd52-bf6ae35a239d, 36, Finished, Available, Finished)

In [None]:
# Ler tabela auxiliar de procedimentos
proced = spark.read.table("aux_procedimentos")

# Garantir nomes das colunas padronizados
proced = proced.select(
    F.col("CD_PROCEDIMENTO").cast("bigint"),
    F.col("DS_PROCEDIMENTO").alias("Nome_Procedimento"),
)

StatementMeta(, f1d139dd-0814-4e6b-a889-dcd96d2b063f, 12, Finished, Available, Finished)

In [None]:
#Dimensão Procedimento Ambulatorial
# Lendo a base ambulatorial para trazer o codigo do procedimento
df_proc_base = (
    amb.select(F.col("PA_PROC_ID").alias("cod_Procedimento"))
    .filter(F.col("cod_Procedimento").isNotNull())
    .distinct()
)

# Junção com tabela auxiliar para trazer a descrição dos procedimentos
df_proc = (
    df_proc_base.join(
        proced,
        df_proc_base.cod_Procedimento == proced.CD_PROCEDIMENTO,
        how="left"
    )
    .select(df_proc_base.cod_Procedimento, proced.Nome_Procedimento)
)

# Cria surrogate key idD_Procedimento
w = Window.orderBy("cod_Procedimento")
df_proc = df_proc.withColumn("idD_Procedimento", F.row_number().over(w))

#Grava no lakehouse
df_proc.write.mode("overwrite").saveAsTable("D_PROCEDIMENTO_AMB")

StatementMeta(, f1d139dd-0814-4e6b-a889-dcd96d2b063f, 18, Finished, Available, Finished)

In [None]:
# Dimensão Paciente
# Essa dimensão foi construída a partir das bases ambulatorial, internação e mortalidade.
# Como não existe uma chave natural de paciente, usamos atributos demográficos (sexo, idade, raça/cor) para formar o perfil.

#Junção das bases
df_paciente = (
    amb.select(
        F.col("PA_SEXO").alias("sexo"),
        F.col("PA_IDADE").alias("idade"),
        F.col("PA_RACACOR").alias("raca_cor")
    )
    .unionByName(
        interna.select(
            F.col("SEXO").alias("sexo"),
            (F.floor(F.datediff(F.current_date(), F.col("NASC")) / 365.25)).alias("idade"),
            F.col("RACA_COR").alias("raca_cor")
        )
    )
    .unionByName(
        mort.select(
            F.col("SEXO").alias("sexo"),
            (F.floor(F.datediff(F.col("DATOBITO"), F.col("DATNASC")) / 365.25)).alias("idade"),
            F.col("RACACOR").alias("raca_cor")
        )
    )
    .filter(F.col("sexo").isNotNull())
    .withColumn("idade", F.when(F.col("idade") < 0, None).otherwise(F.col("idade")))  # corrige idades inválidas
)

# Padronização dos campos que passaram pelo gen2 e ainda sim ficaram fora do padrão

# Normaliza e padroniza os valores de sexo
df_paciente = df_paciente.withColumn("sexo", F.upper(F.trim(F.col("sexo"))))
df_paciente = df_paciente.withColumn(
    "sexo",
    F.when(F.col("sexo").isin("1", "M", "MASCULINO"), "M")
     .when(F.col("sexo").isin("2", "F", "FEMININO"), "F")
     .otherwise("SEM INFO")
)

# Normaliza e padroniza os valores de raça/cor
df_paciente = df_paciente.withColumn("raca_cor", F.upper(F.trim(F.col("raca_cor"))))
df_paciente = df_paciente.withColumn(
    "raca_cor",
    F.when(F.col("raca_cor").isin("1", "BRANCA", "BRANCO"), "BRANCA")
     .when(F.col("raca_cor").isin("2", "PRETA", "NEGRA"), "PRETA")
     .when(F.col("raca_cor").isin("3", "AMARELA"), "AMARELA")
     .when(F.col("raca_cor").isin("4", "INDIGENA"), "INDIGENA")
     .when(F.col("raca_cor").isin("5", "PARDA", "MORENA"), "PARDA")
     .when(F.col("raca_cor").isin("SEM INFO", "NAO INFORMADO", "NAO INFO", "SEM INFORMACAO", "SEM DADOS", "NAO CONSTA"), "SEM INFO")
     .otherwise("SEM INFO")
)

# Cria faixa etaria para analise no ML

df_paciente = df_paciente.withColumn(
    "faixa_etaria",
    F.when(F.col("idade") < 1, "Menor de 1 ano")
     .when(F.col("idade") < 10, "01–09")
     .when(F.col("idade") < 20, "10–19")
     .when(F.col("idade") < 30, "20–29")
     .when(F.col("idade") < 40, "30–39")
     .when(F.col("idade") < 50, "40–49")
     .when(F.col("idade") < 60, "50–59")
     .when(F.col("idade") < 70, "60–69")
     .when(F.col("idade") < 80, "70–79")
     .otherwise("80+")
)


# Em vez de apenas .distinct(), agrupa por atributos demográficos para eliminar repetições
# Por exemplo “Mulheres da raça/cor Parda com idade entre 30 e 39 anos"
# Cada linha será um tipo de paciente e não um paciente individual
df_paciente = (
    df_paciente
    .groupBy("sexo", "raca_cor", "faixa_etaria")
    .agg(F.round(F.avg("idade"), 0).alias("idade_media"))

)

# Criação da chave surrogate

df_paciente = df_paciente.withColumn("idD_PACIENTE", F.monotonically_increasing_id())

# Reordena as colunas
df_paciente = df_paciente.select("idD_PACIENTE", "sexo", "faixa_etaria", "raca_cor", "idade_media")

# Grava no lakehouse

df_paciente.write.mode("overwrite").saveAsTable("D_PACIENTE")

StatementMeta(, f6bea7c7-8e1f-4f39-a63c-6c076dbaaf3f, 8, Finished, Available, Finished)

In [None]:
# Fato Mortalidade
# Junta Mortalidade com dimensões D_CID, D_TEMPO, D_REGIAO e D_PACIENTE

#Ler tabelas dimensão
df_cid = spark.read.table("d_cid")
df_regiao = spark.read.table("d_regiao")
df_tempo = spark.read.table("d_tempo")
df_paciente = spark.read.table("d_paciente")

#Junção da fato com as dimensões
df_f_mort = (
    mort.alias("m")
    .join(df_cid.alias("cid"), F.col("m.CAUSABAS") == F.col("cid.cid10"), "left")
    .join(df_regiao.alias("reg"), F.col("m.CODMUNOCOR").cast("bigint") == F.col("reg.codigo_municipio"), "left")
    .join(df_tempo.alias("tmp"), F.to_date("m.DATOBITO") == F.col("tmp.data_ref"), "left")
    .join(
        df_paciente.alias("pac"),
        (F.upper(F.col("m.SEXO")) == F.col("pac.sexo")) &
        (F.col("m.RACACOR") == F.col("pac.raca_cor")),
        "left"
    )
    .select(
        F.col("cid.idD_CID").alias("D_CID_idD_CID"),
        F.col("reg.idD_REGIAO").alias("D_REGIAO_idD_REGIAO"),
        F.col("tmp.idD_TEMPO").alias("D_TEMPO_idD_TEMPO"),
        F.col("pac.idD_PACIENTE").alias("D_PACIENTE_idD_PACIENTE"),
        F.lit(1).alias("qtde_Obitos")
    )
)

# Cria surrogate key
w = Window.orderBy("D_TEMPO_idD_TEMPO", "D_REGIAO_idD_REGIAO")
df_f_mort = df_f_mort.withColumn("idF_MORTALIDADE", F.row_number().over(w))

# Grava no lakehouse
df_f_mort.write.mode("overwrite").saveAsTable("F_MORTALIDADE")


StatementMeta(, 3f14f2de-c07c-4d33-bf88-183949d402ab, 7, Finished, Available, Finished)

In [6]:
# Fato Internações
# Junta Internações com dimensões D_CID, D_TEMPO, D_REGIAO e D_PACIENTE

df_cid = spark.read.table("d_cid")
df_regiao = spark.read.table("d_regiao")
df_tempo = spark.read.table("d_tempo")
df_paciente = spark.read.table("d_paciente")

df_f_int = (
    interna.alias("i")
    .join(df_cid.alias("cid"), F.col("i.CID10") == F.col("cid.cid10"), "left")
    .join(df_regiao.alias("reg"), F.col("i.MUNIC_MOV").cast("bigint") == F.col("reg.codigo_municipio"), "left")
    .join(df_tempo.alias("tmp"), F.to_date("i.DT_INTER") == F.col("tmp.data_ref"), "left")
    .join(
        df_paciente.alias("pac"),
        (F.upper(F.col("i.SEXO")) == F.col("pac.sexo")) &
        (F.col("i.RACA_COR") == F.col("pac.raca_cor")),
        "left"
    )
    .select(
        F.col("cid.idD_CID").alias("D_CID_idD_CID"),
        F.col("reg.idD_REGIAO").alias("D_REGIAO_idD_REGIAO"),
        F.col("tmp.idD_TEMPO").alias("D_TEMPO_idD_TEMPO"),
        F.col("pac.idD_PACIENTE").alias("D_PACIENTE_idD_PACIENTE"),
        F.lit(1).alias("qtde_Internacoes"),
        F.col("i.VAL_TOT").alias("valor_Internacoes")
    )
)

# Cria surrogate key
w = Window.orderBy("D_TEMPO_idD_TEMPO", "D_REGIAO_idD_REGIAO")
df_f_int = df_f_int.withColumn("idF_INTERNACOES", F.row_number().over(w))

# Grava no lakehouse
df_f_int.write.mode("overwrite").saveAsTable("F_INTERNACOES")


StatementMeta(, f003dfb1-93ed-469b-a40d-10a8c99a6a1d, 8, Finished, Available, Finished)

In [11]:
# Fato Ambulatorial
# Junta com dimensões D_CID, D_TEMPO, D_REGIAO, D_PACIENTES E D_PROCEDIMENTOS

df_cid = spark.read.table("d_cid")
df_regiao = spark.read.table("d_regiao")
df_tempo = spark.read.table("d_tempo")
df_paciente = spark.read.table("d_paciente")
df_proc = spark.read.table("d_procedimento_amb")

df_f_amb = (
    amb.alias("a")
    .join(df_proc.alias("proc"), F.col("a.PA_PROC_ID").cast("bigint") == F.col("proc.cod_Procedimento"), "left")
    .join(df_cid.alias("cid"), F.col("a.PA_CIDPRI") == F.col("cid.cid10"), "left")
    .join(df_regiao.alias("reg"), F.col("a.PA_UFMUN").cast("bigint") == F.col("reg.codigo_municipio"), "left")
    .join(df_tempo.alias("tmp"), F.col("a.DATA_CMP") == F.col("tmp.data_ref"), "left")
    .join(
        df_paciente.alias("pac"),
        (F.upper(F.col("a.PA_SEXO")) == F.col("pac.sexo")) &
        (F.col("a.PA_RACACOR") == F.col("pac.raca_cor")),
        "left"
    )
    .select(
        F.col("tmp.idD_TEMPO").alias("D_TEMPO_idD_TEMPO"),
        F.col("pac.idD_PACIENTE").alias("D_PACIENTE_idD_PACIENTE"),
        F.col("cid.idD_CID").alias("D_CID_idD_CID"),
        F.col("proc.idD_Procedimento").alias("D_PROCEDIMENTO_AMB_idD_PROCEDIMENTO_AMB"),
        F.col("reg.idD_REGIAO").alias("D_REGIAO_idD_REGIAO"),
        F.lit(1).alias("qted_Atendimentos"),
        F.col("a.PA_QTDAPR").alias("qted_Producao"),
        F.col("a.PA_VALAPR").alias("valor_Procedimento")
    )
)

# Cria surrogate key
w = Window.orderBy("D_TEMPO_idD_TEMPO", "D_REGIAO_idD_REGIAO")
df_f_amb = df_f_amb.withColumn("idF_AMBULATORIAL", F.row_number().over(w))

# Grava no lakehouse
df_f_amb.write.mode("overwrite").saveAsTable("F_AMBULATORIAL")


StatementMeta(, f003dfb1-93ed-469b-a40d-10a8c99a6a1d, 13, Finished, Available, Finished)