## Classification using SparkML

In [1]:
!pip install pyspark==3.1.2 -q
!pip install findspark -q

In [19]:
import findspark
findspark.init()
from pyspark.sql import  SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

# import functions/Classes for metrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [9]:
spark = SparkSession.builder.appName("Classification using SparkML").getOrCreate()

#### Load the data in a csv file into a dataframe

In [6]:
  !curl -o drybeans.csv https://raw.githubusercontent.com/AdelOuledSaid/Machine-learning-with-Apache-Spark-/main/Prediction%20Model%20using%20Linear%20Regression/drybeans.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
 59 2426k   59 1440k    0     0  1145k      0  0:00:02  0:00:01  0:00:01 1146k
100 2426k  100 2426k    0     0  1319k      0  0:00:01  0:00:01 --:--:-- 1320k


In [10]:
beans_data = spark.read.csv("drybeans.csv", header=True, inferSchema=True) #Load the dataset into the spark dataframe

In [11]:
beans_data.show(5)

+-----+---------+---------------+---------------+------------+------------+----------+-------------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+-----+
| Area|Perimeter|MajorAxisLength|MinorAxisLength|AspectRation|Eccentricity|ConvexArea|EquivDiameter|     Extent|   Solidity|  roundness|Compactness|ShapeFactor1|ShapeFactor2|ShapeFactor3|ShapeFactor4|Class|
+-----+---------+---------------+---------------+------------+------------+----------+-------------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+-----+
|28395|  610.291|    208.1781167|     173.888747| 1.197191424| 0.549812187|     28715|  190.1410973|0.763922518|0.988855999|0.958027126|0.913357755| 0.007331506| 0.003147289| 0.834222388| 0.998723889|SEKER|
|28734|  638.018|    200.5247957|    182.7344194| 1.097356461| 0.411785251|     29172|  191.2727505|0.783968133|0.984985603|0.887033637|0.953860842| 0.006978659| 0.00356362

In [12]:
beans_data.printSchema()

root
 |-- Area: integer (nullable = true)
 |-- Perimeter: double (nullable = true)
 |-- MajorAxisLength: double (nullable = true)
 |-- MinorAxisLength: double (nullable = true)
 |-- AspectRation: double (nullable = true)
 |-- Eccentricity: double (nullable = true)
 |-- ConvexArea: integer (nullable = true)
 |-- EquivDiameter: double (nullable = true)
 |-- Extent: double (nullable = true)
 |-- Solidity: double (nullable = true)
 |-- roundness: double (nullable = true)
 |-- Compactness: double (nullable = true)
 |-- ShapeFactor1: double (nullable = true)
 |-- ShapeFactor2: double (nullable = true)
 |-- ShapeFactor3: double (nullable = true)
 |-- ShapeFactor4: double (nullable = true)
 |-- Class: string (nullable = true)



In [13]:
beans_data.groupBy('class').count().orderBy('count').show()

+--------+-----+
|   class|count|
+--------+-----+
|  BOMBAY|  522|
|BARBUNYA| 1322|
|    CALI| 1630|
|   HOROZ| 1928|
|   SEKER| 2027|
|    SIRA| 2636|
|DERMASON| 3546|
+--------+-----+



In [21]:
indexer=  StringIndexer( inputCol = 'Class' ,  outputCol='label')
beans_data= indexer.fit(beans_data).transform(beans_data)

In [23]:
beans_data.groupBy('label').count().orderBy('count').show()

+-----+-----+
|label|count|
+-----+-----+
|  6.0|  522|
|  5.0| 1322|
|  4.0| 1630|
|  3.0| 1928|
|  2.0| 2027|
|  1.0| 2636|
|  0.0| 3546|
+-----+-----+



#####  Identify the label column and the input columns

In [24]:
assembler = VectorAssembler( inputCols = ["Area","Perimeter","Solidity","roundness","Compactness"] , outputCol = 'features')
beans_transformed_data = assembler.transform(beans_data)

In [28]:
beans_transformed_data.select("features","label").show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[28395.0,610.291,...|  2.0|
|[28734.0,638.018,...|  2.0|
|[29380.0,624.11,0...|  2.0|
|[30008.0,645.884,...|  2.0|
|[30140.0,620.134,...|  2.0|
+--------------------+-----+
only showing top 5 rows



####  Split the data

In [29]:
(training_data, testing_data) = beans_transformed_data.randomSplit([0.7, 0.3], seed=42)


In [30]:
lr = LogisticRegression(featuresCol="features", labelCol="label")  # Build and Train a Logistic Regression Model
model = lr.fit(training_data)

In [31]:
predictions = model.transform(testing_data) ## Make predictions on testing data

In [32]:
# Evaluate model performance
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy =", accuracy)


Accuracy = 0.9140055318078953


In [33]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
precision = evaluator.evaluate(predictions)
print("Precision =", precision)


Precision = 0.9145127427478639


In [34]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
recall = evaluator.evaluate(predictions)
print("Recall =", recall)

Recall = 0.9140055318078953


In [35]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1_score = evaluator.evaluate(predictions)
print("F1 score = ", f1_score)

F1 score =  0.9141162063478364


In [36]:
spark.stop()