In [None]:
# Feature Engineering
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
import matplotlib.pyplot as plt
from sklearn.preprocessing import LabelEncoder
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import KFold

In [None]:
# Load taining data
Data_train = pd.read_csv('mushroom_train.csv')

In [None]:
# Load test data
Data_test = pd.read_csv('mushroom_test.csv')

In [None]:
# Separate training data features and labels
X = Data_train.drop(columns='class')
y = Data_train['class']

# Define column transformer
numerical_features = ['cap-diameter', 'stem-height', 'stem-width']
categorical_features = ['cap-shape', 'cap-surface', 'cap-color', 'does-bruise-or-bleed',
                        'gill-attachment', 'gill-spacing', 'gill-color', 'stem-color', 'has-ring',
                        'ring-type', 'habitat', 'season']

# Feature Engineering
X_new = X.copy()

# Iterate over the selected categorical features
for feature in categorical_features:
    stats_df = X.groupby(feature).agg(['mean', 'min', 'max', 'median'])[numerical_features]
    stats_df.columns = [f"{feature}_{num_feat}_{stat}" for num_feat in numerical_features for stat in ['mean', 'min', 'max', 'median']]
    stats_df.reset_index(inplace=True)
    X_new = X_new.merge(stats_df, on=feature, how='left')

# Define column transformer
column_transformer = ColumnTransformer(transformers=[
    ('num', StandardScaler(), numerical_features),
    ('cat', OneHotEncoder(), categorical_features)
])

D = 14  # Change this value to select the top D features
feature_selector = SelectKBest(score_func=f_classif, k=D)

# Create the pipeline
transformer = Pipeline(steps=[('transformer', column_transformer),
                              ('selector', feature_selector)])

le = LabelEncoder()
y_data = le.fit_transform(y)

# Fit and transform training data
X_data = transformer.fit_transform(X_new, y_data)

# Split data into training data and validation data
X_train, X_val, y_train, y_val = train_test_split(X_data, y_data, test_size=0.2, random_state=42)

In [None]:
# Separate test data features and labels
Xtest = Data_test.drop(columns='class')
ytest = Data_test['class']

# Feature Engineering for test data
X_test_new = Xtest.copy()

# Iterate over the selected categorical features
for feature in categorical_features:
    stats_df = X.groupby(feature).agg(['mean', 'min', 'max', 'median'])[numerical_features]
    stats_df.columns = [f"{feature}_{num_feat}_{stat}" for num_feat in numerical_features for stat in ['mean', 'min', 'max', 'median']]
    stats_df.reset_index(inplace=True)
    X_test_new = X_test_new.merge(stats_df, on=feature, how='left')

# Transform test data using the same pipeline
X_test = transformer.transform(X_test_new)

# Encode the test labels
y_test = le.transform(ytest)


In [None]:
# Trivial system
unique_labels, counts = np.unique(y_data, return_counts=True)

probability = counts / counts.sum()

y_pred = np.random.choice(unique_labels, size=len(y_test), p=probability)

accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred, average='binary', pos_label=1)

print("Trivial system accuracy:", accuracy)
print("Trivial system F1-score:", f1)

In [None]:
# Nearest Means Classifier
def classifier(X, y):
    means = {}
    for label in np.unique(y):
        class_data = X[y == label]
        class_mean = np.mean(class_data, axis=0)
        means[label] = class_mean
    return means

def predict_nearest_mean(X, means):
    predictions = []
    for x in X:
        min = float('inf')
        predicted_class = None
        for label, class_mean in means.items():
            distance = np.linalg.norm(x - class_mean)
            if distance < min:
                min = distance
                predicted_class = label
        predictions.append(predicted_class)
    return np.array(predictions)

print("Nearest Mean Classifier")
# Train the Nearest Mean Classifier
means = classifier(X_train, y_train)

# Make predictions on validation set
y_pred_val = predict_nearest_mean(X_val, means)

# Calculate the accuracy and F1-score for the validation set
accuracy_val = accuracy_score(y_val, y_pred_val)
f1_val = f1_score(y_val, y_pred_val, average='binary', pos_label=1)

print("Validation accuracy:", accuracy_val)
print("Validation F1-score:", f1_val)
print()

# Train Nearest Mean Classifier on full training set
class_means_full = classifier(X_data, y_data)

# Make predictions on the test set
y_pred_nc = predict_nearest_mean(X_test, class_means_full)

# Calculate the accuracy and F1-score for the test set
accuracy_test = accuracy_score(y_test, y_pred_nc)
f1_test = f1_score(y_test, y_pred_nc, average='binary', pos_label=1)

print("Test accuracy:", accuracy_test)
print("Test F1-score:", f1_test)

In [None]:
# SVM with linear kernel
print("SVM with linear kernel")
svm_classifier = SVC(kernel='linear', C=0.1)
svm_classifier.fit(X_train, y_train)

train_accuracy = svm_classifier.score(X_train, y_train)

y_pred_val = svm_classifier.predict(X_val)
accuracy_val = svm_classifier.score(X_val, y_val)
f1_val = f1_score(y_val, y_pred_val, average='binary', pos_label=1)

print("Train accuracy:", train_accuracy)
print("Validation accuracy:", accuracy_val)
print("Validation F1-score:", f1_val)

svm_classifier.fit(X_data, y_data)

y_pred_test = svm_classifier.predict(X_test)
accuracy_test = accuracy_score(y_test, y_pred_test)
f1_test = f1_score(y_test, y_pred_test, average='binary', pos_label=1)

test_accuracy = svm_classifier.score(X_test, y_test)

print()
print("Test accuracy:", accuracy_test)
print("Test F1-score:", f1_test)

In [None]:
# SVM with rbf kernel
svm_classifier = SVC(kernel='rbf', C=10, gamma=1)

# Train SVM model on the entire training set
svm_classifier.fit(X_train, y_train)

# Evaluate accuracy and F1-score on validation set
train_accuracy = svm_classifier.score(X_train, y_train)
accuracy_val = svm_classifier.score(X_val, y_val)
y_pred_val = svm_classifier.predict(X_val)
f1_val = f1_score(y_val, y_pred_val, average='binary', pos_label=1)

print("Train accuracy:", train_accuracy)
print("Validation accuracy:", accuracy_val)
print("Validation F1-score:", f1_val)
print()

# Evaluate accuracy and F1-score on test set
y_pred_test = svm_classifier.predict(X_test)
accuracy_test = accuracy_score(y_test, y_pred_test)
f1_test = f1_score(y_test, y_pred_test, average='binary', pos_label=1)
test_accuracy = svm_classifier.score(X_test, y_test)
    
print("Test accuracy:", accuracy_test)
print("Test F1-score:", f1_test)

In [None]:
# K Nearest Neighbors
n_neighbors = 5

knn = KNeighborsClassifier(n_neighbors=n_neighbors)

knn.fit(X_train, y_train)

# Predict for validation data
y_pred_val = knn.predict(X_val)

accuracy_val = accuracy_score(y_val, y_pred_val)
f1_val = f1_score(y_val, y_pred_val, average='binary', pos_label=1)

print("Validation Accuracy: {accuracy_val}")
print("Validation F1-score:", f1_val)
print()

# Predict for test data
y_test_pred = knn.predict(X_test)

# Calculate the accuracy of the model on the test data
test_accuracy = accuracy_score(y_test, y_test_pred)
f1_test = f1_score(y_test, y_test_pred, average='binary', pos_label=1)

print("Test Accuracy: {test_accuracy}")
print("Test F1-score:", f1_test)

In [None]:
# 2-class Perceptron
def perceptron(X, y, n_epochs, learning_rate, batch_size, l2_reg):
    n_samples, n_features = X.shape
    weights = np.zeros(n_features)
    bias = 0
    
    for epoch in range(n_epochs):
        indices = np.random.permutation(n_samples)
        X = X[indices]
        y = y[indices]
        
        for i in range(0, n_samples, batch_size):
            batch_X = X[i:i + batch_size]
            batch_y = y[i:i + batch_size]
            
            y_pred = np.dot(batch_X, weights) + bias
            y_pred = np.where(y_pred > 0, 1, -1)
            errors = batch_y - y_pred
            weights += learning_rate * (np.dot(batch_X.T, errors) - l2_reg * weights)
            bias += learning_rate * np.sum(errors)
    
    return weights, bias

def predict(X, weights, bias):
    y_pred = np.dot(X, weights) + bias
    return np.where(y_pred > 0, 1, 0)

# Train the perceptron
n_epochs = 100
learning_rate = 0.1
batch_size = 32
l2_reg = 0.0001
weights, bias = perceptron(X_train.toarray(), y_train * 2 - 1, n_epochs, learning_rate, batch_size, l2_reg)

# Make predictions on validation set
y_pred_train = predict(X_train.toarray(), weights, bias)

# Calculate the accuracy and F1-score for the train set
accuracy_train = accuracy_score(y_train, y_pred_train)
f1_train = f1_score(y_train, y_pred_train, average='binary', pos_label=1)

print("Train accuracy:", accuracy_train)
print("Train F1-score:", f1_train)
print()

# Make predictions on the validation set
y_pred_val = predict(X_val.toarray(), weights, bias)

# Calculate the accuracy and F1-score for validation set
accuracy_val = accuracy_score(y_val, y_pred_val)
f1_val = f1_score(y_val, y_pred_val, average='binary', pos_label=1)

print("Validation accuracy:", accuracy_val)
print("Validation F1-score:", f1_val)
print()

# Make predictions on test set
y_pred_test = predict(X_test.toarray(), weights, bias)

# Calculate the accuracy and F1-score test set
accuracy_test = accuracy_score(y_test, y_pred_test)
f1_test = f1_score(y_test, y_pred_test, average='binary', pos_label=1)

print("Test accuracy:", accuracy_test)
print("Test F1-score:", f1_test)

In [None]:
# MLP
# The code is sampled from the EE559 Github page "https://github.com/keithchugg/ee559_spring2023/blob/main/lecture/fmnist_mlp_torch.ipynb"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Define MLP model
class MLP(nn.Module):
    def __init__(self, n_features, n_classes, n_hidden):
        super().__init__()
        self.fc1 = nn.Linear(n_features, n_hidden)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(n_hidden, n_classes)

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        return x
    
# Parameters
n_hidden = 128
n_epochs = 40
batch_size = 64
learning_rate = 0.01
l2_reg = 0.001 
n_features = X_data.shape[1]
n_classes = 2
n_splits = 5

# Convert data to PyTorch tensors
X_tensor = torch.tensor(X_data.toarray(), dtype=torch.float32)
y_tensor = torch.tensor(y_data, dtype=torch.long)

# Convert test data to PyTorch tensors
X_test_tensor = torch.tensor(X_test.toarray(), dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)

# Create a DataLoader for test data
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Define KFold cross-validator
kfold = KFold(n_splits=n_splits, shuffle=True, random_state=42)


# Training with cross-validation
for fold, (train_indices, val_indices) in enumerate(kfold.split(X_tensor, y_tensor)):
    print(f"Fold {fold + 1}")

    X_train, y_train = X_tensor[train_indices], y_tensor[train_indices]
    X_val, y_val = X_tensor[val_indices], y_tensor[val_indices]

    train_dataset = TensorDataset(X_train, y_train)
    val_dataset = TensorDataset(X_val, y_val)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    model = MLP(n_features, n_classes, n_hidden).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=learning_rate, weight_decay=l2_reg)

    train_loss_list = []
    val_accuracy_list = []
    f1_list = []
    val_accuracy_list_fold = []
    f1_list_fold = []


    # Training loop
    for epoch in range(n_epochs):
        model.train()
        running_loss = 0.0
        for batch_X, batch_y in train_loader:
            batch_X = batch_X.to(device)
            batch_y = batch_y.to(device)

            optimizer.zero_grad()
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        train_loss_list.append(running_loss / len(train_loader))
        
        model.eval()
        all_predictions = []
        all_labels = []
        with torch.no_grad():
            for batch_X, batch_y in val_loader:
                batch_X = batch_X.to(device)
                batch_y = batch_y.to(device)

                outputs = model(batch_X)
                _, predicted = torch.max(outputs, 1)
                all_predictions.extend(predicted.cpu().numpy())
                all_labels.extend(batch_y.cpu().numpy())

        accuracy = accuracy_score(all_labels, all_predictions)
        f1 = f1_score(all_labels, all_predictions, average='binary', pos_label=1)
        val_accuracy_list.append(accuracy)
        f1_list.append(f1)

    val_accuracy_list_fold.append(accuracy)
    f1_list_fold.append(f1)
        
    print(f"Accuracy: {accuracy:.4f}")
    print(f"F1-score: {f1:.4f}")

print(f"Average Validation Accuracy: {np.mean(val_accuracy_list_fold):.4f}")
print(f"Average Validation F1-score: {np.mean(f1_list_fold):.4f}")

plt.figure()
plt.plot(range(1, n_epochs+1), train_loss_list, label="Train Loss")
plt.plot(range(1, n_epochs+1), val_accuracy_list, label="Validation Accuracy")
plt.plot(range(1, n_epochs+1), f1_list, label="Validation F1 Score")
plt.xlabel("Epochs")
plt.ylabel("Loss | Accuracy | F1 Score")
plt.title("Loss | Accuracy | F1 Score vs Epoch")
plt.legend()
plt.show()

# Evaluate the model on test data
model.eval()
test_predictions = []
test_labels = []
with torch.no_grad():
    for batch_X, batch_y in test_loader:
        batch_X = batch_X.to(device)
        batch_y = batch_y.to(device)

        outputs = model(batch_X)
        _, predicted = torch.max(outputs, 1)
        test_predictions.extend(predicted.cpu().numpy())
        test_labels.extend(batch_y.cpu().numpy())

# Calculate accuracy and F1 score on test data
test_accuracy = accuracy_score(test_labels, test_predictions)
test_f1 = f1_score(test_labels, test_predictions, average='binary', pos_label=1)

print(f"Test Accuracy: {test_accuracy:.4f}")
print(f"Test F1-score: {test_f1:.4f}")
