For this project, we use the dataset about egyptians hepatitis C Virus (HCV). The dataset is available here http://archive.ics.uci.edu/ml/datasets/Hepatitis+C+Virus+%28HCV%29+for+Egyptian+patients.
Let us load useful librairies.
The dataset is described in this paper https://www.researchgate.net/profile/Mahmoud_Nasr15/publication/323130913_A_novel_model_based_on_non_invasive_methods_for_prediction_of_liver_fibrosis/links/5e27008ca6fdcc70a13dbfcb/A-novel-model-based-on-non-invasive-methods-for-prediction-of-liver-fibrosis.pdf

In [2]:
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.sql.functions import col
from pyspark.sql import functions as fn
from matplotlib import pyplot as plt
import pandas as pd
import numpy as np
from pyspark.ml.linalg import Vectors 
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString,StringIndexer, VectorIndexer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import seaborn as sn
from pyspark.ml.feature import StandardScaler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


import functools
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, VectorIndexer, SQLTransformer, IndexToString
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression  as LR
from pyspark.ml.classification import RandomForestClassifier as RF 
from pyspark.ml.classification import NaiveBayes as NB

Let us read the dataset.

In [4]:
dataset = spark.read.load("/FileStore/tables/HCV_Egy_Data-4d319.csv", format = "csv", header = True, inferSchema = True)
display(dataset)


Age,Gender,BMI,Fever,Nausea/Vomting,Headache,Diarrhea,Fatigue & generalized bone ache,Jaundice,Epigastric pain,WBC,RBC,HGB,Plat,AST 1,ALT 1,ALT4,ALT 12,ALT 24,ALT 36,ALT 48,ALT after 24 w,RNA Base,RNA 4,RNA 12,RNA EOT,RNA EF,Baseline histological Grading,Baselinehistological staging
56,1,35,2,1,1,1,2,2,2,7425,4248807.0,14,112132.0,99,84,52.0,109,81,5,5,5,655330,634536,288194,5,5,13,2
46,1,29,1,2,2,1,2,2,1,12101,4429425.0,10,129367.0,91,123,95.0,75,113,57,123,44,40620,538635,637056,336804,31085,4,2
57,1,33,2,2,2,2,1,1,1,4178,4621191.0,12,151522.0,113,49,95.0,107,116,5,5,5,571148,661346,5,735945,558829,4,4
49,2,33,1,2,1,2,1,2,1,6490,4794631.0,10,146457.0,43,64,109.0,80,88,48,77,33,1041941,449939,585688,744463,582301,10,3
59,1,32,1,1,2,1,2,2,2,3661,4606375.0,11,187684.0,99,104,67.0,48,120,94,90,30,660410,738756,3731527,338946,242861,11,1
58,2,22,2,2,2,1,2,2,1,11785,3882456.0,15,131228.0,66,104,121.0,96,65,73,114,29,1157452,1086852,5,5,5,4,4
42,2,26,1,1,2,2,2,2,2,11620,4747333.0,12,177261.0,78,57,113.0,118,107,84,80,28,325694,1034008,275095,214566,635157,12,4
48,2,30,1,1,2,2,1,1,2,7335,4405941.0,11,216176.0,119,112,80.0,127,45,96,53,39,641129,72050,787295,370605,506296,12,3
44,1,23,1,1,2,2,2,1,2,10480,4608464.0,12,148889.0,93,83,55.0,102,97,122,39,45,591441,757361,5,371090,203042,5,2
45,1,30,2,1,2,2,1,1,2,6681,4455329.0,12,98200.0,55,68,72.0,127,81,125,43,30,1151206,230488,267320,275295,555516,4,2


We want to see the schema of the dataframe

In [6]:
dataset.printSchema()

How many columns we have?

In [8]:
len(dataset.columns)

Since label column is named Baseline histological Staging, we rename this variable.

In [10]:
dataset.columns

Now, we want to use some aggregation functions in order to explore the dataset. Before, transform label 'Baselinehistological staging' into string type.

In [12]:
def string_to_float(x):
    return float(x)
  
def condition(r):
    if (r==1):
        return "F1"
    elif(r== 2):
        return "F2"
    elif (r==3):
        return "F3"
    else:
        return "F4"


from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, DoubleType
string_to_float_udf = udf(string_to_float, DoubleType())
quality_udf = udf(lambda x: condition(x), StringType())
Label = quality_udf("Baselinehistological staging")
dataset = dataset.withColumn("Label", fn.lit(Label)) ## Be careful here, please remove label column
display(dataset)

Age,Gender,BMI,Fever,Nausea/Vomting,Headache,Diarrhea,Fatigue & generalized bone ache,Jaundice,Epigastric pain,WBC,RBC,HGB,Plat,AST 1,ALT 1,ALT4,ALT 12,ALT 24,ALT 36,ALT 48,ALT after 24 w,RNA Base,RNA 4,RNA 12,RNA EOT,RNA EF,Baseline histological Grading,Baselinehistological staging,Label
56,1,35,2,1,1,1,2,2,2,7425,4248807.0,14,112132.0,99,84,52.0,109,81,5,5,5,655330,634536,288194,5,5,13,2,F2
46,1,29,1,2,2,1,2,2,1,12101,4429425.0,10,129367.0,91,123,95.0,75,113,57,123,44,40620,538635,637056,336804,31085,4,2,F2
57,1,33,2,2,2,2,1,1,1,4178,4621191.0,12,151522.0,113,49,95.0,107,116,5,5,5,571148,661346,5,735945,558829,4,4,F4
49,2,33,1,2,1,2,1,2,1,6490,4794631.0,10,146457.0,43,64,109.0,80,88,48,77,33,1041941,449939,585688,744463,582301,10,3,F3
59,1,32,1,1,2,1,2,2,2,3661,4606375.0,11,187684.0,99,104,67.0,48,120,94,90,30,660410,738756,3731527,338946,242861,11,1,F1
58,2,22,2,2,2,1,2,2,1,11785,3882456.0,15,131228.0,66,104,121.0,96,65,73,114,29,1157452,1086852,5,5,5,4,4,F4
42,2,26,1,1,2,2,2,2,2,11620,4747333.0,12,177261.0,78,57,113.0,118,107,84,80,28,325694,1034008,275095,214566,635157,12,4,F4
48,2,30,1,1,2,2,1,1,2,7335,4405941.0,11,216176.0,119,112,80.0,127,45,96,53,39,641129,72050,787295,370605,506296,12,3,F3
44,1,23,1,1,2,2,2,1,2,10480,4608464.0,12,148889.0,93,83,55.0,102,97,122,39,45,591441,757361,5,371090,203042,5,2,F2
45,1,30,2,1,2,2,1,1,2,6681,4455329.0,12,98200.0,55,68,72.0,127,81,125,43,30,1151206,230488,267320,275295,555516,4,2,F2


In [13]:
dataset.printSchema()

In [14]:
label = dataset.select("Label")
label.show(5)

In the dataset, there are qualitatives variables and quantitatives variables. We separate qualitative and quantitative variables. In the qualitative variables, 1 means presence of disease whereas 2 is absence of disease.
For Gender variable, 1 is male and 2 is female.

In [16]:
quali_dataset =  dataset.select('Gender', 'Fever', 'Nausea/Vomting','Headache ','Diarrhea ','Fatigue & generalized bone ache ', 'Jaundice ','Epigastric pain ') 
quanti_dataset = dataset.drop('Gender', 'Fever', 'Nausea/Vomting','Headache ','Diarrhea ','Fatigue & generalized bone ache ', 'Jaundice ','Epigastric pain ', 'label_new')
display(quali_dataset)
dispaly(quanti_dataset)

Gender,Fever,Nausea/Vomting,Headache,Diarrhea,Fatigue & generalized bone ache,Jaundice,Epigastric pain
1,2,1,1,1,2,2,2
1,1,2,2,1,2,2,1
1,2,2,2,2,1,1,1
2,1,2,1,2,1,2,1
1,1,1,2,1,2,2,2
2,2,2,2,1,2,2,1
2,1,1,2,2,2,2,2
2,1,1,2,2,1,1,2
1,1,1,2,2,2,1,2
1,2,1,2,2,1,1,2


In [17]:
for i in quanti_dataset.columns:
  quanti_dataset.describe(i).show()

In [18]:
for i in quali_dataset.columns:
  quali_dataset.select(i).distinct().show()

In [19]:
label.distinct().show()

We have 4 differents values of label.

In [21]:
for i in quali_dataset.columns:
  quali_dataset.groupby('Gender').agg(fn.count(i)).show()

Let's inspect the NaN in the the quantitative and qualitative variable.

In [23]:
for col in quali_dataset.columns:
  print(col, "\t", "with null values: ", quali_dataset.filter(quali_dataset[col].isNull()).count())

In [24]:
for col in quanti_dataset.columns:
  print(col, "\t", "with null values: ", quanti_dataset.filter(quanti_dataset[col].isNull()).count())

In [25]:
for i in quali_dataset.columns[1:]:
  print( "\t", "The number of persons which have ", i,":", quali_dataset.filter(quali_dataset[i]==1).count())

In [26]:
Occurence_by_class = dataset.groupby('Label').count().collect()
categories = [i[0] for i in Occurence_by_class]
counts = [i[1] for i in Occurence_by_class]
ind = np.array(range(len(categories)))
width = 0.35
plt.bar(ind, counts, width = width, color = 'r')
plt.ylabel('counts')
plt.title('Label distribution')
plt.xticks(ind, categories)
display()


What is distribution of the observation by gender?

In [28]:
Occurence_by_Gender = dataset.groupby('Gender').count().collect()
categories_Gender = [i[0] for i in Occurence_by_Gender]
counts_Gender = [i[1] for i in Occurence_by_Gender]
ind_Gender = np.array(range(len(categories_Gender)))
width = 0.35
plt.bar(ind_Gender, counts_Gender, width = width, color = 'r')
plt.ylabel('counts')
plt.title('Gender distribution')
plt.xticks(ind_Gender, categories_Gender)
display()

There is more male than female. 
Let's plot some numeric variables.

In [30]:
data = dataset.select('Plat','AST 1','ALT 1','ALT4', 'ALT 12','ALT 24','ALT 36','ALT 48','ALT after 24 w','RNA Base','RNA 4','RNA 12','RNA EOT','RNA EF','Baseline histological Grading')
data_toPandas = data.toPandas()
for i in data.columns:
  sn.boxplot(data_toPandas[i])
  display()

In [31]:
for i in data.columns:
  sn.distplot(data_toPandas[i])
  display()


According to these plot above, one can say that ALT after 24w and RNA 12 have outliers. The distribution of variable Plat seems centered.
Now, we want to deal with categorical features. We will use one hot encoders to convert catogorical variables.

In [33]:
def transData(data):
  return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','Label'])
dataset.columns

In [34]:
transformed = transData(dataset)
transformed.show(5, True)

Standardization

In [36]:
scaler = StandardScaler(inputCol="features", outputCol="Features",
                        withStd=True, withMean=False)
scalerModel = scaler.fit(transformed)
scaledData = scalerModel.transform(transformed)
transformed = scaledData.select("Features", "Label")
transformed.show(5, True)

In [37]:
labelIndexer = StringIndexer(inputCol='Label',
                             outputCol='indexedLabel').fit(transformed)
labelIndexer.transform(transformed).show(5, True)

In [38]:
featureIndexer =VectorIndexer(inputCol="Features", \
                              outputCol="indexedFeatures", \
                              maxCategories=4).fit(transformed)
featureIndexer.transform(transformed).show(5, True)

Let's split the data to training and test data sets.

In [40]:
train_data, test_data = transformed.randomSplit([0.6,0.4], seed = 0.0)

In [41]:
train_data.show(5, False)

In [42]:
test_data.show(5, False)

1) Logistic Regression Model

In [44]:
lr = LR(featuresCol='indexedFeatures', labelCol='indexedLabel', maxIter=50, regParam=0.01)

Let's define pipeline architecture

In [46]:
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

pipeline = Pipeline(stages=[labelIndexer, featureIndexer, lr,labelConverter])
model1 = pipeline.fit(train_data) ##train model logistique regression

Prediction with test_data.

In [48]:
predictions = model1.transform(test_data)
predictions.select("features","label","predictedLabel").show(5)

In [49]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("accuracy:", accuracy)

We get a good accuracy

2)Decision Tree

In [52]:
dt =  DecisionTreeClassifier(labelCol='indexedLabel', featuresCol='indexedFeatures')


In [53]:
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt,labelConverter])
model2 = pipeline.fit(train_data)

In [54]:
predictions = model2.transform(test_data)
predictions.select("features","label","predictedLabel").show(5)

In [55]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:",accuracy)