In [12]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import MinMaxScaler, StandardScaler
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [2]:
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder.appName("TrainTestWorkflow").getOrCreate()

# Load the datasets
train_data_path = "/workspaces/ml-winequality/dataset/TrainingDataset.csv"
test_data_path = "/workspaces/ml-winequality/dataset/ValidationDataset.csv"

train_df = spark.read.csv(train_data_path,header=True, 
                      inferSchema=True,
                      sep=';'
                      ,quote='"')
test_df = spark.read.csv(test_data_path,header=True, 
                      inferSchema=True,
                      sep=';'
                      ,quote='"')


In [3]:
# Used copilot how to get rid of quotes from colum header
new_column_names = [col_name.strip('"') for col_name in train_df.columns]
train_df = train_df.toDF(*new_column_names)

# Used copilot how to get rid of quotes from colum header
new_column_names = [col_name.strip('"') for col_name in test_df.columns]
test_df = test_df.toDF(*new_column_names)


In [65]:
# Imbalance data set, Resize using SMOTE
# import SMOTE
# sm = SMOTE(random_state=14)
# # X_train, Y_train = sm.fit_resample(X_train, Y_train)

## Preprocess data

In [4]:
from pyspark.ml.feature import VectorAssembler, StringIndexer

# Assemble features
featureCols = train_df.columns[:-1]  # Assuming the last column is the label
assembler = VectorAssembler(inputCols=featureCols, outputCol="features")

## Define Model

In [7]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline

# Initialize the model
rf = RandomForestClassifier(featuresCol="features", labelCol="quality")

# Create a Pipeline
pipeline = Pipeline(stages=[assembler, rf])


## Predict

In [8]:
# Train the model
fitted_pipeline = pipeline.fit(train_df)



## Evaluate on Test data

In [11]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import *
# Make predictions on test data
# Make predictions
predictions = fitted_pipeline.transform(test_df)

predictions = predictions.withColumn("prediction", round(col("prediction"),0).cast("double"))
# Evaluate the best model
evaluator = MulticlassClassificationEvaluator(labelCol="quality", predictionCol="prediction", metricName="f1")
f1_score = evaluator.evaluate(predictions)

print(f"Best Model F1 Score on Test Data: {f1_score}")
# Best Model F1 Score on Test Data: 0.4865269070010449


Best Model F1 Score on Test Data: 0.4865269070010449


############################# Part 2
Using MinMaxScalar

In [60]:
from pyspark.ml import Pipeline

# Assemble features
featureCols = train_df.columns[:-1]  # Assuming the last column is the label

# MINMAXSCALAR

# Assemble features
assembler = VectorAssembler(inputCols=featureCols, outputCol="assembledFeatures")

# Normalize features using Min-Max Scaling
scaler = MinMaxScaler(inputCol="assembledFeatures", outputCol="normalizedFeatures")

# Initialize the model
model = RandomForestRegressor(featuresCol="normalizedFeatures", labelCol='quality',
                              maxDepth=20,
                              numTrees=25,
                              seed=42,
                              )


# Create a Pipeline
pipeline2 = Pipeline(stages=[assembler, scaler, model])

# Train the model
fitted_pipeline2 = pipeline2.fit(train_df)

24/03/30 18:06:09 WARN DAGScheduler: Broadcasting large task binary with size 1133.9 KiB
24/03/30 18:06:09 WARN DAGScheduler: Broadcasting large task binary with size 1347.6 KiB
24/03/30 18:06:09 WARN DAGScheduler: Broadcasting large task binary with size 1528.8 KiB
24/03/30 18:06:10 WARN DAGScheduler: Broadcasting large task binary with size 1670.7 KiB
24/03/30 18:06:10 WARN DAGScheduler: Broadcasting large task binary with size 1777.1 KiB
24/03/30 18:06:10 WARN DAGScheduler: Broadcasting large task binary with size 1851.3 KiB
24/03/30 18:06:10 WARN DAGScheduler: Broadcasting large task binary with size 1753.6 KiB
24/03/30 18:06:10 WARN DAGScheduler: Broadcasting large task binary with size 1404.9 KiB
24/03/30 18:06:10 WARN DAGScheduler: Broadcasting large task binary with size 1194.3 KiB


In [61]:
# Evaluate using MixMax Scalar

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import *
# Make predictions on test data
# Make predictions
predictions = fitted_pipeline2.transform(test_df)

predictions = predictions.withColumn("prediction", round(col("prediction"),0).cast("double"))
# Evaluate the best model
evaluator = MulticlassClassificationEvaluator(labelCol="quality", predictionCol="prediction", metricName="f1")
f1_score = evaluator.evaluate(predictions)

print(f"Best Model F1 Score on Test Data: {f1_score}")
# Best Model F1 Score on Test Data: 0.6034063260340633

Best Model F1 Score on Test Data: 0.6122891944715902


## Part3 Using Cross Validator + MinMax Scalar

In [40]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Define a parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(model.numTrees, [10, 20,30]) \
    .addGrid(model.maxDepth, [5, 10,15]) \
    .build()

# Configure CrossValidator
crossval = CrossValidator(estimator=pipeline2,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(labelCol="quality", predictionCol="prediction", metricName="f1"),
                            # evaluator=RegressionEvaluator(labelCol="quality"),
                          numFolds=15)

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(train_df)


24/03/30 17:45:00 WARN DAGScheduler: Broadcasting large task binary with size 1076.3 KiB
24/03/30 17:45:00 WARN DAGScheduler: Broadcasting large task binary with size 1205.9 KiB
24/03/30 17:45:00 WARN DAGScheduler: Broadcasting large task binary with size 1302.2 KiB
24/03/30 17:45:00 WARN DAGScheduler: Broadcasting large task binary with size 1370.4 KiB
24/03/30 17:45:00 WARN DAGScheduler: Broadcasting large task binary with size 1419.2 KiB
24/03/30 17:45:02 WARN DAGScheduler: Broadcasting large task binary with size 1045.8 KiB
24/03/30 17:45:02 WARN DAGScheduler: Broadcasting large task binary with size 1294.7 KiB
24/03/30 17:45:03 WARN DAGScheduler: Broadcasting large task binary with size 1045.8 KiB
24/03/30 17:45:03 WARN DAGScheduler: Broadcasting large task binary with size 1294.7 KiB
24/03/30 17:45:03 WARN DAGScheduler: Broadcasting large task binary with size 1524.4 KiB
24/03/30 17:45:03 WARN DAGScheduler: Broadcasting large task binary with size 1724.7 KiB
24/03/30 17:45:04 WAR

In [41]:
# Make predictions on test data
predictions = cvModel.transform(test_df)
predictions = predictions.withColumn("prediction", round(col("prediction"),0).cast("double"))
# Evaluate the best model
evaluator = MulticlassClassificationEvaluator(labelCol="quality", predictionCol="prediction", metricName="f1")
f1_score = evaluator.evaluate(predictions)

print(f"Best Model F1 Score on Test Data: {f1_score}")


Best Model F1 Score on Test Data: 0.5853148496240602


In [35]:
# Select example rows to display
predictions.select("prediction", 'quality', "normalizedFeatures").show(5)

+-----------------+-------+--------------------+
|       prediction|quality|  normalizedFeatures|
+-----------------+-------+--------------------+
|5.268244298656669|      5|[0.25454545454545...|
|4.927972027972028|      5|[0.29090909090909...|
|5.009744641323589|      5|[0.29090909090909...|
|5.195962732919254|      6|[0.6,0.1095890410...|
|5.268244298656669|      5|[0.25454545454545...|
+-----------------+-------+--------------------+
only showing top 5 rows



In [38]:
# EDA 
import seaborn as sns 
import matplotlib.pyplot as plt
sns.catplot(x="prediction",data=predictions,kind='count')

TypeError: Data source must be a DataFrame or Mapping, not <class 'pyspark.sql.dataframe.DataFrame'>.