In [55]:
#Install Java Development kit for Spark
!apt-get install openjdk-8-jdk

Reading package lists... Done
Building dependency tree       
Reading state information... Done
openjdk-8-jdk is already the newest version (8u372-ga~us1-0ubuntu1~20.04).
0 upgraded, 0 newly installed, 0 to remove and 15 not upgraded.


In [56]:
import os

In [57]:
#Set the JAVA_HOME env variable
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-8-openjdk-amd64"

In [58]:
#Current working directory
!pwd

/content


In [59]:
!echo $JAVA_HOME

/usr/lib/jvm/java-8-openjdk-amd64


In [60]:
#Install PySpark with latest version
!pip install pyspark==3.0.0



In [61]:
!pip install findspark
import findspark
findspark.init()



In [62]:
from pyspark.sql import SparkSession, SQLContext
spark = SparkSession.builder.master("local").appName("Test Spark").config("spark.some.config.option", "some-value").getOrCreate()

In [63]:
sc = spark.sparkContext

In [64]:
spark

In [65]:
#mount your drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [66]:
#Read the csv file
df=spark.read.csv('/content/drive/MyDrive/Colab Notebooks/dataset/Dataset_CKD.csv',inferSchema=True,header=True)

In [67]:
#Check dimension's
print((df.count(),len(df.columns)))

(34991, 25)


In [68]:
#Check for the schema
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- bp: integer (nullable = true)
 |-- sg: double (nullable = true)
 |-- al: integer (nullable = true)
 |-- su: integer (nullable = true)
 |-- rbc: string (nullable = true)
 |-- pc: string (nullable = true)
 |-- pcc: string (nullable = true)
 |-- ba: string (nullable = true)
 |-- bgr: integer (nullable = true)
 |-- bu: double (nullable = true)
 |-- sc: double (nullable = true)
 |-- sod: double (nullable = true)
 |-- pot: double (nullable = true)
 |-- hemo: double (nullable = true)
 |-- pcv: integer (nullable = true)
 |-- wc: integer (nullable = true)
 |-- rc: double (nullable = true)
 |-- htn: string (nullable = true)
 |-- dm: string (nullable = true)
 |-- cad: string (nullable = true)
 |-- appet: string (nullable = true)
 |-- pe: string (nullable = true)
 |-- ane: string (nullable = true)
 |-- class: string (nullable = true)



In [69]:
#Top 5 records
df.show(5)

+---+---+-----+---+---+------+-------+--------------------+----------+---+-----+----+-----+---+----+---+----+---+---+---+---+-----+---+---+-----+
|age| bp|   sg| al| su|   rbc|     pc|                 pcc|        ba|bgr|   bu|  sc|  sod|pot|hemo|pcv|  wc| rc|htn| dm|cad|appet| pe|ane|class|
+---+---+-----+---+---+------+-------+--------------------+----------+---+-----+----+-----+---+----+---+----+---+---+---+---+-----+---+---+-----+
| 50|150| 1.02|  1|  0|normal| normal|         not present|notpresent|181| 96.0| 8.4|135.0|4.0|10.3| 44|7800|5.2| no| no| no| good| no| no|  ckd|
| 69|140| 1.02|  4|  0|normal| normal|not present         |notpresent|121| 73.0| 7.3|130.0|5.4| 9.4| 38|6000|3.9| no| no| no| good| no|yes|  ckd|
| 69|160| 1.02|  2|  3|normal| normal|         not present|notpresent|423|111.0| 7.0|129.0|4.9|10.6| 31|7500|4.6| no|yes| no| poor| no| no|  ckd|
| 71|140|1.005|  4|  0|normal|abnomal|             present|notpresent|106|119.0|11.6|131.0|5.5| 7.2| 32|6700|4.4| no| no| no

In [70]:
#Datatypes of the columns
df.dtypes

[('age', 'int'),
 ('bp', 'int'),
 ('sg', 'double'),
 ('al', 'int'),
 ('su', 'int'),
 ('rbc', 'string'),
 ('pc', 'string'),
 ('pcc', 'string'),
 ('ba', 'string'),
 ('bgr', 'int'),
 ('bu', 'double'),
 ('sc', 'double'),
 ('sod', 'double'),
 ('pot', 'double'),
 ('hemo', 'double'),
 ('pcv', 'int'),
 ('wc', 'int'),
 ('rc', 'double'),
 ('htn', 'string'),
 ('dm', 'string'),
 ('cad', 'string'),
 ('appet', 'string'),
 ('pe', 'string'),
 ('ane', 'string'),
 ('class', 'string')]

In [75]:
#Drop unwanted columns
#my_data = df.drop(*['contact', 'day', 'month','default'])
my_data = df.alias('my_data')
#my_data = df.copy()

In [76]:
# get the dimensions of the data
(my_data.count() , len(my_data.columns))

(34991, 25)

In [77]:
my_data.describe().show()

+-------+------------------+-----------------+--------------------+------------------+------------------+------+-------+-----------+----------+------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+-----+-----+-----+-----+-----+-----+------+
|summary|               age|               bp|                  sg|                al|                su|   rbc|     pc|        pcc|        ba|               bgr|               bu|                sc|               sod|               pot|              hemo|              pcv|               wc|                rc|  htn|   dm|  cad|appet|   pe|  ane| class|
+-------+------------------+-----------------+--------------------+------------------+------------------+------+-------+-----------+----------+------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+----------------

In [78]:
# import sql function pyspark
import pyspark.sql.functions as f

# null values in each column
data_agg = my_data.agg(*[f.count(f.when(f.isnull(c), c)).alias(c) for c in my_data.columns])
data_agg.show()

+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+---+---+---+---+---+---+-----+---+---+-----+
|age| bp| sg| al| su|rbc| pc|pcc| ba|bgr| bu| sc|sod|pot|hemo|pcv| wc| rc|htn| dm|cad|appet| pe|ane|class|
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+---+---+---+---+---+---+-----+---+---+-----+
|367|615|350|346|366|  0|477| 67| 67| 96|171|143| 68| 68| 100| 65|107|107| 27| 27|  0|   13| 13|  0|    0|
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+---+---+---+---+---+---+-----+---+---+-----+



In [79]:
# value counts of columns
my_data.groupBy('rbc').count().show()
print()
my_data.groupBy('pc').count().show()
print()
my_data.groupBy('pcc').count().show()
print()
my_data.groupBy('ba').count().show()
print()
my_data.groupBy('htn').count().show()
print()
my_data.groupBy('dm').count().show()
print()
my_data.groupBy('cad').count().show()
print()
my_data.groupBy('appet').count().show()
print()
my_data.groupBy('pe').count().show()
print()
my_data.groupBy('ane').count().show()
print()
my_data.groupBy('class').count().show()
print()

+------+-----+
|   rbc|count|
+------+-----+
|normal|34991|
+------+-----+


+--------+-----+
|      pc|count|
+--------+-----+
|    null|  477|
|  normal|22792|
|abnormal|11167|
| abnomal|  555|
+--------+-----+


+--------------------+-----+
|                 pcc|count|
+--------------------+-----+
|             present| 3873|
|          notpresent|22315|
|                null|   67|
|not present         |   28|
|         not present| 8708|
+--------------------+-----+


+----------+-----+
|        ba|count|
+----------+-----+
|   present| 3947|
|notpresent|30977|
|      null|   67|
+----------+-----+


+----+-----+
| htn|count|
+----+-----+
|null|   27|
|  no|19981|
| yes|14983|
+----+-----+


+----+-----+
|  dm|count|
+----+-----+
|null|   27|
|	yes|  130|
| 	no|  170|
|  no|21066|
| yes|13574|
| yes|   24|
+----+-----+


+---+-----+
|cad|count|
+---+-----+
| no|24779|
|yes|10212|
+---+-----+


+-----+-----+
|appet|count|
+-----+-----+
| null|   13|
|good |   79|
| poor| 8926|
| go

In [80]:
my_data.dtypes

[('age', 'int'),
 ('bp', 'int'),
 ('sg', 'double'),
 ('al', 'int'),
 ('su', 'int'),
 ('rbc', 'string'),
 ('pc', 'string'),
 ('pcc', 'string'),
 ('ba', 'string'),
 ('bgr', 'int'),
 ('bu', 'double'),
 ('sc', 'double'),
 ('sod', 'double'),
 ('pot', 'double'),
 ('hemo', 'double'),
 ('pcv', 'int'),
 ('wc', 'int'),
 ('rc', 'double'),
 ('htn', 'string'),
 ('dm', 'string'),
 ('cad', 'string'),
 ('appet', 'string'),
 ('pe', 'string'),
 ('ane', 'string'),
 ('class', 'string')]

In [81]:
my_data.columns

['age',
 'bp',
 'sg',
 'al',
 'su',
 'rbc',
 'pc',
 'pcc',
 'ba',
 'bgr',
 'bu',
 'sc',
 'sod',
 'pot',
 'hemo',
 'pcv',
 'wc',
 'rc',
 'htn',
 'dm',
 'cad',
 'appet',
 'pe',
 'ane',
 'class']

In [82]:
#Preprocessing steps
# import packages
# from pyspark.ml.feature import StringIndexer, OneHotEncoder
# # conversion
# indexer = StringIndexer(inputCol='feature1', outputCol='feature1_numeric').fit(spark_df)
# indexed_df = indexer.transform(spark_df)
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# create object of StringIndexer class and specify input and output column
SI_rbc = StringIndexer(inputCol='rbc',outputCol='rbc_Index')
SI_pc = StringIndexer(inputCol='pc',outputCol='pc_Index')
SI_pcc = StringIndexer(inputCol='pcc',outputCol='pcc_Index')
SI_ba = StringIndexer(inputCol='ba',outputCol='ba_Index')
SI_htn = StringIndexer(inputCol='htn',outputCol='htn_Index')
SI_dm = StringIndexer(inputCol='dm',outputCol='dm_Index')
SI_cad = StringIndexer(inputCol='cad',outputCol='cad_Index')
SI_appet = StringIndexer(inputCol='appet',outputCol='appet_Index')
SI_pe = StringIndexer(inputCol='pe',outputCol='pe_Index')
SI_ane = StringIndexer(inputCol='ane',outputCol='ane_Index')
SI_predict = StringIndexer(inputCol='class',outputCol='predict_Index')

# transform the data
my_data = SI_rbc.fit(my_data).transform(my_data)
my_data = SI_pc.fit(my_data).transform(my_data)
my_data = SI_pcc.fit(my_data).transform(my_data)
my_data = SI_ba.fit(my_data).transform(my_data)
my_data = SI_htn.fit(my_data).transform(my_data)
my_data = SI_dm.fit(my_data).transform(my_data)
my_data = SI_cad.fit(my_data).transform(my_data)
my_data = SI_appet.fit(my_data).transform(my_data)
my_data = SI_pe.fit(my_data).transform(my_data)
my_data = SI_ane.fit(my_data).transform(my_data)
my_data = SI_predict.fit(my_data).transform(my_data)

In [87]:
my_data.select('rbc', 'rbc_Index', 'pc', 'pc_Index', 'pcc', 'pcc_Index', 'ba', 'ba_Index', 'htn', 'htn_Index', 'dm', 'dm_Index', 'cad', 'cad_Index', 'appet', 'appet_Index', 'pe', 'pe_Index', 'ane', 'ane_Index', 'class', 'predict_Index').show(10)

+------+---------+-------+--------+--------------------+---------+----------+--------+---+---------+---+--------+---+---------+-----+-----------+---+--------+---+---------+-----+-------------+
|   rbc|rbc_Index|     pc|pc_Index|                 pcc|pcc_Index|        ba|ba_Index|htn|htn_Index| dm|dm_Index|cad|cad_Index|appet|appet_Index| pe|pe_Index|ane|ane_Index|class|predict_Index|
+------+---------+-------+--------+--------------------+---------+----------+--------+---+---------+---+--------+---+---------+-----+-----------+---+--------+---+---------+-----+-------------+
|normal|      0.0| normal|     0.0|         not present|      1.0|notpresent|     0.0| no|      0.0| no|     0.0| no|      0.0| good|        0.0| no|     0.0| no|      0.0|  ckd|          0.0|
|normal|      0.0| normal|     0.0|not present         |      3.0|notpresent|     0.0| no|      0.0| no|     0.0| no|      0.0| good|        0.0| no|     0.0|yes|      1.0|  ckd|          0.0|
|normal|      0.0| normal|     0.0|

In [89]:
from pyspark.ml.feature import VectorAssembler

# specify the input and output columns of the vector assembler
assembler = VectorAssembler(inputCols=['age',
                                       'bp',
                                       'rbc_Index',
                                       'pc_Index',
                                       'pcc_Index',
                                       'ba_Index',
                                       'bgr',
                                       'bu',
                                       'sc',
                                       'sod',
                                       'pot',
                                       'hemo',
                                       'pcv',
                                       'wc',
                                       'rc',
                                       'htn_Index',
                                       'dm_Index',
                                       'cad_Index',
                                       'appet_Index',
                                       'pe_Index',
                                       'ane_Index'
                                       ],
                           outputCol='features')

# fill the null values
my_data = my_data.fillna(0)

# transform the data
final_data = assembler.transform(my_data)

In [95]:
# view the transformed vector
final_data.select('features','predict_Index').show()

+--------------------+-------------+
|            features|predict_Index|
+--------------------+-------------+
|(21,[0,1,4,6,7,8,...|          0.0|
|[69.0,140.0,0.0,0...|          0.0|
|[69.0,160.0,0.0,0...|          0.0|
|[71.0,140.0,0.0,2...|          0.0|
|[65.0,170.0,0.0,0...|          0.0|
|[70.0,160.0,0.0,0...|          0.0|
|[35.0,170.0,0.0,0...|          0.0|
|[55.0,120.0,0.0,2...|          0.0|
|[59.0,130.0,0.0,2...|          0.0|
|[55.0,130.0,0.0,2...|          0.0|
|[43.0,120.0,0.0,2...|          0.0|
|[68.0,100.0,0.0,2...|          0.0|
|[68.0,160.0,0.0,0...|          0.0|
|[53.0,130.0,0.0,0...|          0.0|
|[50.0,150.0,0.0,2...|          0.0|
|[69.0,140.0,0.0,0...|          0.0|
|(21,[0,1,4,6,7,8,...|          0.0|
|[71.0,120.0,0.0,0...|          0.0|
|[65.0,140.0,0.0,0...|          0.0|
|[70.0,120.0,0.0,0...|          0.0|
+--------------------+-------------+
only showing top 20 rows



In [99]:
#Model_Dataframe
model_df = final_data.select(['features','predict_Index'])
model_df = model_df.withColumnRenamed("predict_Index","label")
model_df.printSchema()

root
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)



In [100]:

#Split into training & testing Dataframe
training_df,test_df = model_df.randomSplit([0.75,0.25])

In [101]:
#Create a logistic regression model object
from pyspark.ml.classification import LogisticRegression
log_reg=LogisticRegression().fit(training_df)

Py4JJavaError: ignored

In [102]:
lr_summary=log_reg.summary

NameError: ignored

In [None]:
#Overall accuracy of the classification model
lr_summary.accuracy

1.0

In [None]:
#Precision of both classes
print(lr_summary.precisionByLabel)

[1.0, 1.0]


In [None]:
#Get Preditions
predictions = log_reg.transform(test_df)

In [None]:

predictions.select('label','prediction').show(50)

+-----+----------+
|label|prediction|
+-----+----------+
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    0|       0.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    0|       0.0|
|    0|       0.0|
|    1|       1.0|
|    0|       0.0|
|    1|       1.0|
|    0|       0.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    0|       0.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    1|       1.0|
|    0|       0.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    0|     

In [None]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
log_reg=DecisionTreeRegressor().fit(training_df)

In [None]:
test_df.show(truncate=False)

+----------------------------------------------------------------------------------------------------------------------+-----+
|features                                                                                                              |label|
+----------------------------------------------------------------------------------------------------------------------+-----+
|(24,[0,1,2,3,4,9,10,11,12,13,14,15,16,17],[42.0,170.0,1.005,4.0,2.0,109.0,204.0,18.4,131.0,5.5,10.9,24.0,5300.0,4.6]) |1    |
|(24,[0,1,2,4,8,9,10,11,12,13,14,15,16,17],[61.0,150.0,1.025,2.0,1.0,129.0,153.0,10.5,130.0,5.6,10.7,33.0,12100.0,5.2])|1    |
|(24,[0,1,2,5,9,10,11,12,13,14,15,16,17,18],[38.0,170.0,1.025,1.0,273.0,102.0,7.1,135.0,4.0,12.9,36.0,6900.0,3.7,1.0]) |1    |
|(24,[0,1,2,7,9,10,11,12,13,14,15,16,17,19],[29.0,130.0,1.025,1.0,153.0,158.0,8.0,145.0,4.9,11.9,42.0,5300.0,5.0,1.0]) |1    |
|[18.0,140.0,1.005,4.0,0.0,1.0,0.0,1.0,0.0,106.0,119.0,11.6,131.0,5.5,7.2,32.0,6700.0,4.4,1.0,0.0,0.0,0.0,1.0,1

In [None]:
#Get Preditions
predictions = log_reg.transform(test_df)

In [None]:
predictions.show()

+--------------------+-----+----------+
|            features|label|prediction|
+--------------------+-----+----------+
|(24,[0,1,2,3,4,9,...|    1|       1.0|
|(24,[0,1,2,4,8,9,...|    1|       1.0|
|(24,[0,1,2,5,9,10...|    1|       1.0|
|(24,[0,1,2,7,9,10...|    1|       1.0|
|[18.0,140.0,1.005...|    1|       1.0|
|[18.0,160.0,1.005...|    1|       1.0|
|[19.0,120.0,1.02,...|    1|       1.0|
|[20.0,70.0,1.02,0...|    0|       0.0|
|[22.0,120.0,1.02,...|    1|       1.0|
|[23.0,150.0,1.01,...|    1|       1.0|
|[23.0,170.0,1.025...|    1|       1.0|
|[24.0,160.0,1.005...|    1|       1.0|
|[25.0,120.0,1.02,...|    1|       1.0|
|[25.0,150.0,1.01,...|    1|       1.0|
|[25.0,150.0,1.02,...|    1|       1.0|
|[28.0,60.0,1.02,0...|    0|       0.0|
|[28.0,70.0,1.025,...|    0|       0.0|
|[28.0,100.0,1.015...|    1|       1.0|
|[29.0,70.0,1.02,0...|    0|       0.0|
|[29.0,150.0,1.01,...|    1|       1.0|
+--------------------+-----+----------+
only showing top 20 rows



In [None]:
df_accuracy = MulticlassClassificationEvaluator(labelCol="label",
                                                metricName="accuracy").evaluate(predictions)

In [None]:
df_accuracy

1.0

In [None]:
# Random Forest
from pyspark.ml.classification import RandomForestClassifier

In [None]:
rf_classififer = RandomForestClassifier(labelCol="label",
                                        numTrees=50).fit(training_df)

In [None]:
rf_prediction = rf_classififer.transform(test_df)

In [None]:
rf_prediction.show()

+--------------------+-----+-------------+-----------+----------+
|            features|label|rawPrediction|probability|prediction|
+--------------------+-----+-------------+-----------+----------+
|(24,[0,1,2,3,4,9,...|    1|   [0.0,50.0]|  [0.0,1.0]|       1.0|
|(24,[0,1,2,4,8,9,...|    1|   [0.0,50.0]|  [0.0,1.0]|       1.0|
|(24,[0,1,2,5,9,10...|    1|   [0.0,50.0]|  [0.0,1.0]|       1.0|
|(24,[0,1,2,7,9,10...|    1|   [0.0,50.0]|  [0.0,1.0]|       1.0|
|[18.0,140.0,1.005...|    1|   [0.0,50.0]|  [0.0,1.0]|       1.0|
|[18.0,160.0,1.005...|    1|   [0.0,50.0]|  [0.0,1.0]|       1.0|
|[19.0,120.0,1.02,...|    1|   [2.0,48.0]|[0.04,0.96]|       1.0|
|[20.0,70.0,1.02,0...|    0|   [50.0,0.0]|  [1.0,0.0]|       0.0|
|[22.0,120.0,1.02,...|    1|   [0.0,50.0]|  [0.0,1.0]|       1.0|
|[23.0,150.0,1.01,...|    1|   [0.0,50.0]|  [0.0,1.0]|       1.0|
|[23.0,170.0,1.025...|    1|   [0.0,50.0]|  [0.0,1.0]|       1.0|
|[24.0,160.0,1.005...|    1|   [0.0,50.0]|  [0.0,1.0]|       1.0|
|[25.0,120

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
rf_acu = BinaryClassificationEvaluator(labelCol="label").evaluate(rf_prediction)

In [None]:
rf_acu

1.0

In [None]:
df.columns

['age',
 'bp',
 'sg',
 'al',
 'su',
 'rbc',
 'pc',
 'pcc',
 'ba',
 'bgr',
 'bu',
 'sc',
 'sod',
 'pot',
 'hemo',
 'pcv',
 'wc',
 'rc',
 'htn',
 'dm',
 'cad',
 'appet',
 'pe',
 'ane',
 'predict']

In [None]:
rf_classififer.featureImportances

SparseVector(24, {1: 0.2369, 2: 0.0005, 3: 0.0006, 6: 0.0007, 8: 0.0003, 9: 0.0006, 10: 0.2059, 11: 0.2673, 12: 0.0092, 13: 0.0177, 14: 0.1085, 15: 0.0972, 17: 0.0303, 18: 0.018, 20: 0.0015, 21: 0.0022, 22: 0.001, 23: 0.0015})