----


## Setup


In [1]:
!pip install pyspark==3.1.2 -q
!pip install findspark -q

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m212.4/212.4 MB[0m [31m6.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 kB[0m [31m26.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


### Importing Required Libraries

_We recommend you import all required libraries in one place (here):_


In [2]:
# You can also use this section to suppress warnings generated by your code:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python

import findspark
findspark.init()

from pyspark.sql import SparkSession

#import functions/Classes for sparkml

from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

# import functions/Classes for metrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


## Task 1 - Create a spark session


In [3]:
#Create SparkSession
#Ignore any warnings by SparkSession command

spark = SparkSession.builder.appName("Classification using SparkML").getOrCreate()

## Task 2 - Load the data in a csv file into a dataframe


Download the data file


In [4]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/drybeans.csv


--2023-10-31 21:53:07--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/drybeans.csv
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.45.118.108
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.45.118.108|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2484759 (2.4M) [text/csv]
Saving to: ‘drybeans.csv’


2023-10-31 21:53:27 (138 KB/s) - ‘drybeans.csv’ saved [2484759/2484759]



Load the dataset into the spark dataframe


In [5]:

# Load mpg dataset
beans_data = spark.read.csv("drybeans.csv", header=True, inferSchema=True)


Print the schema of the dataset


In [6]:
beans_data.printSchema()

root
 |-- Area: integer (nullable = true)
 |-- Perimeter: double (nullable = true)
 |-- MajorAxisLength: double (nullable = true)
 |-- MinorAxisLength: double (nullable = true)
 |-- AspectRation: double (nullable = true)
 |-- Eccentricity: double (nullable = true)
 |-- ConvexArea: integer (nullable = true)
 |-- EquivDiameter: double (nullable = true)
 |-- Extent: double (nullable = true)
 |-- Solidity: double (nullable = true)
 |-- roundness: double (nullable = true)
 |-- Compactness: double (nullable = true)
 |-- ShapeFactor1: double (nullable = true)
 |-- ShapeFactor2: double (nullable = true)
 |-- ShapeFactor3: double (nullable = true)
 |-- ShapeFactor4: double (nullable = true)
 |-- Class: string (nullable = true)



Print top 5 rows of selected columns from the dataset


In [7]:
beans_data.select(["Area","Perimeter","Solidity","roundness","Compactness","Class"]).show(5)

+-----+---------+-----------+-----------+-----------+-----+
| Area|Perimeter|   Solidity|  roundness|Compactness|Class|
+-----+---------+-----------+-----------+-----------+-----+
|28395|  610.291|0.988855999|0.958027126|0.913357755|SEKER|
|28734|  638.018|0.984985603|0.887033637|0.953860842|SEKER|
|29380|   624.11|0.989558774|0.947849473|0.908774239|SEKER|
|30008|  645.884|0.976695743|0.903936374|0.928328835|SEKER|
|30140|  620.134| 0.99089325|0.984877069|0.970515523|SEKER|
+-----+---------+-----------+-----------+-----------+-----+
only showing top 5 rows



Print the value counts for the column 'Class'


In [8]:
beans_data.groupBy('Class').count().orderBy('count').show()

+--------+-----+
|   Class|count|
+--------+-----+
|  BOMBAY|  522|
|BARBUNYA| 1322|
|    CALI| 1630|
|   HOROZ| 1928|
|   SEKER| 2027|
|    SIRA| 2636|
|DERMASON| 3546|
+--------+-----+



Convert the string column "Class" into a numberic column named "Label"


In [9]:
# Convert Class column from string to numerical values
indexer = StringIndexer(inputCol="Class", outputCol="label")
beans_data = indexer.fit(beans_data).transform(beans_data)


Print the value counts for the column 'label'


In [10]:
beans_data.groupBy('label').count().orderBy('count').show()

+-----+-----+
|label|count|
+-----+-----+
|  6.0|  522|
|  5.0| 1322|
|  4.0| 1630|
|  3.0| 1928|
|  2.0| 2027|
|  1.0| 2636|
|  0.0| 3546|
+-----+-----+



## Task 3 - Identify the label column and the input columns


We ask the VectorAssembler to group a bunch of inputCols as single column named "features"


Use "Area","Perimeter","Solidity","roundness","Compactness" as input columns


In [11]:
# Prepare feature vector
assembler = VectorAssembler(inputCols=["Area","Perimeter","Solidity","roundness","Compactness"], outputCol="features")
beans_transformed_data = assembler.transform(beans_data)


Display the assembled "features" and the label column "MPG"


In [12]:
beans_transformed_data.select("features","label").show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[28395.0,610.291,...|  2.0|
|[28734.0,638.018,...|  2.0|
|[29380.0,624.11,0...|  2.0|
|[30008.0,645.884,...|  2.0|
|[30140.0,620.134,...|  2.0|
|[30279.0,634.927,...|  2.0|
|[30477.0,670.033,...|  2.0|
|[30519.0,629.727,...|  2.0|
|[30685.0,635.681,...|  2.0|
|[30834.0,631.934,...|  2.0|
|[30917.0,640.765,...|  2.0|
|[31091.0,638.558,...|  2.0|
|[31107.0,640.594,...|  2.0|
|[31158.0,642.626,...|  2.0|
|[31158.0,641.105,...|  2.0|
|[31178.0,636.888,...|  2.0|
|[31202.0,644.454,...|  2.0|
|[31203.0,639.782,...|  2.0|
|[31272.0,638.666,...|  2.0|
|[31335.0,635.011,...|  2.0|
+--------------------+-----+
only showing top 20 rows



## Task 4 - Split the data


We split the data set in the ratio of 70:30. 70% training data, 30% testing data.


In [13]:
# Split data into training and testing sets
(training_data, testing_data) = beans_transformed_data.randomSplit([0.7, 0.3], seed=42)


The random_state variable "seed" controls the shuffling applied to the data before applying the split. Pass the same integer for reproducible output across multiple function calls


## Task 5 - Build and Train a Logistic Regression Model


Create a LR model and train the model using the training data set


In [14]:
# Ignore any warnings

lr = LogisticRegression(featuresCol="features", labelCol="label")
model = lr.fit(training_data)

## Task 6 - Evaluate the model


Your model is now trained. We use the testing data to make predictions.


In [15]:
# Make predictions on testing data
predictions = model.transform(testing_data)

##### Accuracy


In [16]:
# Evaluate model performance
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy =", accuracy)


Accuracy = 0.9044505908976616


##### Precision


In [17]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
precision = evaluator.evaluate(predictions)
print("Precision =", precision)


Precision = 0.9048375345335676


##### Recall


In [18]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
recall = evaluator.evaluate(predictions)
print("Recall =", recall)


Recall = 0.9044505908976616


##### F1 Score


In [19]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1_score = evaluator.evaluate(predictions)
print("F1 score = ", f1_score)


F1 score =  0.9045296121528202


Stop Spark Session


In [20]:
spark.stop()

In this exercise you will use sparkml to classify the iris flower dataset.


### Exercise 1 - Create a spark session


Create a spark session with appname "Iris Flower Classification"


In [23]:
# Initialize SparkSession
spark = SparkSession.builder.appName("Iris Flower Classification").getOrCreate()


### Exercise 2 - Load the data in a csv file into a dataframe


Download the data set


In [24]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/iris.csv


--2023-10-31 22:25:11--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/iris.csv
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.45.118.108
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.45.118.108|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4612 (4.5K) [text/csv]
Saving to: ‘iris.csv.1’


2023-10-31 22:25:12 (2.00 GB/s) - ‘iris.csv.1’ saved [4612/4612]



Load the dataset into a spark dataframe


In [25]:
iris_data = spark.read.csv("iris.csv", header=True, inferSchema=True)


In [26]:
iris_data.show(5)

+-------------+------------+-------------+------------+-----------+
|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+-------------+------------+-------------+------------+-----------+
|          5.1|         3.5|          1.4|         0.2|Iris-setosa|
|          4.9|         3.0|          1.4|         0.2|Iris-setosa|
|          4.7|         3.2|          1.3|         0.2|Iris-setosa|
|          4.6|         3.1|          1.5|         0.2|Iris-setosa|
|          5.0|         3.6|          1.4|         0.2|Iris-setosa|
+-------------+------------+-------------+------------+-----------+
only showing top 5 rows



### Exercise 3 - Identify the label column and the input columns


Transform the string column "Species" to a column of numerical values named "label"


In [27]:
indexer = StringIndexer(inputCol="Species", outputCol="label")
iris_data = indexer.fit(iris_data).transform(iris_data)


Assemble the columns columns "SepalLengthCm", "SepalWidthCm", "PetalLengthCm", "PetalWidthCm" into a single column named "features"


In [28]:
# Prepare feature vector
assembler = VectorAssembler(inputCols=["SepalLengthCm", "SepalWidthCm", "PetalLengthCm", "PetalWidthCm"], outputCol="features")
iris_transformed_data = assembler.transform(iris_data)


Print the vectorized features and label columns


In [29]:
iris_transformed_data.select("features","label").show()

+-----------------+-----+
|         features|label|
+-----------------+-----+
|[5.1,3.5,1.4,0.2]|  0.0|
|[4.9,3.0,1.4,0.2]|  0.0|
|[4.7,3.2,1.3,0.2]|  0.0|
|[4.6,3.1,1.5,0.2]|  0.0|
|[5.0,3.6,1.4,0.2]|  0.0|
|[5.4,3.9,1.7,0.4]|  0.0|
|[4.6,3.4,1.4,0.3]|  0.0|
|[5.0,3.4,1.5,0.2]|  0.0|
|[4.4,2.9,1.4,0.2]|  0.0|
|[4.9,3.1,1.5,0.1]|  0.0|
|[5.4,3.7,1.5,0.2]|  0.0|
|[4.8,3.4,1.6,0.2]|  0.0|
|[4.8,3.0,1.4,0.1]|  0.0|
|[4.3,3.0,1.1,0.1]|  0.0|
|[5.8,4.0,1.2,0.2]|  0.0|
|[5.7,4.4,1.5,0.4]|  0.0|
|[5.4,3.9,1.3,0.4]|  0.0|
|[5.1,3.5,1.4,0.3]|  0.0|
|[5.7,3.8,1.7,0.3]|  0.0|
|[5.1,3.8,1.5,0.3]|  0.0|
+-----------------+-----+
only showing top 20 rows



### Exercise 4 - Split the data


Split the dataset into training and testing sets in the ratio of 70:30. Use 42 as seed.


In [30]:

(training_data, testing_data) = iris_transformed_data.randomSplit([0.7,0.3],seed=42)

### Exercise 5 - Build and Train a Logistic Regression Model


Build a Logistic Regression model and train it


In [31]:
lr = LogisticRegression(featuresCol="features", labelCol="label")
model = lr.fit(training_data)


Predict the values using the test data


### Exercise 6 - Evaluate the model


Your model is now trained. Make the model predict on testing_data


In [32]:
predictions = model.transform(testing_data)


Print the metrics :
- Accuracy
- Precision
- Recall
- F1 Score


In [33]:

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy =", accuracy)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
precision = evaluator.evaluate(predictions)
print("Precision =", precision)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
recall = evaluator.evaluate(predictions)
print("Recall =", recall)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1_score = evaluator.evaluate(predictions)
print("F1 score = ", f1_score)


Accuracy = 0.9782608695652174
Precision = 0.9804347826086957
Recall = 0.9782608695652174
F1 score =  0.978458139351377


In [34]:
spark.stop

<bound method SparkSession.stop of <pyspark.sql.session.SparkSession object at 0x7b8470922470>>