In [2]:
sc.install_pypi_package("numpy==1.21.6")
sc.install_pypi_package("pandas==1.3.3")
sc.install_pypi_package("quinn")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting numpy==1.21.6
  Downloading numpy-1.21.6-cp39-cp39-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (15.7 MB)
Installing collected packages: numpy
Successfully installed numpy-1.21.6

Collecting pandas==1.3.3
  Downloading pandas-1.3.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (11.5 MB)
Installing collected packages: pandas
Successfully installed pandas-1.3.3

Collecting quinn
  Downloading quinn-0.10.3-py3-none-any.whl (23 kB)
Installing collected packages: quinn
Successfully installed quinn-0.10.3




In [3]:
import pandas as pd
import numpy as np
import quinn


print("Pandas version:", pd.__version__)
print("Numpy version:", np.__version__)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Pandas version: 1.3.3
Numpy version: 1.21.6

In [4]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, Normalizer, StandardScaler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline

spark = SparkSession \
    .builder \
    .appName("CS643_Project") \
    .getOrCreate()

## Load Training Dataset
train_df = spark.read.format('csv').options(header='true', inferSchema='true', sep=';').load('s3://cs643rudolphpaulin/TrainingDataset.csv')
validation_df = spark.read.format('csv').options(header='true', inferSchema='true', sep=';').load('s3://cs643rudolphpaulin/ValidationDataset.csv')

print("Data loaded from S3 bucket.")
print(train_df.toPandas().head())

def remove_quotations(s):
    return s.replace('"', '')

train_df = quinn.with_columns_renamed(remove_quotations)(train_df)
train_df = train_df.withColumnRenamed('quality', 'label')

validation_df = quinn.with_columns_renamed(remove_quotations)(validation_df)
validation_df = validation_df.withColumnRenamed('quality', 'label')

print("Data has been formatted.")
print(train_df.toPandas().head())

assembler = VectorAssembler(
    inputCols=["fixed acidity",
               "volatile acidity",
               "citric acid",
               "residual sugar",
               "chlorides",
               "free sulfur dioxide",
               "total sulfur dioxide",
               "density",
               "pH",
               "sulphates",
               "alcohol"],
                outputCol="inputFeatures")

scaler = Normalizer(inputCol="inputFeatures", outputCol="features")

lr = LogisticRegression()
rf = RandomForestClassifier()

pipeline1 = Pipeline(stages=[assembler, scaler, lr])
pipeline2 = Pipeline(stages=[assembler, scaler, rf])

paramgrid = ParamGridBuilder().build()

evaluator = MulticlassClassificationEvaluator(metricName="f1")

crossval = CrossValidator(estimator=pipeline1,  
                         estimatorParamMaps=paramgrid,
                         evaluator=evaluator, 
                         numFolds=3
                        )

cvModel1 = crossval.fit(train_df) 
print("F1 Score for LogisticRegression Model: ", evaluator.evaluate(cvModel1.transform(validation_df)))


crossval = CrossValidator(estimator=pipeline2,  
                         estimatorParamMaps=paramgrid,
                         evaluator=evaluator, 
                         numFolds=3
                        )

cvModel2 = crossval.fit(train_df) 
print("F1 Score for RandomForestClassifier Model: ", evaluator.evaluate(cvModel2.transform(validation_df)))

print("Since the Logistic Regression model has the superior F1 score, it will be selected for the prediction application.")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-4:
Traceback (most recent call last):
  File "/mnt/notebook-env/lib/python3.9/threading.py", line 980, in _bootstrap_inner
    self.run()
  File "/mnt/notebook-env/lib/python3.9/threading.py", line 917, in run
    self._target(*self._args, **self._kwargs)
  File "/mnt/notebook-env/lib/python3.9/site-packages/awseditorssparkmonitoringwidget/cellmonitor.py", line 154, in cell_monitor
    job_group_filtered_jobs = [job for job in jobs_data if job['jobGroup'] == str(statement_id)]
  File "/mnt/notebook-env/lib/python3.9/site-packages/awseditorssparkmonitoringwidget/cellmonitor.py", line 154, in <listcomp>
    job_group_filtered_jobs = [job for job in jobs_data if job['jobGroup'] == str(statement_id)]
KeyError: 'jobGroup'


Data loaded from S3 bucket.
   """""fixed acidity""""  ...  """"quality"""""
0                     8.9  ...                 6
1                     7.6  ...                 5
2                     7.9  ...                 5
3                     8.5  ...                 5
4                     6.9  ...                 6

[5 rows x 12 columns]
Data has been formatted.
   fixed acidity  volatile acidity  citric acid  ...  sulphates  alcohol  label
0            8.9              0.22         0.48  ...       0.53      9.4      6
1            7.6              0.39         0.31  ...       0.65      9.7      5
2            7.9              0.43         0.21  ...       0.91      9.5      5
3            8.5              0.49         0.11  ...       0.53      9.4      5
4            6.9              0.40         0.14  ...       0.63      9.7      6

[5 rows x 12 columns]
F1 Score for LogisticRegression Model:  0.5729445029855991
F1 Score for RandomForestClassifier Model:  0.5149515912576688
Since