<a href="https://colab.research.google.com/github/billipoul/AC2/blob/main/ac2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# AC2 Big Data
## *Integrantes*
### Deevis Billi
### Vitor Silva Bueno 190925
## *Dataset*
### Datasus de 2008 a 2023 com dados diversos sobre paciente e tratamento/diagnostico

### Importações

In [None]:
import zipfile
from pyspark.sql import SparkSession
from pyspark.ml.stat import Correlation
from pyspark.mllib.stat import Statistics
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

### Inicialização de spark e leitura dos dados

In [None]:
spark = SparkSession.builder.appName("AC2 Big Data").getOrCreate()

In [None]:
df = spark.read.parquet('/content/SIH/*')
print(f'Numero de linhas: {df.count()}')

Numero de linhas: 2396850


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### Exploração dos dados importados, validando o schema criado e os tipos presentes

In [None]:
df.describe()

DataFrame[summary: string, UF_ZI: string, ANO_CMPT: string, MES_CMPT: string, ESPEC: string, CGC_HOSP: string, N_AIH: string, IDENT: string, CEP: string, MUNIC_RES: string, NASC: string, SEXO: string, UTI_MES_IN: string, UTI_MES_AN: string, UTI_MES_AL: string, UTI_MES_TO: string, MARCA_UTI: string, UTI_INT_IN: string, UTI_INT_AN: string, UTI_INT_AL: string, UTI_INT_TO: string, DIAR_ACOM: string, QT_DIARIAS: string, PROC_SOLIC: string, PROC_REA: string, VAL_SH: string, VAL_SP: string, VAL_SADT: string, VAL_RN: string, VAL_ACOMP: string, VAL_ORTP: string, VAL_SANGUE: string, VAL_SADTSR: string, VAL_TRANSP: string, VAL_OBSANG: string, VAL_PED1AC: string, VAL_TOT: string, VAL_UTI: string, US_TOT: string, DT_INTER: string, DT_SAIDA: string, DIAG_PRINC: string, DIAG_SECUN: string, COBRANCA: string, NATUREZA: string, NAT_JUR: string, GESTAO: string, RUBRICA: string, IND_VDRL: string, MUNIC_MOV: string, COD_IDADE: string, IDADE: string, DIAS_PERM: string, MORTE: string, NACIONAL: string, NUM_P

In [None]:
df.printSchema()

root
 |-- UF_ZI: string (nullable = true)
 |-- ANO_CMPT: string (nullable = true)
 |-- MES_CMPT: string (nullable = true)
 |-- ESPEC: string (nullable = true)
 |-- CGC_HOSP: string (nullable = true)
 |-- N_AIH: string (nullable = true)
 |-- IDENT: string (nullable = true)
 |-- CEP: string (nullable = true)
 |-- MUNIC_RES: string (nullable = true)
 |-- NASC: string (nullable = true)
 |-- SEXO: string (nullable = true)
 |-- UTI_MES_IN: string (nullable = true)
 |-- UTI_MES_AN: string (nullable = true)
 |-- UTI_MES_AL: string (nullable = true)
 |-- UTI_MES_TO: decimal(3,0) (nullable = true)
 |-- MARCA_UTI: string (nullable = true)
 |-- UTI_INT_IN: string (nullable = true)
 |-- UTI_INT_AN: string (nullable = true)
 |-- UTI_INT_AL: string (nullable = true)
 |-- UTI_INT_TO: decimal(3,0) (nullable = true)
 |-- DIAR_ACOM: decimal(3,0) (nullable = true)
 |-- QT_DIARIAS: decimal(3,0) (nullable = true)
 |-- PROC_SOLIC: string (nullable = true)
 |-- PROC_REA: string (nullable = true)
 |-- VAL_SH: 

### Utilizando sql, validar preenchimento das colunas

In [None]:
df.createOrReplaceTempView("datasus")
for column in df.columns:
    spark.sql(f"SELECT COUNT({column}) FROM datasus").show()

+------------+
|count(UF_ZI)|
+------------+
|     2396850|
+------------+

+---------------+
|count(ANO_CMPT)|
+---------------+
|        2396850|
+---------------+

+---------------+
|count(MES_CMPT)|
+---------------+
|        2396850|
+---------------+

+------------+
|count(ESPEC)|
+------------+
|     2396850|
+------------+

+---------------+
|count(CGC_HOSP)|
+---------------+
|        1611641|
+---------------+

+------------+
|count(N_AIH)|
+------------+
|     2396850|
+------------+

+------------+
|count(IDENT)|
+------------+
|     2396850|
+------------+

+----------+
|count(CEP)|
+----------+
|   2396850|
+----------+

+----------------+
|count(MUNIC_RES)|
+----------------+
|         2396850|
+----------------+

+-----------+
|count(NASC)|
+-----------+
|    2396850|
+-----------+

+-----------+
|count(SEXO)|
+-----------+
|    2396850|
+-----------+

+-----------------+
|count(UTI_MES_IN)|
+-----------------+
|                0|
+-----------------+

+-----------------

### Validar numero de categorias presentes de diagnostico

In [None]:
spark.sql("SELECT count(DISTINCT DIAG_PRINC) FROM datasus").show()

+--------------------------+
|count(DISTINCT DIAG_PRINC)|
+--------------------------+
|                      8476|
+--------------------------+



### Visualizar preenchimento manualmente para encontrar colunas inválidas para previsão

In [None]:
spark.sql("SELECT * FROM datasus LIMIT 5").show()

+------+--------+--------+-----+--------------+-------------+-----+--------+---------+--------+----+----------+----------+----------+----------+---------+----------+----------+----------+----------+---------+----------+----------+----------+------+------+--------+------+---------+--------+----------+----------+----------+----------+----------+-------+-------+------+--------+--------+----------+----------+--------+--------+-------+------+-------+--------+---------+---------+-----+---------+-----+--------+--------+-------+---------+-------+--------+----------+------+---------+----------+----------+---------+-------+--------+----+-----+--------+----------+---------+---------------+---------+-------+---------+--------+--------+---------+-------+------+-------+-----+--------+-----+---------+--------------------+--------+--------+----------+----------+----------+----------+-------+---------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+---

### Alterar coluna diagnostico para um index numérico

In [None]:
indexer_step = StringIndexer(inputCol="DIAG_PRINC", outputCol="DIAG_PRINC_INDEX").fit(df)
df_index_step = indexer_step.transform(df)

df_index_step.show()

+------+--------+--------+-----+--------------+-------------+-----+--------+---------+--------+----+----------+----------+----------+----------+---------+----------+----------+----------+----------+---------+----------+----------+----------+--------+-------+--------+------+---------+--------+----------+----------+----------+----------+----------+--------+--------+-------+--------+--------+----------+----------+--------+--------+-------+------+-------+--------+---------+---------+-----+---------+-----+--------+--------+-------+---------+-------+--------+----------+------+---------+----------+----------+---------+-------+--------+----+-----+--------+----------+---------+---------------+---------+-------+--------------+--------+--------+---------+-------+------+-------+-----+--------+-----+---------+--------------------+--------+--------+----------+----------+----------+----------+-------+---------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-

### Remoção de colunas inválidas
- Por ser muito mal preenchida (colunas com dezenas de milhares de entradas em um dataset de ~= 200m
- Por serem dados irrelevantes (cnpj)
- Por não influenciarem em dados futuros (remessa)
- Por terem sido convertidos em campos diferentes

In [None]:
collumns_to_drop = ['DIAG_PRINC', 'UTI_MES_IN','CNPJ_MANT','DIAGSEC1','UTI_MES_AN','UTI_MES_AL','UTI_INT_IN','UTI_INT_AN','UTI_INT_AL','VAL_SADT','VAL_RN','VAL_ACOMP','VAL_ORTP','VAL_SANGUE','VAL_SADTSR','VAL_TRANSP','VAL_OBSANG','VAL_PED1AC','RUBRICA','NUM_PROC','TOT_PT_SP','CPF_AUT','GESTOR_DT','INFEHOSP','DIAGSEC8','DIAGSEC9','TPDISEC8','TPDISEC9', 'IND_VDRL', 'HOMONIMO', 'INSTRU', 'CID_NOTIF', 'CONTRACEP1', 'CONTRACEP2', 'INSC_PN', 'SEQ_AIH5', 'CBOR', 'CNAER', 'VINCPREV', 'GESTOR_COD', 'GESTOR_TP', 'GESTOR_CPF', 'CID_ASSO', 'CID_MORTE', 'FAEC_TP', 'ETNIA', 'DIAGSEC3', 'DIAGSEC2', 'DIAGSEC4', 'DIAGSEC5','DIAGSEC1', 'DIAGSEC6', 'DIAGSEC7', 'TPDISEC1', 'TPDISEC2', 'TPDISEC3', 'TPDISEC4', 'TPDISEC5', 'TPDISEC6', 'TPDISEC7', 'REMESSA']
dropped_df = df_index_step.drop(*collumns_to_drop)

### Visualização dos dados limpos

In [None]:
dropped_df.createOrReplaceTempView("datasus_dropped")
for column in dropped_df.columns:
    spark.sql(f"SELECT COUNT({column}) FROM datasus_dropped").show()

+------------+
|count(UF_ZI)|
+------------+
|     2396850|
+------------+

+---------------+
|count(ANO_CMPT)|
+---------------+
|        2396850|
+---------------+

+---------------+
|count(MES_CMPT)|
+---------------+
|        2396850|
+---------------+

+------------+
|count(ESPEC)|
+------------+
|     2396850|
+------------+

+---------------+
|count(CGC_HOSP)|
+---------------+
|        1611641|
+---------------+

+------------+
|count(N_AIH)|
+------------+
|     2396850|
+------------+

+------------+
|count(IDENT)|
+------------+
|     2396850|
+------------+

+----------+
|count(CEP)|
+----------+
|   2396850|
+----------+

+----------------+
|count(MUNIC_RES)|
+----------------+
|         2396850|
+----------------+

+-----------+
|count(NASC)|
+-----------+
|    2396850|
+-----------+

+-----------+
|count(SEXO)|
+-----------+
|    2396850|
+-----------+

+-----------------+
|count(UTI_MES_TO)|
+-----------------+
|          2396850|
+-----------------+

+----------------+

 Indexar a coluna de target 'MORTE' como 'label'

In [None]:
indexador_label = StringIndexer(inputCol="MORTE", outputCol="label")


 Selecionar colunas numéricas válidas (excluindo a própria label)

In [None]:
tipos_numericos = ['IntegerType', 'DoubleType', 'LongType', 'DecimalType']
colunas_numericas = [f.name for f in dropped_df.schema.fields if str(f.dataType)[:3] in ['Int', 'Dou', 'Lon', 'Dec'] and f.name != 'MORTE']


Vetorização

In [None]:
vetorizador = VectorAssembler(inputCols=colunas_numericas, outputCol="features")


Modelos

In [None]:
lr = LogisticRegression()
dt = DecisionTreeClassifier(maxDepth=5, minInstancesPerNode=5, maxBins=10000 )

 Pipelines

In [None]:
pipeline_lr = Pipeline(stages=[indexador_label, vetorizador, lr])
pipeline_dt = Pipeline(stages=[indexador_label, vetorizador, dt])

 Separar treino e teste

In [None]:
train, test = dropped_df.randomSplit([0.7, 0.3], seed=42)

Treinar os modelos

In [None]:
modelo_lr = pipeline_lr.fit(train)
modelo_dt = pipeline_dt.fit(train)

Previsões

In [None]:
pred_lr = modelo_lr.transform(test)
pred_dt = modelo_dt.transform(test)

Teste

In [None]:
avaliador_f1 = MulticlassClassificationEvaluator(metricName="f1")
avaliador_acc = MulticlassClassificationEvaluator(metricName="accuracy")

print("Logistic Regression - F1 Score:", avaliador_f1.evaluate(pred_lr))
print("Logistic Regression - Acurácia:", avaliador_acc.evaluate(pred_lr))

print("Decision Tree - F1 Score:", avaliador_f1.evaluate(pred_dt))
print("Decision Tree - Acurácia:", avaliador_acc.evaluate(pred_dt))


Logistic Regression - F1 Score: 0.9482538969882692
Logistic Regression - Acurácia: 0.9597610271600538
Decision Tree - F1 Score: 0.9474133959554792
Decision Tree - Acurácia: 0.9574044275676218
