In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType, IntegerType, StringType, StructField, StructType
from pyspark.sql.functions import * 
import pyspark.sql.functions as F
from datetime import datetime
from pyspark.sql.functions import datediff

In [0]:
spark = SparkSession.builder.appName('basecp').getOrCreate()

In [0]:
base = sqlContext.sql("SELECT * FROM default.casos_obitos_doencas_preexistentes_v1_2_csv")

In [0]:
base.dtypes

Out[8]: [('nome_munic', 'string'),
 ('codigo_ibge', 'string'),
 ('idade', 'string'),
 ('cs_sexo', 'string'),
 ('diagnostico_covid19', 'string'),
 ('data_inicio_sintomas', 'string'),
 ('obito', 'string'),
 ('asma', 'string'),
 ('cardiopatia', 'string'),
 ('diabetes', 'string'),
 ('doenca_hematologica', 'string'),
 ('doenca_hepatica', 'string'),
 ('doenca_neurologica', 'string'),
 ('doenca_renal', 'string'),
 ('imunodepressao', 'string'),
 ('obesidade', 'string'),
 ('outros_fatores_de_risco', 'string'),
 ('pneumopatia', 'string'),
 ('puerpera', 'string'),
 ('sindrome_de_down', 'string')]

In [0]:
base.show(truncate = False)

+----------+-----------+-----+---------+-------------------+--------------------+-----+----+-----------+--------+-------------------+---------------+------------------+------------+--------------+---------+-----------------------+-----------+--------+----------------+
|nome_munic|codigo_ibge|idade|cs_sexo  |diagnostico_covid19|data_inicio_sintomas|obito|asma|cardiopatia|diabetes|doenca_hematologica|doenca_hepatica|doenca_neurologica|doenca_renal|imunodepressao|obesidade|outros_fatores_de_risco|pneumopatia|puerpera|sindrome_de_down|
+----------+-----------+-----+---------+-------------------+--------------------+-----+----+-----------+--------+-------------------+---------------+------------------+------------+--------------+---------+-----------------------+-----------+--------+----------------+
|Taubaté   |3554102    |54   |MASCULINO|CONFIRMADO         |2021-01-14T00:00:00Z|0    |N   |N          |N       |N                  |N              |N                 |N           |N           

In [0]:
base.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in base.columns]).show()


+----------+-----------+-----+-------+-------------------+--------------------+-----+----+-----------+--------+-------------------+---------------+------------------+------------+--------------+---------+-----------------------+-----------+--------+----------------+
|nome_munic|codigo_ibge|idade|cs_sexo|diagnostico_covid19|data_inicio_sintomas|obito|asma|cardiopatia|diabetes|doenca_hematologica|doenca_hepatica|doenca_neurologica|doenca_renal|imunodepressao|obesidade|outros_fatores_de_risco|pneumopatia|puerpera|sindrome_de_down|
+----------+-----------+-----+-------+-------------------+--------------------+-----+----+-----------+--------+-------------------+---------------+------------------+------------+--------------+---------+-----------------------+-----------+--------+----------------+
|         0|          0|    0|      0|                  0|                   0|    0|   0|          0|       0|                  0|              0|                 0|           0|             0|     

In [0]:
# Caso haja a existencia de nulos na análise realizada acima rodar o código para excluir missings
base = base.na.fill(0)

# lembrando que pode ser escolhido o retorno dado a cada missing de cada coluna se agregar a indicação de subset na sintaxe
# base = base.ma.fill('melhor_retorno', subset = ['nome_da_coluna'])

In [0]:
base.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in base.columns]).show()

+----------+-----------+-----+-------+-------------------+--------------------+-----+----+-----------+--------+-------------------+---------------+------------------+------------+--------------+---------+-----------------------+-----------+--------+----------------+
|nome_munic|codigo_ibge|idade|cs_sexo|diagnostico_covid19|data_inicio_sintomas|obito|asma|cardiopatia|diabetes|doenca_hematologica|doenca_hepatica|doenca_neurologica|doenca_renal|imunodepressao|obesidade|outros_fatores_de_risco|pneumopatia|puerpera|sindrome_de_down|
+----------+-----------+-----+-------+-------------------+--------------------+-----+----+-----------+--------+-------------------+---------------+------------------+------------+--------------+---------+-----------------------+-----------+--------+----------------+
|         0|          0|    0|      0|                  0|                   0|    0|   0|          0|       0|                  0|              0|                 0|           0|             0|     

In [0]:
base.dtypes

Out[13]: [('nome_munic', 'string'),
 ('codigo_ibge', 'string'),
 ('idade', 'string'),
 ('cs_sexo', 'string'),
 ('diagnostico_covid19', 'string'),
 ('data_inicio_sintomas', 'string'),
 ('obito', 'string'),
 ('asma', 'string'),
 ('cardiopatia', 'string'),
 ('diabetes', 'string'),
 ('doenca_hematologica', 'string'),
 ('doenca_hepatica', 'string'),
 ('doenca_neurologica', 'string'),
 ('doenca_renal', 'string'),
 ('imunodepressao', 'string'),
 ('obesidade', 'string'),
 ('outros_fatores_de_risco', 'string'),
 ('pneumopatia', 'string'),
 ('puerpera', 'string'),
 ('sindrome_de_down', 'string')]

In [0]:
base = base.drop('nome_munic', 'codigo_ibge', 'data_inicio_sintomas')

In [0]:
tipos = base.dtypes

In [0]:
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler, StandardScaler

In [0]:
base.columns[1]

Out[17]: 'cs_sexo'

In [0]:
# este comando permite indexar todas as colunas que são strings para possibilitar a vetorização (neste passo é importante ter certeza que estão presentes na base APENAS AS VARIAVEIS QUE SERÃO ESTUDADAS pois a existencia de variaveis inuteis para o porcesso resultarão no gasto computacional desnecessário)
list_exc = []
for i in range(len(tipos)):
  if tipos[i][1] == 'string':
    list_exc.append(base.columns[i])
    base = StringIndexer(inputCol = base.columns[i], outputCol = base.columns[i]+'_indexada').fit(base).transform(base)
    #    base = base.drop(tipos[i][0]) # já exclui a coluna original que está sendo indexada
    print(base.columns[i])

idade
cs_sexo
diagnostico_covid19
obito
asma
cardiopatia
diabetes
doenca_hematologica
doenca_hepatica
doenca_neurologica
doenca_renal
imunodepressao
obesidade
outros_fatores_de_risco
pneumopatia
puerpera
sindrome_de_down


In [0]:
list_exc

Out[19]: ['idade',
 'cs_sexo',
 'diagnostico_covid19',
 'obito',
 'asma',
 'cardiopatia',
 'diabetes',
 'doenca_hematologica',
 'doenca_hepatica',
 'doenca_neurologica',
 'doenca_renal',
 'imunodepressao',
 'obesidade',
 'outros_fatores_de_risco',
 'pneumopatia',
 'puerpera',
 'sindrome_de_down']

In [0]:
#seleciona todas as colunas como váriel de estudo 
lista_vars = base.columns 
lista_vars

Out[20]: ['idade',
 'cs_sexo',
 'diagnostico_covid19',
 'obito',
 'asma',
 'cardiopatia',
 'diabetes',
 'doenca_hematologica',
 'doenca_hepatica',
 'doenca_neurologica',
 'doenca_renal',
 'imunodepressao',
 'obesidade',
 'outros_fatores_de_risco',
 'pneumopatia',
 'puerpera',
 'sindrome_de_down',
 'idade_indexada',
 'cs_sexo_indexada',
 'diagnostico_covid19_indexada',
 'obito_indexada',
 'asma_indexada',
 'cardiopatia_indexada',
 'diabetes_indexada',
 'doenca_hematologica_indexada',
 'doenca_hepatica_indexada',
 'doenca_neurologica_indexada',
 'doenca_renal_indexada',
 'imunodepressao_indexada',
 'obesidade_indexada',
 'outros_fatores_de_risco_indexada',
 'pneumopatia_indexada',
 'puerpera_indexada',
 'sindrome_de_down_indexada']

In [0]:
len(list_exc)

Out[21]: 17

In [0]:
for i in range(len(list_exc)):
  lista_vars.remove(list_exc[i])

In [0]:
lista_vars

Out[23]: ['idade_indexada',
 'cs_sexo_indexada',
 'diagnostico_covid19_indexada',
 'obito_indexada',
 'asma_indexada',
 'cardiopatia_indexada',
 'diabetes_indexada',
 'doenca_hematologica_indexada',
 'doenca_hepatica_indexada',
 'doenca_neurologica_indexada',
 'doenca_renal_indexada',
 'imunodepressao_indexada',
 'obesidade_indexada',
 'outros_fatores_de_risco_indexada',
 'pneumopatia_indexada',
 'puerpera_indexada',
 'sindrome_de_down_indexada']

In [0]:
target = 'obito_indexada'
lista_vars.remove(target)

In [0]:
assembler = VectorAssembler(inputCols = lista_vars, outputCol = 'vetor_analise')
base_vetor = assembler.transform(base)

In [0]:
base_vetor.show(truncate = False)

+-----+---------+-------------------+-----+----+-----------+--------+-------------------+---------------+------------------+------------+--------------+---------+-----------------------+-----------+--------+----------------+--------------+----------------+----------------------------+--------------+-------------+--------------------+-----------------+----------------------------+------------------------+---------------------------+---------------------+-----------------------+------------------+--------------------------------+--------------------+-----------------+-------------------------+----------------------------------------+
|idade|cs_sexo  |diagnostico_covid19|obito|asma|cardiopatia|diabetes|doenca_hematologica|doenca_hepatica|doenca_neurologica|doenca_renal|imunodepressao|obesidade|outros_fatores_de_risco|pneumopatia|puerpera|sindrome_de_down|idade_indexada|cs_sexo_indexada|diagnostico_covid19_indexada|obito_indexada|asma_indexada|cardiopatia_indexada|diabetes_indexada|doenca_

In [0]:
from pyspark.ml.linalg import DenseVector, SparseVector, Vectors, VectorUDT
from pyspark.sql import functions as F
from pyspark.sql.types import *

In [0]:
toDense = lambda v: Vectors.dense(v.toArray())
toDenseUdf = F.udf(toDense, VectorUDT())

In [0]:
base_vetor_1 = base_vetor.withColumn('vetor_analise', toDenseUdf('vetor_analise'))

In [0]:
base_vetor_1.show(truncate = False)

+-----+---------+-------------------+-----+----+-----------+--------+-------------------+---------------+------------------+------------+--------------+---------+-----------------------+-----------+--------+----------------+--------------+----------------+----------------------------+--------------+-------------+--------------------+-----------------+----------------------------+------------------------+---------------------------+---------------------+-----------------------+------------------+--------------------------------+--------------------+-----------------+-------------------------+------------------------------------------------------------------+
|idade|cs_sexo  |diagnostico_covid19|obito|asma|cardiopatia|diabetes|doenca_hematologica|doenca_hepatica|doenca_neurologica|doenca_renal|imunodepressao|obesidade|outros_fatores_de_risco|pneumopatia|puerpera|sindrome_de_down|idade_indexada|cs_sexo_indexada|diagnostico_covid19_indexada|obito_indexada|asma_indexada|cardiopatia_indexada

In [0]:
from pyspark.ml.stat import ChiSquareTest

In [0]:
# realiza a analise de pvalor _ VEJAM QUE NO CASO DE TENTAR COLOCAR UMA VARIAVEL STRING DARA ERRO NA LEITURA DOS DADOS
dt_sensibilidade = ChiSquareTest.test(base_vetor_1, 'vetor_analise', 'obito')

[0;31m---------------------------------------------------------------------------[0m
[0;31mIllegalArgumentException[0m                  Traceback (most recent call last)
[0;32m<command-2025060721775177>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0;31m# realiza a analise de pvalor _ VEJAM QUE NO CASO DE TENTAR COLOCAR UMA VARIAVEL STRING DARA ERRO NA LEITURA DOS DADOS[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0mdt_sensibilidade[0m [0;34m=[0m [0mChiSquareTest[0m[0;34m.[0m[0mtest[0m[0;34m([0m[0mbase_vetor_1[0m[0;34m,[0m [0;34m'vetor_analise'[0m[0;34m,[0m [0;34m'obito'[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/ml/stat.py[0m in [0;36mtest[0;34m(dataset, featuresCol, labelCol, flatten)[0m
[1;32m     97[0m         [0mjavaTestObj[0m [0;34m=[0m [0m_jvm[0m[0;34m([0m[0;34m)[0m[0;34m.[0m[0morg[0m[0;34m.[0m[0mapache[0m[0;34m.[0m[0mspark[0m[0;34m.[0m[0mml[0m[0;34m

In [0]:
# realiza a analise de pvalor _ VEJAM QUE NO CASO DE TENTAR COLOCAR UMA VARIAVEL STRING DARA ERRO NA LEITURA DOS DADOS
dt_sensibilidade = ChiSquareTest.test(base_vetor_1, 'vetor_analise', 'obito_indexada')

In [0]:
dt_sensibilidade.show(truncate = False)

+-----------------------------------------------------------------------------------------------------+--------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|pValues                                                                                              |degreesOfFreedom                                  |statistics                                                                                                                                                                                                                                                                              |
+-----------------------------------------------------------------------------------------------------+-------------------------

In [0]:
# Cria as partes de teste e treino  para a arvore
(training, testing) = base_vetor_1.randomSplit([0.7,0.3]) 
# aqui se define a proporção de dados a serem utilizados para a estrutura de treino e de teste  geralmente são utilizados  a proporção de 30% para 70%

In [0]:
from pyspark.ml.classification import DecisionTreeClassifier, DecisionTreeClassificationModel
from pyspark.ml import Pipeline, PipelineModel

In [0]:
arvore_analise = DecisionTreeClassifier(labelCol = 'obito_indexada', featuresCol = 'vetor_analise')


In [0]:
pipeline = Pipeline(stages = [arvore_analise])

In [0]:
model = pipeline.fit(training)

In [0]:
predictions = model.transform(testing)

In [0]:
treeModel= model.stages[0]

In [0]:
print(treeModel)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_5a72517d884a, depth=5, numNodes=49, numClasses=2, numFeatures=16


In [0]:
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils

In [0]:
teste = treeModel.toDebugString

In [0]:
print(teste)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_5a72517d884a, depth=5, numNodes=49, numClasses=2, numFeatures=16
  If (feature 4 <= 0.5)
   If (feature 8 <= 0.5)
    If (feature 5 <= 0.5)
     If (feature 13 <= 0.5)
      Predict: 0.0
     Else (feature 13 > 0.5)
      If (feature 0 <= 19.5)
       Predict: 0.0
      Else (feature 0 > 19.5)
       Predict: 1.0
    Else (feature 5 > 0.5)
     If (feature 9 <= 0.5)
      Predict: 0.0
     Else (feature 9 > 0.5)
      If (feature 9 <= 1.5)
       Predict: 1.0
      Else (feature 9 > 1.5)
       Predict: 0.0
   Else (feature 8 > 0.5)
    If (feature 7 <= 0.5)
     If (feature 0 <= 21.5)
      If (feature 0 <= 1.5)
       Predict: 1.0
      Else (feature 0 > 1.5)
       Predict: 0.0
     Else (feature 0 > 21.5)
      Predict: 1.0
    Else (feature 7 > 0.5)
     If (feature 8 <= 1.5)
      If (feature 11 <= 1.5)
       Predict: 0.0
      Else (feature 11 > 1.5)
       Predict: 1.0
     Else (feature 8 > 1.5)
      If (feature 7 <

In [0]:
teste_1 = teste

In [0]:
lista_vars[0]

Out[47]: 'idade_indexada'

In [0]:
a = 0
for i, feat in enumerate(lista_vars):
  exec('teste_1 = teste_1.replace("feature {}", lista_vars[a])'.format(i))
  a = a + 1

In [0]:
lista_vars

Out[49]: ['idade_indexada',
 'cs_sexo_indexada',
 'diagnostico_covid19_indexada',
 'asma_indexada',
 'cardiopatia_indexada',
 'diabetes_indexada',
 'doenca_hematologica_indexada',
 'doenca_hepatica_indexada',
 'doenca_neurologica_indexada',
 'doenca_renal_indexada',
 'imunodepressao_indexada',
 'obesidade_indexada',
 'outros_fatores_de_risco_indexada',
 'pneumopatia_indexada',
 'puerpera_indexada',
 'sindrome_de_down_indexada']

In [0]:
print(teste_1)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_5a72517d884a, depth=5, numNodes=49, numClasses=2, numFeatures=16
  If (cardiopatia_indexada <= 0.5)
   If (doenca_neurologica_indexada <= 0.5)
    If (diabetes_indexada <= 0.5)
     If (cs_sexo_indexada3 <= 0.5)
      Predict: 0.0
     Else (cs_sexo_indexada3 > 0.5)
      If (idade_indexada <= 19.5)
       Predict: 0.0
      Else (idade_indexada > 19.5)
       Predict: 1.0
    Else (diabetes_indexada > 0.5)
     If (doenca_renal_indexada <= 0.5)
      Predict: 0.0
     Else (doenca_renal_indexada > 0.5)
      If (doenca_renal_indexada <= 1.5)
       Predict: 1.0
      Else (doenca_renal_indexada > 1.5)
       Predict: 0.0
   Else (doenca_neurologica_indexada > 0.5)
    If (doenca_hepatica_indexada <= 0.5)
     If (idade_indexada <= 21.5)
      If (idade_indexada <= 1.5)
       Predict: 1.0
      Else (idade_indexada > 1.5)
       Predict: 0.0
     Else (idade_indexada > 21.5)
      Predict: 1.0
    Else (doenca_hepatica_indexa