#### Import modules

In [1]:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

# import functions/Classes for metrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#### Create Spark Session

In [2]:
spark = SparkSession.builder.appName("Classification using SparkML").getOrCreate()

#### Load the data

In [4]:
bean_data = spark.read.csv('drybeans.csv', header=True, inferSchema=True)

In [5]:
bean_data.show(5)

+-----+---------+---------------+---------------+------------+------------+----------+-------------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+-----+
| Area|Perimeter|MajorAxisLength|MinorAxisLength|AspectRation|Eccentricity|ConvexArea|EquivDiameter|     Extent|   Solidity|  roundness|Compactness|ShapeFactor1|ShapeFactor2|ShapeFactor3|ShapeFactor4|Class|
+-----+---------+---------------+---------------+------------+------------+----------+-------------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+-----+
|28395|  610.291|    208.1781167|     173.888747| 1.197191424| 0.549812187|     28715|  190.1410973|0.763922518|0.988855999|0.958027126|0.913357755| 0.007331506| 0.003147289| 0.834222388| 0.998723889|SEKER|
|28734|  638.018|    200.5247957|    182.7344194| 1.097356461| 0.411785251|     29172|  191.2727505|0.783968133|0.984985603|0.887033637|0.953860842| 0.006978659| 0.00356362

In [6]:
bean_data.printSchema()

root
 |-- Area: integer (nullable = true)
 |-- Perimeter: double (nullable = true)
 |-- MajorAxisLength: double (nullable = true)
 |-- MinorAxisLength: double (nullable = true)
 |-- AspectRation: double (nullable = true)
 |-- Eccentricity: double (nullable = true)
 |-- ConvexArea: integer (nullable = true)
 |-- EquivDiameter: double (nullable = true)
 |-- Extent: double (nullable = true)
 |-- Solidity: double (nullable = true)
 |-- roundness: double (nullable = true)
 |-- Compactness: double (nullable = true)
 |-- ShapeFactor1: double (nullable = true)
 |-- ShapeFactor2: double (nullable = true)
 |-- ShapeFactor3: double (nullable = true)
 |-- ShapeFactor4: double (nullable = true)
 |-- Class: string (nullable = true)



##### Print top 5 rows of selected columns from the dataset

In [7]:
bean_data.select(["Area","Perimeter","Solidity","roundness","Compactness","Class"]).show(5)

+-----+---------+-----------+-----------+-----------+-----+
| Area|Perimeter|   Solidity|  roundness|Compactness|Class|
+-----+---------+-----------+-----------+-----------+-----+
|28395|  610.291|0.988855999|0.958027126|0.913357755|SEKER|
|28734|  638.018|0.984985603|0.887033637|0.953860842|SEKER|
|29380|   624.11|0.989558774|0.947849473|0.908774239|SEKER|
|30008|  645.884|0.976695743|0.903936374|0.928328835|SEKER|
|30140|  620.134| 0.99089325|0.984877069|0.970515523|SEKER|
+-----+---------+-----------+-----------+-----------+-----+
only showing top 5 rows



##### Print the value counts for the column 'Class'

In [8]:
bean_data.groupBy('Class').count().orderBy('count').show()

+--------+-----+
|   Class|count|
+--------+-----+
|  BOMBAY|  522|
|BARBUNYA| 1322|
|    CALI| 1630|
|   HOROZ| 1928|
|   SEKER| 2027|
|    SIRA| 2636|
|DERMASON| 3546|
+--------+-----+



##### Convert the string column "Class" into a numberic column named "Label"

In [9]:
indexer = StringIndexer(inputCol='Class', outputCol='Label')

bean_data = indexer.fit(bean_data).transform(bean_data)

##### Print the value counts for the column 'label'

In [10]:
bean_data.groupBy('Label').count().orderBy('count').show()

+-----+-----+
|Label|count|
+-----+-----+
|  6.0|  522|
|  5.0| 1322|
|  4.0| 1630|
|  3.0| 1928|
|  2.0| 2027|
|  1.0| 2636|
|  0.0| 3546|
+-----+-----+



#### Extract target and feature columns

In [11]:
assembler = VectorAssembler(inputCols=["Area","Perimeter","Solidity","roundness","Compactness"], outputCol="features")

beans_transformed_data = assembler.transform(bean_data)

In [15]:
beans_transformed_data.select("features","Label").show()

+--------------------+-----+
|            features|Label|
+--------------------+-----+
|[28395.0,610.291,...|  2.0|
|[28734.0,638.018,...|  2.0|
|[29380.0,624.11,0...|  2.0|
|[30008.0,645.884,...|  2.0|
|[30140.0,620.134,...|  2.0|
|[30279.0,634.927,...|  2.0|
|[30477.0,670.033,...|  2.0|
|[30519.0,629.727,...|  2.0|
|[30685.0,635.681,...|  2.0|
|[30834.0,631.934,...|  2.0|
|[30917.0,640.765,...|  2.0|
|[31091.0,638.558,...|  2.0|
|[31107.0,640.594,...|  2.0|
|[31158.0,642.626,...|  2.0|
|[31158.0,641.105,...|  2.0|
|[31178.0,636.888,...|  2.0|
|[31202.0,644.454,...|  2.0|
|[31203.0,639.782,...|  2.0|
|[31272.0,638.666,...|  2.0|
|[31335.0,635.011,...|  2.0|
+--------------------+-----+
only showing top 20 rows



#### Split the data

In [14]:
(training_data, testing_data) = beans_transformed_data.randomSplit([0.7,0.3], seed=42)

#### Build and Train a Logistic Regression Model

In [16]:
lr = LogisticRegression(featuresCol="features", labelCol="Label")

In [17]:
model = lr.fit(training_data)

#### Evaluate the model

In [18]:
predictions = model.transform(testing_data)

##### Accuracy

In [22]:
evaluator = MulticlassClassificationEvaluator(labelCol="Label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy =", accuracy)

Accuracy = 0.9142569776213226


##### Precision

In [23]:
evaluator = MulticlassClassificationEvaluator(labelCol="Label", predictionCol="prediction", metricName="weightedPrecision")
precision = evaluator.evaluate(predictions)
print("Precision =", precision)

Precision = 0.9147333452508383


##### Recall

In [24]:
evaluator = MulticlassClassificationEvaluator(labelCol="Label", predictionCol="prediction", metricName="weightedRecall")
recall = evaluator.evaluate(predictions)
print("Recall =", recall)

Recall = 0.9142569776213225


##### F1 Score

In [25]:
evaluator = MulticlassClassificationEvaluator(labelCol="Label", predictionCol="prediction", metricName="f1")
f1_score = evaluator.evaluate(predictions)
print("F1 score = ", f1_score)

F1 score =  0.9143589035174375


In [26]:
spark.stop()