In [45]:
# Iniciando o ambiente PySpark
import findspark
findspark.init()

In [46]:
# Criando uma instância de SparkSession para iniciar uma sessão Spark
from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .master('local[*]')\
    .appName('Conversão e manipulação dos arquivos CSV para Parquet')\
    .config('spark.ui.port', '4050')\
    .getOrCreate()

In [47]:
PATH_PYSUS = r'data\001_PYSUS'
PATH_TABNET_UFMUN = r'data\002_TABNET\PA_UFMUN.csv'
PATH_TABNET_NIVCPL = r'data\002_TABNET\PA_NIVCPL.csv'
PATH_TABNET_CBOCOD = r'data\002_TABNET\PA_CBOCOD.csv'
PATH_TABNET_PROC_ID = r'data\002_TABNET\PA_PROC_ID.csv'
PATH_TABNET_G_PROC_ID = r'data\002_TABNET\PA_G_PROC_ID.csv'
PATH_TABNET_SG_PROC_ID = r'data\002_TABNET\PA_SG_PROC_ID.csv'

PATH_PYSUS_PARQUET = r'data\003_JOIN_DATASETS\PYSUS_PARQUET'
PATH_FINAL_DATASET = r'data\003_JOIN_DATASETS\FINAL_DATASET'

# Carregando DataFrame Spark

In [48]:
def read_csv(path):
    return spark.read.csv(path, sep=';', header=True, inferSchema=True)

In [49]:
%%time
df_pysus = read_csv(PATH_PYSUS)
df_pysus.printSchema()

root
 |-- PA_UFMUN: integer (nullable = true)
 |-- PA_CMP: string (nullable = true)
 |-- PA_PROC_ID: integer (nullable = true)
 |-- PA_NIVCPL: integer (nullable = true)
 |-- PA_CBOCOD: string (nullable = true)
 |-- PA_OBITO: integer (nullable = true)
 |-- PA_ENCERR: integer (nullable = true)
 |-- PA_PERMAN: integer (nullable = true)
 |-- PA_ALTA: integer (nullable = true)
 |-- PA_TRANSF: integer (nullable = true)
 |-- PA_QTDAPR: integer (nullable = true)
 |-- PA_VALAPR: double (nullable = true)
 |-- PA_UFDIF: integer (nullable = true)
 |-- PA_MNDIF: integer (nullable = true)
 |-- PA_G_PROC_ID: integer (nullable = true)
 |-- PA_SG_PROC_ID: integer (nullable = true)

CPU times: total: 0 ns
Wall time: 14.4 s


In [50]:
df_ufmun = read_csv(PATH_TABNET_UFMUN)
df_ufmun.printSchema()

root
 |-- PA_UFMUN: integer (nullable = true)
 |-- DS_PA_UFMUN: string (nullable = true)



In [51]:
df_nivcpl = read_csv(PATH_TABNET_NIVCPL)
df_nivcpl.printSchema()

root
 |-- PA_NIVCPL: integer (nullable = true)
 |-- DS_PA_NIVCPL: string (nullable = true)



In [52]:
df_cbocod = read_csv(PATH_TABNET_CBOCOD)
df_cbocod.printSchema()

root
 |-- PA_CBOCOD: string (nullable = true)
 |-- DS_PA_CBOCOD: string (nullable = true)



In [53]:
df_proc_id = read_csv(PATH_TABNET_PROC_ID)
df_proc_id.printSchema()

root
 |-- PA_PROC_ID: integer (nullable = true)
 |-- DS_PA_PROC_ID: string (nullable = true)



In [54]:
df_g_proc_id = read_csv(PATH_TABNET_G_PROC_ID)
df_g_proc_id.printSchema()

root
 |-- PA_G_PROC_ID: integer (nullable = true)
 |-- DS_PA_G_PROC_ID: string (nullable = true)



In [55]:
df_sg_proc_id = read_csv(PATH_TABNET_SG_PROC_ID)
df_sg_proc_id.printSchema()

root
 |-- PA_SG_PROC_ID: integer (nullable = true)
 |-- DS_PA_SG_PROC_ID: string (nullable = true)



# Convertendo base do PySUS de CSV para PARQUET para obter melhor desempenho de manipulação dos dados

In [56]:
%%time
# 'partitionBy' especifica a coluna pela qual os dados serão particionados. Neste caso, os dados serão particionados pela coluna de data 'PA_CMP'
df_pysus.write.parquet(
        path=PATH_PYSUS_PARQUET,
        mode='overwrite',
        partitionBy='PA_CMP' )

CPU times: total: 0 ns
Wall time: 24.2 s


In [57]:
%%time
df_pysus = spark.read.parquet(PATH_PYSUS_PARQUET)
df_pysus.printSchema()

root
 |-- PA_UFMUN: integer (nullable = true)
 |-- PA_PROC_ID: integer (nullable = true)
 |-- PA_NIVCPL: integer (nullable = true)
 |-- PA_CBOCOD: string (nullable = true)
 |-- PA_OBITO: integer (nullable = true)
 |-- PA_ENCERR: integer (nullable = true)
 |-- PA_PERMAN: integer (nullable = true)
 |-- PA_ALTA: integer (nullable = true)
 |-- PA_TRANSF: integer (nullable = true)
 |-- PA_QTDAPR: integer (nullable = true)
 |-- PA_VALAPR: double (nullable = true)
 |-- PA_UFDIF: integer (nullable = true)
 |-- PA_MNDIF: integer (nullable = true)
 |-- PA_G_PROC_ID: integer (nullable = true)
 |-- PA_SG_PROC_ID: integer (nullable = true)
 |-- PA_CMP: date (nullable = true)

CPU times: total: 0 ns
Wall time: 184 ms


In [58]:
f'O dataset possui {df_pysus.count()} linhas.'

'O dataset possui 33831473 linhas.'

# Juntando todos os DataFrames
O objetivo é adicionar as colunas de descrição obtidos no TabNet para enriquecer o dataset extraído da biblioteca PySUS

In [59]:
df = df_pysus.join(df_ufmun, 'PA_UFMUN', how='left')
df.select('PA_UFMUN', 'DS_PA_UFMUN', 'PA_QTDAPR', 'PA_VALAPR')\
  .show(3, False)

+--------+-----------+---------+---------+
|PA_UFMUN|DS_PA_UFMUN|PA_QTDAPR|PA_VALAPR|
+--------+-----------+---------+---------+
|261160  |RECIFE     |1        |1.85     |
|260680  |IGARASSU   |1        |0.0      |
|261160  |RECIFE     |2        |3.7      |
+--------+-----------+---------+---------+
only showing top 3 rows



In [60]:
df = df.join(df_nivcpl, 'PA_NIVCPL', how='left')
df.select('PA_NIVCPL', 'DS_PA_NIVCPL', 'PA_QTDAPR', 'PA_VALAPR')\
  .show(3, False)

+---------+------------------+---------+---------+
|PA_NIVCPL|DS_PA_NIVCPL      |PA_QTDAPR|PA_VALAPR|
+---------+------------------+---------+---------+
|2        |Média Complexidade|1        |1.85     |
|2        |Média Complexidade|1        |0.0      |
|2        |Média Complexidade|2        |3.7      |
+---------+------------------+---------+---------+
only showing top 3 rows



In [61]:
df = df.join(df_cbocod, 'PA_CBOCOD', how='left')
df.select('PA_CBOCOD', 'DS_PA_CBOCOD', 'PA_QTDAPR', 'PA_VALAPR')\
  .show(3, False)

+---------+-------------------------------+---------+---------+
|PA_CBOCOD|DS_PA_CBOCOD                   |PA_QTDAPR|PA_VALAPR|
+---------+-------------------------------+---------+---------+
|225250   |Medico ginecologista e obstetra|1        |1.85     |
|251510   |Psicologo clinico              |1        |0.0      |
|225250   |Medico ginecologista e obstetra|2        |3.7      |
+---------+-------------------------------+---------+---------+
only showing top 3 rows



In [62]:
df = df.join(df_proc_id, 'PA_PROC_ID', how='left')
df.select('PA_PROC_ID', 'DS_PA_PROC_ID', 'PA_QTDAPR', 'PA_VALAPR')\
  .show(3, False)

+----------+-----------------------------------------+---------+---------+
|PA_PROC_ID|DS_PA_PROC_ID                            |PA_QTDAPR|PA_VALAPR|
+----------+-----------------------------------------+---------+---------+
|202010317 |DOSAGEM DE CREATININA                    |1        |1.85     |
|301080356 |PROMOCAO DE CONTRATUALIDADE NO TERRITORIO|1        |0.0      |
|202010317 |DOSAGEM DE CREATININA                    |2        |3.7      |
+----------+-----------------------------------------+---------+---------+
only showing top 3 rows



In [63]:
df = df.join(df_g_proc_id, 'PA_G_PROC_ID', how='left')
df.select('PA_G_PROC_ID', 'DS_PA_G_PROC_ID', 'PA_QTDAPR', 'PA_VALAPR')\
  .show(3, False)

+------------+----------------------------------------+---------+---------+
|PA_G_PROC_ID|DS_PA_G_PROC_ID                         |PA_QTDAPR|PA_VALAPR|
+------------+----------------------------------------+---------+---------+
|2           |Procedimentos com finalidade diagnóstica|1        |1.85     |
|3           |Procedimentos clínicos                  |1        |0.0      |
|2           |Procedimentos com finalidade diagnóstica|2        |3.7      |
+------------+----------------------------------------+---------+---------+
only showing top 3 rows



In [64]:
df = df.join(df_sg_proc_id, 'PA_SG_PROC_ID', how='left')
df.select('PA_SG_PROC_ID', 'DS_PA_SG_PROC_ID', 'PA_QTDAPR', 'PA_VALAPR')\
  .show(3, False)

+-------------+------------------------------------------+---------+---------+
|PA_SG_PROC_ID|DS_PA_SG_PROC_ID                          |PA_QTDAPR|PA_VALAPR|
+-------------+------------------------------------------+---------+---------+
|202          |Diagnóstico em laboratório clínico        |1        |1.85     |
|301          |Consultas / Atendimentos / Acompanhamentos|1        |0.0      |
|202          |Diagnóstico em laboratório clínico        |2        |3.7      |
+-------------+------------------------------------------+---------+---------+
only showing top 3 rows



In [65]:
%%time
# Versão final do Dataset
df.write.parquet(
        path=PATH_FINAL_DATASET,
        mode='overwrite')

CPU times: total: 15.6 ms
Wall time: 20.9 s


In [66]:
spark.stop()