In [None]:
#Installation
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
#spark
import findspark
findspark.init()

In [None]:
#Creating Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ClassificationwithSpark").getOrCreate()

In [None]:
#import necessary functions
from itertools import chain
from pyspark.sql.functions import count, mean, when, lit, create_map, regexp_extract

In [None]:
!wget -O "diabates.csv" "https://raw.githubusercontent.com/DeepProgram/random/main/diabates.csv"

--2023-11-15 16:28:11--  https://raw.githubusercontent.com/DeepProgram/random/main/diabates.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.108.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 62058 (61K) [text/plain]
Saving to: ‘diabates.csv’


2023-11-15 16:28:12 (2.22 MB/s) - ‘diabates.csv’ saved [62058/62058]



In [None]:
df = spark.read.csv('/content/diabates.csv',
                     header=True, inferSchema=True)

In [None]:
#Printing information about each column
df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [None]:
df.show(4)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|      0|33.6|                   0.127| 47|      0|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|            0|            0|      0|44.2|                    0.63| 31|      0|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      0|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
only showing top 4 rows



In [None]:
df.limit(100).toPandas()

Unnamed: 0,Pregnancies,Glucose,BloodPressure,SkinThickness,Insulin,BMI,DiabetesPedigreeFunction,Age,Outcome
0,2,138,62,35,0,33.6,0.127,47,0
1,0,84,82,31,125,38.2,0.233,23,0
2,0,145,0,0,0,44.2,0.630,31,0
3,0,135,68,42,250,42.3,0.365,24,0
4,1,139,62,41,480,40.7,0.536,21,0
...,...,...,...,...,...,...,...,...,...
95,6,144,72,27,228,33.9,0.255,40,0
96,2,92,62,28,0,31.6,0.130,24,0
97,1,71,48,18,76,20.4,0.323,22,0
98,6,93,50,30,64,28.7,0.356,23,0


In [None]:
#Summarizing key information of the selected columns (describe() method can also be used)
df.select('Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI', 'DiabetesPedigreeFunction', 'Age').summary().show()

+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+
|summary|      Pregnancies|           Glucose|     BloodPressure|    SkinThickness|          Insulin|               BMI|DiabetesPedigreeFunction|               Age|
+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+
|  count|             2000|              2000|              2000|             2000|             2000|              2000|                    2000|              2000|
|   mean|           3.7035|          121.1825|           69.1455|           20.935|           80.254|32.192999999999984|     0.47092999999999974|           33.0905|
| stddev|3.306063032730656|32.068635649902916|19.188314815604098|16.10324290992682|111.1805335457595| 8.149900701279762|      0.3235525586811429|11.786423106049496|
|    min| 

In [None]:
#To know the shape of the dataframe
print('Number of rows: \t', df.count())
print('Number of columns: \t', len(df.columns))

Number of rows: 	 2000
Number of columns: 	 9


In [None]:
#Grouping a column based on the type of data present there
df.groupBy('Outcome').count().show()

+-------+-----+
|Outcome|count|
+-------+-----+
|      1|  684|
|      0| 1316|
+-------+-----+



In [None]:
df.groupBy('Outcome').mean('Age').show()

+-------+------------------+
|Outcome|          avg(Age)|
+-------+------------------+
|      1| 32.97514619883041|
|      0|33.150455927051674|
+-------+------------------+



In [None]:
#Printing number of column values that has null (Train set)
for col in df.columns:
    print(col.ljust(25), df.filter(df[col].isNull()).count())

Pregnancies               0
Glucose                   0
BloodPressure             0
SkinThickness             0
Insulin                   0
BMI                       0
DiabetesPedigreeFunction  0
Age                       0
Outcome                   0


In [None]:
# importing required libraries
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression,\
                    RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [None]:
#converitng dataset for model
vec_asmbl = VectorAssembler(inputCols=df.columns[:-1],
                           outputCol='features')

df = vec_asmbl.transform(df).select('features', 'Outcome')
df.show(4, truncate=False)

+-------------------------------------------+-------+
|features                                   |Outcome|
+-------------------------------------------+-------+
|[2.0,138.0,62.0,35.0,0.0,33.6,0.127,47.0]  |0      |
|[0.0,84.0,82.0,31.0,125.0,38.2,0.233,23.0] |0      |
|(8,[1,5,6,7],[145.0,44.2,0.63,31.0])       |0      |
|[0.0,135.0,68.0,42.0,250.0,42.3,0.365,24.0]|0      |
+-------------------------------------------+-------+
only showing top 4 rows



In [None]:
# Creating train and validation set
train_df, valid_df = df.randomSplit([0.8, 0.2])

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol='Outcome',
                                          metricName='accuracy')

In [None]:
#Ridge Regression model
ridge = LogisticRegression(labelCol='Outcome',
                        maxIter=100,
                        elasticNetParam=0,
                        regParam=0.03)

model = ridge.fit(train_df)
pred = model.transform(valid_df)
evaluator.evaluate(pred)

0.6675531914893617

In [None]:
#Lasso Regression model
lasso = LogisticRegression(labelCol='Outcome',
                           maxIter=100,
                           elasticNetParam=1,
                           regParam=0.0003)

model = lasso.fit(train_df)
pred = model.transform(valid_df)
evaluator.evaluate(pred)

0.6675531914893617

In [None]:
df.first()

Row(features=DenseVector([2.0, 138.0, 62.0, 35.0, 0.0, 33.6, 0.127, 47.0]), Outcome=0)

In [None]:
#Random Forest Classifier Model
rf = RandomForestClassifier(labelCol='Outcome',
                           numTrees=100, maxDepth=3)

model = rf.fit(train_df)
pred = model.transform(valid_df)
evaluator.evaluate(pred)

0.6675531914893617

In [None]:
from pyspark.sql.types import StructType, StructField, FloatType, IntegerType

In [None]:
# Sample data for demonstration (replace this with your actual data)
# data = [(3,163,70,18,105,31.6,0.268,28),]
data = [(2,75,64,24,55,29.7,0.37,33),]
schema = StructType([
    StructField("Pregnancies", IntegerType(), True),
    StructField("Glucose", IntegerType(), True),
    StructField("BloodPressure", IntegerType(), True),
    StructField("SkinThickness", IntegerType(), True),
    StructField("Insulin", IntegerType(), True),
    StructField("BMI", FloatType(), True),
    StructField("DiabetesPedigreeFunction", FloatType(), True),
    StructField("Age", IntegerType(), True)
])

single_row_df = spark.createDataFrame(data, schema)

# Assuming "Outcome" is the label column and "features" is the feature vector
vector_assembler = VectorAssembler(inputCols=["Pregnancies", "Glucose", "BloodPressure", "SkinThickness", "Insulin", "BMI", "DiabetesPedigreeFunction", "Age"],
                                   outputCol="features")

single_row_df = vector_assembler.transform(single_row_df)

# Use the trained model to make predictions
prediction = model.transform(single_row_df)

# Display the prediction
prediction.select("features", "rawPrediction", "probability", "prediction").show()

+--------------------+--------------------+--------------------+----------+
|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|[2.0,75.0,64.0,24...|[67.3098925120708...|[0.67309892512070...|       0.0|
+--------------------+--------------------+--------------------+----------+



In [None]:
#Gradient Boosting Classifier
gb = GBTClassifier(labelCol='Outcome', maxIter=100, maxDepth=3)

model = gb.fit(train_df)
pred = model.transform(valid_df)
evaluator.evaluate(pred)

0.5773195876288659