# Big Data e Processamento Distribuído
## Projeto de Disciplina

Alunos:
- André N. Darcie
- Cristiane Lemos

## Atividade
**Valor**: 50 pontos    
**Entrega**:  16/01 às 23:59     
**Formato**: Jupyter Notebook     
Individual ou em duplas.     
**Objetivo**: realizar um ciclo de ciência de dados completo no Spark.      
 
Nesse  projeto,  vocês  deverão  realizar  um  ciclo  completo  de  ciência  de  dados  utilizando  o 
PySpark.  Isso  significa  que  vocês  deverão  explorar  e  preparar  dados,  treinar  um  modelo  de 
aprendizado de máquina e fazer análise dos resultados obtidos.  
Para escolher um dataset, vocês poderão visitar o Kaggle e selecionar algum que estejam com 
vontade de explorar. Vocês deverão optar por datasets apropriados para as seguintes tarefas: 
• Classificação 
• Agrupamento 
• Recomendação 

Todo  o  projeto  deve  ser  construído  em  um  único  Notebook.  Nele  deverão  conter,  além  do 
código,  análises,  explicações  e  motivações  para  a  escolha  do  dataset  e  do  algoritmo  de 
aprendizado de máquina. IMPORTANTE: Todos os imports utilizados deverão ser colocados no 
início do Notebook.  
 
### Parte I: Exploração de Dados.

Vocês  deverão  utilizar  as  funcionalidades  de  RDD  e/ou  Dataframes  para  analisar  e  limpar  os 
dados. Como essa tarefa é dependente de cada conjunto de dados, não há um modelo rígido a 
seguir. Porém, vocês deverão realizar no mínimo 2 análises (estatísticas, análise com gráficos, 
etc.) e 3 transformações (filtragem, remoção de características, remoção/troca de valores nulos, 
normalizações, etc). As transformações devem ser pautadas no que for descoberto ao analisar 
os dados. Por exemplo: normalização dos valores por discrepância de magnitude entre 
características. 
 
### Parte II: Criação de um Modelo e Análise de Resultados. 

Nessa etapa, vocês deverão rodar um algoritmo da biblioteca MLlib do Spark para aprender um 
modelo  de  aprendizado  de  máquina  com  os  dados  que  vocês  acabaram  de  organizar.  Vocês 
deverão motivar a escolha do algoritmo, que deve ser um dos disponíveis dentro da MLlib do 
Spark. Além disso, vocês deverão dividir os dados utilizando alguma metodologia de validação 
(cross-validation,  60-40,  80-20,  etc),  e  validar  a  performance  do  seu  modelo,  analisando  os 
resultados. 

## Configuração do ambiente Google Colab

Esse notebook é baseado no notebook [como instalar pyspark no google colab](https://colab.research.google.com/github/carlosfab/sigmoidal_ai/blob/master/Big_Data_Como_instalar_o_PySpark_no_Google_Colab.ipynb).       
O ambiente de execução é configurado para executar no Google Colab.

In [5]:
# instalar as dependências
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:7 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:9 http://security.ubuntu.com/ubuntu bionic-security InRelease
Hit:10 http://archive.ubuntu.com/ubuntu bionic-updates InRelease
Hit:11 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:13 http://archive.ubuntu.com/ubuntu bionic-backports InRelease
Hit:14 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Hit:15 http://ppa.launchpad.net/graphic

In [7]:
# configurar as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

# tornar o pyspark "importável"
import findspark
findspark.init('spark-2.4.4-bin-hadoop2.7')

import pyspark

sc = pyspark.SparkContext(appName='Projeto_BDPD')

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

Dataset escolhido: Healthcare Dataset Stroke Data

In [8]:
!git clone https://github.com/andredarcie/my-data-science-notebooks.git

Cloning into 'my-data-science-notebooks'...
remote: Enumerating objects: 1047, done.[K
remote: Counting objects: 100% (635/635), done.[K
remote: Compressing objects: 100% (424/424), done.[K
remote: Total 1047 (delta 360), reused 474 (delta 200), pack-reused 412[K
Receiving objects: 100% (1047/1047), 34.22 MiB | 23.29 MiB/s, done.
Resolving deltas: 100% (567/567), done.


In [36]:
df = spark.read.csv('/content/my-data-science-notebooks/big-data-processamento-distribuido/datasets/healthcare-dataset-stroke-data.csv', inferSchema=True, header=True, nullValue='NA')

In [10]:
df.show(10)

+-----+------+----+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
|   id|gender| age|hypertension|heart_disease|ever_married|    work_type|Residence_type|avg_glucose_level| bmi| smoking_status|stroke|
+-----+------+----+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
| 9046|  Male|67.0|           0|            1|         Yes|      Private|         Urban|           228.69|36.6|formerly smoked|     1|
|51676|Female|61.0|           0|            0|         Yes|Self-employed|         Rural|           202.21| N/A|   never smoked|     1|
|31112|  Male|80.0|           0|            1|         Yes|      Private|         Rural|           105.92|32.5|   never smoked|     1|
|60182|Female|49.0|           0|            0|         Yes|      Private|         Urban|           171.23|34.4|         smokes|     1|
| 1665|Female|79.0|           1|            0|         

In [11]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: string (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



In [12]:
df.dtypes

[('id', 'int'),
 ('gender', 'string'),
 ('age', 'double'),
 ('hypertension', 'int'),
 ('heart_disease', 'int'),
 ('ever_married', 'string'),
 ('work_type', 'string'),
 ('Residence_type', 'string'),
 ('avg_glucose_level', 'double'),
 ('bmi', 'string'),
 ('smoking_status', 'string'),
 ('stroke', 'int')]

In [13]:
df.groupBy('stroke').count().show()

+------+-----+
|stroke|count|
+------+-----+
|     1|  249|
|     0| 4861|
+------+-----+



In [14]:
df.createOrReplaceTempView('table')

In [15]:
spark.sql("SELECT work_type, count(work_type) as work_type_count FROM table WHERE stroke == 1 GROUP BY work_type ORDER BY work_type_count DESC").show()

+-------------+---------------+
|    work_type|work_type_count|
+-------------+---------------+
|      Private|            149|
|Self-employed|             65|
|     Govt_job|             33|
|     children|              2|
+-------------+---------------+



In [16]:
spark.sql("SELECT gender, count(gender) as count_gender, count(gender)*100/sum(count(gender)) over() as percent FROM table GROUP BY gender").show()

+------+------------+--------------------+
|gender|count_gender|             percent|
+------+------------+--------------------+
|Female|        2994|  58.590998043052835|
| Other|           1|0.019569471624266144|
|  Male|        2115|    41.3894324853229|
+------+------------+--------------------+



In [17]:
spark.sql("SELECT gender, count(gender), (COUNT(gender) * 100.0) /(SELECT count(gender) FROM table WHERE gender == 'Male') as percentage FROM table WHERE stroke = '1' and gender = 'Male' GROUP BY gender").show()

+------+-------------+----------------+
|gender|count(gender)|      percentage|
+------+-------------+----------------+
|  Male|          108|5.10638297872340|
+------+-------------+----------------+



In [18]:
spark.sql("SELECT gender, count(gender), (COUNT(gender) * 100.0) /(SELECT count(gender) FROM table WHERE gender == 'Female') as percentage FROM table WHERE stroke = '1' and gender = 'Female' GROUP BY gender").show()

+------+-------------+----------------+
|gender|count(gender)|      percentage|
+------+-------------+----------------+
|Female|          141|4.70941883767535|
+------+-------------+----------------+



In [19]:
spark.sql("SELECT age, count(age) as age_count FROM table WHERE stroke == 1 GROUP BY age ORDER BY age_count DESC").show()

+----+---------+
| age|age_count|
+----+---------+
|78.0|       21|
|79.0|       17|
|80.0|       17|
|81.0|       14|
|57.0|       11|
|76.0|       10|
|74.0|        9|
|63.0|        9|
|68.0|        9|
|82.0|        9|
|59.0|        8|
|77.0|        8|
|58.0|        7|
|71.0|        7|
|75.0|        6|
|70.0|        6|
|69.0|        6|
|54.0|        6|
|61.0|        6|
|72.0|        6|
+----+---------+
only showing top 20 rows



In [20]:
idade = 60
resultado = df.filter((df['stroke'] == 1) & (df['age'] > idade)).count()
print(f'Quantidade de pessoas com AVC com a idade acima de {idade}: {resultado}')

Quantidade de pessoas com AVC com a idade acima de 60: 177


## Limpando os dados

In [21]:
# fill in missing values
df = df.na.fill('No Info', subset=['smoking_status'])

# fill in miss values with mean
from pyspark.sql.functions import mean
mean = df.select(mean(df['bmi'])).collect()
mean_bmi = mean[0][0]
df = df.na.fill(mean_bmi,['bmi'])

In [22]:
from pyspark.ml.feature import (VectorAssembler,OneHotEncoder,StringIndexer)

In [23]:
df.dtypes

[('id', 'int'),
 ('gender', 'string'),
 ('age', 'double'),
 ('hypertension', 'int'),
 ('heart_disease', 'int'),
 ('ever_married', 'string'),
 ('work_type', 'string'),
 ('Residence_type', 'string'),
 ('avg_glucose_level', 'double'),
 ('bmi', 'string'),
 ('smoking_status', 'string'),
 ('stroke', 'int')]

In [37]:
gender_indexer = StringIndexer(inputCol='gender', outputCol='genderIndex').setHandleInvalid("keep")
gender_encoder = OneHotEncoder(inputCol='genderIndex', outputCol='genderVec')

ever_married_indexer = StringIndexer(inputCol='ever_married', outputCol='ever_marriedIndex').setHandleInvalid("keep")
ever_married_encoder = OneHotEncoder(inputCol='ever_marriedIndex', outputCol='ever_marriedVec')

work_type_indexer = StringIndexer(inputCol='work_type', outputCol='work_typeIndex').setHandleInvalid("keep")
work_type_encoder = OneHotEncoder(inputCol='work_typeIndex', outputCol='work_typeVec')

Residence_type_indexer = StringIndexer(inputCol='Residence_type', outputCol='Residence_typeIndex').setHandleInvalid("keep")
Residence_type_encoder = OneHotEncoder(inputCol='Residence_typeIndex', outputCol='Residence_typeVec')

bmi_indexer = StringIndexer(inputCol='bmi', outputCol='bmiIndex').setHandleInvalid("keep")
bmi_encoder = OneHotEncoder(inputCol='bmiIndex', outputCol='bmiVec')

smoking_status_indexer = StringIndexer(inputCol='smoking_status', outputCol='smoking_statusIndex').setHandleInvalid("keep")
smoking_status_encoder = OneHotEncoder(inputCol='smoking_statusIndex', outputCol='smoking_statusVec')

In [38]:
assembler = VectorAssembler(inputCols=['genderVec',
 'age',
 'hypertension',
 'heart_disease',
 'ever_marriedVec',
 'work_typeVec',
 'Residence_typeVec',
 'avg_glucose_level',
 'bmiVec',
 'smoking_statusVec'],outputCol='features')

In [61]:
from pyspark.ml import Pipeline

def executa(algoritmo, nome_do_algoritmo):
  pipeline = Pipeline(stages=[gender_indexer, ever_married_indexer, work_type_indexer, Residence_type_indexer,
                            smoking_status_indexer, bmi_indexer, gender_encoder, ever_married_encoder, work_type_encoder,
                            Residence_type_encoder, smoking_status_encoder, bmi_encoder, assembler, algoritmo])
  
  train_data, test_data = df.randomSplit([0.7,0.3])
  model = pipeline.fit(train_data)
  dtc_predictions = model.transform(test_data)

  evaluator = MulticlassClassificationEvaluator(
    labelCol="stroke", predictionCol="prediction", metricName="accuracy")
  accuracy = evaluator.evaluate(dtc_predictions)
  print(nome_do_algoritmo)
  print('Accuracy of: {0:2.2f}%'.format(accuracy*100))

In [68]:
from pyspark.ml.classification import DecisionTreeClassifier
dtc = DecisionTreeClassifier(labelCol='stroke', featuresCol='features')

from pyspark.ml.classification import RandomForestClassifier
rfc = RandomForestClassifier(labelCol='stroke', featuresCol='features')

from pyspark.ml.classification import GBTClassifier
gbtc = GBTClassifier(labelCol='stroke', featuresCol='features')

from pyspark.ml.classification import MultilayerPerceptronClassifier
mpc = MultilayerPerceptronClassifier(labelCol='stroke', featuresCol='features')

from pyspark.ml.classification import LinearSVC
lsvc = LinearSVC(labelCol='stroke', featuresCol='features')

from pyspark.ml.classification import OneVsRest
ovr = OneVsRest(labelCol='stroke', featuresCol='features')

from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(labelCol='stroke', featuresCol='features')

#from pyspark.ml.classification import FMClassifier
#fmc = FMClassifier(labelCol='stroke', featuresCol='features')

executa(dtc, "Decision tree classifier")
print("--" * 20)
executa(rfc, "Random forest classifier")
print("--" * 20)
executa(gbtc, "Gradient-boosted tree classifier")
print("--" * 20)
#executa(mpc, "MultilayerPerceptronClassifier")
#print("--" * 20)
executa(lsvc, "Linear Support Vector Machine")
print("--" * 20)
#executa(ovr, "One-vs-Rest classifier (a.k.a. One-vs-All)")
#print("--" * 20)
executa(nb, "Naive Bayes")
print("--" * 20)
#executa(fmc, "Factorization machines classifier")
#print("--" * 20)

Decision tree classifier
Accuracy of: 95.91%
----------------------------------------
Random forest classifier
Accuracy of: 95.18%
----------------------------------------
Gradient-boosted tree classifier
Accuracy of: 95.05%
----------------------------------------
Linear Support Vector Machine
Accuracy of: 94.74%
----------------------------------------
Naive Bayes
Accuracy of: 79.34%
----------------------------------------
