# PySpark Demonstration

## Data Processing Using PySpark

In [1]:
from pyspark.sql import SparkSession
Spark = SparkSession.builder.appName("Demonstration").getOrCreate()

In [2]:
Spark

In [3]:
df = Spark.read.csv("Dataset/stroke.csv",header=True,inferSchema=True)
print("Total Records: {}\nTotal Columns: {}".format(df.count(), len(df.columns)))

Total Records: 5110
Total Columns: 12


In [4]:
df.show(5)

+-----+------+----+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
|   id|gender| age|hypertension|heart_disease|ever_married|    work_type|Residence_type|avg_glucose_level| bmi| smoking_status|stroke|
+-----+------+----+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
| 9046|  Male|67.0|           0|            1|         Yes|      Private|         Urban|           228.69|36.6|formerly smoked|     1|
|51676|Female|61.0|           0|            0|         Yes|Self-employed|         Rural|           202.21| N/A|   never smoked|     1|
|31112|  Male|80.0|           0|            1|         Yes|      Private|         Rural|           105.92|32.5|   never smoked|     1|
|60182|Female|49.0|           0|            0|         Yes|      Private|         Urban|           171.23|34.4|         smokes|     1|
| 1665|Female|79.0|           1|            0|         

First 5 records within the dataset

In [5]:
from pyspark.sql.functions import col,isnan, when, count

df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---+------+---+------------+-------------+------------+---------+--------------+-----------------+---+--------------+------+
| id|gender|age|hypertension|heart_disease|ever_married|work_type|Residence_type|avg_glucose_level|bmi|smoking_status|stroke|
+---+------+---+------------+-------------+------------+---------+--------------+-----------------+---+--------------+------+
|  0|     0|  0|           0|            0|           0|        0|             0|                0|  0|             0|     0|
+---+------+---+------------+-------------+------------+---------+--------------+-----------------+---+--------------+------+



We can see that there are no null/none values present within the dataset. However, while inspecting the data we observed that the column "BMI" contains the value "N/A" that refers to null/none values. We will deal with this and replace it with a null character so that we can fill these values with the mean of the BMI.

In [6]:
value = str(round(df.agg({'bmi':'mean'}).collect()[0][0],2))
df = df.na.replace('N/A', value,'bmi')

from pyspark.sql.types import IntegerType
df = df.withColumn("bmi", df["bmi"].cast(IntegerType()))

df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: integer (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



In [7]:
columns = [x[0] for x in df.dtypes if (x[1] == 'string')]

print("**********\n")

for x in columns:
    print(x)
    print([i[x] for i in df.select(x).distinct().collect()])
    print("\n**********\n")

**********

gender
['Female', 'Other', 'Male']

**********

ever_married
['No', 'Yes']

**********

work_type
['Never_worked', 'Self-employed', 'Private', 'children', 'Govt_job']

**********

Residence_type
['Urban', 'Rural']

**********

smoking_status
['smokes', 'Unknown', 'never smoked', 'formerly smoked']

**********



The columns "Gender and Smoking Status" have two values that are "Other and Unknown". We can replace these values with the most frequent value of the column.

In [8]:
value = [df.groupby("gender").count().orderBy("count", ascending=False).first()[0]][0]
df = df.na.replace('Other', value,'gender')

value = [df.groupby("smoking_status").count().orderBy("count", ascending=False).first()[0]][0]
df = df.na.replace('Unknown', value,'smoking_status')

In [9]:
columns = ['summary'] + [x[0] for x in df.dtypes if (x[1] != 'string') and (len(df.select(x[0]).distinct().collect()) != 2)]
columns.remove('id')

data_summary = df.describe().select(columns)

data_summary.show()

+-------+------------------+------------------+-----------------+
|summary|               age| avg_glucose_level|              bmi|
+-------+------------------+------------------+-----------------+
|  count|              5110|              5110|             5110|
|   mean|43.226614481409015|106.14767710371804|28.43091976516634|
| stddev| 22.61264672311348| 45.28356015058193|7.688482598387539|
|    min|              0.08|             55.12|               10|
|    max|              82.0|            271.74|               97|
+-------+------------------+------------------+-----------------+



A statistical summary of the dataset

In [10]:
columns = [x[0] for x in df.dtypes if (x[1] == 'string')]

for i in columns:
    
    print(i)
    df.groupBy(i).count().orderBy('count').show()
    print()

gender
+------+-----+
|gender|count|
+------+-----+
|  Male| 2115|
|Female| 2995|
+------+-----+


ever_married
+------------+-----+
|ever_married|count|
+------------+-----+
|          No| 1757|
|         Yes| 3353|
+------------+-----+


work_type
+-------------+-----+
|    work_type|count|
+-------------+-----+
| Never_worked|   22|
|     Govt_job|  657|
|     children|  687|
|Self-employed|  819|
|      Private| 2925|
+-------------+-----+


Residence_type
+--------------+-----+
|Residence_type|count|
+--------------+-----+
|         Rural| 2514|
|         Urban| 2596|
+--------------+-----+


smoking_status
+---------------+-----+
| smoking_status|count|
+---------------+-----+
|         smokes|  789|
|formerly smoked|  885|
|   never smoked| 3436|
+---------------+-----+




We notice that in the current dataset, a majority of records are from female patients. It also consists of many records from patients that are married as well as patients who have never smoked. On close observation at the working type of the patients, many work in private organizations or are self-employed. There is no difference in the type of residence.

In [11]:
columns = [x[0] for x in df.dtypes if (x[1] == 'string')]

for i in columns:
    print("Patient Stroke vs {} Analysis".format(i.title().replace("_"," ")))
    df.groupBy(i).pivot("stroke").count().show()

Patient Stroke vs Gender Analysis
+------+----+---+
|gender|   0|  1|
+------+----+---+
|Female|2854|141|
|  Male|2007|108|
+------+----+---+

Patient Stroke vs Ever Married Analysis
+------------+----+---+
|ever_married|   0|  1|
+------------+----+---+
|          No|1728| 29|
|         Yes|3133|220|
+------------+----+---+

Patient Stroke vs Work Type Analysis
+-------------+----+----+
|    work_type|   0|   1|
+-------------+----+----+
| Never_worked|  22|null|
|Self-employed| 754|  65|
|      Private|2776| 149|
|     children| 685|   2|
|     Govt_job| 624|  33|
+-------------+----+----+

Patient Stroke vs Residence Type Analysis
+--------------+----+---+
|Residence_type|   0|  1|
+--------------+----+---+
|         Urban|2461|135|
|         Rural|2400|114|
+--------------+----+---+

Patient Stroke vs Smoking Status Analysis
+---------------+----+---+
| smoking_status|   0|  1|
+---------------+----+---+
|         smokes| 747| 42|
|   never smoked|3299|137|
|formerly smoked| 815| 7

Here we have identified all the absolute values for patients suffering and not suffering from stroke for various factors. We will now analyze the data in depth.

In [12]:
columns = [x[0] for x in df.dtypes if (x[1] == 'string')]

for i in columns:
    print("Patient Stroke vs {} Detailed Analysis".format(i.title().replace("_"," ")))
    Data = df.groupBy(i).pivot("stroke").count()
    Data = Data.withColumn("stroke_precentage",(Data["1"]/(Data["0"]+Data["1"]))*100)
    Data = Data.withColumn("stroke_precentage", Data["stroke_precentage"].cast(IntegerType()))
    Data.show()

Patient Stroke vs Gender Detailed Analysis
+------+----+---+-----------------+
|gender|   0|  1|stroke_precentage|
+------+----+---+-----------------+
|Female|2854|141|                4|
|  Male|2007|108|                5|
+------+----+---+-----------------+

Patient Stroke vs Ever Married Detailed Analysis
+------------+----+---+-----------------+
|ever_married|   0|  1|stroke_precentage|
+------------+----+---+-----------------+
|          No|1728| 29|                1|
|         Yes|3133|220|                6|
+------------+----+---+-----------------+

Patient Stroke vs Work Type Detailed Analysis
+-------------+----+----+-----------------+
|    work_type|   0|   1|stroke_precentage|
+-------------+----+----+-----------------+
| Never_worked|  22|null|             null|
|Self-employed| 754|  65|                7|
|      Private|2776| 149|                5|
|     children| 685|   2|                0|
|     Govt_job| 624|  33|                5|
+-------------+----+----+---------------

Married men are more likely to suffer a stroke. It is observed that self-employed patients are seven times more likely to suffer a stroke, whereas patients with a private or government job are five times likely to suffer a stroke. We notice that patients in the Urban area suffer a stroke a little more than those from the rural areas. Lastly, the patients who have formerly been smoking have much higher chances of having a stroke than those who have never smoked.  

## Data Modeling Using PySpark

In [13]:
from pyspark.ml.feature import VectorAssembler
featureassembler=VectorAssembler(inputCols=["age","Experience"],outputCol="Independent Features")

In [14]:
from pyspark.ml.feature import StringIndexer
columns = [x[0] for x in df.dtypes if (x[1] == 'string')]
columnsrename = ["{}_OHE".format(i) for i in columns]

indexer=StringIndexer(inputCols=columns,outputCols=columnsrename)
Data=indexer.fit(df).transform(df)
Data.select(columnsrename).show(5)

+----------+----------------+-------------+------------------+------------------+
|gender_OHE|ever_married_OHE|work_type_OHE|Residence_type_OHE|smoking_status_OHE|
+----------+----------------+-------------+------------------+------------------+
|       1.0|             0.0|          0.0|               0.0|               1.0|
|       0.0|             0.0|          1.0|               1.0|               0.0|
|       1.0|             0.0|          0.0|               1.0|               0.0|
|       0.0|             0.0|          0.0|               0.0|               2.0|
|       0.0|             0.0|          1.0|               1.0|               0.0|
+----------+----------------+-------------+------------------+------------------+
only showing top 5 rows



In [15]:
from pyspark.ml.feature import VectorAssembler

columns = [x[0] for x in df.dtypes if (x[1] != 'string')]
columns.remove('id')
columns.remove('stroke')

columns = columns + columnsrename

featureassembler = VectorAssembler(inputCols=columns,outputCol="Independent Attributes")

OutputData = featureassembler.transform(Data)

OutputData.select(["Independent Attributes","stroke"]).show(5)

+----------------------+------+
|Independent Attributes|stroke|
+----------------------+------+
|  [67.0,0.0,1.0,228...|     1|
|  (10,[0,3,4,7,8],[...|     1|
|  [80.0,0.0,1.0,105...|     1|
|  (10,[0,3,4,9],[49...|     1|
|  [79.0,1.0,0.0,174...|     1|
+----------------------+------+
only showing top 5 rows



In [16]:
TrainingData = OutputData.select(["stroke","Independent Attributes"])

from pyspark.ml.regression import LinearRegression

training_data, testing_data = TrainingData.randomSplit([0.75,0.25])

In [17]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import RandomForest
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

model = RandomForestClassifier(numTrees=3, maxDepth=2, featuresCol='Independent Attributes', labelCol="stroke", seed=42,leafCol="leafId")
model = model.fit(training_data)

prediction = model.transform(testing_data)

prediction.show(5)

+------+----------------------+--------------------+--------------------+----------+-------------+
|stroke|Independent Attributes|       rawPrediction|         probability|prediction|       leafId|
+------+----------------------+--------------------+--------------------+----------+-------------+
|     0|  (10,[0,1,3,4],[36...|[2.83906906318637...|[0.94635635439545...|       0.0|[0.0,0.0,0.0]|
|     0|  (10,[0,1,3,4],[40...|[2.83906906318637...|[0.94635635439545...|       0.0|[0.0,0.0,0.0]|
|     0|  (10,[0,1,3,4],[51...|[2.83906906318637...|[0.94635635439545...|       0.0|[0.0,0.0,0.0]|
|     0|  (10,[0,1,3,4],[55...|[2.83906906318637...|[0.94635635439545...|       0.0|[0.0,0.0,0.0]|
|     0|  (10,[0,1,3,4],[57...|[2.83906906318637...|[0.94635635439545...|       0.0|[0.0,0.0,0.0]|
+------+----------------------+--------------------+--------------------+----------+-------------+
only showing top 5 rows



In [18]:
modeleval = MulticlassClassificationEvaluator(labelCol="stroke", predictionCol="prediction",metricName = "accuracy")
modelaccuracy = modeleval.evaluate(prediction)

In [19]:
print("Accuracy: {}".format(modelaccuracy))
print("Test Error: {}".format(1.0-modelaccuracy))

Accuracy: 0.9607993850883936
Test Error: 0.039200614911606424
