### ML with PySpark
+ Classify and Predict

In [1]:
# Load Pckgs
from pyspark import SparkContext

In [2]:
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
sc= SparkContext(master='local[2]')


In [3]:
# Spark UI
sc

In [4]:
# Load pkgs
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.appName("MLWithSpark").getOrCreate()

### WorkFlow
+ Data Preparation
+ Feature Engineering
+ Build Model
+ Evaluate

# Task 
+ Predict if a patient is HEP or not based parameters

In [6]:
# Load Dataset
df = spark.read.csv("hcvdata.csv", header=True, inferSchema=True)

In [91]:
# Preview Dataset
df.show()

+---+-------------+---+---+----+----+----+----+----+-----+----+-----+----+----+
|_c0|     Category|Age|Sex| ALB| ALP| ALT| AST| BIL|  CHE|CHOL| CREA| GGT|PROT|
+---+-------------+---+---+----+----+----+----+----+-----+----+-----+----+----+
|  1|0=Blood Donor| 32|  m|38.5|52.5| 7.7|22.1| 7.5| 6.93|3.23|106.0|12.1|  69|
|  2|0=Blood Donor| 32|  m|38.5|70.3|  18|24.7| 3.9|11.17| 4.8| 74.0|15.6|76.5|
|  3|0=Blood Donor| 32|  m|46.9|74.7|36.2|52.6| 6.1| 8.84| 5.2| 86.0|33.2|79.3|
|  4|0=Blood Donor| 32|  m|43.2|  52|30.6|22.6|18.9| 7.33|4.74| 80.0|33.8|75.7|
|  5|0=Blood Donor| 32|  m|39.2|74.1|32.6|24.8| 9.6| 9.15|4.32| 76.0|29.9|68.7|
|  6|0=Blood Donor| 32|  m|41.6|43.3|18.5|19.7|12.3| 9.92|6.05|111.0|91.0|  74|
|  7|0=Blood Donor| 32|  m|46.3|41.3|17.5|17.8| 8.5| 7.01|4.79| 70.0|16.9|74.5|
|  8|0=Blood Donor| 32|  m|42.2|41.9|35.8|31.1|16.1| 5.82| 4.6|109.0|21.5|67.1|
|  9|0=Blood Donor| 32|  m|50.9|65.5|23.2|21.2| 6.9| 8.69| 4.1| 83.0|13.7|71.3|
| 10|0=Blood Donor| 32|  m|42.4|86.3|20.

In [10]:
# Check for columns
print(df.columns)

['_c0', 'Category', 'Age', 'Sex', 'ALB', 'ALP', 'ALT', 'AST', 'BIL', 'CHE', 'CHOL', 'CREA', 'GGT', 'PROT']


In [7]:
# Re-arrange
df = df.select('Age', 'Sex', 'ALB', 'ALP', 'ALT', 'AST', 'BIL', 'CHE', 'CHOL', 'CREA', 'GGT', 'PROT', 'Category')

In [8]:
df.head()

Row(Age=32, Sex='m', ALB='38.5', ALP='52.5', ALT='7.7', AST=22.1, BIL=7.5, CHE=6.93, CHOL='3.23', CREA=106.0, GGT=12.1, PROT='69', Category='0=Blood Donor')

In [94]:
# Check datatypes
df.dtypes

[('Age', 'int'),
 ('Sex', 'string'),
 ('ALB', 'string'),
 ('ALP', 'string'),
 ('ALT', 'string'),
 ('AST', 'double'),
 ('BIL', 'double'),
 ('CHE', 'double'),
 ('CHOL', 'string'),
 ('CREA', 'double'),
 ('GGT', 'double'),
 ('PROT', 'string'),
 ('Category', 'string')]

In [95]:
# Check for the schema
df.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- ALB: string (nullable = true)
 |-- ALP: string (nullable = true)
 |-- ALT: string (nullable = true)
 |-- AST: double (nullable = true)
 |-- BIL: double (nullable = true)
 |-- CHE: double (nullable = true)
 |-- CHOL: string (nullable = true)
 |-- CREA: double (nullable = true)
 |-- GGT: double (nullable = true)
 |-- PROT: string (nullable = true)
 |-- Category: string (nullable = true)



In [22]:
# Descriptive summary
print(df.describe().show())

+-------+------------------+----+-----------------+------------------+------------------+-----------------+------------------+------------------+------------------+-----------------+-----------------+-----------------+-------------+
|summary|               Age| Sex|              ALB|               ALP|               ALT|              AST|               BIL|               CHE|              CHOL|             CREA|              GGT|             PROT|     Category|
+-------+------------------+----+-----------------+------------------+------------------+-----------------+------------------+------------------+------------------+-----------------+-----------------+-----------------+-------------+
|  count|               615| 615|              615|               615|               615|              615|               615|               615|               615|              615|              615|              615|          615|
|   mean| 47.40813008130081|null|41.62019543973941| 68.2839195979899

In [23]:
# Value Count
df.groupBy('Category').count().show()

+--------------------+-----+
|            Category|count|
+--------------------+-----+
|       0=Blood Donor|  533|
|         3=Cirrhosis|   30|
|          2=Fibrosis|   21|
|0s=suspect Blood ...|    7|
|         1=Hepatitis|   24|
+--------------------+-----+



#### Feature Engineering
+ Numerical Values
+ Vectorization
+ Scaling

In [9]:
import pyspark.ml

In [10]:
dir(pyspark.ml)

['Estimator',
 'Model',
 'Pipeline',
 'PipelineModel',
 'PredictionModel',
 'Predictor',
 'Transformer',
 'UnaryTransformer',
 '__all__',
 '__builtins__',
 '__cached__',
 '__doc__',
 '__file__',
 '__loader__',
 '__name__',
 '__package__',
 '__path__',
 '__spec__',
 'base',
 'classification',
 'clustering',
 'common',
 'evaluation',
 'feature',
 'fpm',
 'image',
 'linalg',
 'param',
 'pipeline',
 'recommendation',
 'regression',
 'stat',
 'tree',
 'tuning',
 'util',
 'wrapper']

In [11]:
# Load ML Pkgs
from pyspark.ml.feature import VectorAssembler, StringIndexer

In [30]:
df.show()

+---+---+----+----+----+----+----+-----+----+-----+----+----+-------------+
|Age|Sex| ALB| ALP| ALT| AST| BIL|  CHE|CHOL| CREA| GGT|PROT|     Category|
+---+---+----+----+----+----+----+-----+----+-----+----+----+-------------+
| 32|  m|38.5|52.5| 7.7|22.1| 7.5| 6.93|3.23|106.0|12.1|  69|0=Blood Donor|
| 32|  m|38.5|70.3|  18|24.7| 3.9|11.17| 4.8| 74.0|15.6|76.5|0=Blood Donor|
| 32|  m|46.9|74.7|36.2|52.6| 6.1| 8.84| 5.2| 86.0|33.2|79.3|0=Blood Donor|
| 32|  m|43.2|  52|30.6|22.6|18.9| 7.33|4.74| 80.0|33.8|75.7|0=Blood Donor|
| 32|  m|39.2|74.1|32.6|24.8| 9.6| 9.15|4.32| 76.0|29.9|68.7|0=Blood Donor|
| 32|  m|41.6|43.3|18.5|19.7|12.3| 9.92|6.05|111.0|91.0|  74|0=Blood Donor|
| 32|  m|46.3|41.3|17.5|17.8| 8.5| 7.01|4.79| 70.0|16.9|74.5|0=Blood Donor|
| 32|  m|42.2|41.9|35.8|31.1|16.1| 5.82| 4.6|109.0|21.5|67.1|0=Blood Donor|
| 32|  m|50.9|65.5|23.2|21.2| 6.9| 8.69| 4.1| 83.0|13.7|71.3|0=Blood Donor|
| 32|  m|42.4|86.3|20.3|20.0|35.2| 5.46|4.45| 81.0|15.9|69.9|0=Blood Donor|
| 32|  m|44.

In [32]:
# Convert Gender/Sex column to 0(m) or 1 (f)
## Check unique values.
df.select('Sex').distinct().show()

+---+
|Sex|
+---+
|  m|
|  f|
+---+



In [12]:
# Convert the string to numerical code
# label encoding 
genderEncoder = StringIndexer(inputCol='Sex', outputCol='Gender').fit(df)

In [13]:
df = genderEncoder.transform(df)

In [99]:
df.show(5)

+---+---+----+----+----+----+----+-----+----+-----+----+----+-------------+------+
|Age|Sex| ALB| ALP| ALT| AST| BIL|  CHE|CHOL| CREA| GGT|PROT|     Category|Gender|
+---+---+----+----+----+----+----+-----+----+-----+----+----+-------------+------+
| 32|  m|38.5|52.5| 7.7|22.1| 7.5| 6.93|3.23|106.0|12.1|  69|0=Blood Donor|   0.0|
| 32|  m|38.5|70.3|  18|24.7| 3.9|11.17| 4.8| 74.0|15.6|76.5|0=Blood Donor|   0.0|
| 32|  m|46.9|74.7|36.2|52.6| 6.1| 8.84| 5.2| 86.0|33.2|79.3|0=Blood Donor|   0.0|
| 32|  m|43.2|  52|30.6|22.6|18.9| 7.33|4.74| 80.0|33.8|75.7|0=Blood Donor|   0.0|
| 32|  m|39.2|74.1|32.6|24.8| 9.6| 9.15|4.32| 76.0|29.9|68.7|0=Blood Donor|   0.0|
+---+---+----+----+----+----+----+-----+----+-----+----+----+-------------+------+
only showing top 5 rows



In [39]:
# Encode Category column
df.select('Category').distinct().show()

+--------------------+
|            Category|
+--------------------+
|       0=Blood Donor|
|         3=Cirrhosis|
|          2=Fibrosis|
|0s=suspect Blood ...|
|         1=Hepatitis|
+--------------------+



In [14]:
# Convert the string to numerical code
# label encoding 
categoryEncoder = StringIndexer(inputCol='Category', outputCol='CategoryCode').fit(df)

In [15]:
df = categoryEncoder.transform(df)

In [43]:
df.show()

+---+---+----+----+----+----+----+-----+----+-----+----+----+-------------+------+------------+
|Age|Sex| ALB| ALP| ALT| AST| BIL|  CHE|CHOL| CREA| GGT|PROT|     Category|Gender|CategoryCode|
+---+---+----+----+----+----+----+-----+----+-----+----+----+-------------+------+------------+
| 32|  m|38.5|52.5| 7.7|22.1| 7.5| 6.93|3.23|106.0|12.1|  69|0=Blood Donor|   0.0|         0.0|
| 32|  m|38.5|70.3|  18|24.7| 3.9|11.17| 4.8| 74.0|15.6|76.5|0=Blood Donor|   0.0|         0.0|
| 32|  m|46.9|74.7|36.2|52.6| 6.1| 8.84| 5.2| 86.0|33.2|79.3|0=Blood Donor|   0.0|         0.0|
| 32|  m|43.2|  52|30.6|22.6|18.9| 7.33|4.74| 80.0|33.8|75.7|0=Blood Donor|   0.0|         0.0|
| 32|  m|39.2|74.1|32.6|24.8| 9.6| 9.15|4.32| 76.0|29.9|68.7|0=Blood Donor|   0.0|         0.0|
| 32|  m|41.6|43.3|18.5|19.7|12.3| 9.92|6.05|111.0|91.0|  74|0=Blood Donor|   0.0|         0.0|
| 32|  m|46.3|41.3|17.5|17.8| 8.5| 7.01|4.79| 70.0|16.9|74.5|0=Blood Donor|   0.0|         0.0|
| 32|  m|42.2|41.9|35.8|31.1|16.1| 5.82|

In [44]:
# Get the labels
categoryEncoder.labels

['0=Blood Donor',
 '3=Cirrhosis',
 '1=Hepatitis',
 '2=Fibrosis',
 '0s=suspect Blood Donor']

In [45]:
genderEncoder.labels

['m', 'f']

In [47]:
# Convert back to original dataset
from pyspark.ml.feature import IndexToString
converter = IndexToString(inputCol='Gender', outputCol='Orig_Gender')
converted_df = converter.transform(df)
converted_df.head()

Row(Age=32, Sex='m', ALB='38.5', ALP='52.5', ALT='7.7', AST=22.1, BIL=7.5, CHE=6.93, CHOL='3.23', CREA=106.0, GGT=12.1, PROT='69', Category='0=Blood Donor', Gender=0.0, CategoryCode=0.0, Orig_Gender='m')

In [48]:
print(df.columns)

['Age', 'Sex', 'ALB', 'ALP', 'ALT', 'AST', 'BIL', 'CHE', 'CHOL', 'CREA', 'GGT', 'PROT', 'Category', 'Gender', 'CategoryCode']


In [60]:
df.show(5)

+---+---+----+----+----+----+----+-----+----+-----+----+----+-------------+------+------------+
|Age|Sex| ALB| ALP| ALT| AST| BIL|  CHE|CHOL| CREA| GGT|PROT|     Category|Gender|CategoryCode|
+---+---+----+----+----+----+----+-----+----+-----+----+----+-------------+------+------------+
| 32|  m|38.5|52.5| 7.7|22.1| 7.5| 6.93|3.23|106.0|12.1|  69|0=Blood Donor|   0.0|         0.0|
| 32|  m|38.5|70.3|  18|24.7| 3.9|11.17| 4.8| 74.0|15.6|76.5|0=Blood Donor|   0.0|         0.0|
| 32|  m|46.9|74.7|36.2|52.6| 6.1| 8.84| 5.2| 86.0|33.2|79.3|0=Blood Donor|   0.0|         0.0|
| 32|  m|43.2|  52|30.6|22.6|18.9| 7.33|4.74| 80.0|33.8|75.7|0=Blood Donor|   0.0|         0.0|
| 32|  m|39.2|74.1|32.6|24.8| 9.6| 9.15|4.32| 76.0|29.9|68.7|0=Blood Donor|   0.0|         0.0|
+---+---+----+----+----+----+----+-----+----+-----+----+----+-------------+------+------------+
only showing top 5 rows



In [16]:
### Feature Selection
required_features = ['Age', 'Gender', 'ALB', 'ALP', 'ALT', 'AST', 'BIL', 'CHE', 'CHOL', 'CREA', 'GGT', 'PROT', 'CategoryCode']

In [17]:
df2 = df.select(required_features)

In [18]:
df2.show(5)

+---+------+----+----+----+----+----+-----+----+-----+----+----+------------+
|Age|Gender| ALB| ALP| ALT| AST| BIL|  CHE|CHOL| CREA| GGT|PROT|CategoryCode|
+---+------+----+----+----+----+----+-----+----+-----+----+----+------------+
| 32|   0.0|38.5|52.5| 7.7|22.1| 7.5| 6.93|3.23|106.0|12.1|  69|         0.0|
| 32|   0.0|38.5|70.3|  18|24.7| 3.9|11.17| 4.8| 74.0|15.6|76.5|         0.0|
| 32|   0.0|46.9|74.7|36.2|52.6| 6.1| 8.84| 5.2| 86.0|33.2|79.3|         0.0|
| 32|   0.0|43.2|  52|30.6|22.6|18.9| 7.33|4.74| 80.0|33.8|75.7|         0.0|
| 32|   0.0|39.2|74.1|32.6|24.8| 9.6| 9.15|4.32| 76.0|29.9|68.7|         0.0|
+---+------+----+----+----+----+----+-----+----+-----+----+----+------------+
only showing top 5 rows



In [19]:
df2 = df2.toPandas().replace('NA',0).astype(float)

In [20]:
type(df2)

pandas.core.frame.DataFrame

In [21]:
# Convert to PySpark Dataframe
new_df =  spark.createDataFrame(df2)

  for column, series in pdf.iteritems():


In [22]:
type(new_df)

pyspark.sql.dataframe.DataFrame

In [23]:
new_df.printSchema()

root
 |-- Age: double (nullable = true)
 |-- Gender: double (nullable = true)
 |-- ALB: double (nullable = true)
 |-- ALP: double (nullable = true)
 |-- ALT: double (nullable = true)
 |-- AST: double (nullable = true)
 |-- BIL: double (nullable = true)
 |-- CHE: double (nullable = true)
 |-- CHOL: double (nullable = true)
 |-- CREA: double (nullable = true)
 |-- GGT: double (nullable = true)
 |-- PROT: double (nullable = true)
 |-- CategoryCode: double (nullable = true)



In [24]:
# Vectorizing
vec_assembler = VectorAssembler(inputCols=required_features, outputCol='features')

In [25]:
vec_df = vec_assembler.transform(new_df)

In [26]:
vec_df.show()

+----+------+----+----+----+----+----+-----+----+-----+----+----+------------+--------------------+
| Age|Gender| ALB| ALP| ALT| AST| BIL|  CHE|CHOL| CREA| GGT|PROT|CategoryCode|            features|
+----+------+----+----+----+----+----+-----+----+-----+----+----+------------+--------------------+
|32.0|   0.0|38.5|52.5| 7.7|22.1| 7.5| 6.93|3.23|106.0|12.1|69.0|         0.0|[32.0,0.0,38.5,52...|
|32.0|   0.0|38.5|70.3|18.0|24.7| 3.9|11.17| 4.8| 74.0|15.6|76.5|         0.0|[32.0,0.0,38.5,70...|
|32.0|   0.0|46.9|74.7|36.2|52.6| 6.1| 8.84| 5.2| 86.0|33.2|79.3|         0.0|[32.0,0.0,46.9,74...|
|32.0|   0.0|43.2|52.0|30.6|22.6|18.9| 7.33|4.74| 80.0|33.8|75.7|         0.0|[32.0,0.0,43.2,52...|
|32.0|   0.0|39.2|74.1|32.6|24.8| 9.6| 9.15|4.32| 76.0|29.9|68.7|         0.0|[32.0,0.0,39.2,74...|
|32.0|   0.0|41.6|43.3|18.5|19.7|12.3| 9.92|6.05|111.0|91.0|74.0|         0.0|[32.0,0.0,41.6,43...|
|32.0|   0.0|46.3|41.3|17.5|17.8| 8.5| 7.01|4.79| 70.0|16.9|74.5|         0.0|[32.0,0.0,46.3,41...|


### Train, Test, Split


In [27]:
train_df,test_df = vec_df.randomSplit([0.7, 0.3])

In [28]:
train_df.count()

441

#### Model building
+ Pyspark.ml: DataFrame
+ Pyspark.mllib: RDD / Legacy

In [30]:
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier

In [34]:
# Logistic Model
lr = LogisticRegression(featuresCol='features', labelCol='CategoryCode')

In [35]:
lr_mode = lr.fit(train_df)

In [40]:
y_pred = lr_mode.transform(test_df)

In [41]:
print(y_pred.columns)

['Age', 'Gender', 'ALB', 'ALP', 'ALT', 'AST', 'BIL', 'CHE', 'CHOL', 'CREA', 'GGT', 'PROT', 'CategoryCode', 'features', 'rawPrediction', 'probability', 'prediction']


In [42]:
y_pred.select('CategoryCode', 'rawPrediction', 'probability', 'prediction').show()

+------------+--------------------+--------------------+----------+
|CategoryCode|       rawPrediction|         probability|prediction|
+------------+--------------------+--------------------+----------+
|         0.0|[158.889653103593...|[1.0,4.4797412263...|       0.0|
|         0.0|[172.539655469257...|[1.0,4.5027580140...|       0.0|
|         0.0|[166.055082095802...|[1.0,6.0260936822...|       0.0|
|         0.0|[134.388038813368...|[1.0,2.5053623237...|       0.0|
|         0.0|[167.988626966462...|[1.0,8.5347800081...|       0.0|
|         0.0|[162.835602598592...|[1.0,1.2414613044...|       0.0|
|         0.0|[137.203318098966...|[1.0,5.9396995516...|       0.0|
|         0.0|[134.765573474115...|[1.0,2.3355981870...|       0.0|
|         0.0|[154.162228353261...|[1.0,5.0458726607...|       0.0|
|         0.0|[144.791973969941...|[1.0,1.0283029585...|       0.0|
|         0.0|[192.881441828642...|[1.0,1.6626368140...|       0.0|
|         0.0|[126.544185482714...|[1.0,4.618552

In [43]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#### Model Evaluation

In [44]:
# How to check for Accuracy. Evaluate model
multi_evaluator = MulticlassClassificationEvaluator(labelCol='CategoryCode', metricName='accuracy')

In [45]:
multi_evaluator.evaluate(y_pred)

0.9712643678160919

In [None]:
# Precision, F1 Score, Recall: Classification Report

In [46]:
from pyspark.mllib.evaluation import MulticlassMetrics

In [47]:
lr_metric = MulticlassMetrics(y_pred['CategoryCode', 'prediction'].rdd)



In [52]:
print(lr_metric.precision(1.0))

1.0
