importing SparkContext

In [1]:
from pyspark import SparkContext

In [2]:
spark = SparkContext(master='local[2]')

In [3]:
spark

importing sparkSesssion to perform operations

In [4]:
from pyspark.sql import SparkSession

In [5]:
sc = SparkSession.builder.appName("Hepatitis prediction").getOrCreate()

we'll read it as a dataframe not an rdd since the dataset is not that huge

In [6]:
df = sc.read.csv("HepatitisCdata.csv", inferSchema=True, header=True)

Data Profiling

In [7]:
df.show(5)

+---+-------------+---+---+----+----+----+----+----+-----+----+-----+----+----+
|_c0|     Category|Age|Sex| ALB| ALP| ALT| AST| BIL|  CHE|CHOL| CREA| GGT|PROT|
+---+-------------+---+---+----+----+----+----+----+-----+----+-----+----+----+
|  1|0=Blood Donor| 32|  m|38.5|52.5| 7.7|22.1| 7.5| 6.93|3.23|106.0|12.1|  69|
|  2|0=Blood Donor| 32|  m|38.5|70.3|  18|24.7| 3.9|11.17| 4.8| 74.0|15.6|76.5|
|  3|0=Blood Donor| 32|  m|46.9|74.7|36.2|52.6| 6.1| 8.84| 5.2| 86.0|33.2|79.3|
|  4|0=Blood Donor| 32|  m|43.2|  52|30.6|22.6|18.9| 7.33|4.74| 80.0|33.8|75.7|
|  5|0=Blood Donor| 32|  m|39.2|74.1|32.6|24.8| 9.6| 9.15|4.32| 76.0|29.9|68.7|
+---+-------------+---+---+----+----+----+----+----+-----+----+-----+----+----+
only showing top 5 rows



In [8]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Category: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- ALB: string (nullable = true)
 |-- ALP: string (nullable = true)
 |-- ALT: string (nullable = true)
 |-- AST: double (nullable = true)
 |-- BIL: double (nullable = true)
 |-- CHE: double (nullable = true)
 |-- CHOL: string (nullable = true)
 |-- CREA: double (nullable = true)
 |-- GGT: double (nullable = true)
 |-- PROT: string (nullable = true)



ALB: Albumin - It is a protein synthesized by the liver. Abnormal levels of albumin can indicate liver dysfunction.

ALP: Alkaline Phosphatase - It is an enzyme found in various tissues, including the liver and bones. Elevated levels of ALP can be indicative of liver damage or other conditions.

ALT: Alanine Aminotransferase - It is an enzyme primarily found in the liver. Increased levels of ALT in the bloodstream can be a sign of liver injury or disease.

AST: Aspartate Aminotransferase - It is an enzyme present in various tissues, including the liver, heart, and muscles. Elevated levels of AST can indicate liver damage or other health conditions.

BIL: Bilirubin - It is a yellow pigment produced during the breakdown of red blood cells. Elevated levels of bilirubin can indicate liver dysfunction or other conditions.

CHE: Cholinesterase - It is an enzyme produced mainly by the liver. Abnormal levels of cholinesterase can be a sign of liver disease.

CHOL: Cholesterol - It is a fatty substance found in the blood. Abnormal cholesterol levels can contribute to liver disease and other health issues.

CREA: Creatinine - It is a waste product produced by muscles and eliminated by the kidneys. Elevated creatinine levels may indicate impaired kidney function.

GGT: Gamma-Glutamyl Transferase - It is an enzyme found in various tissues, including the liver. Increased levels of GGT can indicate liver damage or disease.

PROT: Protein - It refers to the total protein levels in the blood. Abnormal protein levels can be indicative of liver disease or other health conditions.

These laboratory markers are commonly measured to assess liver function, diagnose liver diseases, and monitor the progression of hepatitis.

In [9]:
df.count()

615

In [10]:
df.groupBy("Category").count().show()

+--------------------+-----+
|            Category|count|
+--------------------+-----+
|       0=Blood Donor|  533|
|         3=Cirrhosis|   30|
|          2=Fibrosis|   21|
|0s=suspect Blood ...|    7|
|         1=Hepatitis|   24|
+--------------------+-----+



In [11]:
df.dtypes

[('_c0', 'int'),
 ('Category', 'string'),
 ('Age', 'int'),
 ('Sex', 'string'),
 ('ALB', 'string'),
 ('ALP', 'string'),
 ('ALT', 'string'),
 ('AST', 'double'),
 ('BIL', 'double'),
 ('CHE', 'double'),
 ('CHOL', 'string'),
 ('CREA', 'double'),
 ('GGT', 'double'),
 ('PROT', 'string')]

Converting strings to numbers

In [12]:
from pyspark.ml.feature import StringIndexer, VectorAssembler

In [13]:
category = StringIndexer(inputCol='Category', outputCol='dis').fit(df)
df = category.transform(df)

In [14]:
category.labels

['0=Blood Donor',
 '3=Cirrhosis',
 '1=Hepatitis',
 '2=Fibrosis',
 '0s=suspect Blood Donor']

In [15]:
df.select(['dis']).distinct().show()

+---+
|dis|
+---+
|0.0|
|1.0|
|4.0|
|3.0|
|2.0|
+---+



In [16]:
df.dtypes

[('_c0', 'int'),
 ('Category', 'string'),
 ('Age', 'int'),
 ('Sex', 'string'),
 ('ALB', 'string'),
 ('ALP', 'string'),
 ('ALT', 'string'),
 ('AST', 'double'),
 ('BIL', 'double'),
 ('CHE', 'double'),
 ('CHOL', 'string'),
 ('CREA', 'double'),
 ('GGT', 'double'),
 ('PROT', 'string'),
 ('dis', 'double')]

Converting sex to double

In [17]:
gender = StringIndexer(inputCol='Sex', outputCol="gender").fit(df)
df= gender.transform(df)

In [18]:
df.show()

+---+-------------+---+---+----+----+----+----+----+-----+----+-----+----+----+---+------+
|_c0|     Category|Age|Sex| ALB| ALP| ALT| AST| BIL|  CHE|CHOL| CREA| GGT|PROT|dis|gender|
+---+-------------+---+---+----+----+----+----+----+-----+----+-----+----+----+---+------+
|  1|0=Blood Donor| 32|  m|38.5|52.5| 7.7|22.1| 7.5| 6.93|3.23|106.0|12.1|  69|0.0|   0.0|
|  2|0=Blood Donor| 32|  m|38.5|70.3|  18|24.7| 3.9|11.17| 4.8| 74.0|15.6|76.5|0.0|   0.0|
|  3|0=Blood Donor| 32|  m|46.9|74.7|36.2|52.6| 6.1| 8.84| 5.2| 86.0|33.2|79.3|0.0|   0.0|
|  4|0=Blood Donor| 32|  m|43.2|  52|30.6|22.6|18.9| 7.33|4.74| 80.0|33.8|75.7|0.0|   0.0|
|  5|0=Blood Donor| 32|  m|39.2|74.1|32.6|24.8| 9.6| 9.15|4.32| 76.0|29.9|68.7|0.0|   0.0|
|  6|0=Blood Donor| 32|  m|41.6|43.3|18.5|19.7|12.3| 9.92|6.05|111.0|91.0|  74|0.0|   0.0|
|  7|0=Blood Donor| 32|  m|46.3|41.3|17.5|17.8| 8.5| 7.01|4.79| 70.0|16.9|74.5|0.0|   0.0|
|  8|0=Blood Donor| 32|  m|42.2|41.9|35.8|31.1|16.1| 5.82| 4.6|109.0|21.5|67.1|0.0|   0.0|

In [19]:
from pyspark.ml.feature import IndexToString

In [20]:
its=IndexToString(inputCol='dis', outputCol='tar')
itsdf=its.transform(df)

In [21]:
itsdf.select('tar').distinct().show()

+--------------------+
|                 tar|
+--------------------+
|       0=Blood Donor|
|         3=Cirrhosis|
|          2=Fibrosis|
|0s=suspect Blood ...|
|         1=Hepatitis|
+--------------------+



In [22]:
itsdf.dtypes

[('_c0', 'int'),
 ('Category', 'string'),
 ('Age', 'int'),
 ('Sex', 'string'),
 ('ALB', 'string'),
 ('ALP', 'string'),
 ('ALT', 'string'),
 ('AST', 'double'),
 ('BIL', 'double'),
 ('CHE', 'double'),
 ('CHOL', 'string'),
 ('CREA', 'double'),
 ('GGT', 'double'),
 ('PROT', 'string'),
 ('dis', 'double'),
 ('gender', 'double'),
 ('tar', 'string')]

In [23]:
incol=['ALB','ALP','ALT','CHOL','PROT']

we need to convert all the string datatype columns to integer and we'll use a pipeline to do it all at once

In [24]:
from pyspark.ml import Pipeline

In [25]:
indexers = []

for column in incol:
    indexer = StringIndexer(inputCol=column, outputCol=f"{column}_new").fit(df)
    indexers.append(indexer)

pipeline = Pipeline(stages=indexers)
df = pipeline.fit(df).transform(df)

In [26]:
df.show(5)

+---+-------------+---+---+----+----+----+----+----+-----+----+-----+----+----+---+------+-------+-------+-------+--------+--------+
|_c0|     Category|Age|Sex| ALB| ALP| ALT| AST| BIL|  CHE|CHOL| CREA| GGT|PROT|dis|gender|ALB_new|ALP_new|ALT_new|CHOL_new|PROT_new|
+---+-------------+---+---+----+----+----+----+----+-----+----+-----+----+----+---+------+-------+-------+-------+--------+--------+
|  1|0=Blood Donor| 32|  m|38.5|52.5| 7.7|22.1| 7.5| 6.93|3.23|106.0|12.1|  69|0.0|   0.0|   51.0|    1.0|  320.0|   169.0|    30.0|
|  2|0=Blood Donor| 32|  m|38.5|70.3|  18|24.7| 3.9|11.17| 4.8| 74.0|15.6|76.5|0.0|   0.0|   51.0|   26.0|   91.0|   217.0|    65.0|
|  3|0=Blood Donor| 32|  m|46.9|74.7|36.2|52.6| 6.1| 8.84| 5.2| 86.0|33.2|79.3|0.0|   0.0|   27.0|  116.0|  131.0|    52.0|    70.0|
|  4|0=Blood Donor| 32|  m|43.2|  52|30.6|22.6|18.9| 7.33|4.74| 80.0|33.8|75.7|0.0|   0.0|   87.0|    9.0|  122.0|    19.0|    85.0|
|  5|0=Blood Donor| 32|  m|39.2|74.1|32.6|24.8| 9.6| 9.15|4.32| 76.0|

In [27]:
df.dtypes

[('_c0', 'int'),
 ('Category', 'string'),
 ('Age', 'int'),
 ('Sex', 'string'),
 ('ALB', 'string'),
 ('ALP', 'string'),
 ('ALT', 'string'),
 ('AST', 'double'),
 ('BIL', 'double'),
 ('CHE', 'double'),
 ('CHOL', 'string'),
 ('CREA', 'double'),
 ('GGT', 'double'),
 ('PROT', 'string'),
 ('dis', 'double'),
 ('gender', 'double'),
 ('ALB_new', 'double'),
 ('ALP_new', 'double'),
 ('ALT_new', 'double'),
 ('CHOL_new', 'double'),
 ('PROT_new', 'double')]

In [28]:
df1 = df.select(['Age', 'AST', 'BIL', 'CHE', 'CREA', 'GGT', 'gender', 'ALB_new', 'ALP_new', 'ALT_new', 'CHOL_new', 'PROT_new' , 'dis'])

In [29]:
df1.show(5)

+---+----+----+-----+-----+----+------+-------+-------+-------+--------+--------+---+
|Age| AST| BIL|  CHE| CREA| GGT|gender|ALB_new|ALP_new|ALT_new|CHOL_new|PROT_new|dis|
+---+----+----+-----+-----+----+------+-------+-------+-------+--------+--------+---+
| 32|22.1| 7.5| 6.93|106.0|12.1|   0.0|   51.0|    1.0|  320.0|   169.0|    30.0|0.0|
| 32|24.7| 3.9|11.17| 74.0|15.6|   0.0|   51.0|   26.0|   91.0|   217.0|    65.0|0.0|
| 32|52.6| 6.1| 8.84| 86.0|33.2|   0.0|   27.0|  116.0|  131.0|    52.0|    70.0|0.0|
| 32|22.6|18.9| 7.33| 80.0|33.8|   0.0|   87.0|    9.0|  122.0|    19.0|    85.0|0.0|
| 32|24.8| 9.6| 9.15| 76.0|29.9|   0.0|   80.0|  331.0|   31.0|   205.0|    53.0|0.0|
+---+----+----+-----+-----+----+------+-------+-------+-------+--------+--------+---+
only showing top 5 rows



In [30]:
df1=df1.toPandas().replace('NA',0).astype(float)

In [31]:
df1.dtypes

Age         float64
AST         float64
BIL         float64
CHE         float64
CREA        float64
GGT         float64
gender      float64
ALB_new     float64
ALP_new     float64
ALT_new     float64
CHOL_new    float64
PROT_new    float64
dis         float64
dtype: object

Converting features into feature vectors

In [32]:
from pyspark.ml.feature import VectorAssembler

In [33]:
print(df1.columns)

Index(['Age', 'AST', 'BIL', 'CHE', 'CREA', 'GGT', 'gender', 'ALB_new',
       'ALP_new', 'ALT_new', 'CHOL_new', 'PROT_new', 'dis'],
      dtype='object')


Converting pandas Dataframe back to pyspark dataframe

In [34]:
df1= sc.createDataFrame(df1)

In [35]:
cols=['Age', 'AST', 'BIL', 'CHE', 'CREA', 'GGT', 'gender', 'ALB_new', 'ALP_new', 'ALT_new', 'CHOL_new', 'PROT_new', 'dis']

In [36]:
fea= VectorAssembler(inputCols=cols, outputCol='features')

In [37]:
df2=fea.transform(df1)

In [38]:
df2.show()

+----+----+----+-----+-----+----+------+-------+-------+-------+--------+--------+---+--------------------+
| Age| AST| BIL|  CHE| CREA| GGT|gender|ALB_new|ALP_new|ALT_new|CHOL_new|PROT_new|dis|            features|
+----+----+----+-----+-----+----+------+-------+-------+-------+--------+--------+---+--------------------+
|32.0|22.1| 7.5| 6.93|106.0|12.1|   0.0|   51.0|    1.0|  320.0|   169.0|    30.0|0.0|[32.0,22.1,7.5,6....|
|32.0|24.7| 3.9|11.17| 74.0|15.6|   0.0|   51.0|   26.0|   91.0|   217.0|    65.0|0.0|[32.0,24.7,3.9,11...|
|32.0|52.6| 6.1| 8.84| 86.0|33.2|   0.0|   27.0|  116.0|  131.0|    52.0|    70.0|0.0|[32.0,52.6,6.1,8....|
|32.0|22.6|18.9| 7.33| 80.0|33.8|   0.0|   87.0|    9.0|  122.0|    19.0|    85.0|0.0|[32.0,22.6,18.9,7...|
|32.0|24.8| 9.6| 9.15| 76.0|29.9|   0.0|   80.0|  331.0|   31.0|   205.0|    53.0|0.0|[32.0,24.8,9.6,9....|
|32.0|19.7|12.3| 9.92|111.0|91.0|   0.0|   33.0|  217.0|   50.0|    65.0|    39.0|0.0|[32.0,19.7,12.3,9...|
|32.0|17.8| 8.5| 7.01| 70.0|

In [39]:
traind,testd= df2.randomSplit([0.69,0.31])

Using Logistics Regression

In [40]:
from pyspark.ml.classification import LogisticRegression

In [41]:
lr= LogisticRegression(featuresCol='features', labelCol='dis')

In [42]:
lrm=lr.fit(traind)

In [43]:
y=lrm.transform(testd)

In [44]:
print(y.columns)

['Age', 'AST', 'BIL', 'CHE', 'CREA', 'GGT', 'gender', 'ALB_new', 'ALP_new', 'ALT_new', 'CHOL_new', 'PROT_new', 'dis', 'features', 'rawPrediction', 'probability', 'prediction']


In [45]:
y.select(['dis', 'rawPrediction', 'probability', 'prediction']).show()

+---+--------------------+--------------------+----------+
|dis|       rawPrediction|         probability|prediction|
+---+--------------------+--------------------+----------+
|0.0|[106.042115101311...|[0.99999999999999...|       0.0|
|0.0|[129.309596668961...|[1.0,4.0295589392...|       0.0|
|0.0|[106.396825184891...|[1.0,2.9066656679...|       0.0|
|0.0|[87.1281072106328...|[0.99999999999998...|       0.0|
|0.0|[106.302912339158...|[0.99999999999999...|       0.0|
|0.0|[87.9997635429729...|[0.97217391102438...|       0.0|
|0.0|[101.334132258812...|[1.0,8.1168005868...|       0.0|
|0.0|[89.8236588602807...|[1.0,1.2277951721...|       0.0|
|0.0|[100.525785949037...|[0.99999999999999...|       0.0|
|0.0|[93.262505284484,...|[0.99999748883694...|       0.0|
|0.0|[117.877138273464...|[1.0,1.8922452351...|       0.0|
|0.0|[96.3817711696228...|[1.0,1.4678409720...|       0.0|
|0.0|[104.269492354541...|[1.0,2.1469794485...|       0.0|
|0.0|[105.391233937039...|[1.0,1.3208529932...|       0.

Model Evaluation

In [46]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [47]:
eval= MulticlassClassificationEvaluator(labelCol='dis', metricName="accuracy")

In [48]:
eval.evaluate(y)

0.9646464646464646

In [49]:
from pyspark.mllib.evaluation import MulticlassMetrics

Creating a RDD consisting of dis(disease) and prediction

In [50]:
lrmetrics =MulticlassMetrics(y['dis', 'prediction'].rdd)



In [51]:
print(f"the accuracy of this model is {round(lrmetrics.accuracy*100,2)}% ")

the accuracy of this model is 96.46% 
