In [2]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('SesionPractica5').getOrCreate()
df_spark = spark.read.csv('healthcare-dataset-stroke-data.csv', header = True, inferSchema=True)

In [4]:
df_spark.printSchema()

root
 |-- id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: string (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



In [5]:
df_spark.show(5)

+-----+------+----+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
|   id|gender| age|hypertension|heart_disease|ever_married|    work_type|Residence_type|avg_glucose_level| bmi| smoking_status|stroke|
+-----+------+----+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
| 9046|  Male|67.0|           0|            1|         Yes|      Private|         Urban|           228.69|36.6|formerly smoked|     1|
|51676|Female|61.0|           0|            0|         Yes|Self-employed|         Rural|           202.21| N/A|   never smoked|     1|
|31112|  Male|80.0|           0|            1|         Yes|      Private|         Rural|           105.92|32.5|   never smoked|     1|
|60182|Female|49.0|           0|            0|         Yes|      Private|         Urban|           171.23|34.4|         smokes|     1|
| 1665|Female|79.0|           1|            0|         

In [6]:
df_spark.describe().show()

+-------+-----------------+------+------------------+------------------+-------------------+------------+---------+--------------+------------------+------------------+--------------+-------------------+
|summary|               id|gender|               age|      hypertension|      heart_disease|ever_married|work_type|Residence_type| avg_glucose_level|               bmi|smoking_status|             stroke|
+-------+-----------------+------+------------------+------------------+-------------------+------------+---------+--------------+------------------+------------------+--------------+-------------------+
|  count|             5110|  5110|              5110|              5110|               5110|        5110|     5110|          5110|              5110|              5110|          5110|               5110|
|   mean|36517.82935420744|  NULL|43.226614481409015|0.0974559686888454|0.05401174168297456|        NULL|     NULL|          NULL|106.14767710371804|28.893236911794673|          NULL| 

In [13]:
df = df_spark

In [7]:
df_prediction = df_spark.select('gender','age','hypertension','heart_disease','Residence_type','avg_glucose_level','bmi','smoking_status','stroke')

In [8]:
df_prediction.show(5)

+------+----+------------+-------------+--------------+-----------------+----+---------------+------+
|gender| age|hypertension|heart_disease|Residence_type|avg_glucose_level| bmi| smoking_status|stroke|
+------+----+------------+-------------+--------------+-----------------+----+---------------+------+
|  Male|67.0|           0|            1|         Urban|           228.69|36.6|formerly smoked|     1|
|Female|61.0|           0|            0|         Rural|           202.21| N/A|   never smoked|     1|
|  Male|80.0|           0|            1|         Rural|           105.92|32.5|   never smoked|     1|
|Female|49.0|           0|            0|         Urban|           171.23|34.4|         smokes|     1|
|Female|79.0|           1|            0|         Rural|           174.12|  24|   never smoked|     1|
+------+----+------------+-------------+--------------+-----------------+----+---------------+------+
only showing top 5 rows



In [9]:
df_prediction.groupBy('smoking_status').count().show()

+---------------+-----+
| smoking_status|count|
+---------------+-----+
|         smokes|  789|
|        Unknown| 1544|
|   never smoked| 1892|
|formerly smoked|  885|
+---------------+-----+



In [10]:
df_prediction.groupBy(['gender','Residence_type','smoking_status']).count().show()

+------+--------------+---------------+-----+
|gender|Residence_type| smoking_status|count|
+------+--------------+---------------+-----+
|Female|         Urban|         smokes|  243|
|Female|         Rural|formerly smoked|  227|
|  Male|         Urban|         smokes|  183|
|Female|         Urban|   never smoked|  618|
|Female|         Rural|   never smoked|  611|
|Female|         Urban|        Unknown|  418|
|  Male|         Rural|formerly smoked|  200|
|Female|         Rural|         smokes|  209|
| Other|         Rural|formerly smoked|    1|
|  Male|         Urban|        Unknown|  364|
|  Male|         Rural|   never smoked|  350|
|Female|         Urban|formerly smoked|  250|
|  Male|         Urban|   never smoked|  313|
|  Male|         Rural|         smokes|  154|
|Female|         Rural|        Unknown|  418|
|  Male|         Rural|        Unknown|  344|
|  Male|         Urban|formerly smoked|  207|
+------+--------------+---------------+-----+



In [12]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier

genderEncoder = StringIndexer(inputCols=['gender','Residence_type','smoking_status'], outputCols=['indexer_gender','indexer_Residence_type','indexer_smoking_status']).fit(df_prediction)

In [14]:
df_prediction_indexer = genderEncoder.transform(df_prediction)

In [15]:
df_prediction_indexer.show(5)

+------+----+------------+-------------+--------------+-----------------+----+---------------+------+--------------+----------------------+----------------------+
|gender| age|hypertension|heart_disease|Residence_type|avg_glucose_level| bmi| smoking_status|stroke|indexer_gender|indexer_Residence_type|indexer_smoking_status|
+------+----+------------+-------------+--------------+-----------------+----+---------------+------+--------------+----------------------+----------------------+
|  Male|67.0|           0|            1|         Urban|           228.69|36.6|formerly smoked|     1|           1.0|                   0.0|                   2.0|
|Female|61.0|           0|            0|         Rural|           202.21| N/A|   never smoked|     1|           0.0|                   1.0|                   0.0|
|  Male|80.0|           0|            1|         Rural|           105.92|32.5|   never smoked|     1|           1.0|                   1.0|                   0.0|
|Female|49.0|         

In [16]:
df = df_prediction_indexer.select('indexer_gender','age','hypertension','heart_disease','indexer_Residence_type','avg_glucose_level','bmi','indexer_smoking_status','stroke')

In [17]:
from pyspark.sql.functions import col

df_na = df.filter(col("BMI") == "N/A")
df_na.groupBy('BMI').count().show()

+---+-----+
|BMI|count|
+---+-----+
|N/A|  201|
+---+-----+



In [18]:
df = df.where(col("BMI") != "N/A")
df.show(5)

+--------------+----+------------+-------------+----------------------+-----------------+----+----------------------+------+
|indexer_gender| age|hypertension|heart_disease|indexer_Residence_type|avg_glucose_level| bmi|indexer_smoking_status|stroke|
+--------------+----+------------+-------------+----------------------+-----------------+----+----------------------+------+
|           1.0|67.0|           0|            1|                   0.0|           228.69|36.6|                   2.0|     1|
|           1.0|80.0|           0|            1|                   1.0|           105.92|32.5|                   0.0|     1|
|           0.0|49.0|           0|            0|                   0.0|           171.23|34.4|                   3.0|     1|
|           0.0|79.0|           1|            0|                   1.0|           174.12|  24|                   0.0|     1|
|           1.0|81.0|           0|            0|                   0.0|           186.21|  29|                   2.0|     1|


In [19]:
from pyspark.sql.types import DoubleType
df = df.withColumn("bmi", col("bmi").cast(DoubleType()))

In [20]:
columns = df.columns
columns.remove('stroke')
VectorAss = VectorAssembler(inputCols=columns, outputCol='features')
vec = VectorAss.transform(df)

In [21]:
vec.show(3)

+--------------+----+------------+-------------+----------------------+-----------------+----+----------------------+------+--------------------+
|indexer_gender| age|hypertension|heart_disease|indexer_Residence_type|avg_glucose_level| bmi|indexer_smoking_status|stroke|            features|
+--------------+----+------------+-------------+----------------------+-----------------+----+----------------------+------+--------------------+
|           1.0|67.0|           0|            1|                   0.0|           228.69|36.6|                   2.0|     1|[1.0,67.0,0.0,1.0...|
|           1.0|80.0|           0|            1|                   1.0|           105.92|32.5|                   0.0|     1|[1.0,80.0,0.0,1.0...|
|           0.0|49.0|           0|            0|                   0.0|           171.23|34.4|                   3.0|     1|(8,[1,5,6,7],[49....|
+--------------+----+------------+-------------+----------------------+-----------------+----+----------------------+------+

In [22]:
vec.select('features').take(3)

[Row(features=DenseVector([1.0, 67.0, 0.0, 1.0, 0.0, 228.69, 36.6, 2.0])),
 Row(features=DenseVector([1.0, 80.0, 0.0, 1.0, 1.0, 105.92, 32.5, 0.0])),
 Row(features=SparseVector(8, {1: 49.0, 5: 171.23, 6: 34.4, 7: 3.0}))]

In [23]:
scaler = StandardScaler(inputCol='features', outputCol='scaler_features', withMean=True, withStd=True)
scaler = scaler.fit(vec)
df_final = scaler.transform(vec)

In [24]:
df_final = df_final.select('scaler_features', 'stroke')

In [42]:
df_final.show(5)

+--------------------+------+
|     scaler_features|stroke|
+--------------------+------+
|[1.19830604849670...|     1|
|[1.19830604849670...|     1|
|[-0.8329385620248...|     1|
|[-0.8329385620248...|     1|
|[1.19830604849670...|     1|
+--------------------+------+
only showing top 5 rows



In [45]:
df_final.select('scaler_features').show(truncate = False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|scaler_features                                                                                                                                                   |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[1.1983060484967045,1.070028960278811,-0.3180343283203816,4.381521946957882,-0.9855394689333665,2.7774154537662934,0.981244921065141,0.8490921279045552]          |
|[1.1983060484967045,1.6463948990013098,-0.3180343283203816,4.381521946957882,1.014466009774321,0.013840391682981368,0.45922236369288344,-1.0242531556385743]      |
|[-0.8329385620248136,0.271983814355351,-0.3180343283203816,-0.22818470491015116,-0.9855394689333665,1.483980387648291,0.7011352561336855,1.78576476967612]        |
|[-0.83293

In [40]:
df_scaler

[['scaler_feature']]

In [26]:
train, test = df_final.randomSplit([0.7,0.3])
train.count()

3422

In [27]:
regre = LogisticRegression(featuresCol='scaler_features', labelCol='stroke')
regre_md = regre.fit(train)
y_pred = regre_md.transform(test)

In [28]:
y_pred.show(5)

+--------------------+------+--------------------+--------------------+----------+
|     scaler_features|stroke|       rawPrediction|         probability|prediction|
+--------------------+------+--------------------+--------------------+----------+
|[-0.8329385620248...|     0|[6.62091113461281...|[0.99866955587585...|       0.0|
|[-0.8329385620248...|     0|[7.10637366097601...|[0.99918080891620...|       0.0|
|[-0.8329385620248...|     0|[6.95069139760063...|[0.99904294424844...|       0.0|
|[-0.8329385620248...|     0|[6.91116138826122...|[0.99900439253715...|       0.0|
|[-0.8329385620248...|     0|[6.95971045246054...|[0.99905152902832...|       0.0|
+--------------------+------+--------------------+--------------------+----------+
only showing top 5 rows



In [52]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

mul_ev = MulticlassClassificationEvaluator(labelCol='stroke', metricName='accuracy')
mul_ev.evaluate(y_pred)

0.9623402824478816