# Criação de data warehouse para dados públicos de atendimentos ambulatoriais do SUS

Esse Jupyter notebook é parte do trabalho de conclusão de curso do MBA em Inteligência Artificial e Big Data, oferecido pelo ICMC - USP, do aluno Danilo Gouvea Silva, da Turma 3.

Parte do capítulo de Avaliação Experimental da monografia, nesse primeiro notebook (`sus_dw_etl.ipynb`), está a execução do processo de ETL - "Extract, Transform and Load" - dos dados brutos de saúde provenientes dos bancos de dados públicos do DATASUS. Após o processo de ETL, o data warehouse estará organizado em 1 tabela de fatos e 12 tabelas de dimensões, todas armazendas em arquivos Apache Parquet.

No segundo notebook (`sus_dw_eda.ipynb`), todas as tabelas serão carregadas e estarão prontras para a análise exploratória através de consultas analíticas. 

Esse notebook foi criado e utilizado localmente. Para utilizá-lo, é necessário que estejam localmente instalados o Spark, o Java e o Python. Também é recomendado a criação de um ambiente virtual Python para a instalação de todos pacotes de Python necessários, que estão contidos no arquivo de requisitos `requirements.txt`, que disponibilizado junto a esse notebook.

É importante ressaltar que o objetivo desse notebook não é demonstrar, nem guiar a instalação e configuração do Spark numa máquina local.

## Criação da sessão Spark

Nessa seção, é criado uma sessão local de Spark, com apenas um <b>nó mestre</b> e sem <b>nós de trabalho</b> e <b>gerenciador de cluster</b>. As tarefas (<i>tasks</i>) serão executadas pelo <i>driver</i> localizado no nó mestre e utilizarão o máximo de núcleos lógicos de processamento disponíveis no processador local.

In [1]:
# Verifica as versões instaladas de Java, Python e Spark

!java -version
!python --version
!pyspark --version

java version "1.8.0_411"
Java(TM) SE Runtime Environment (build 1.8.0_411-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.411-b09, mixed mode)


Python 3.10.5


Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.1
      /_/
                        
Using Scala version 2.12.18, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_381
Branch HEAD
Compiled by user heartsavior on 2024-02-15T11:24:58Z
Revision fd86f85e181fc2dc0f50a096855acf83a6cc5d9c
Url https://github.com/apache/spark
Type --help for more information.


In [2]:
# Configura corretamente as variáveis de ambiente do Spark

!pip install findspark
import findspark
findspark.init()



In [3]:
# Cria a sessão de Spark

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName('sus_dw_etl') \
    .master('local[*]') \
    .getOrCreate()

spark

## Carregamento da tabela de fatos SIASUS - SERVIÇO DE INFORMAÇÕES AMBULATORIAIS DO SUS (`sia_df`)

In [4]:
# Importa as funções e tipos de dados do Spark

import pyspark.sql.functions as F
import pyspark.sql.types as T

In [20]:
# Carrega num dataframe os arquivos da tabela SIA - Atendimentos Ambulatoriais

sia_df = spark.read.option('mergeSchema', 'true').parquet('data/tb_sia_pa')

In [6]:
# Conta o número total de registros

sia_df_count = sia_df.count()

print(f'Número de registros: {sia_df_count:,}')

Número de registros: 1,742,743,969


In [7]:
# Exibe a lista de colunas do dataframe SIA

sia_df \
    .select('*')

DataFrame[PA_CODUNI: string, PA_GESTAO: string, PA_CONDIC: string, PA_UFMUN: string, PA_REGCT: string, PA_INCOUT: string, PA_INCURG: string, PA_TPUPS: string, PA_TIPPRE: string, PA_MN_IND: string, PA_CNPJCPF: string, PA_CNPJMNT: string, PA_CNPJ_CC: string, PA_MVM: string, PA_CMP: string, PA_PROC_ID: string, PA_TPFIN: string, PA_SUBFIN: string, PA_NIVCPL: string, PA_DOCORIG: string, PA_AUTORIZ: string, PA_CNSMED: string, PA_CBOCOD: string, PA_MOTSAI: string, PA_OBITO: string, PA_ENCERR: string, PA_PERMAN: string, PA_ALTA: string, PA_TRANSF: string, PA_CIDPRI: string, PA_CIDSEC: string, PA_CIDCAS: string, PA_CATEND: string, PA_IDADE: string, IDADEMIN: string, IDADEMAX: string, PA_FLIDADE: string, PA_SEXO: string, PA_RACACOR: string, PA_MUNPCN: string, PA_QTDPRO: string, PA_QTDAPR: string, PA_VALPRO: string, PA_VALAPR: string, PA_UFDIF: string, PA_MNDIF: string, PA_DIF_VAL: string, NU_VPA_TOT: string, NU_PA_TOT: string, PA_INDICA: string, PA_CODOCO: string, PA_FLQT: string, PA_FLER: strin

In [21]:
sia_df.printSchema()

root
 |-- PA_CODUNI: string (nullable = true)
 |-- PA_GESTAO: string (nullable = true)
 |-- PA_CONDIC: string (nullable = true)
 |-- PA_UFMUN: string (nullable = true)
 |-- PA_REGCT: string (nullable = true)
 |-- PA_INCOUT: string (nullable = true)
 |-- PA_INCURG: string (nullable = true)
 |-- PA_TPUPS: string (nullable = true)
 |-- PA_TIPPRE: string (nullable = true)
 |-- PA_MN_IND: string (nullable = true)
 |-- PA_CNPJCPF: string (nullable = true)
 |-- PA_CNPJMNT: string (nullable = true)
 |-- PA_CNPJ_CC: string (nullable = true)
 |-- PA_MVM: string (nullable = true)
 |-- PA_CMP: string (nullable = true)
 |-- PA_PROC_ID: string (nullable = true)
 |-- PA_TPFIN: string (nullable = true)
 |-- PA_SUBFIN: string (nullable = true)
 |-- PA_NIVCPL: string (nullable = true)
 |-- PA_DOCORIG: string (nullable = true)
 |-- PA_AUTORIZ: string (nullable = true)
 |-- PA_CNSMED: string (nullable = true)
 |-- PA_CBOCOD: string (nullable = true)
 |-- PA_MOTSAI: string (nullable = true)
 |-- PA_OBITO: 

In [8]:
len(sia_df.columns)

60

In [9]:
# Seleciona as colunas de interesse da tabela SIA_PA

sia_df = sia_df \
    .selectExpr('PA_CMP',  # Data da Realização do Procedimento / Competência (AAAAMM)
                'PA_CODUNI',  # Código do SCNES do estabelecimento de saúde
                'PA_TPUPS',  # Tipo do estabelecimento
                'PA_UFMUN',  # Município onde está localizado o estabelecimento
                'PA_PROC_ID',  # Código do procedimento ambulatorial
                'PA_DOCORIG',  # Instrumento de registro: C: BPA-C, I: BPA-I, P: APAC-P, S: APAC-S, A: RAAS-AD, B: RAAS-Psico
                'PA_CNSMED',  # Número CNS do profissional de saúde executante 
                'PA_CBOCOD',  # Código de ocupação brasileira do profissional
                'PA_MOTSAI',  # Motivo de saída ou zeros, caso não tenha
                'PA_CIDPRI',  # CID principal (APAC ou BPA-I)
                'PA_CIDSEC',  # CID secundário (APAC)
                'PA_CIDCAS',  # CID causas associadas (APAC)
                'PA_CATEND',  # Caráter de Atendimento (APAC ou BPA-I)
                'cast(PA_IDADE as int)',  # Idade do paciente em anos
                'PA_SEXO',  # Sexo do paciente
                'PA_RACACOR',  # Raça/cor do paciente
                'PA_MUNPCN',  # Município de residência do paciente ou do estabelecimento, caso não se tenha à identificação do paciente o que ocorre no (BPA)  # BPA-C
                'cast(PA_QTDAPR as int)',  # Quantidade aprovado do procedimento
                'cast(PA_VALAPR as float)')  # Valor aprovado do procedimento


In [10]:
len(sia_df.columns)

19

In [11]:
sia_df.count()

1742743969

In [9]:
# Escreve a tabela de fatos `sia_df` em arquivo parquet no diretório sus-data-warehouse

sia_df.write \
    .format('parquet') \
    .mode('overwrite') \
    .save('sus-data-warehouse/sia')

### Breve análise exploratória
Análise exploratória de algumas atributos (dimensões) da tabela de fatos que chamaram atenção do autor.

In [10]:
# Mostra o primeiro registro do data frame 'sia_df'

sia_df.show(1, vertical=True)

-RECORD 0---------------------
 PA_CMP     | 201907          
 PA_CODUNI  | 0003786         
 PA_TPUPS   | 07              
 PA_UFMUN   | 292740          
 PA_PROC_ID | 0417010060      
 PA_DOCORIG | I               
 PA_CNSMED  | 108258251530008 
 PA_CBOCOD  | 225310          
 PA_MOTSAI  | 00              
 PA_CIDPRI  | 0000            
 PA_CIDSEC  | 0000            
 PA_CIDCAS  | 0000            
 PA_CATEND  | 01              
 PA_IDADE   | 60              
 PA_SEXO    | F               
 PA_RACACOR | 99              
 PA_MUNPCN  | 292740          
 PA_QTDAPR  | 1               
 PA_VALAPR  | 15.15           
only showing top 1 row



In [11]:
sia_df \
    .groupBy('PA_MUNPCN') \
    .count() \
    .orderBy(F.desc('count')) \
    .withColumn('MUNPCN_PERCENT', F.round(F.col('count') / sia_df_count * 100, 2)) \
    .withColumn('count', F.format_number('count', 0)) \
    .show(5)

+---------+-----------+--------------+
|PA_MUNPCN|      count|MUNPCN_PERCENT|
+---------+-----------+--------------+
|   999999|460,061,443|          26.4|
|   330455|103,426,318|          5.93|
|   355030| 70,764,436|          4.06|
|   310620| 58,555,360|          3.36|
|   431490| 29,844,593|          1.71|
+---------+-----------+--------------+
only showing top 5 rows



In [12]:
# Quantidade de registros com PA_UFMUN (Município do estabelecimento) <> PA_MUNPCN (Município do paciente) e que não são BPC-Consolidado 
# (BPC-C não possui informação de paciente)

sia_df \
    .select('PA_UFMUN', 'PA_MUNPCN') \
    .where('PA_UFMUN <> PA_MUNPCN and PA_MUNPCN <> "999999"') \
    .count()

309678509

Quantidade de registros com PA_MUNPCN == '999999' é igual a quantidade de registros com PA_DOCORIG == 'BPA - Consolidado'. Após consulta analítica, é possível afirmar que todos esses registros tem PA_DOCORIG == PA_DOCORIG == 'BPA - Consolidado'

In [13]:
# Verifica a cardinalidade de cada coluna

sia_df \
    .agg({c: 'approx_count_distinct' for c in sia_df.columns}) \
    .show(vertical=True)

-RECORD 0-----------------------------------
 approx_count_distinct(PA_CODUNI)  | 88345  
 approx_count_distinct(PA_QTDAPR)  | 26993  
 approx_count_distinct(PA_DOCORIG) | 6      
 approx_count_distinct(PA_UFMUN)   | 5522   
 approx_count_distinct(PA_SEXO)    | 6      
 approx_count_distinct(PA_MUNPCN)  | 5637   
 approx_count_distinct(PA_CIDCAS)  | 3225   
 approx_count_distinct(PA_CMP)     | 67     
 approx_count_distinct(PA_CIDPRI)  | 13852  
 approx_count_distinct(PA_MOTSAI)  | 21     
 approx_count_distinct(PA_CNSMED)  | 492359 
 approx_count_distinct(PA_CIDSEC)  | 2321   
 approx_count_distinct(PA_CATEND)  | 24     
 approx_count_distinct(PA_VALAPR)  | 256163 
 approx_count_distinct(PA_TPUPS)   | 41     
 approx_count_distinct(PA_RACACOR) | 25     
 approx_count_distinct(PA_CBOCOD)  | 219    
 approx_count_distinct(PA_PROC_ID) | 2864   
 approx_count_distinct(PA_IDADE)   | 129    



In [14]:
# Conta valores faltantes em todas as colunas

sia_df  \
    .select([F.count(F.when(F.col(c).isNull() | \
                            F.isnan(c), c)
                    ).alias(c) \
             for c in sia_df.columns]) \
    .show(vertical=True)

-RECORD 0--------------
 PA_CMP     | 0        
 PA_CODUNI  | 0        
 PA_TPUPS   | 0        
 PA_UFMUN   | 0        
 PA_PROC_ID | 0        
 PA_DOCORIG | 0        
 PA_CNSMED  | 0        
 PA_CBOCOD  | 809921   
 PA_MOTSAI  | 0        
 PA_CIDPRI  | 0        
 PA_CIDSEC  | 0        
 PA_CIDCAS  | 49825202 
 PA_CATEND  | 0        
 PA_IDADE   | 0        
 PA_SEXO    | 0        
 PA_RACACOR | 0        
 PA_MUNPCN  | 0        
 PA_QTDAPR  | 0        
 PA_VALAPR  | 0        



In [15]:
# Verifica a contagem de valores na coluna PA_CATEND

sia_df \
    .groupBy('PA_CATEND') \
    .count() \
    .orderBy(F.desc('count')) \
    .withColumn('count', F.format_number('count', 0)) \
    .show()

+---------+-------------+
|PA_CATEND|        count|
+---------+-------------+
|       01|1,100,224,673|
|       99|  460,061,443|
|       02|  180,461,010|
|       06|    1,167,119|
|       03|      555,353|
|       05|      172,030|
|       04|      102,214|
|       00|          107|
|      056|            2|
|      026|            2|
|      046|            2|
|      027|            2|
|      065|            2|
|      030|            1|
|      022|            1|
|      070|            1|
|      067|            1|
|      057|            1|
|      044|            1|
|      086|            1|
+---------+-------------+
only showing top 20 rows



Coluna PA_ETNIA foi removida da tabela de fatos `sia_df` após análise dos valores presentes. Cardinalidade muito grande, valores não explicáveis e
99,9% dos registros 'NULL'. 

In [17]:
# Verifica quantidade de CIDs de acordo com números de dígitos do CID

sia_df \
    .selectExpr('len(PA_CIDPRI) as LEN') \
    .groupBy('LEN') \
    .count() \
    .show()

+---+----------+
|LEN|     count|
+---+----------+
|  3|  48835518|
|  4|1693908411|
|  2|        31|
|  1|         9|
+---+----------+



In [18]:
# Mostra CIDs com quantidade de dígitos igual a 2 ou menores

sia_df \
    .selectExpr('len(PA_CIDPRI) as LEN_CIDPRI',
               'PA_CIDPRI') \
    .where('LEN_CIDPRI <= 2') \
    .show()

+----------+---------+
|LEN_CIDPRI|PA_CIDPRI|
+----------+---------+
|         2|       N8|
|         2|       H0|
|         1|        S|
|         1|        G|
|         2|       F2|
|         2|       H3|
|         2|       R9|
|         2|       R9|
|         1|        Z|
|         1|        Z|
|         2|       H6|
|         2|       I7|
|         2|       R0|
|         1|        Z|
|         1|        Z|
|         2|       B4|
|         2|       Q2|
|         2|       H5|
|         2|       H3|
|         2|       L1|
+----------+---------+
only showing top 20 rows



In [30]:
sia_df \
    .selectExpr('left(PA_CMP, 4)') \
    .distinct() \
    .show()

+---------------+
|left(PA_CMP, 4)|
+---------------+
|           2020|
|           2019|
|           2017|
|           2018|
|           2016|
|           2015|
+---------------+



In [34]:
sia_df \
    .select('PA_CMP') \
    .filter('left(PA_CMP, 4) = "2015"') \
    .distinct() \
    .orderBy('PA_CMP') \
    .show()

+------+
|PA_CMP|
+------+
|201508|
|201509|
|201510|
|201511|
|201512|
+------+



## Carregamento das tabelas de dimensões

Nessa seção, serão carregadas as tabelas de dimensões:
- `municipios_df`: listagem do IBGE de todos os munícipios brasileiros, estados, regiões e outras informações;
- `cnes_df`: Cadastro Nacional de Estabelecimentos de Saúde;
- `sigtap_proced_df`: listagem dos Procedimentos oferecidos pelo SUS;
- `cid_df`: CID-10 Código Internacional de Doenças;
- `ano_mes_df`: Dimensão "data"no formato AAAAMM;
- `cbocod_df`: Código Brasileiro de Ocupações;
- `tpups_df`: Tipos de Estabelcimentos de Saúde;
- `catend_df`: Caráter de Atendimento;
- `docorig_df`: Tipo de Documento de Origem da produção ambulatorial;
- `sexo_df`: Sexo do paciente;
- `raca_cor_df`: Raça/Cor do paciente;
- `mosai_df`: Motivos de saída

### Dimensão MUNICIPIOS (`municipios_df`) (`sia_df['PA_UFMUN']`)

In [12]:
# Importa o módulo requests para baixar os d

import requests

In [13]:
# Baixa os dados de municípios do IBGE

municipios = requests.get('https://servicodados.ibge.gov.br/api/v1/localidades/municipios').json()

In [14]:
# Mostra o formato do primeiro município

municipios[0]

{'id': 1100015,
 'nome': "Alta Floresta D'Oeste",
 'microrregiao': {'id': 11006,
  'nome': 'Cacoal',
  'mesorregiao': {'id': 1102,
   'nome': 'Leste Rondoniense',
   'UF': {'id': 11,
    'sigla': 'RO',
    'nome': 'Rondônia',
    'regiao': {'id': 1, 'sigla': 'N', 'nome': 'Norte'}}}},
 'regiao-imediata': {'id': 110005,
  'nome': 'Cacoal',
  'regiao-intermediaria': {'id': 1102,
   'nome': 'Ji-Paraná',
   'UF': {'id': 11,
    'sigla': 'RO',
    'nome': 'Rondônia',
    'regiao': {'id': 1, 'sigla': 'N', 'nome': 'Norte'}}}}}

In [15]:
# Cria o dataframe de municípios

municipios_df = spark.createDataFrame(
    data=[(str(municipio['id'])[:6],  # 'Slicing para remover o dígito verificador, o sétimo dígito.
           municipio['nome'], 
           municipio['microrregiao']['nome'], 
           municipio['microrregiao']['mesorregiao']['nome'], 
           municipio['microrregiao']['mesorregiao']['UF']['sigla'], 
           municipio['microrregiao']['mesorregiao']['UF']['nome'],
           municipio['microrregiao']['mesorregiao']['UF']['regiao']['sigla'],
           municipio['microrregiao']['mesorregiao']['UF']['regiao']['nome'],
           municipio['regiao-imediata']['nome'],
           municipio['regiao-imediata']['regiao-intermediaria']['nome']) for municipio in municipios],
    schema=['PA_UFMUN', 'NOME', 'MICRORREGIAO', 'MESORREGIAO', 'UF', 'UF_NOME', 'REGIAO_SIGLA', 'REGIAO', 'REGIAO_IMEDIATA', 'REGIAO_INTERMEDIARIA']
)

municipios_df.show()

+--------+--------------------+-----------------+-----------------+---+--------+------------+------+---------------+--------------------+
|PA_UFMUN|                NOME|     MICRORREGIAO|      MESORREGIAO| UF| UF_NOME|REGIAO_SIGLA|REGIAO|REGIAO_IMEDIATA|REGIAO_INTERMEDIARIA|
+--------+--------------------+-----------------+-----------------+---+--------+------------+------+---------------+--------------------+
|  110001|Alta Floresta D'O...|           Cacoal|Leste Rondoniense| RO|Rondônia|           N| Norte|         Cacoal|           Ji-Paraná|
|  110002|           Ariquemes|        Ariquemes|Leste Rondoniense| RO|Rondônia|           N| Norte|      Ariquemes|         Porto Velho|
|  110003|              Cabixi|Colorado do Oeste|Leste Rondoniense| RO|Rondônia|           N| Norte|        Vilhena|           Ji-Paraná|
|  110004|              Cacoal|           Cacoal|Leste Rondoniense| RO|Rondônia|           N| Norte|         Cacoal|           Ji-Paraná|
|  110005|          Cerejeiras|Col

In [16]:
# Conta a quantidade total de registros do data frame de munícipios

municipios_df.count()

5570

In [18]:
len(municipios_df.columns)

10

In [24]:
# Salva a tabela `municipios_df` na pasta do data warehouse `sus-data-warehouse` em formato Apache Parquet

municipios_df.write \
    .format('parquet') \
    .mode('overwrite') \
    .save('sus-data-warehouse/municipios')

### Dimensão CADASTRO NACIONAL DE ESTABELECIMENTOS DE SAÚDE (`cnes_df`) (`sia_df['PA_CODUNI']`)

In [9]:
# Carrega num dataframe os arquivos da tabela CNES - Cadastro Nacional de Estabelecimentos de Saúde

cnes_df = spark.read.option('mergeSchema', 'true').parquet('data/tb_cnes_estabelecimentos')

In [10]:
# Mostra o primeiro registro do data frame CNES}

cnes_df.show(1, vertical=True, truncate=False)

-RECORD 0----------------------------------------------------
 CO_UNIDADE                  | 3556205173388                 
 CO_CNES                     | 5173388                       
 NU_CNPJ_MANTENEDORA         | NULL                          
 TP_PFPJ                     | 1                             
 NIVEL_DEP                   | 1                             
 NO_RAZAO_SOCIAL             | SILVIO MAURICIO RINHEL VIRDES 
 NO_FANTASIA                 | SILVIO MAURICIO RINHEL VIRDES 
 NO_LOGRADOURO               | RUA 13 DE MAIO                
 NU_ENDERECO                 | 133                           
 NO_COMPLEMENTO              | 1 ANDAR                       
 NO_BAIRRO                   | CENTRO                        
 CO_CEP                      | 13270020                      
 CO_REGIAO_SAUDE             | NULL                          
 CO_MICRO_REGIAO             | NULL                          
 CO_DISTRITO_SANITARIO       | NULL                          
 CO_DIST

In [15]:
len(cnes_df.columns)

54

In [27]:
# Número de registros no data frame 'cnes_df'

cnes_df_count = cnes_df.count()
cnes_df_count

418045

#### Dimensão CLASSIFICAÇÃO DE ESTABELECIMENTOS (`tp_estab_df`)

In [28]:
# Cria o data frame da dimensão Classificação de Estabelecimentos (CO_TIPO_ESTABELECIMENTO)

# CO_TIPO_ESTABELECIMENTO extraído da página http://cnes2.datasus.gov.br/, aba Relatórios, opção Classif. de Estabelecimentos em 15/08/2024
tp_estab_df = spark.createDataFrame(
    data=[
        ('000', 'OUTROS'),
        ('001', 'UNIDADE BASICA DE SAUDE'),
        ('002', 'CENTRAL DE GESTAO EM SAUDE'),
        ('003', 'CENTRAL DE REGULACAO'),
        ('004', 'CENTRAL DE ABASTECIMENTO'),
        ('005', 'CENTRAL DE TRANSPLANTE'),
        ('006', 'HOSPITAL'),
        ('007', 'CENTRO DE ASSISTENCIA OBSTETRICA E NEONATAL NORMAL'),
        ('008', 'PRONTO ATENDIMENTO'),
        ('009', 'FARMACIA'),
        ('010', 'UNIDADE DE ATENCAO HEMATOLOGICA E/OU HEMOTERAPICA'),
        ('011', 'NUCLEO DE TELESSAUDE'),
        ('012', 'UNIDADE DE ATENCAO DOMICILIAR'),
        ('013', 'POLO DE PREVENCAO DE DOENCAS E AGRAVOS E PROMOCAO DA SAUDE'),
        ('014', 'CASAS DE APOIO A SAUDE'),
        ('015', 'UNIDADE DE REABILITACAO'),
        ('016', 'AMBULATORIO'),
        ('017', 'UNIDADE DE ATENCAO PSICOSSOCIAL'),
        ('018', 'UNIDADE DE APOIO DIAGNOSTICO'),
        ('019', 'UNIDADE DE TERAPIAS ESPECIAIS'),
        ('020', 'LABORATORIO DE PROTESE DENTARIA'),
        ('021', 'UNIDADE DE VIGILANCIA DE ZOONOSES'),
        ('022', 'LABORATORIO DE SAUDE PUBLICA'),
        ('023', 'CENTRO DE REFERENCIA EM SAUDE DO TRABALHADOR'),
        ('024', 'SERVICO DE VERIFICACAO DE OBITO'),
        ('025', 'CENTRO DE IMUNIZACAO')
    ],
    schema=['CO_TIPO_ESTABELECIMENTO', 'CO_TIPO_ESTABELECIMENTO_DESC']
)

tp_estab_df.show(truncate=False)

+-----------------------+----------------------------------------------------------+
|CO_TIPO_ESTABELECIMENTO|CO_TIPO_ESTABELECIMENTO_DESC                              |
+-----------------------+----------------------------------------------------------+
|000                    |OUTROS                                                    |
|001                    |UNIDADE BASICA DE SAUDE                                   |
|002                    |CENTRAL DE GESTAO EM SAUDE                                |
|003                    |CENTRAL DE REGULACAO                                      |
|004                    |CENTRAL DE ABASTECIMENTO                                  |
|005                    |CENTRAL DE TRANSPLANTE                                    |
|006                    |HOSPITAL                                                  |
|007                    |CENTRO DE ASSISTENCIA OBSTETRICA E NEONATAL NORMAL        |
|008                    |PRONTO ATENDIMENTO                      

#### Dimensão TIPO DE ESTABELECIMENTO (`tp_unidade_df`)

In [29]:
# Cria o data frame da dimensão Tipo de Estabelecimento (TP_UNIDADE)
# CNES.TP_UNIDADE == SIA.PA_TPUPS

# Tipos de Estabelecimentos (TP_UNIDADE) extraídos do arquivo de conversão para Tabwin TP_ESTAB.CNV
# baixado na página https://datasus.saude.gov.br/transferencia-de-arquivos/ em 15/08/2024
# Fonte: CNES - Cadastro Nacional de Estabelecimentos de Saúde, Modalidade: Arquivos auxiliares para tabulação, Tipo de Arquivo: Arquivo de definição do Tabwin
# /TAB_CNES/CNV/TP_ESTAB.CNV


## Importa a tabela TP_ESTAB.CNV e cria data frame
import csv

# Cria a lista para as tuplas
tp_unidade = []

# Exrai a tabela TP_ESTAB.CNV e coloca numa lista de tuplas
with open('data/TP_ESTAB.CNV', mode='r') as file:
    reader = csv.reader(file, delimiter='\t')
    # Pula a primeira linha do arquivo CNV
    next(reader)
    # Lê linha a linha (linhas com mais de um char), faz o slice e limpa os espaços do início e fim de cada string resultante
    for row in reader:
        if len(row[0]) > 1:
            tp_unidade.append((row[0][112:114].strip(), row[0][11:112].strip()))

# Cria o data frame
tp_unidade_df = spark.createDataFrame(
    data=tp_unidade,
    schema=['TP_UNIDADE', 'TP_UNIDADE_DESC']
)

tp_unidade_df.show(truncate=False)

+----------+----------------------------------------------------------+
|TP_UNIDADE|TP_UNIDADE_DESC                                           |
+----------+----------------------------------------------------------+
|01        |POSTO DE SAUDE                                            |
|02        |CENTRO DE SAUDE/UNIDADE BASICA                            |
|04        |POLICLINICA                                               |
|05        |HOSPITAL GERAL                                            |
|07        |HOSPITAL ESPECIALIZADO                                    |
|09        |PRONTO SOCORRO DE HOSPITAL GERAL (ANTIGO)                 |
|12        |PRONTO SOCORRO TRAUMATO-ORTOPEDICO (ANTIGO)               |
|15        |UNIDADE MISTA                                             |
|20        |PRONTO SOCORRO GERAL                                      |
|21        |PRONTO SOCORRO ESPECIALIZADO                              |
|22        |CONSULTORIO ISOLADO                                 

In [30]:
# Completa o data frame 'cnes_df' com as descrições dos campos TP_UNIDADE e CO_TIPO_ESTABELECIMENTO

cnes_df = cnes_df \
    .join(tp_unidade_df, on='TP_UNIDADE', how='left') \
    .join(tp_estab_df, on='CO_TIPO_ESTABELECIMENTO', how='left')

In [31]:
# Verifica se todos os registros do data frame original cnes_df continuam após as junções

cnes_df.count() == cnes_df_count

True

In [32]:
# Renomeia a coluna CO_UNIDADE para PA_CODUNI para padronizar com a tabela de fatos SIA_PA

cnes_df = cnes_df \
    .withColumnRenamed('CO_CNES', 'PA_CODUNI')

In [33]:
cnes_df.show(1, vertical=True, truncate=False)

-RECORD 0-----------------------------------------------------------------
 CO_TIPO_ESTABELECIMENTO      | NULL                                      
 TP_UNIDADE                   | 36                                        
 CO_UNIDADE                   | 3550307917376                             
 PA_CODUNI                    | 7917376                                   
 NU_CNPJ_MANTENEDORA          | NULL                                      
 TP_PFPJ                      | 3                                         
 NIVEL_DEP                    | 1                                         
 NO_RAZAO_SOCIAL              | JOSE R VALENTE JR CLINICA ODONTOLOGICA ME 
 NO_FANTASIA                  | ESTETICA ORAL VALENTE                     
 NO_LOGRADOURO                | RUA SERRA DE BOTUCATU                     
 NU_ENDERECO                  | 878                                       
 NO_COMPLEMENTO               | SALAS 1102 1103 1104                      
 NO_BAIRRO               

In [34]:
# Salva a tabela `cnes_df` na pasta do data warehouse `sus-data-warehouse` em formato Apache Parquet

cnes_df.write \
    .format('parquet') \
    .mode('overwrite') \
    .save('sus-data-warehouse/cnes')

### Dimensão SIGTAP - PROCEDIMENTOS (`sigtap_proced_df`) (`sia_df['PA_PROC_ID']`)

In [16]:
# Carrega num dataframe os arquivos da tabela de dimensão SIGTAP - PROCEDIMENTOS

sigtap_proced_df = spark.read.option('mergeSchema', 'true').parquet('data/tb_sigtap_procedimento')

In [17]:
# Exibe o primeiro registro da tabela SIGTAP - PROCEDIMENTOS
sigtap_proced_df.show(1, vertical=True, truncate=False)

-RECORD 0-----------------------------------------------------------------------------
 CO_PROCEDIMENTO      | 0101010010                                                    
 NO_PROCEDIMENTO      | ATIVIDADE EDUCATIVA / ORIENTAÇÃO EM GRUPO NA ATENÇÃO PRIMÁRIA 
 TP_COMPLEXIDADE      | 1                                                             
 TP_SEXO              | N                                                             
 QT_MAXIMA_EXECUCAO   | 9999                                                          
 QT_DIAS_PERMANENCIA  | 9999                                                          
 QT_PONTOS            | 0000                                                          
 VL_IDADE_MINIMA      | 9999                                                          
 VL_IDADE_MAXIMA      | 9999                                                          
 VL_SH                | 0000000000                                                    
 VL_SA                | 0000000000         

In [20]:
sigtap_proced_df.count()

275577

In [21]:
len(sigtap_proced_df.columns)

16

In [37]:
# Renomeia a coluna 'CO_PROCEDIMENTO' para 'PA_PROC_ID' para padronizar com a tabela de fatos SIA_PA

sigtap_proced_df = sigtap_proced_df \
    .withColumnRenamed('CO_PROCEDIMENTO', 'PA_PROC_ID')

In [38]:
# Salva a tabela `sigtap_proced_df` na pasta do data warehouse `sus-data-warehouse` em formato Apache Parquet

sigtap_proced_df.write \
    .format('parquet') \
    .mode('overwrite') \
    .save('sus-data-warehouse/sigtap_proced')

### Dimensão CID - CÓDIGO INTERNACIONAL DE DOENÇAS (`cid_df`) (`sia_df['PA_CIDPRI', 'PA_CIDSEC', 'PA_CIDCAS']`)

In [22]:
# Carrega num dataframe a tabela CID-10 - Código Internacional de Doenças

cid_df = spark.read.option('mergeSchema', 'true').parquet('data/tb_sigtap_cid')

In [23]:
# Conta a quantidade registros

cid_df.count()

14230

In [24]:
len(cid_df.columns)

6

In [41]:
# Conta o número de códigos CID únicos

cid_df \
    .select('CO_CID') \
    .distinct() \
    .count()

14230

In [42]:
# Mostra as colunas e tipos de dados no dataframe CID

cid_df.printSchema()

root
 |-- CO_CID: string (nullable = true)
 |-- NO_CID: string (nullable = true)
 |-- TP_AGRAVO: string (nullable = true)
 |-- TP_SEXO: string (nullable = true)
 |-- TP_ESTADIO: string (nullable = true)
 |-- VL_CAMPOS_IRRADIADOS: string (nullable = true)



In [43]:
# Mostra os primeiros 20 registros do dataframe CID

cid_df.show()

+------+--------------------+---------+-------+----------+--------------------+
|CO_CID|              NO_CID|TP_AGRAVO|TP_SEXO|TP_ESTADIO|VL_CAMPOS_IRRADIADOS|
+------+--------------------+---------+-------+----------+--------------------+
|   A00|              Cólera|        0|      I|         N|                 000|
|  A000|Cólera devida a V...|        2|      I|         N|                 000|
|  A001|Cólera devida a V...|        2|      I|         N|                 000|
|  A009|Cólera não especi...|        2|      I|         N|                 000|
|   A01|Febres tifóide e ...|        0|      I|         N|                 000|
|  A010|       Febre tifóide|        1|      I|         N|                 000|
|  A011| Febre paratifóide A|        0|      I|         N|                 000|
|  A012| Febre paratifóide B|        0|      I|         N|                 000|
|  A013| Febre paratifóide C|        0|      I|         N|                 000|
|  A014|Febre paratifóide...|        0| 

In [44]:
cid_df \
    .selectExpr('len(CO_CID) as LEN',
                'CO_CID',
                'NO_CID') \
    .groupBy('LEN') \
    .count() \
    .show()

+---+-----+
|LEN|count|
+---+-----+
|  3| 2042|
|  4|12188|
+---+-----+



In [45]:
cid_df = cid_df \
    .withColumn('CO_CATEG', F.left('CO_CID', F.lit(3)))

cid_df.show()

+------+--------------------+---------+-------+----------+--------------------+--------+
|CO_CID|              NO_CID|TP_AGRAVO|TP_SEXO|TP_ESTADIO|VL_CAMPOS_IRRADIADOS|CO_CATEG|
+------+--------------------+---------+-------+----------+--------------------+--------+
|   A00|              Cólera|        0|      I|         N|                 000|     A00|
|  A000|Cólera devida a V...|        2|      I|         N|                 000|     A00|
|  A001|Cólera devida a V...|        2|      I|         N|                 000|     A00|
|  A009|Cólera não especi...|        2|      I|         N|                 000|     A00|
|   A01|Febres tifóide e ...|        0|      I|         N|                 000|     A01|
|  A010|       Febre tifóide|        1|      I|         N|                 000|     A01|
|  A011| Febre paratifóide A|        0|      I|         N|                 000|     A01|
|  A012| Febre paratifóide B|        0|      I|         N|                 000|     A01|
|  A013| Febre parati

In [46]:
cid_df = cid_df.alias('A') \
    .join(cid_df.select('CO_CID', 'NO_CID').alias('B'), F.col('A.CO_CATEG') == F.col('B.CO_CID')) \
    .selectExpr('A.CO_CID', 
                'A.NO_CID', 
                'CO_CATEG', 
                'B.NO_CID as NO_CATEG', 
                'TP_AGRAVO', 
                'TP_SEXO', 
                'TP_ESTADIO', 
                'VL_CAMPOS_IRRADIADOS')

cid_df.show()

+------+--------------------+--------+--------------------+---------+-------+----------+--------------------+
|CO_CID|              NO_CID|CO_CATEG|            NO_CATEG|TP_AGRAVO|TP_SEXO|TP_ESTADIO|VL_CAMPOS_IRRADIADOS|
+------+--------------------+--------+--------------------+---------+-------+----------+--------------------+
|   A00|              Cólera|     A00|              Cólera|        0|      I|         N|                 000|
|  A000|Cólera devida a V...|     A00|              Cólera|        2|      I|         N|                 000|
|  A001|Cólera devida a V...|     A00|              Cólera|        2|      I|         N|                 000|
|  A009|Cólera não especi...|     A00|              Cólera|        2|      I|         N|                 000|
|   A01|Febres tifóide e ...|     A01|Febres tifóide e ...|        0|      I|         N|                 000|
|  A010|       Febre tifóide|     A01|Febres tifóide e ...|        1|      I|         N|                 000|
|  A011| F

In [47]:
# Salva a tabela `cid_df` na pasta do data warehouse `sus-data-warehouse` em formato Apache Parquet

cid_df.write \
    .format('parquet') \
    .mode('overwrite') \
    .save('sus-data-warehouse/cid')

### Dimensão ANO_MES  (`ano_mes_df`) (`sia_df['PA_CMP']`)

In [48]:
# Cria um data frame auxiliar para a (sub)dimensão MÊS

mes_df = spark.createDataFrame(
    data=[('01', 'JANEIRO', 'JAN', '1', '1'),
          ('02', 'FEVEREIRO', 'FEV', '1', '1'),
          ('03', 'MARÇO', 'MAR', '1', '1'),
          ('04', 'ABRIL', 'ABR', '2', '1'),
          ('05', 'MAIO', 'MAI', '2', '1'),
          ('06', 'JUNHO', 'JUN', '2', '1'),
          ('07', 'JULHO', 'JUL', '3', '2'),
          ('08', 'AGOSTO', 'AGO', '3', '2'),
          ('09', 'SETEMBRO', 'SET', '3', '2'),
          ('10', 'OUTUBRO', 'OUT', '4', '2'),
          ('11', 'NOVEMBRO', 'NOV', '4', '2'),
          ('12', 'DEZEMBRO', 'DEZ', '4', '2')],
    schema=['MES', 'MES_NOME', 'MES_ABREV', 'TRIMESTRE', 'SEMESTRE']
)

mes_df.show()

+---+---------+---------+---------+--------+
|MES| MES_NOME|MES_ABREV|TRIMESTRE|SEMESTRE|
+---+---------+---------+---------+--------+
| 01|  JANEIRO|      JAN|        1|       1|
| 02|FEVEREIRO|      FEV|        1|       1|
| 03|    MARÇO|      MAR|        1|       1|
| 04|    ABRIL|      ABR|        2|       1|
| 05|     MAIO|      MAI|        2|       1|
| 06|    JUNHO|      JUN|        2|       1|
| 07|    JULHO|      JUL|        3|       2|
| 08|   AGOSTO|      AGO|        3|       2|
| 09| SETEMBRO|      SET|        3|       2|
| 10|  OUTUBRO|      OUT|        4|       2|
| 11| NOVEMBRO|      NOV|        4|       2|
| 12| DEZEMBRO|      DEZ|        4|       2|
+---+---------+---------+---------+--------+



In [49]:
# Cria uma lista de string de anos e meses no formato AAAAMM de 2015 a 2020

pa_cmp = [str(ano) + (str(mes) if len(str(mes)) > 1 else '0' + str(mes)) 
          for ano in list(range(2015, 2021)) 
          for mes in list(range(1, 13))]

In [50]:
# Cria o data frame para a dimensão ANO_MES e popula com a informação dos meses

ano_mes_df = spark.createDataFrame(
    data=[(ano_mes,) for ano_mes in pa_cmp],
    schema=['PA_CMP']
)

ano_mes_df = ano_mes_df \
    .selectExpr('PA_CMP',
                'left(PA_CMP, 4) as ANO',
                'right(PA_CMP, 2) as MES') \
    .join(mes_df, 'MES') \
    .orderBy('PA_CMP') \
    .select('PA_CMP', 'ANO', 'MES', 'MES_NOME', 'MES_ABREV', 'TRIMESTRE', 'SEMESTRE')

ano_mes_df.show()

+------+----+---+---------+---------+---------+--------+
|PA_CMP| ANO|MES| MES_NOME|MES_ABREV|TRIMESTRE|SEMESTRE|
+------+----+---+---------+---------+---------+--------+
|201501|2015| 01|  JANEIRO|      JAN|        1|       1|
|201502|2015| 02|FEVEREIRO|      FEV|        1|       1|
|201503|2015| 03|    MARÇO|      MAR|        1|       1|
|201504|2015| 04|    ABRIL|      ABR|        2|       1|
|201505|2015| 05|     MAIO|      MAI|        2|       1|
|201506|2015| 06|    JUNHO|      JUN|        2|       1|
|201507|2015| 07|    JULHO|      JUL|        3|       2|
|201508|2015| 08|   AGOSTO|      AGO|        3|       2|
|201509|2015| 09| SETEMBRO|      SET|        3|       2|
|201510|2015| 10|  OUTUBRO|      OUT|        4|       2|
|201511|2015| 11| NOVEMBRO|      NOV|        4|       2|
|201512|2015| 12| DEZEMBRO|      DEZ|        4|       2|
|201601|2016| 01|  JANEIRO|      JAN|        1|       1|
|201602|2016| 02|FEVEREIRO|      FEV|        1|       1|
|201603|2016| 03|    MARÇO|    

In [51]:
# Salva a tabela `ano_mes_df` na pasta do data warehouse `sus-data-warehouse` em formato Apache Parquet

ano_mes_df.write \
    .format('parquet') \
    .mode('overwrite') \
    .save('sus-data-warehouse/ano_mes')

### Dimensão CÓDIGO BRASILEIRO DE OCUPAÇÕES (`cbocod_df`) (`sia_df['PA_CBOCOD']`)

In [25]:
# Instala e import o pacote `dbfread`, para carregar arquivos tipo .dbf
# Necessário para adquirir a lista do Código Brasileiro de Ocupações disponibilizada pelo DATASUS

!pip install dbfread
from dbfread import DBF



In [26]:
# Carrega a tabela a do Código Brasileiro de Ocupações (PA_CBOCOD) extraídos do arquivo de conversão para Tabwin CBO.DBF
# baixado na página https://datasus.saude.gov.br/transferencia-de-arquivos/ em 12/08/2024
# Fonte: SIASUS - Sistema de Informações Ambulatoriais dos SUS), Modalidade: Arquivos auxiliares para tabulação, Tipo de Arquivo: Arquivo de definição do Tabwin
# /TAB_SIA/DBF/CBO.DBF

cbo = DBF('data/CBO.dbf', load=True)

cbo.records[0]

OrderedDict([('CBO', '010105'), ('DS_CBO', 'Oficial General da Aeronautica')])

In [27]:
# Cria o Data Frame da dimensão PA_CBOCOD - Código Brasileiro de Ocupações

cbocod_df = spark.createDataFrame(
    data=[(record['CBO'], record['DS_CBO']) for record in cbo.records],
    schema=['PA_CBOCOD', 'CBOCOD_DESC'])

cbocod_df.show(truncate=False)

+---------+-----------------------------------+
|PA_CBOCOD|CBOCOD_DESC                        |
+---------+-----------------------------------+
|010105   |Oficial General da Aeronautica     |
|010110   |Oficial General do Exercito        |
|010115   |Oficial General da Marinha         |
|010205   |Oficial da Aeronautica             |
|010210   |Oficial do Exercito                |
|010215   |Oficial da Marinha                 |
|010305   |Praca da Aeronautica               |
|010310   |Praca do Exercito                  |
|010315   |Praca da Marinha                   |
|020105   |Coronel da Policia Militar         |
|020110   |Tenente-Coronel da Policia Militar |
|020115   |Major da Policia Militar           |
|020205   |Capitao da Policia Militar         |
|020305   |Primeiro Tenente de Policia Militar|
|020310   |Segundo Tenente de Policia Militar |
|021105   |Subtenente da Policia Militar      |
|021110   |Sargento da Policia Militar        |
|021205   |Cabo da Policia Militar      

In [28]:
# Quantidade de registros no data frame 'cbocod_df'

cbocod_df.count()

2812

### Dimensão TIPO DE ESTABELECIMENTO (`tpups_df`) (`sia_df['PA_TPUPS']`)

In [57]:
# Cria um Data Frame para a dimensão Tipos de Estabelecimentos (PA_TPUPS)
# SIA.PA_TPUPS == CNES.TP_UNIDADE

# Tipos de estabelecimento (PA_TPUPS) extraídos do arquivo de conversão para Tabwin TP_ESTAB.CNV
# baixado na página https://datasus.saude.gov.br/transferencia-de-arquivos/ em 12/08/2024
# Fonte: SIASUS - Sistema de Informações Ambulatoriais dos SUS), Modalidade: Arquivos auxiliares para tabulação, Tipo de Arquivo: Arquivo de definição do Tabwin
# /TAB_SIA/CNV/TP_ESTAB.CNV

tpups_df = spark.createDataFrame(
    data=[('74', 'ACADEMIA DA SAÚDE'),
          ('81', 'CENTRAL DE REGUALAÇÃO'),
          ('76', 'CENTRAL DE REGULAÇÃO MÉDICA DAS URGÊNCIAS'),
          ('71', 'CENTRO DE APOIO A SAÚDE DA FAMÍLIA-CASF'),
          ('69', 'CENTRO DE ATENÇÃOO HEMOTERÁPICA E/OU HEMATOLÓGICA'),
          ('70', 'CENTRO DE ATENÇÃO PSICOSSOCIAL-CAPS'),
          ('61', 'CENTRO DE PARTO NORMAL'),
          ('02', 'CENTRO DE SAUDE/UNIDADE BASICA DE SAUDE'),
          ('64', 'CENTRAL DE REGULACAO DE SERVICOS DE SAUDE'),
          ('36', 'CLINICA ESPECIALIZADA/AMBULATORIO ESPECIALIZADO'),
          ('22', 'CONSULTORIO'),
          ('60', 'COOPERATIVA'),
          ('43', 'FARMACIA'),
          ('07', 'HOSPITAL ESPECIALIZADO'),
          ('05', 'HOSPITAL GERAL'),
          ('62', 'HOSPITAL DIA'),
          ('67', 'LABORATORIO CENTRAL DE SAUDE PUBLICA - LACEN'),
          ('80', 'ORIO DE SAUDE PUBLICA'),
          ('04', 'POLICLINICA'),
          ('79', 'OFICINA ORTOPEDICA'),
          ('01', 'POSTO DE SAUDE'),
          ('73', 'PRONTO ANTEDIMENTO'),
          ('21', 'PRONTO SOCORRO ESPECIALIZADO'),
          ('20', 'PRONTO SOCORRO GERAL'),
          ('68', 'SECRETARIA DE SAUDE'),
          ('77', 'SERVICO DE ATENCAO DOMICILIAR ISOLADO(HOME CARE)'),
          ('63', 'UNIDADE AUTORIZADORA'),
          ('72', 'UNIDADE DE ATENÇÃO E SAÚDE INDÍGENA'),
          ('78', 'UNIDADE DE ATENCAO EM REGIME RESIDENCIAL'),
          ('39', 'UNIDADE DE SERVICO DE APOIO DE DIAGNOSE E TERAPIA'),
          ('45', 'UNIDADE DE SAUDE DA FAMILIA'),
          ('50', 'UNIDADE DE VIGILANCIA EM SAUDE'),
          ('65', 'UNIDADE DE VIGILANCIA EPIDEMIOLOGIA (ANTIGO)'),
          ('66', 'UNIDADE DE VIGILANCIA SANITARIA (ANTIGO)'),
          ('15', 'UNIDADE MISTA'),
          ('42', 'UNIDADE MOVEL DE NIVEL PRE-HOSP-URGENCIA/EMERGENCIA'),
          ('32', 'UNIDADE MOVEL FLUVIAL'),
          ('40', 'UNIDADE MOVEL TERRESTRE'),
          ('75', 'TELESAÚDE'),
          ('09', 'PRONTO SOCORRO DE HOSPITAL GERAL (ANTIGO)'),
          ('12', 'PRONTO SOCORRO TRAUMATO-ORTOPEDICO (ANTIGO)'),
          ('-99', 'TIPO ESTABELECIMENTO NÃO INFORMADO')],
    schema=['PA_TPUPS', 'TPUPS_DESC']
)

tpups_df.show(truncate=False)

+--------+-------------------------------------------------+
|PA_TPUPS|TPUPS_DESC                                       |
+--------+-------------------------------------------------+
|74      |ACADEMIA DA SAÚDE                                |
|81      |CENTRAL DE REGUALAÇÃO                            |
|76      |CENTRAL DE REGULAÇÃO MÉDICA DAS URGÊNCIAS        |
|71      |CENTRO DE APOIO A SAÚDE DA FAMÍLIA-CASF          |
|69      |CENTRO DE ATENÇÃOO HEMOTERÁPICA E/OU HEMATOLÓGICA|
|70      |CENTRO DE ATENÇÃO PSICOSSOCIAL-CAPS              |
|61      |CENTRO DE PARTO NORMAL                           |
|02      |CENTRO DE SAUDE/UNIDADE BASICA DE SAUDE          |
|64      |CENTRAL DE REGULACAO DE SERVICOS DE SAUDE        |
|36      |CLINICA ESPECIALIZADA/AMBULATORIO ESPECIALIZADO  |
|22      |CONSULTORIO                                      |
|60      |COOPERATIVA                                      |
|43      |FARMACIA                                         |
|07      |HOSPITAL ESPEC

In [58]:
# Salva a tabela `tpups_df` na pasta do data warehouse `sus-data-warehouse` em formato Apache Parquet

tpups_df.write \
    .format('parquet') \
    .mode('overwrite') \
    .save('sus-data-warehouse/tpups')

### Dimensão CARÁTER DE ATENDIMENTO (`catend_df`) (`sia_df['PA_CATEND']`)

In [59]:
# Caráteres de Atendimento (PA_CATEND) extraídos do arquivo de conversão para Tabwin CARAT_AT.CNV
# baixado na página https://datasus.saude.gov.br/transferencia-de-arquivos/ em 12/08/2024
# Fonte: SIASUS - Sistema de Informações Ambulatoriais dos SUS), Modalidade: Arquivos auxiliares para tabulação, Tipo de Arquivo: Arquivo de definição do Tabwin
# /TAB_SIA/CNV/CARAT_AT.CNV

catend_df = spark.createDataFrame(
    data=[
        ('01', 'ELETIVO'),
        ('02', 'URGÊNCIA'),
        ('03', 'ACIDENTE NO LOCAL TRABALHO OU A SERViÇO DA EMPRESA'),
        ('04', 'ACIDENTE NO TRAJETO PARA O TRABALHO '),
        ('05', 'OUTROS TIPOS DE ACIDENTE DE TRÂNSITO'),
        ('06', 'OUTROS TIPOS LESÕES/ENVENENAMENTOS(AGENT.FIS./QUIM.'),
        ('99', 'INFORMAÇÃO INEXISTENTE  (BPA-C)'),
        ('00', 'CARATER DE ATENDIMENTO NÃO INFORMADO'),
        ('07', 'CARATER DE ATENDIMENTO INVALIDO'),
        ('10', 'CARATER DE ATENDIMENTO INVALIDO'),
        ('12', 'CARATER DE ATENDIMENTO INVALIDO'),
        ('20', 'CARATER DE ATENDIMENTO INVALIDO'),
        ('53', 'CARATER DE ATENDIMENTO INVALIDO'),
        ('54', 'CARATER DE ATENDIMENTO INVALIDO'),
        ('57', 'CARATER DE ATENDIMENTO INVALIDO'),
        ('0-', 'CARATER DE ATENDIMENTO INVALIDO'),
        ('0E', 'CARATER DE ATENDIMENTO INVALIDO'),
        ('0U', 'CARATER DE ATENDIMENTO INVALIDO')
    ],
    schema=['PA_CATEND', 'CATEND_DESC']
)

catend_df.show(truncate=False)

+---------+---------------------------------------------------+
|PA_CATEND|CATEND_DESC                                        |
+---------+---------------------------------------------------+
|01       |ELETIVO                                            |
|02       |URGÊNCIA                                           |
|03       |ACIDENTE NO LOCAL TRABALHO OU A SERViÇO DA EMPRESA |
|04       |ACIDENTE NO TRAJETO PARA O TRABALHO                |
|05       |OUTROS TIPOS DE ACIDENTE DE TRÂNSITO               |
|06       |OUTROS TIPOS LESÕES/ENVENENAMENTOS(AGENT.FIS./QUIM.|
|99       |INFORMAÇÃO INEXISTENTE  (BPA-C)                    |
|00       |CARATER DE ATENDIMENTO NÃO INFORMADO               |
|07       |CARATER DE ATENDIMENTO INVALIDO                    |
|10       |CARATER DE ATENDIMENTO INVALIDO                    |
|12       |CARATER DE ATENDIMENTO INVALIDO                    |
|20       |CARATER DE ATENDIMENTO INVALIDO                    |
|53       |CARATER DE ATENDIMENTO INVALI

In [60]:
# Salva a tabela `catend_df` na pasta do data warehouse `sus-data-warehouse` em formato Apache Parquet

catend_df.write \
    .format('parquet') \
    .mode('overwrite') \
    .save('sus-data-warehouse/catend')

### Dimensão DOCUMENTO DE ORIGEM (`docorig_df`) (`sia_df['PA_DOCORIG']`)

In [61]:
# Documentos de Origem (PA_DOCORIG) extraídos do arquivo de conversão para Tabwin DOCORIG.CNV
# baixado na página https://datasus.saude.gov.br/transferencia-de-arquivos/ em 12/08/2024
# Fonte: SIASUS - Sistema de Informações Ambulatoriais dos SUS), Modalidade: Arquivos auxiliares para tabulação, Tipo de Arquivo: Arquivo de definição do Tabwin
# /TAB_SIA/CNV/DOCORIG.CNV

docorig_df = spark.createDataFrame(
    data=[
        ('C', 'BPA - Consolidado'),  # BPA-C
        ('I', 'BPA - Individualizado'),  # BPA-I
        ('P', 'APAC - Procedimento Principal'),
        ('S', 'APAC - Procedimento Secundário'),
        ('A', 'RAAS - Atenção Domiciliar'),
        ('B', 'RAAS - Psicossocial')
    ],
    schema=['PA_DOCORIG', 'DOCORIG_DESC']
)

docorig_df.show(truncate=False)

+----------+------------------------------+
|PA_DOCORIG|DOCORIG_DESC                  |
+----------+------------------------------+
|C         |BPA - Consolidado             |
|I         |BPA - Individualizado         |
|P         |APAC - Procedimento Principal |
|S         |APAC - Procedimento Secundário|
|A         |RAAS - Atenção Domiciliar     |
|B         |RAAS - Psicossocial           |
+----------+------------------------------+



In [62]:
# Quantidade total de registros do data frame 'sia_df'

sia_df_count = sia_df.count()

In [63]:
# Verifica a distribuição de registros por PA_DOCORIG

sia_df \
    .groupBy('PA_DOCORIG') \
    .count() \
    .withColumn('DOCORIG_PERCENT', F.round(F.col('count') / sia_df_count * 100, 2)) \
    .join(docorig_df, 'PA_DOCORIG') \
    .orderBy(F.desc('count')) \
    .withColumn('count', F.format_number('count', 0)) \
    .show(truncate=False)

+----------+-----------+---------------+------------------------------+
|PA_DOCORIG|count      |DOCORIG_PERCENT|DOCORIG_DESC                  |
+----------+-----------+---------------+------------------------------+
|I         |958,547,600|55.0           |BPA - Individualizado         |
|C         |460,061,443|26.4           |BPA - Consolidado             |
|P         |147,446,676|8.46           |APAC - Procedimento Principal |
|S         |119,552,212|6.86           |APAC - Procedimento Secundário|
|B         |56,971,394 |3.27           |RAAS - Psicossocial           |
|A         |164,644    |0.01           |RAAS - Atenção Domiciliar     |
+----------+-----------+---------------+------------------------------+



In [64]:
sia_df \
    .where('PA_DOCORIG = "C"') \
    .show(1, vertical=True)

-RECORD 0---------------------
 PA_CMP     | 201907          
 PA_CODUNI  | 4026896         
 PA_TPUPS   | 05              
 PA_UFMUN   | 291460          
 PA_PROC_ID | 0202010635      
 PA_DOCORIG | C               
 PA_CNSMED  | 000000000000000 
 PA_CBOCOD  | 223415          
 PA_MOTSAI  | 00              
 PA_CIDPRI  | 0000            
 PA_CIDSEC  | 0000            
 PA_CIDCAS  | 0000            
 PA_CATEND  | 99              
 PA_IDADE   | 999             
 PA_SEXO    | 0               
 PA_RACACOR | 00              
 PA_MUNPCN  | 999999          
 PA_QTDAPR  | 1               
 PA_VALAPR  | 1.85            
only showing top 1 row



In [65]:
# Salva a tabela `docorig_df` na pasta do data warehouse `sus-data-warehouse` em formato Apache Parquet

docorig_df.write \
    .format('parquet') \
    .mode('overwrite') \
    .save('sus-data-warehouse/docorig')

### Dimensão SEXO (`sexo_df`) (`sia_df['PA_SEXO']`)

In [66]:
# Verifica a contagem de valores na coluna PA_SEXO

sia_df \
    .groupBy('PA_SEXO') \
    .count() \
    .orderBy(F.desc('count')) \
    .withColumn('count', F.format_number('count', 0)) \
    .show()

+-------+-----------+
|PA_SEXO|      count|
+-------+-----------+
|      F|761,155,673|
|      M|521,526,726|
|      0|460,061,550|
|     01|          8|
|     03|          7|
|     99|          5|
+-------+-----------+



PA_SEXO "0"? Não informado? ['01', '02', '99'] devem ser erros.

In [67]:
# Sexos (PA_SEXO) extraídos do arquivo de conversão para Tabwin SEXO.CNV
# baixado na página https://datasus.saude.gov.br/transferencia-de-arquivos/ em 12/08/2024
# Fonte: SIASUS - Sistema de Informações Ambulatoriais dos SUS), Modalidade: Arquivos auxiliares para tabulação, Tipo de Arquivo: Arquivo de definição do Tabwin
# /TAB_SIA/CNV/SEXO.CNV

sexo_df = spark.createDataFrame(
    data= [('0', 'Não exigido'),
           ('M', 'Masculino'),
           ('F', 'Feminino')],
    schema=['PA_SEXO', 'SEXO_DESC']
)

sexo_df.show()

+-------+-----------+
|PA_SEXO|  SEXO_DESC|
+-------+-----------+
|      0|Não exigido|
|      M|  Masculino|
|      F|   Feminino|
+-------+-----------+



In [68]:
# Salva a tabela `sexo_df` na pasta do data warehouse `sus-data-warehouse` em formato Apache Parquet

sexo_df.write \
    .format('parquet') \
    .mode('overwrite') \
    .save('sus-data-warehouse/sexo')

### Dimensão RAÇA/COR (`raca_cor_df`) (`sia_df['PA_RACACOR']`)

In [69]:
# Raça/Cor (PA_RACACOR) extraídos do arquivo de conversão para Tabwin RACA_COR.CNV
# baixado na página https://datasus.saude.gov.br/transferencia-de-arquivos/ em 12/08/2024
# Fonte: SIASUS - Sistema de Informações Ambulatoriais dos SUS), Modalidade: Arquivos auxiliares para tabulação, Tipo de Arquivo: Arquivo de definição do Tabwin
# /TAB_SIA/CNV/RACA_COR.CNV

raca_cor_df = spark.createDataFrame(
    data=[('01', 'BRANCA'),
          ('02', 'PRETA'),
          ('03', 'PARDA'),
          ('04', 'AMARELA'),
          ('05', 'INDIGENA'),
          ('99', 'SEM INFORMAÇÃO'),
          ('06', 'RAÇA/COR=06 (INDEVIDO)'),
          ('09', 'RAÇA/COR=09 (INDEVIDO)'),
          ('1M', 'RACA/COR (OUTROS INDEVIDOS)'),
          ('1G', 'RACA/COR (OUTROS INDEVIDOS)'),
          ('1C', 'RACA/COR (OUTROS INDEVIDOS)'),
          ('DE', 'RACA/COR (OUTROS INDEVIDOS)'),
          ('D', 'RACA/COR (OUTROS INDEVIDOS)'),
          ('87', 'RACA/COR (OUTROS INDEVIDOS)')],
    schema=['PA_RACACOR', 'RACACOR_DESC']
)

raca_cor_df.show(truncate=False)

+----------+---------------------------+
|PA_RACACOR|RACACOR_DESC               |
+----------+---------------------------+
|01        |BRANCA                     |
|02        |PRETA                      |
|03        |PARDA                      |
|04        |AMARELA                    |
|05        |INDIGENA                   |
|99        |SEM INFORMAÇÃO             |
|06        |RAÇA/COR=06 (INDEVIDO)     |
|09        |RAÇA/COR=09 (INDEVIDO)     |
|1M        |RACA/COR (OUTROS INDEVIDOS)|
|1G        |RACA/COR (OUTROS INDEVIDOS)|
|1C        |RACA/COR (OUTROS INDEVIDOS)|
|DE        |RACA/COR (OUTROS INDEVIDOS)|
|D         |RACA/COR (OUTROS INDEVIDOS)|
|87        |RACA/COR (OUTROS INDEVIDOS)|
+----------+---------------------------+



In [70]:
# Salva a tabela `raca_cor_df` na pasta do data warehouse `sus-data-warehouse` em formato Apache Parquet

raca_cor_df.write \
    .format('parquet') \
    .mode('overwrite') \
    .save('sus-data-warehouse/raca_cor')

### Dimensão MOTIVO DE SAÍDA (`motsai_df`) (`sia_df['PA_MOTSAI']`)

In [71]:
# Verifica a contagem de valores na coluna PA_MOTSAI

sia_df \
    .groupBy('PA_MOTSAI') \
    .count() \
    .orderBy(F.desc('count')) \
    .withColumn('count', F.format_number('count', 0)) \
    .show()

+---------+-------------+
|PA_MOTSAI|        count|
+---------+-------------+
|       00|1,418,609,023|
|       21|  275,660,569|
|       28|   21,568,731|
|       15|    8,617,676|
|       12|    6,889,036|
|       18|    4,178,059|
|       11|    2,269,761|
|       51|    1,272,749|
|       41|    1,063,127|
|       26|    1,006,297|
|       31|      885,177|
|       25|      178,617|
|       16|      165,496|
|       43|       98,692|
|       22|       97,093|
|       24|       70,733|
|       42|       68,800|
|       14|       34,744|
|       23|        6,243|
|       27|        3,326|
+---------+-------------+
only showing top 20 rows



In [72]:
# Motivos de Saída (PA_MOTSAI) extraídos do arquivo de conversão para Tabwin MOTSAIPE.CNV
# baixado na página https://datasus.saude.gov.br/transferencia-de-arquivos/ em 12/08/2024
# Fonte: SIASUS - Sistema de Informações Ambulatoriais dos SUS), Modalidade: Arquivos auxiliares para tabulação, Tipo de Arquivo: Arquivo de definição do Tabwin
# /TAB_SIA/CNV/MOTSAIPE.CNV

motsai_df = spark.createDataFrame(
    data=[('11', 'ALTA CURADO'),
          ('12', 'ALTA MELHORADO'),
          ('13', 'ALTA DA PUÉRPERA E PERMANÊNCIA DO RECÉM NASCIDO'),
          ('14', 'ALTA A PEDIDO'),
          ('15', 'ALTA COM PREVISÃO DE RETORNO P/ ACOMPAN. DO PACIENT'),
          ('16', 'ALTA POR EVASÃO'),
          ('17', 'ALTA DA PUÉRPERA E RECÉM NASCIDO'),
          ('18', 'ALTA POR OUTROS MOTIVOS'),
          ('21', 'PERMANÊNCIA POR CARACTERÍSTICAS PRÓPRIAS DA DOENÇA'),
          ('22', 'PERMANÊNCIA POR INTERCORRÊNCIA'),
          ('23', 'PERMANÊNCIA POR IMPOSSIBILIDADE SÓCIO-FAMILIAR'),
          ('24', 'PERMAN. POR PROCESSO-DOAÇÃO DE ÓRGÃOS-DOADOR VIVO'),
          ('25', 'PERMAN. POR PROCESSO-DOAÇÃO DE ÓRGÃOS-DOADOR MORTO'),
          ('26', 'PERMANÊNCIA POR MUDANÇAA DE PROCEDIMENTO'),
          ('27', 'PERMANÊNCIA POR REOPERAÇÃO'),
          ('28', 'PERMANÊNCIA POR OUTROS MOTIVOS'),
          ('31', 'TRANSFERIDO PARA OUTRO ESTABELECIMENTO'),
          ('41', 'ÓBITO COM DECLARAÇÃO DE ÓBITO FORNEC. MÉDICO ASSIST'),
          ('42', 'ÓBITO COM DECLARAÇÃO DE ÓBITO FORNECIDA PELO I.M.L'),
          ('43', 'ÓBITO COM DECLARAÇÃO DE ÓBITO FORNECIDA PELO S.V.O'),
          ('51', 'ENCERRAMENTO ADMINSTRATIVO'),
          ('00', 'PRODUÇÃO SEM MOTIVO DE SAÍDA (BPA-C / BPA-I)')],
    schema=['PA_MOTSAI', 'MOTSAI_DESC']
)

motsai_df.show(25, truncate=False)

+---------+---------------------------------------------------+
|PA_MOTSAI|MOTSAI_DESC                                        |
+---------+---------------------------------------------------+
|11       |ALTA CURADO                                        |
|12       |ALTA MELHORADO                                     |
|13       |ALTA DA PUÉRPERA E PERMANÊNCIA DO RECÉM NASCIDO    |
|14       |ALTA A PEDIDO                                      |
|15       |ALTA COM PREVISÃO DE RETORNO P/ ACOMPAN. DO PACIENT|
|16       |ALTA POR EVASÃO                                    |
|17       |ALTA DA PUÉRPERA E RECÉM NASCIDO                   |
|18       |ALTA POR OUTROS MOTIVOS                            |
|21       |PERMANÊNCIA POR CARACTERÍSTICAS PRÓPRIAS DA DOENÇA |
|22       |PERMANÊNCIA POR INTERCORRÊNCIA                     |
|23       |PERMANÊNCIA POR IMPOSSIBILIDADE SÓCIO-FAMILIAR     |
|24       |PERMAN. POR PROCESSO-DOAÇÃO DE ÓRGÃOS-DOADOR VIVO  |
|25       |PERMAN. POR PROCESSO-DOAÇÃO D

In [73]:
# Salva a tabela `motsai_df` na pasta do data warehouse `sus-data-warehouse` em formato Apache Parquet

motsai_df.write \
    .format('parquet') \
    .mode('overwrite') \
    .save('sus-data-warehouse/motsai')

## Encerramento da sessão Spark

In [74]:
spark.stop()