<a href="https://colab.research.google.com/github/chyokomizo/DataScienceProjects/blob/main/An%C3%A1lise_explorat%C3%B3ria_do_dataset_'healthcare_stroke_data'_utilizando_Apache_Spark_e_Databricks.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#**Análise exploratória de Big Data utilizando Apache Spark e Databricks**
O seguinte programa utiliza um dataset (healthcare_stroke_data.csv) de pacientes hipertensos e não hipertensos que foram acometidos por infartos. O Databricks (https://databricks.com/) foi utilizado para criação do cluster de processamento do Big Data, juntamente com o framework Apache Spark. 

###**Iniciar a seção SPARK**

In [None]:
from pyspark.sql import SparkSession #importa a biblioteca que cria a seção do spark
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
#inicia a seção para a utilização do spark
spark = SparkSession.builder.appName("desafio_IGTI").getOrCreate() #cria a seção caso não exista ou obtém a já criada

In [None]:
%fs ls /FileStore/tables   

path,name,size
dbfs:/FileStore/tables/77_cancer_proteomes_CPTAC_itraq.csv,77_cancer_proteomes_CPTAC_itraq.csv,12415390
dbfs:/FileStore/tables/Mall_Customers-1.csv,Mall_Customers-1.csv,4286
dbfs:/FileStore/tables/Mall_Customers-2.csv,Mall_Customers-2.csv,4286
dbfs:/FileStore/tables/Mall_Customers-3.csv,Mall_Customers-3.csv,4286
dbfs:/FileStore/tables/Mall_Customers-4.csv,Mall_Customers-4.csv,4286
dbfs:/FileStore/tables/Mall_Customers.csv,Mall_Customers.csv,4286
dbfs:/FileStore/tables/healthcare_dataset_stroke_data-1.csv,healthcare_dataset_stroke_data-1.csv,316971
dbfs:/FileStore/tables/healthcare_dataset_stroke_data-2.csv,healthcare_dataset_stroke_data-2.csv,316971
dbfs:/FileStore/tables/healthcare_dataset_stroke_data-3.csv,healthcare_dataset_stroke_data-3.csv,316971
dbfs:/FileStore/tables/healthcare_dataset_stroke_data-4.csv,healthcare_dataset_stroke_data-4.csv,316971


In [None]:
diretorio_dataset="/FileStore/tables/healthcare_dataset_stroke_data.csv"  #diretório que contém o arquivo a ser utilizado

In [None]:
dataset_desafio = spark.read.format("csv").options(header="true", inferschema="true").load(diretorio_dataset)  #realiza a leitura do dataset

In [None]:
dataset_desafio.printSchema() #mostra o esquema inferido pelas variáveis

###**Número de instâncias (linhas) do dataset**

In [None]:
#número de instancias no dataset
dataset_desafio.count()

###**Instâncias e atributos (features) do dataset**

In [None]:
dataset_desafio.show() #mostra as linhas iniciais do dataset

###**Conhecendo o dataset**

In [None]:
#seleção de colunas
dataset_desafio.select('age','hypertension').show(5)

In [None]:
#agrupando os dados
display(dataset_desafio.groupby('gender','stroke').count().sort("count",ascending=True))

gender,stroke,count
Other,0,1
Male,1,108
Female,1,141
Male,0,2007
Female,0,2853


###**Agrupando atributos**

In [None]:
#agrupando os dados
display(dataset_desafio.groupby('smoking_status','stroke').count().sort("count",ascending=True))

smoking_status,stroke,count
smokes,1,42
Unknown,1,47
formerly smoked,1,70
never smoked,1,90
smokes,0,747
formerly smoked,0,815
Unknown,0,1497
never smoked,0,1802


In [None]:
#agrupando os dados
display(dataset_desafio.groupby('hypertension','stroke').count().sort("count",ascending=True))

hypertension,stroke,count
1,1,66
0,1,183
1,0,432
0,0,4429


In [None]:
display(dataset_desafio.groupby('stroke').count())

stroke,count
1,249
0,4861


In [None]:
#utilizando crosstab para contar a quantidade de indivíduos com que ganho mais de 50K pela idade
dataset_desafio.filter(dataset_desafio.gender== 'Female').count()

In [None]:
#contando as classes
dataset_desafio.groupBy("work_type").agg({'work_type': 'count'}).sort("count(work_type)").show()

In [None]:
entradas_numericas  = ['age',"avg_glucose_level", "bmi"]
dataset_desafio.describe(entradas_numericas).show(truncate=False)

###**Selecionando apenas a coluna de nivel de glicose no sangue**

In [None]:
#boxplot
display(dataset_desafio.select('avg_glucose_level'))

avg_glucose_level
228.69
202.21
105.92
171.23
174.12
186.21
70.09
94.39
76.15
58.57


###**Verificando a existência de outliers no dataset através de box-plot**

In [None]:
#boxplot
display(dataset_desafio.select('age'))

age
67.0
61.0
80.0
49.0
79.0
81.0
74.0
69.0
59.0
78.0


###**Pré-processamento dos dados**

In [None]:
#contando a quantidade de valores desconhecidos
from pyspark.sql.functions import *
dataset_desafio.groupby('Residence_type').agg({'Residence_type': 'count'}).sort(asc("count(Residence_type)")).show()

In [None]:
from pyspark.sql.functions import mean
mean = dataset_desafio.select(mean(dataset_desafio['bmi'])).collect()
mean_bmi = mean[0][0]
dataset_desafio = dataset_desafio.na.fill(mean_bmi,['bmi'])

In [None]:
#aplicando o filtro para as colunas que possuem valores não conhecidos
dataset_filtrado=dataset_desafio.filter((dataset_desafio['bmi'] != 'N/A') & (dataset_desafio['smoking_status'] > 'Unknown'))

In [None]:
# aplicando a transformação dos dados categóricos
from pyspark.ml.feature import VectorAssembler,OneHotEncoder, StringIndexer

In [None]:
#define a transformação para a variável "gender"
stringIndexer_gender=StringIndexer(inputCol="gender", outputCol="gender_encoded")  #label encoding
encoder_gender = OneHotEncoder(dropLast=False, inputCol="gender_encoded", outputCol="genderVec") #one-hot encoding

In [None]:
#define a transformação para a variável "ever_married"
stringIndexer_married=StringIndexer(inputCol="ever_married", outputCol="ever_married_encoded") #label encoding
encoder_married = OneHotEncoder(dropLast=False, inputCol="ever_married_encoded", outputCol="marriedVec") #one-hot encoding

In [None]:
#define a transformação para a variável "work_type"
stringIndexer_work=StringIndexer(inputCol="work_type", outputCol="work_type_encoded")  #label encoding
encoder_work = OneHotEncoder(dropLast=False, inputCol="work_type_encoded", outputCol="workVec") #one-hot encoding

In [None]:
#define a transformação para a variável "Residence_type"
stringIndexer_residence=StringIndexer(inputCol="Residence_type", outputCol="Residence_type_encoded")  #label encoding
encoder_residence = OneHotEncoder(dropLast=False, inputCol="Residence_type_encoded", outputCol="residenceVec") #one-hot encoding

In [None]:
#define a transformação para a variável "smoking_status"
stringIndexer_smoking=StringIndexer(inputCol="smoking_status", outputCol="smoking_status_encoded")  #define o objeto
encoder_smoking = OneHotEncoder(dropLast=False, inputCol="smoking_status_encoded", outputCol="smokingVec")#one-hot encoding

In [None]:
#define a construção do vetor de entrada
colunas_entrada=['age','hypertension', 'heart_disease','avg_glucose_level','genderVec','marriedVec','workVec','residenceVec','smokingVec']
vetor_entrada = VectorAssembler(inputCols=colunas_entrada,outputCol='features')

In [None]:
#define a sequencia de transformações para o pipeline
sequencia_transformacoes=[stringIndexer_gender,stringIndexer_married,stringIndexer_work,stringIndexer_residence,stringIndexer_smoking,encoder_gender,encoder_married,encoder_work,encoder_residence,encoder_smoking,vetor_entrada]

In [None]:
from pyspark.ml import Pipeline
# Aplicando o pipeline
pipeline = Pipeline(stages=sequencia_transformacoes)
pipelineModel = pipeline.fit(dataset_filtrado)
model = pipelineModel.transform(dataset_filtrado)

In [None]:
#mostrando parte dos dados para entrada
model.select('age','gender','genderVec','ever_married','marriedVec','features').show()

In [None]:
#dividindo o dataset entre teste e treinamento
train_data, test_data = model.randomSplit([.8,.2],seed=1)

In [None]:
#mostrando os dados de treinamento
train_data.columns

###**Previsão através de Regressão Logística**

In [None]:
#define o modelo de regrssão logística
from pyspark.ml.classification import LogisticRegression

#instancia o objeto para a regressão logística
lr = LogisticRegression(labelCol="stroke",featuresCol="features", maxIter=100, regParam=0.3, )

# treina o modelo
linearModel = lr.fit(train_data)

In [None]:
#realiza a previsão utilizando o modelo de regressão logística
previsao_regressao = linearModel.transform(test_data)

In [None]:
#avaliando a classificação realizada pela regressão logística
acc_evaluator = MulticlassClassificationEvaluator(labelCol="stroke", predictionCol="prediction", metricName="accuracy")
acuracia_regressao = acc_evaluator.evaluate(previsao_regressao)
print('Regressão Logística: {0:2.2f}%'.format(acuracia_regressao*100))