### Step 1: Set the data location and type

There are two ways to access Azure Blob storage: account keys and shared access signatures (SAS).

To get started, we need to set the location and type of the file.

In [0]:
storage_account_name = "devassignment"
storage_account_access_key = "8YpOLSWDJegnOVlvZzuFpwShdoAPmZpc5Ws4PTz4w6R7sN4WCD+9JgNTs00YgQTxjNfmWVokZ5AE+ASthmNG3g=="

In [0]:
file_location = "wasbs://capstone@devassignment.blob.core.windows.net/data/processed/iris_processed.parquet"
file_type = "parquet"

In [0]:
spark.conf.set(
  "fs.azure.account.key."+storage_account_name+".blob.core.windows.net",
  storage_account_access_key)

### Step 2: Read the data

Now that we have specified our file metadata, we can create a DataFrame. Notice that we use an *option* to specify that we want to infer the schema from the file. We can also explicitly set this to a particular schema if we have one already.

First, let's create a DataFrame in Python.

In [0]:
df_tfm = spark.read.format(file_type).option("inferSchema", "true").option("header", "true").load(file_location)

In [0]:
df_tfm.show()

+---+-------------+------------+-------------+------------+-----------+--------+--------------------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|Species1|            features|
+---+-------------+------------+-------------+------------+-----------+--------+--------------------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|     0.0|[1.0,5.1,3.5,1.4,...|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|     0.0|[2.0,4.9,3.0,1.4,...|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|     0.0|[3.0,4.7,3.2,1.3,...|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|     0.0|[4.0,4.6,3.1,1.5,...|
|  5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|     0.0|[5.0,5.0,3.6,1.4,...|
|  6|          5.4|         3.9|          1.7|         0.4|Iris-setosa|     0.0|[6.0,5.4,3.9,1.7,...|
|  7|          4.6|         3.4|          1.4|         0.3|Iris-setosa|     0.0|[7

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql.functions import unix_timestamp

In [0]:
df_tfm.show()

+---+-------------+------------+-------------+------------+-----------+--------+--------------------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|Species1|            features|
+---+-------------+------------+-------------+------------+-----------+--------+--------------------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|     0.0|[1.0,5.1,3.5,1.4,...|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|     0.0|[2.0,4.9,3.0,1.4,...|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|     0.0|[3.0,4.7,3.2,1.3,...|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|     0.0|[4.0,4.6,3.1,1.5,...|
|  5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|     0.0|[5.0,5.0,3.6,1.4,...|
|  6|          5.4|         3.9|          1.7|         0.4|Iris-setosa|     0.0|[6.0,5.4,3.9,1.7,...|
|  7|          4.6|         3.4|          1.4|         0.3|Iris-setosa|     0.0|[7

### Step 3: Split the dataset into training and testing

In [0]:
train, test = df_tfm.randomSplit([0.7, 0.3])

In [0]:
#Training dataset
num_rows_train = train.count()
num_cols_train = len(train.columns)
print("Training:",num_rows_train,"x",num_cols_train)

Training: 108 x 8


In [0]:
#Testing dataset
num_rows_test = test.count()
num_cols_test = len(test.columns)
print("Testing:",num_rows_test,"x",num_cols_test)

Testing: 42 x 8


In [0]:
#Feature Scaling
df_tfm.columns
scaler = StandardScaler(inputCol='features', outputCol='scaled_features')
scaler_model = scaler.fit(df_tfm)
train=scaler_model.transform(df_tfm)
test=scaler_model.transform(test)
train.show(3,False)

+---+-------------+------------+-------------+------------+-----------+--------+-------------------------+-------------------------------------------------------------------------------------------------+
|Id |SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|Species    |Species1|features                 |scaled_features                                                                                  |
+---+-------------+------------+-------------+------------+-----------+--------+-------------------------+-------------------------------------------------------------------------------------------------+
|1  |5.1          |3.5         |1.4          |0.2         |Iris-setosa|0.0     |[1.0,5.1,3.5,1.4,0.2,0.0]|[0.02301741350593744,6.158928408838787,8.072061621390857,0.7934616853039358,0.26206798787142,0.0]|
|2  |4.9          |3.0         |1.4          |0.2         |Iris-setosa|0.0     |[2.0,4.9,3.0,1.4,0.2,0.0]|[0.04603482701187488,5.9174018045706,6.9189099611921625,0.7934616853039358

In [0]:
from pyspark.ml.classification import LogisticRegression
log=LogisticRegression(featuresCol='scaled_features',labelCol='Species1')
lrmodel=log.fit(train)
prediction=lrmodel.transform(test)
test.show(3)

+---+-------------+------------+-------------+------------+-----------+--------+--------------------+--------------------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|Species1|            features|     scaled_features|
+---+-------------+------------+-------------+------------+-----------+--------+--------------------+--------------------+
|  7|          4.6|         3.4|          1.4|         0.3|Iris-setosa|     0.0|[7.0,4.6,3.4,1.4,...|[0.16112189454156...|
| 10|          4.9|         3.1|          1.5|         0.1|Iris-setosa|     0.0|[10.0,4.9,3.1,1.5...|[0.23017413505937...|
| 11|          5.4|         3.7|          1.5|         0.2|Iris-setosa|     0.0|[11.0,5.4,3.7,1.5...|[0.25319154856531...|
+---+-------------+------------+-------------+------------+-----------+--------+--------------------+--------------------+
only showing top 3 rows



In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
test.groupby("Species1").count().show()

+--------+-----+
|Species1|count|
+--------+-----+
|     0.0|   20|
|     1.0|   10|
|     2.0|   12|
+--------+-----+



In [0]:
train.groupby("Species1").count().show()

+--------+-----+
|Species1|count|
+--------+-----+
|     0.0|   50|
|     1.0|   50|
|     2.0|   50|
+--------+-----+



### Step 4: Model Evaluation

In [0]:
# Use the MulticlassClassificationEvaluator to evaluate the model's accuracy
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="Species1", predictionCol="prediction", metricName="accuracy")

In [0]:
# Logistic regression model
lr = LogisticRegression(labelCol='Species1', featuresCol='features')

In [0]:
# Fit the model to the training data
lr_model = lr.fit(train)

In [0]:
# Make predictions on the test data
predictions = lr_model.transform(test)

In [0]:
logistic_accuracy = evaluator.evaluate(predictions)
print("Accuracy:", logistic_accuracy)

Accuracy: 1.0


In [0]:
#Decision Tree model
from pyspark.ml.classification import DecisionTreeClassifier
rand=DecisionTreeClassifier(featuresCol='scaled_features',labelCol='Species1')
rmodel=rand.fit(train)
predictionrand=rmodel.transform(test)

In [0]:
rand_accuracy = evaluator.evaluate(predictionrand)
print("Accuracy:", rand_accuracy)

Accuracy: 1.0


In [0]:
# Select the "prediction" and "label" columns
predictions_df = predictionrand.select(["prediction", "Species1"])

In [0]:
# Convert the predictions and labels to Pandas dataframes for easier inspection
predictions_pd = predictions_df.toPandas()

In [0]:
# Print the first 10 predictions and their corresponding true labels
print(predictions_pd.head(10))

   prediction  Species1
0         0.0       0.0
1         0.0       0.0
2         0.0       0.0
3         0.0       0.0
4         0.0       0.0
5         0.0       0.0
6         0.0       0.0
7         0.0       0.0
8         0.0       0.0
9         0.0       0.0


### Step 5: Hyperparameter Tuning

In [0]:
# Set the hyperparameters for the random forest model
from pyspark.ml.classification import RandomForestClassifier
regrand = RandomForestClassifier(labelCol='Species1', featuresCol='features',numTrees=100,maxDepth=5)

In [0]:
# Fit the model to the training data
regmodel = regrand.fit(train)

In [0]:
# Make predictions on the test data
predictions = regmodel.transform(test)

reg_accuracy = evaluator.evaluate(predictions)
print("Accuracy:", reg_accuracy)

Accuracy: 1.0


In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [0]:
# Define the hyperparameters to tune
hyperparameters = [
    {'regParam': [0.1, 0.01, 0.001], 'elasticNetParam': [0.0, 0.5, 1.0]},
    {'regParam': [0.1, 0.01, 0.001], 'elasticNetParam': [0.0, 0.5, 1.0], 'maxIter': [10, 50, 100]}
]
param_grid = ParamGridBuilder().addGrid(log.regParam, hyperparameters[0]['regParam'])\
                               .addGrid(log.elasticNetParam, hyperparameters[0]['elasticNetParam'])\
                               .build()
cv = CrossValidator(estimator=log, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=2)
model = cv.fit(train)
model.params
model.bestModel
predictions = model.transform(test)

In [0]:
cv_accuracy = evaluator.evaluate(predictions)
print("Accuracy: ", cv_accuracy)

Accuracy:  1.0


In [0]:
accuracy = [logistic_accuracy,rand_accuracy,reg_accuracy,cv_accuracy]
models = ["logistic_reg","decision_tree","random_forest","grid_search"]

In [0]:
import pandas as pd
df = pd.DataFrame({
    'Model': models,
    'Accuracy': accuracy
})

In [0]:
score=spark.createDataFrame(df)
score.show()

+-------------+--------+
|        Model|Accuracy|
+-------------+--------+
| logistic_reg|     1.0|
|decision_tree|     1.0|
|random_forest|     1.0|
|  grid_search|     1.0|
+-------------+--------+



In [0]:
dbutils.fs.mkdirs("wasbs://capstone@devassignment.blob.core.windows.net/" + "models/")

output_folder = "wasbs://capstone@devassignment.blob.core.windows.net/" + "models/model_scores.parquet"

score.repartition(1).write.format("parquet") \
    .mode("overwrite") \
    .save(output_folder)