In [0]:
#author: Andrew J. Otis

In [0]:
# Recommended Modules
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
import pandas as pd
# ------------------MACHINE LEARNING SECTION------------------
# Load the dataset
mushroom = pd.read_csv('https://archive.ics.uci.edu/ml/machine-learning-databases/mushroom/agaricus-lepiota.data')

# Define column names mapping
col_map = {"p": "mushroom_classification", "x": "cap-shape", "s": "cap-surface"}
mushroom = mushroom.rename(columns=col_map)

# Select relevant columns
mushroom = mushroom.iloc[:, :3]

# Select features and output variable
X = mushroom.drop('mushroom_classification', axis=1)
y = mushroom['mushroom_classification']

# Encode the target variable(i.e. "mushroom_classification")
label_encoder = LabelEncoder()
y = label_encoder.fit_transform(y)

# Split the data into training and test sets 
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# Pre-processing part of the pipeline
preprocessing = ColumnTransformer([
    ('onehot', OneHotEncoder(), ['cap-shape', 'cap-surface'])
], remainder='passthrough')

# Create a pipeline for a Random Forest Classifier
pipeline_rf = Pipeline([
    ('preprocessing', preprocessing),
    ('classification', RandomForestClassifier(max_depth=1, max_features=4))
])

# Fit the pipeline on the training data
pipeline_rf.fit(X_train, y_train)

# Evaluate the pipeline on the training set
print("----Random Forest Accuracy Results----")
train_accuracy = pipeline_rf.score(X_train, y_train)
print("Accuracy on training set:", train_accuracy)

# Evaluate the pipeline on the test set
test_accuracy = pipeline_rf.score(X_test, y_test)
print("Accuracy on test set:", test_accuracy)

# Create another pipeline for Decision Tree Classifier
pipeline_dt = Pipeline([
    ('preprocessing', preprocessing),
    ('classification', DecisionTreeClassifier(max_depth=1, max_features=4))
])

# Fit the pipeline on the training data
pipeline_dt.fit(X_train, y_train)

# Evaluate the pipeline on the training set
print()
print("----Decision Tree Accuracy Results----")
train_accuracy2 = pipeline_dt.score(X_train, y_train)
print("Accuracy on training set:", train_accuracy2)

# Evaluate the pipeline on the test set
test_accuracy2 = pipeline_dt.score(X_test, y_test)
print("Accuracy on test set:", test_accuracy2)




----Random Forest Accuracy Results----
Accuracy on training set: 0.619416109743229
Accuracy on test set: 0.6175625769388593

----Decision Tree Accuracy Results----
Accuracy on training set: 0.5160042208934225
Accuracy on test set: 0.5227739023389413


In [0]:
# Recommended Modules
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier, DecisionTreeClassifier
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext

# Important note: 
#    All the code assosciated with DB directory manipulation is commented out since I do not need to re-create the directories or re-copy them over. However, please reference this code if you wanted to create your own directory for the streaming dataset and another DB directory for the training and test sets.

# DB Directory Manipulation for streaming data
# Create a DB directory for the train and test sets
#dbutils.fs.mkdirs("FileStore/tables/assignment2")

# Create another DB directory for the data to be streamed
# Make a directory for relevant lab6 files
#dbutils.fs.mkdirs("FileStore/tables/assignment2/streaming")

# Refer to the file "HW2.py" for the code that saves the train and test sets whole

# Refer to the file "HW2.py" for the code that creates the batches of test data. These batch files will be in their own directory for the streaming portion of the report.

# After saving CSVs of the batch data locally with the "HW2.py" and manually move them to the main DB directory  the following code is ran to copy multiple files from one directory to another(i.e. I'm moving the relevant files for streaming into their own directory)

import os
#source_directory = "dbfs:/FileStore/tables"
#destination_directory = "dbfs:/FileStore/tables/assignment2/streaming"

# Create the destination directory if it doesn't exist
#dbutils.fs.mkdirs(destination_directory)

# List files in the source directory
#files = dbutils.fs.ls(source_directory)

# Copy files with names containing "test_part.." to the destination directory
#for file in files:
    #if "test_part" in file.name:
        #source_path = os.path.join(source_directory, file.name)
        #destination_path = os.path.join(destination_directory, file.name)
        #dbutils.fs.cp(source_path, destination_path)
        

#----------------------------STREAMING SECTION-------------------------
# Checking current files in new DataBricks directory
#dbutils.fs.ls ("dbfs:/FileStore/tables/assignment2/streaming")

# Create SparkSession
spark = SparkSession.builder.getOrCreate()

# Define the schema for training data in order to train the ML model
train_schema = StructType([
    StructField('cap-shape', StringType(), True),
    StructField('cap-surface', StringType(), True),
    StructField('mushroom_classification', DoubleType(), True)
])

# Load the training data with the specified schema
train_data = spark.read.format("csv").schema(train_schema).option("header", "true").load("dbfs:/FileStore/tables/assignment2/mushroom_training.csv")

# Define the features column
features_col = ['cap-shape', 'cap-surface']

# Remove null values from the dataset
train_data = train_data.na.drop()

# Preprocessing transformer
indexer = StringIndexer(inputCols=features_col, outputCols=['{}_indexed'.format(col) for col in features_col]).setHandleInvalid("keep")
encoder = OneHotEncoder(inputCols=['{}_indexed'.format(col) for col in features_col], outputCols=['{}_encoded'.format(col) for col in features_col])
assembler = VectorAssembler(inputCols=['{}_encoded'.format(col) for col in features_col], outputCol='features')

train_data.head(5)


Out[3]: [Row(cap-shape='f', cap-surface='y', mushroom_classification=1.0),
 Row(cap-shape='f', cap-surface='y', mushroom_classification=0.0),
 Row(cap-shape='f', cap-surface='f', mushroom_classification=0.0),
 Row(cap-shape='x', cap-surface='f', mushroom_classification=1.0),
 Row(cap-shape='f', cap-surface='f', mushroom_classification=0.0)]

In [0]:
# Define schema for incoming streaming data
streaming_schema = StructType([
    StructField('cap-shape', StringType(), True),
    StructField('cap-surface', StringType(), True),
    StructField('mushroom_classification', DoubleType(), True)
])

# Random Forest Classifier Model
rf_classifier = RandomForestClassifier(labelCol='mushroom_classification', featuresCol='features', maxDepth=1, maxBins=4)

# Create the pipeline for Random Forest Classifier
rf_pipeline = Pipeline(stages=[indexer, encoder, assembler, rf_classifier])

# Fit the pipeline on the training data
rf_pipeline_model = rf_pipeline.fit(train_data)

# Define the testing directory
testing_directory = "dbfs:/FileStore/tables/assignment2/streaming"

# Create a streaming DataFrame to read one file per trigger from the testing directory
streaming_df = (spark.readStream.format("csv")
                .option("header", "true")
                .schema(streaming_schema)
                .option("maxFilesPerTrigger", 1)
                .load(testing_directory))

# Remove null values from the streaming data
streaming_df = streaming_df.na.drop()

# Apply the Random Forest model to the streaming data
rf_predictions = rf_pipeline_model.transform(streaming_df).select("cap-shape", "cap-surface", "mushroom_classification", "prediction")

# Start the streaming query for Random Forest
rf_query = rf_predictions.writeStream.format("memory").outputMode("append").queryName("rf_stream").start()

# Display the streaming results
display(spark.table("rf_stream"))

# Wait for the streaming queries to finish
#rf_query.awaitTermination()


cap-shape,cap-surface,mushroom_classification,prediction


In [0]:
# Decision Tree Classifier
dt_classifier = DecisionTreeClassifier(labelCol='mushroom_classification', featuresCol='features', maxDepth=1, maxBins=4)

# Create the pipeline for Decision Tree Classifier
dt_pipeline = Pipeline(stages=[indexer, encoder, assembler, dt_classifier])

dt_pipeline_model = dt_pipeline.fit(train_data)

# Apply the Decision Tree model to the streaming data
dt_predictions = dt_pipeline_model.transform(streaming_df).select("cap-shape", "cap-surface", "mushroom_classification", "prediction")

# Start the streaming query for Decision Tree
dt_query = dt_predictions.writeStream.format("console").outputMode("append").queryName("dt_stream").start()

# Display the streaming predictions
display("dt_stream")

# Wait for the streaming queries to finish
#dt_query.awaitTermination()


'dt_stream'