
#OBJETIVO

O objetivo principal desse notebook é transformar os dados brutos do datasus.

_The primary objective of this notebook is to transform the DATASUS SIM raw data._


#Conectando ao ADLS Gen2

O Azure Key Vault foi utilizado como repositório para o token SAS do container ADLS datasus-data. Para realizar o acesso ao container, foi necessário configurar o Databricks Secret Scopes.

_Azure Key Vault served as the repository for the datadus-data ADLS container's SAS token. Databricks secret scopes were employed to get the SAS token and retrieve the raw data._


In [0]:
storage_account = 'coutjdatasusstorage'
container_name = 'datasus-data'

spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "SAS")
spark.conf.set(f"fs.azure.sas.token.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set(f"fs.azure.sas.fixed.token.{storage_account}.dfs.core.windows.net", dbutils.secrets.get(scope="adls_secrete", key="databricks-sas"))
# dbutils.secrets.list("adls_secrete")

conn_string = f"abfss://{container_name}@{storage_account}.dfs.core.windows.net/raw"

# dbutils.fs.ls(f"abfss://{container_name}@{storage_account}.dfs.core.windows.net//trusted")


In [0]:
#Importing libs

import pyspark.sql.functions as f
from datetime import datetime
from pyspark.sql.types import TimestampType
from dateutil.relativedelta import relativedelta
from pathlib import Path
from pyspark.sql.types import StructType, StructField, StringType, DecimalType, TimestampType, DateType, IntegerType

In [0]:
#READING datasus raw data parquet

# dbutils.fs.ls(conn_string)
dbutils.widgets.text("dir_name", "", "Enter dir_name")
dir_name = dbutils.widgets.get('dir_name') or 'DORJ2020.parquet'
print(dir_name)
df_datasus = spark.read.parquet(f"{conn_string}/{dir_name}")

DOSP2015.parquet


####TIPOBITO

Tipo do óbito

_Type of death_

TIPOBITO:
  * 1: óbito fetal
  * 2: óbito não fetal

In [0]:
df_datasus = df_datasus.withColumn('TIPOBITO', f.when(df_datasus.TIPOBITO == "1", 'FETAL')
                                 .otherwise('NAO FETAL')
                                 )



#####DTHORAOBITO

Data e hora do óbito, obtido a partir das colunas DTOBITO e HORAOBITO

_Date and time of death, obtained from the columns DTOBITO and HORAOBITO._


In [0]:
@udf(TimestampType())
def transform_data_hora(data, hora):
    
    data = data.strip()
    hora = hora.strip()

    try:
        if len(data) == 8 and len(hora) != 4:
            return  datetime.strptime(data, '%d%m%Y')
        elif len(data) == 8 and len(hora) == 4:
            data_hora = f"{data} {hora}"
            return datetime.strptime(data_hora, '%d%m%Y %H%M')
        else:
            return None
    except:
        return None


df_datasus = df_datasus.withColumn('DTHORAOBITO', transform_data_hora(df_datasus.DTOBITO, df_datasus.HORAOBITO))
df_datasus = df_datasus.drop(df_datasus.DTOBITO)
df_datasus = df_datasus.drop(df_datasus.HORAOBITO)

#####UFNATU e NOMEMUNNATU

SIGLA da UF e município de naturalidade do falecido.

_Abbreviation of the state (UF) and municipality of the deceased's place of birth._

In [0]:
#retrieve files with city's names and codes

df_uf_municipio = spark.read.csv(f"{conn_string}/Utils/municipios.csv", header=True)
df_uf_municipio.createOrReplaceTempView('uf_codes')

df_uf_codes = spark.sql('''
                SELECT DISTINCT uf, uf_code, substring(municipio, 1, 6) as municipio, name from uf_codes
                ORDER BY uf_code, municipio
          ''')

In [0]:
df_datasus.createOrReplaceTempView('susdata')
df_uf_codes.createOrReplaceTempView('uf_codes')

df_datasus = spark.sql(
    """
        with municipio_uf as (
            select municipio, name, uf_code, uf from uf_codes
            where trim(municipio) <> ''
        )

        select susdata.*, municipio_uf.uf_code AS UFNATU, municipio_uf.name AS NOMEMUNNATU from susdata
        LEFT JOIN municipio_uf
        ON susdata.CODMUNNATU = CAST(municipio_uf.municipio AS NUMERIC)
    """

)


#####DTNASC e IDADE

Data de nascimento e idade do falecido.

_Date of birth and age of the deceased._

In [0]:
df_datasus = df_datasus.withColumn('DTNASC', f.to_date(df_datasus.DTNASC, 'ddMMyyyy'))
df_datasus = df_datasus.withColumn('IDADE', f.round(f.months_between(df_datasus.DTHORAOBITO, df_datasus.DTNASC)/12, 1))


#####SEXO

Sexo do falecido.

_Deceased's gender._

SEXO: 
  * 0: Ignorado
  * 1: Masculino
  * 2: Feminino

In [0]:
df_datasus.createOrReplaceTempView('susdata')

df_datasus = df_datasus.withColumn('SEXO', f.when(f.col('SEXO') == 1, "M")
                                    .when(f.col('SEXO') == 2, 'F')
                                    .otherwise('IGNORADO')
            )


#####RACACOR

Raça/Cor

_ethnicity_

RACACOR:
  * 1: Branca
  * 2: Preta
  * 3: Amarela
  * 4: Parda
  * 5: Indígena
  * -: Não Declarado

In [0]:
df_datasus = df_datasus.withColumn('RACACOR', f.when(f.col('RACACOR') == 1, 'BRANCA')
                                        .when(f.col('RACACOR') == 2, 'PRETA')
                                        .when(f.col('RACACOR') == 3, 'AMARELA')
                                        .when(f.col('RACACOR') == 4, 'PARDA')
                                        .when(f.col('RACACOR') == 5, 'INDIGENA')
                                        .otherwise('NAO DECLARADO')
                )

#####ESTCIV

Estado civíl.

_Marital status_.

ESTCIV:
  * 1: Solteiro
  * 2: Casado
  * 3: Viúvo
  * 4: Separado judicialmente
  * 5: União consensual (versões anteriores)
  * 9: Ignorado

In [0]:
df_datasus = df_datasus.withColumn('ESTCIV', f.when(df_datasus.ESTCIV == 1, 'SOLTEIRO')
                                        .when(df_datasus.ESTCIV == 2, 'CASADO')
                                        .when(df_datasus.ESTCIV == 3, 'VIUVO')
                                        .when(df_datasus.ESTCIV == 4, 'SEPARADO')
                                        .when(df_datasus.ESTCIV == 5, 'UNIAO ESTAVEL')
                                        .otherwise('IGNORADO')
                )


##### ESC2010

Escolaridade.

_Educational level_.

ESC2010:
  0 – Sem escolaridade
  * 1 – FundamentalI (1a a 4a série)
  * 2 – Fundamental II (5a a 8a série)
  * 3 – Médio(antigo 2o Grau)
  * 4 – Superior incompleto
  * 5 – Superior completo
  * 9– Ignorado.

In [0]:
df_datasus = df_datasus.withColumn('ESC2010', f.when(df_datasus.ESC2010 == 0, 'SEM ESCOLARIDADE')
                                        .when(df_datasus.ESC2010 == 1, 'FUNDAMENTAL I')
                                        .when(df_datasus.ESC2010 == 2, 'FUNDAMENTAL II')
                                        .when(df_datasus.ESC2010 == 3, 'MEDIO')
                                        .when(df_datasus.ESC2010 == 4, 'SUPERIOR INCOMPLETO')
                                        .when(df_datasus.ESC2010 == 5, 'SUPERIOR COMPLETO')
                                        .otherwise('IGNORADO')
                )


#####OCUP

Ocupação

_Occupattion_

In [0]:
#retrieve file with occupation's names and codes


df_ocupacoes = spark.read.csv(
                                f"{conn_string}/Utils/CBO2002 - Ocupacao.csv",
                                header=True, 
                                encoding='ISO-8859-1',
                                sep=';'
                            )

In [0]:
df_ocupacoes.createOrReplaceTempView('ocupacoes')
df_datasus.createOrReplaceTempView('susdata')

df_datasus = spark.sql(
    """
        SELECT susdata.*, ocupacoes.TITULO as OCUPTITULO FROM susdata
        LEFT JOIN ocupacoes
        WHERE susdata.OCUP = ocupacoes.CODIGO
    """
)


#####NOMEMUNRES e UFRES

Nome do município e UF de residência.

_City name and state (UF) of residence._

In [0]:
df_datasus.createOrReplaceTempView('susdata')
df_uf_codes.createOrReplaceTempView('uf_codes')


df_datasus = spark.sql("""
                SELECT susdata.*, uf_codes.uf_code AS UFRES, uf_codes.name AS NOMEMUNRES FROM susdata
                LEFT JOIN uf_codes
                ON CAST(uf_codes.municipio AS NUMERIC) = susdata.CODMUNRES
          """)


#####LOCOCOR

Local de ocorrência do óbito.

_Place of death occurrence._

Lococor:
  * 1: Hospital
  * 2: Outors estabelecimentos de saude
  * 3: Domicilio
  * 4: Via Publica
  * 5: Outros
  * 6: Aldeia Indigena
  * -: Ignorado

In [0]:
df_datasus = df_datasus.withColumn('LOCOCOR', f.when(df_datasus.LOCOCOR == 1, 'HOSPITAL')
                                        .when(df_datasus.LOCOCOR == 2, 'OUTROS ESTABELICIMENTOS DE SAUDE')
                                        .when(df_datasus.LOCOCOR == 3, 'DOMICILIO')
                                        .when(df_datasus.LOCOCOR == 4, 'VIA PUBLICA')
                                        .when(df_datasus.LOCOCOR == 5, 'OUTROS')
                                        .when(df_datasus.LOCOCOR == 6, 'ALDEIA INDIGENA')
                                        .otherwise('IGNORADO')                        
                )

#####CODMUNOCOR

Código do município de ocorrência.

_City of occurrence code._


In [0]:
df_datasus.createOrReplaceTempView('susdata')
df_uf_codes.createOrReplaceTempView('uf_codes')


df_datasus= spark.sql("""
                SELECT susdata.*, uf_codes.uf_code AS UFMUNOCOR, uf_codes.name AS NOMEMUNOCOR FROM susdata
                LEFT JOIN uf_codes
                ON CAST(uf_codes.municipio AS NUMERIC) = susdata.CODMUNOCOR
          """)


#####ASSISTMED

Indica se houve assistência médica.

_Indicates whether medical assistance was provided._

ASSISTMED:
  * 9: Ignorado
  * 1: Com assistência
  * 2: Sem assistência

In [0]:
df_datasus = df_datasus.withColumn('ASSISTMED', f.when(df_datasus.ASSISTMED == '1', 'SIM')
                                            .when(df_datasus.ASSISTMED == '2', 'NAO')
                                            .otherwise('IGNORADO')
                          )


#####NECROPSIA

Indica se houve necrópsia.

_Indicates whether there was a necropsy_

NECROPSIA:
  * 9: Ignorado
  * 1: Sim
  * 2: Não

In [0]:
df_datasus = df_datasus.withColumn('NECROPSIA', f.when(df_datasus.NECROPSIA == 1, 'SIM')
                                        .when(df_datasus.NECROPSIA == 2, 'NAO')
                                        .otherwise('IGNORADO')          
                )

#####CAUSABAS

Causa básica, conforme a Classificação Internacional de Doença (CID), 10a. Revisão

_Basic cause, according to the International Classification of Disease (ICD), 10a. Revision_

In [0]:
#retrieve files with disease's names and codes

df_cid10_sub = spark.read.csv(
                                f"{conn_string}/Utils/CID-10-SUBCATEGORIAS.CSV",
                                header=True, 
                                encoding='ISO-8859-1',
                                sep=';'
                            )

df_cid10_cat = spark.read.csv(
                                f"{conn_string}/Utils/CID-10-CATEGORIAS.CSV",
                                header=True, 
                                encoding='ISO-8859-1',
                                sep=';'
                            )

df_cid10_sub = df_cid10_sub.select(df_cid10_sub.SUBCAT, df_cid10_sub.DESCRICAO)
df_cid10_cat = df_cid10_cat.select(df_cid10_cat.CAT, df_cid10_cat.DESCRICAO)

In [0]:
df_cid10_sub.createOrReplaceTempView('cid10_sub')
df_datasus.createOrReplaceTempView('susdata')

df_datasus = spark.sql(
    """
        SELECT susdata.*, cid10_sub.DESCRICAO AS CAUSABASDESCSUBCAT FROM susdata
        LEFT JOIN cid10_sub
        ON susdata.CAUSABAS = cid10_sub.SUBCAT
    """
)


In [0]:
df_cid10_cat.createOrReplaceTempView('cid10_cat')
df_datasus.createOrReplaceTempView('susdata')

df_datasus = spark.sql(
    """
        SELECT susdata.*, cid10_cat.DESCRICAO AS CAUSABASDESCCAT FROM susdata
        LEFT JOIN cid10_cat
        ON SUBSTRING(susdata.CAUSABAS, 1, 3) = cid10_cat.CAT
    """
)



#####CIRCOBITO

Indica o tipo de acidente.

_Specifies the type of accident._

In [0]:
df_datasus = df_datasus.withColumn('CIRCOBITO', f.when(df_datasus.CIRCOBITO == 1, 'ACIDENTE')
                                            .when(df_datasus.CIRCOBITO == 2, 'SUICIDIO')
                                            .when(df_datasus.CIRCOBITO == 3, 'HOMICIDIO')
                                            .when(df_datasus.CIRCOBITO == 4, 'OUTROS')
                                            .otherwise('IGNORADO')              
            )


#####ACIDTRAB

Indica se foi acidente de trabalho.

_Specifies whether it was a work-related accident._

ACIDTRAB:
  * 9: Ignorado
  * 1: Sim
  * 2: Não

In [0]:
df_datasus = df_datasus.withColumn('ACIDTRAB', f.when(df_datasus.ACIDTRAB == 1, 'SIM')
                                        .when(df_datasus.ACIDTRAB == 2, 'NAO')
                                        .otherwise('IGNORADO')
                          )

#####TPPOS

Óbito investigado

_Death under investigation_

TPPOS:
  * 1: Sim
  * 2: Não

In [0]:
df_datasus = df_datasus.withColumn('TPPOS', f.when(df_datasus.TPPOS == 'S', 'SIM')
                                    .when(df_datasus.TPPOS == 'N', 'NAO')
                                    .otherwise('IGNORADO')
                          )

In [0]:
# Assign uf and year

uf_file = dir_name[2:4]
year_file = dir_name[4:8]

df_datasus = df_datasus.withColumn('UFARQUIVO', f.lit(uf_file)).withColumn('ANOARQUIVO', f.lit(year_file))

In [0]:
# df_datasus.columns

ordered_columns =  ['UFARQUIVO', 'ANOARQUIVO', 'TIPOBITO', 'DTHORAOBITO', 'NATURAL', 'UFNATU', 'CODMUNNATU', 'NOMEMUNNATU', 'DTNASC', 'IDADE', 'SEXO', 'RACACOR', 'ESTCIV', 'ESC2010', 'OCUP', 'OCUPTITULO', 'UFRES', 'CODMUNRES', 'NOMEMUNRES', 'LOCOCOR', 'UFMUNOCOR', 'CODMUNOCOR', 'NOMEMUNOCOR', 'ASSISTMED', 'NECROPSIA', 'CAUSABAS',  'CAUSABASDESCCAT', 'CAUSABASDESCSUBCAT', 'CIRCOBITO', 'ACIDTRAB', 'TPPOS', 'CONTADOR']

df_datasus = df_datasus.select([col for col in ordered_columns])


In [0]:
#assign id

df_datasus = df_datasus.withColumn('ID', f.sha2(f.concat_ws('_', *df_datasus.columns), 256))

In [0]:
#Set Schema

new_schema = StructType([
    StructField("ufarquivo", StringType(), nullable=True),
    StructField("anoarquivo", IntegerType(), nullable=True),
    StructField("tipobito", StringType(), nullable=True),
    StructField("dthoraobito", TimestampType(), nullable=True),
    StructField("NATURAL", IntegerType(), nullable=True),
    StructField("ufnatu", StringType(), nullable=True),
    StructField("codmunnatu", IntegerType(), nullable=True),
    StructField("nomemunnatu", StringType(), nullable=True),
    StructField("dtnasc", DateType(), nullable=True),
    StructField("idade", IntegerType(), nullable=True),
    StructField("sexo", StringType(), nullable=True),
    StructField("racacor", StringType(), nullable=True),
    StructField("estciv", StringType(), nullable=True),
    StructField("esc2010", StringType(), nullable=True),
    StructField("ocup", IntegerType(), nullable=True),
    StructField("ocuptitulo", StringType(), nullable=True),
    StructField("ufres", StringType(), nullable=True),
    StructField("codmunres", StringType(), nullable=True),
    StructField("nomemunres", StringType(), nullable=True),
    StructField("lococor", StringType(), nullable=True),
    StructField("ufmunocor", StringType(), nullable=True),
    StructField("codmunocor", StringType(), nullable=True),
    StructField("nomemunocor", StringType(), nullable=True),
    StructField("assistmed", StringType(), nullable=True),
    StructField("necropsia", StringType(), nullable=True),
    StructField("causabas", StringType(), nullable=True),
    StructField("causabasdesccat", StringType(), nullable=True),
    StructField("causabasdescsubcat", StringType(), nullable=True),
    StructField("circobito", StringType(), nullable=True),
    StructField("acidtrab", StringType(), nullable=True),
    StructField("tppos", StringType(), nullable=True),
    StructField("contador", IntegerType(), nullable=True),
    StructField("id", StringType())
])

df_datasus = df_datasus.select(*[f.col(field.name.upper()).cast(field.dataType) for field in new_schema])

In [0]:
#Write to ADLS

conn_write = f"abfss://{container_name}@{storage_account}.dfs.core.windows.net/trusted"
trusted_file = f"{conn_write}/{dir_name}"

print(trusted_file)
df_datasus.write.parquet(trusted_file, mode='overwrite')


abfss://datasus-data@coutjdatasusstorage.dfs.core.windows.net/trusted/DOSP2015.parquet


In [0]:
#Write to DB

driver = "org.postgresql.Driver"

database_host = "datasus-db.postgres.database.azure.com"
database_port = "5432"
database_name = "postgres"
table = "datasus"
user = "coutj"
password = dbutils.secrets.get(scope="pw_postgres", key="postgres")

url = f"jdbc:postgresql://{database_host}:{database_port}/{database_name}"

properties = {"user": user, "password": password, "driver": "org.postgresql.Driver"}


df_datasus.write.jdbc(url=url, table=table, mode="append", properties=properties)
