In [None]:
import pandas as pd
from google.colab import drive

drive.mount('/content/drive')

file_path = "/content/drive/My Drive/cleaned_data_combined_modified.csv"
df = pd.read_csv(file_path)

df.head()


Mounted at /content/drive


Unnamed: 0,id,"Q1: From a scale 1 to 5, how complex is it to make this food? (Where 1 is the most simple, and 5 is the most complex)",Q2: How many ingredients would you expect this food item to contain?,Q3: In what setting would you expect this food to be served? Please check all that apply,Q4: How much would you expect to pay for one serving of this food item?,Q5: What movie do you think of when thinking of this food item?,Q6: What drink would you pair with this food item?,"Q7: When you think about this food item, who does it remind you of?",Q8: How much hot sauce would you add to this food item?,Label
0,716549,3,6,"Week day lunch,At a party,Late night snack",5,Cloudy with a Chance of Meatballs,Coke,Friends,A little (mild),Pizza
1,715742,4,"bread, meet","Week day lunch,At a party,Late night snack",5$ for a large piece,All sort of american young boy movies,Coke,"Friends,Teachers,Strangers",,Pizza
2,727333,3,5,"Week day lunch,Week day dinner,Weekend lunch,W...",10dollar,action movie,cola,Friends,A moderate amount (medium),Pizza
3,606874,4,6-7,"Week day lunch,Week day dinner,Weekend lunch,W...",$3,Mamma Mia,Soda,"Siblings,Friends,Teachers",I will have some of this food item with my hot...,Pizza
4,505318,2,3 or more,"Week day lunch,Week day dinner,Weekend lunch,W...",$5,Cloudy with a chance of meatballs,Soda,"Siblings,Friends",A little (mild),Pizza


**Data Processing**

In [None]:
import pandas as pd
import numpy as np
import re
import json
from collections import Counter

# LOAD DATA
df = pd.read_csv("/content/drive/MyDrive/cleaned_data_combined_modified.csv")

# NUMERIC CLEANING
def extract_numeric(value):
    if pd.isnull(value):
        return None
    numbers = re.findall(r"\d+\.?\d*", str(value))
    return np.mean([float(n) for n in numbers]) if numbers else None

numerical_columns = [
    "Q1: From a scale 1 to 5, how complex is it to make this food? (Where 1 is the most simple, and 5 is the most complex)",
    "Q2: How many ingredients would you expect this food item to contain?",
    "Q4: How much would you expect to pay for one serving of this food item?"
]

# Apply numeric extraction
for col in numerical_columns:
    df[col] = df[col].apply(extract_numeric)

# Fill missing with mean
for col in numerical_columns:
    mean_val = df[col].mean()
    df[col] = df[col].fillna(mean_val)

# TEXT CLEANING
text_cols = [
    "Q3: In what setting would you expect this food to be served? Please check all that apply",
    "Q5: What movie do you think of when thinking of this food item?",
    "Q6: What drink would you pair with this food item?",
    "Q7: When you think about this food item, who does it remind you of?"
]

for col in text_cols:
    df[col] = df[col].fillna("none").str.lower().str.strip()

# BINARY BAG-OF-WORDS
def simple_bow(df, column_name, prefix):
    vocab = set()
    tokenized = []

    # Tokenize and build vocab
    for text in df[column_name]:
        tokens = re.findall(r'\b\w+\b', text)
        tokenized.append(tokens)
        vocab.update(tokens)

    vocab = sorted(vocab)
    vocab_index = {word: i for i, word in enumerate(vocab)}

    # Create binary BoW matrix
    bow_matrix = np.zeros((len(df), len(vocab)), dtype=int)
    for i, tokens in enumerate(tokenized):
        for token in tokens:
            if token in vocab_index:
                bow_matrix[i, vocab_index[token]] = 1

    return pd.DataFrame(bow_matrix, columns=[f"{prefix}_{word}" for word in vocab])

# Apply BoW to each column
bow_frames = []
for i, col in enumerate(text_cols):
    bow_frames.append(simple_bow(df, col, f"Q{i+3}"))

df = pd.concat([df] + bow_frames, axis=1)
df.drop(columns=text_cols, inplace=True)

# ONE-HOT ENCODING: Q8
hot_sauce_map = {
    "A little (mild)": "Mild",
    "A moderate amount (medium)": "Medium",
    "A lot (hot)": "Hot",
    "I will have some of this food item with my hot sauce": "Medium"
}
q8_col = "Q8: How much hot sauce would you add to this food item?"
df["Q8_cleaned"] = df[q8_col].map(hot_sauce_map).fillna("None")

# Manually one-hot encode
for category in df["Q8_cleaned"].unique():
    df[f"Q8_cleaned_{category}"] = (df["Q8_cleaned"] == category).astype(int)
df.drop(columns=[q8_col, "Q8_cleaned"], inplace=True)

# CLEAN TYPES
df.dropna(inplace=True)
for col in df.columns:
    if df[col].dtype in ["float64", "bool"]:
        df[col] = df[col].astype(int)

# SPLIT FEATURES AND LABEL
X = df.drop(columns=["Label"])
y = df["Label"]

# STRATIFIED SPLIT
def stratified_split(X, y, test_size=0.3, random_state=42):
    np.random.seed(random_state)
    X = X.reset_index(drop=True)
    y = y.reset_index(drop=True)
    train_idx, test_idx = [], []

    for label in y.unique():
        idx = y[y == label].index.tolist()
        np.random.shuffle(idx)
        split = int(len(idx) * (1 - test_size))
        train_idx += idx[:split]
        test_idx += idx[split:]

    return (
        X.iloc[train_idx].values.astype(np.float64),
        X.iloc[test_idx].values.astype(np.float64),
        y.iloc[train_idx].values,
        y.iloc[test_idx].values
    )

X_train, X_test, y_train, y_test = stratified_split(X, y)

# Convert X_test (NumPy) back to DataFrame using original column names
X_test_df = pd.DataFrame(X_test, columns=X.columns)

# Add labels back
X_test_df["Label"] = y_test

# Save to a test CSV
X_test_df.to_csv("/content/drive/MyDrive/test_split_only.csv", index=False)

print("Shape:", X.shape, "| Train:", X_train.shape, "| Test:", X_test.shape)


Shape: (1644, 1367) | Train: (1149, 1367) | Test: (495, 1367)


**Naive Bayes**

In [None]:
import numpy as np
import random
import pickle

class NaiveBayesClassifier():
    def __init__(self, *, a=2, b=2, split=90, N=100) -> None:
        self.a = a
        self.b = b
        self.split = split
        self.N = int(N)

    def _map_pi_theta(self, X, y):
        a = self.a
        b = self.b
        N, vocab_size = X.shape[0], X.shape[1]
        pi = 0
        theta = np.zeros([vocab_size, 3])

        X_pizza = X[y == "Pizza"]
        X_sushi = X[y == "Sushi"]
        X_shawarma = X[y == "Shawarma"]

        N_pizza = X_pizza.shape[0]
        N_sushi = X_sushi.shape[0]
        N_shawarma = X_shawarma.shape[0]

        theta[:, 0] = (np.matmul(np.transpose(X_pizza), np.ones(N_pizza)) + a - 1) / (N_pizza + a + b - 2)
        theta[:, 1] = (np.matmul(np.transpose(X_sushi), np.ones(N_sushi)) + a - 1) / (N_sushi + a + b - 2)
        theta[:, 2] = (np.matmul(np.transpose(X_shawarma), np.ones(N_shawarma)) + a - 1) / (N_shawarma + a + b - 2)

        pi = [N_pizza/N, N_sushi/N, N_shawarma/N]

        return pi, theta

    def _training_subset(self, X, y):
        percent_split = self.split
        X_random = np.array(X.copy())
        y_random = np.array(y.copy())

        p = np.random.permutation(len(y_random))
        X_random, y_random = X_random[p], y_random[p]

        slice1 = int(np.floor(percent_split * len(y_random) / 100))
        return X_random[:slice1], y_random[:slice1]

    def _single_prediction(self, X, pi, theta):
      results = []

      # Use log-probabilities instead of exponentiating
      log_pi = np.log(pi)

      log_pizza = np.matmul(X, np.log(theta[:, 0])) + np.matmul(1 - X, np.log(1 - theta[:, 0])) + log_pi[0]
      results.append(log_pizza)

      log_sushi = np.matmul(X, np.log(theta[:, 1])) + np.matmul(1 - X, np.log(1 - theta[:, 1])) + log_pi[1]
      results.append(log_sushi)

      log_shawarma = np.matmul(X, np.log(theta[:, 2])) + np.matmul(1 - X, np.log(1 - theta[:, 2])) + log_pi[2]
      results.append(log_shawarma)

      # Compare log scores directly
      y = np.argmax(results, axis=0)
      return y

    def fit(self, X, y, sample_weight=None):
        pi_map = []
        theta_map = []
        N = self.N
        for i in range(N):
            X_batch, y_batch = self._training_subset(X, y)
            pi_map_temp, theta_map_temp = self._map_pi_theta(X_batch, y_batch)
            pi_map.append(pi_map_temp)
            theta_map.append(theta_map_temp)

        self.pi_map = np.mean(pi_map, axis=0)
        self.theta_map = np.mean(theta_map, axis=0)
        self.theta_map = np.clip(self.theta_map, 1e-9, 1 - 1e-9)
        return self

    def predict(self, X):
        N = self.N
        pi = self.pi_map
        theta = self.theta_map
        y_temp = self._single_prediction(X, pi, theta)

        y_map = ["Pizza" if x==0 else "Sushi" if x==1 else "Shawarma" for x in y_temp]
        return np.array(y_map)

    def get_params(self, deep=False):
        if deep:
            params = {}
            for parameter, value in self:
                params[parameter] = value
            return params
        else:
            return {"a": self.a, "b": self.b, "N": self.N, "split": self.split}

    def set_params(self, **parameters):
        for parameter, value in parameters.items():
            setattr(self, parameter, value)
        return self

    def save(self, filename: str):
        try:
            with open(filename, "wb") as f:
                params = {"pi": self.pi_map, "theta": self.theta_map}
                pickle.dump(params, f)
            print(f"Success! Naive Bayes exported to {filename}.")
        except pickle.PicklingError:
            print("Error: Naive Bayes could not be pickled. Double-check the types of data")
        except Exception as e:
            print(f"Unexpected error: {e}")

    def load_pretrained(self, filename: str):
        try:
            with open(filename, "rb") as f:
                params = pickle.load(f)
                self.pi_map = params["pi"]
                self.theta_map = params["theta"]
            print(f"Success! Pre-trained Naive Bayes loaded from {filename}.")
        except pickle.UnpicklingError:
            print(f"Error: either {filename} is not a valid pickle file or is corrupted.")
        except Exception as e:
            print(f"Unexpected error: {e}")

    def predict_proba(self, X):
        """Return probability estimates for each class (needed for soft voting)."""
        if not hasattr(self, 'pi_map') or not hasattr(self, 'theta_map'):
            raise ValueError("Model must be trained before calling predict_proba.")

        log_pi = np.log(self.pi_map)
        log_theta = np.log(self.theta_map)
        log_1_minus_theta = np.log(1 - self.theta_map)

        log_probs = np.zeros((X.shape[0], 3))  # 3 classes

        for i in range(3):
            log_prob_class = np.matmul(X, log_theta[:, i]) + np.matmul(1 - X, log_1_minus_theta[:, i]) + log_pi[i]
            log_probs[:, i] = log_prob_class

        # Convert log-probs to actual probabilities using softmax for stability
        max_log_probs = np.max(log_probs, axis=1, keepdims=True)
        exp_log_probs = np.exp(log_probs - max_log_probs)
        probs = exp_log_probs / np.sum(exp_log_probs, axis=1, keepdims=True)

        return {
            "Pizza": probs[:, 0],
            "Sushi": probs[:, 1],
            "Shawarma": probs[:, 2]
        }


**Random Forest**

In [None]:
import numpy as np
import pandas as pd
import copy
from typing import Tuple, Any, Optional, Union, List
from collections import Counter
import random
import math
import pickle
import re

class DecisionTreeClassifier:
    """ My implementation of sklearn's Decision Tree Classifier.

    Note: this assumes that ALL features are discrete i.e. categorical.

    SOURCES:
    - https://medium.com/@cristianleo120/master-decision-trees-and-building-them-from-scratch-in-python-af173dafb836
    - https://www.kaggle.com/code/fareselmenshawii/decision-tree-from-scratch

    """
    def __init__(self, min_samples_split: int = 10, in_forest: bool = False, random_state: Optional[int] = None):
        """ Initialize the decision tree classifier.

        - min_samples_split: the minimum number of samples required to split an internal node.
          I left out max_depth because during initial testing, the best value for it was None.
        - in_forest: a boolean value which is True iff this tree is part of a forest
        - random_state: an integer for reproducibility
        - tree: the decision tree, as a nested dictionary...who is OOP idk her
        """
        self.min_samples_split = min_samples_split
        self.in_forest = in_forest
        self.tree = None
        self.random_state = random_state

    def fit(self, X: np.ndarray, y: np.ndarray):
        """ Train the model by building the tree based on X & y.

        - X: training data matrix
        - y: associated labels
        Please ensure that both X & y are numpy arrays and not pandas dataframes
        or series.
        Assumes that all features are categorical, so make sure floats are rounded
        to ints.
        """
        self.tree = self.build_tree(X, y)

    def build_tree(self, X: np.ndarray, y: np.ndarray) -> dict:
        """ Train the model and return a dictionary representing the decision tree.

        - X: training data matrix
        - y: associated labels

        The dictionary can be either a leaf or a subtree node representing a split
        e.g.,
        - leaf node {'label': 'Shawarma'}
        - subtree node: {'feature': 'Avengers', 'value': 1, 'left': [data points['Avengers'] == 1] ,
          'right': [all other data points] }
        """
        # base case 1: all samples have same label
        if len(np.unique(y)) <= 1:
            return {'label': np.unique(y)[0]}
        # base case 2: min_samples_split reached
        if len(y) < self.min_samples_split:
            unique_vals, counts = np.unique(y, return_counts=True)
            majority_label = unique_vals[np.argmax(counts)]
            return {'label': majority_label}

        # recursive case
        best_split = self.find_best_split(X, y)
        if best_split is None:
            unique_vals, counts = np.unique(y, return_counts=True)
            majority_label = unique_vals[np.argmax(counts)]
            return {'label': majority_label}

        feature, split_on, left_X, left_y, right_X, right_y = best_split
        # build subtrees recursively
        left_subtree = self.build_tree(left_X, left_y)
        right_subtree = self.build_tree(right_X, right_y)

        return {'feature': feature,
                'value': split_on,
                'left': left_subtree,
                'right': right_subtree}

    def find_best_split(self, X: np.ndarray, y: np.ndarray) -> Optional[Tuple[str, Union[int, float, str], np.ndarray, np.ndarray, np.ndarray, np.ndarray]]:
        """ Use Gini impurity to find the find the best feature and value to split on using Gini impurity.

        - X: training data matrix
        - y: associated labels

        Returns a tuple containing the best feature, value, and its split:
        left_X, left_y, right_X, right_y, or None if no split is found.
        """
        best_gini = float('inf')
        best_split = None

        # if tree is part of forest, randomly choose subset of features
        # subset size is sqrt(n_features)
        n_features = X.shape[1]
        features = list(range(n_features))
        if self.in_forest:
            max_features = round(math.sqrt(n_features))
            if self.random_state:
                random_state = np.random.RandomState(self.random_state) # to avoid impacting global numpy state
                features = random_state.choice(features, max_features, replace=False).tolist()
            else:
                features = np.random.choice(features, max_features, replace=False).tolist()

        for feature in features:  # test each feature
            col = X[:, feature]
            vals = np.unique(col)
            for val in vals:  # test each split of this feature: one side == value, rest == not value
                mask_left = col == val  # masque for vectorrrrization
                mask_right = ~mask_left
                y_l = y[mask_left]
                y_r = y[mask_right]
                # i am paranoid that a split w min samples will escape to here
                if len(y_l) >= self.min_samples_split and len(y_r) >= self.min_samples_split:
                    # calc Gini impurity of split
                    split_impurity = self.split_impurity(y_l, y_r)
                    if split_impurity < best_gini:
                        best_gini = split_impurity
                        best_split = (feature, val, X[mask_left], y_l, X[mask_right], y_r)

        return best_split

    def gini(self, y: np.ndarray) -> float:
        """ Return the Gini impurity of a series y. Gini impurity = how often a
        random datapoint would be labelled incorrectly. """
        # Gini = 1 - sum(p^2)
        if len(y) == 0:
            return 0.0
        unique_vals, class_counts = np.unique(y, return_counts=True)
        probabilities = class_counts / len(y)
        impurity = 1 - np.sum(probabilities ** 2)
        return impurity

    def split_impurity(self, left_y: np.ndarray, right_y: np.ndarray) -> float:
        """ Return the Gini impurity for a split. Remember: a lower Gini score
        means the split is better.

        - left_y: labels for the left data points (== value) after the split.
        - right_y: labels for the right data points (!= value) after the split.
        """
        # gini impurity for a split = n_left / n_total * Gini_left + n_right / n_total * Gini_right
        Gini_l = self.gini(left_y)
        Gini_r = self.gini(right_y)
        n_left = len(left_y)
        n_right = len(right_y)
        n_total = n_left + n_right

        split_impurity = (n_left / n_total * Gini_l) + (n_right / n_total * Gini_r)
        return split_impurity

    def predict(self, X: np.ndarray) -> np.ndarray:
        """ Return the predicted class for each data point in X.
        Please ensure that X is a numpy array and not a pandas dataframe or series.
        Assumes that all features are categorical, so make sure floats are rounded
        to ints.
        """
        predictions = np.array([self.predict_single(row, self.tree) for row in X])
        return predictions

    def predict_single(self, row: np.ndarray, tree: dict) -> Any:
        """ For ONE test point, go down the decision tree and return the
        predicted class.

        - row: a single test data point
        - tree: the current decision tree/subtree/leaf we are at
        """
        # base case: there is a leaf in self.tree
        if 'label' in tree:
            return tree['label']

        # recursive case: there is a tree with branches in self.tree
        if row[tree['feature']] == tree['value']:  # left subtree
            return self.predict_single(row, tree['left'])
        else:  # right subtree
            return self.predict_single(row, tree['right'])


class RandomForestClassifier():
    """ My implementation of sklearn's RFC.
    Bootstrapping and categorical features are assumed. Uses sqrt for max_features.

    SOURCES:
    - https://medium.com/@enozeren/building-a-random-forest-model-from-scratch-81583cbaa7a9
    """
    def __init__(self, n_estimators: int = 200, max_samples: Optional[float] = None, min_samples_split: int = 10, random_state: Optional[int] = None) -> None:
        """ Initialize the RFC.

        - n_estimators: the number of individual decision trees in this forest
        - min_samples_split: the minimum number of samples needed to split an internal node
        - max_samples: a float in the range (0.0, 1.0] which specifies the size of the bootstrap batch
        used to train each individual tree, as a proportion of the original dataset size.
        - random_state: for reproducibility
        """
        if max_samples:
            if (max_samples <= 0.0) or (max_samples > 1.0):
                raise Exception("max_samples must be in the range (0.0, 1.0].")

        self.n_estimators = n_estimators
        self.min_samples_split = min_samples_split
        self.max_samples = max_samples
        self.n_classes = None
        self.trees = []
        self.random_state = random_state

    def get_bootstrap_sample(self, X: np.ndarray, y: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """ Get one bootstrap sample the size of self.max_samples * N (where N is the original
        number of samples). Return this as a tuple of [X_sample, y_sample].

        - X: training data matrix
        - y: associated labels
        """
        if self.random_state:  # don't touch global numpy state
            random_state = np.random.RandomState(self.random_state)

        n = X.shape[0]  # no. samples
        if self.max_samples:  # max_samples is not None
            bstrap_size = max(round(n * self.max_samples), 1)
            if self.random_state:
                indices = random_state.choice(n, size=bstrap_size, replace=True)  # sample w replacement
            else:
                indices = np.random.choice(n, size=bstrap_size, replace=True)  # sample w replacement
        else:  # max_samples is None, sample full thing w/replacement (does not usually yield same dataset!)
            if self.random_state:
                indices = random_state.choice(n, size=n, replace=True)
            else:
                indices = np.random.choice(n, size=n, replace=True)

        X_boot = X[indices]
        y_boot = y[indices]
        return X_boot, y_boot

    def fit(self, X: np.ndarray, y: np.ndarray):
        """ Fit the RFC to the training data.

        - X: training data matrix
        - y: associated labels

        Please ensure that both X and y are numpy arrays and not pandas dataframes
        or series.
        Assumes that all features are categorical, so make sure floats are rounded
        to ints.
        """
        self.n_classes = len(np.unique(y))
        for i in range(self.n_estimators):
            X_boot, y_boot = self.get_bootstrap_sample(X, y)
            tree = DecisionTreeClassifier(min_samples_split=self.min_samples_split, in_forest=True, random_state=self.random_state)
            tree.fit(X_boot, y_boot)
            self.trees.append(tree)

    def predict(self, X: np.ndarray) -> np.ndarray:
        """ Predict class labels for a set of samples.

        Please ensure that X is a numpy array and not a pandas dataframe or series.
        Assumes that all features are categorical, so make sure floats are rounded
        to ints.
        """
        tree_predictions = np.empty((self.n_estimators, X.shape[0]), dtype=object)

        # get predictions from each tree
        for i, tree in enumerate(self.trees):
            tree_predictions[i, :] = tree.predict(X)

        # take the mode (most common value) across trees for each sample
        majority_votes = []
        for i in range(X.shape[0]):
            sample_votes = tree_predictions[:, i]
            unique_values, counts = np.unique(sample_votes, return_counts=True)
            majority_vote = unique_values[np.argmax(counts)]
            majority_votes.append(majority_vote)

        return np.array(majority_votes)

    def save(self, filename: str):
        """ Save the RFC model to a pickle file with filename so it can be used
        out-of-the-box.
        filename must be a pickle (.pkl) file, e.g. "RFC_pretrained.pkl".
        """
        try:
            with open(filename, "wb") as f:
                pickle.dump(self.trees, f)
            print(f"Success! RFC exported to {filename}.")
        except pickle.PicklingError:
            print("Error: RFC could not be pickled. Double-check the types of data in the tree :(")
        except Exception as e:
            print(f"Unexpected error: {e}")

    def load_pretrained(self, filename: str):
        """ Load a pretrained RFC from filename.
        filename must be a pickle (.pkl) file which contains a list of dictionaries,
        where each dict represents one decision tree in the forest. """
        try:
            with open(filename, "rb") as f:
                self.trees = pickle.load(f)
            print(f"Success! Pre-trained RFC loaded from {filename}.")
        except pickle.UnpicklingError:
            print(f"Error: either {filename} is not a valid pickle file or is corrupted.")
        except Exception as e:
            print(f"Unexpected error: {e}")

    def predict_proba(self, X: np.ndarray) -> dict:
        """
        Returns predicted probabilities as a dictionary: {class_label: probability_vector}
        Each vector contains the estimated probability of that class for each input row.
        """
        n_samples = X.shape[0]
        class_labels = ["Pizza", "Sushi", "Shawarma"]
        class_index = {label: i for i, label in enumerate(class_labels)}
        votes = np.zeros((n_samples, len(class_labels)))

        for tree in self.trees:
            preds = tree.predict(X)
            for i, pred in enumerate(preds):
                votes[i, class_index[pred]] += 1

        probs = votes / len(self.trees)
        return {label: probs[:, idx] for idx, label in enumerate(class_labels)}


**Logistic Regression**

In [None]:
import numpy as np
import pickle

class CustomLogisticRegression:
    def __init__(self, lr=0.01, epochs=1000, reg_strength=0.0):
        self.lr = lr
        self.epochs = epochs
        self.reg_strength = reg_strength
        self.weights = None
        self.bias = 0.0

    def sigmoid(self, z):
        z = np.clip(z, -500, 500)  # prevent overflow
        return 1 / (1 + np.exp(-z))

    def fit(self, X, y):
        X = np.array(X, dtype=np.float64)
        y = np.array(y, dtype=np.float64)

        # Normalize X
        self.X_min = X.min(axis=0)
        self.X_max = X.max(axis=0)
        X = (X - self.X_min) / (self.X_max - self.X_min + 1e-8)

        m, n = X.shape
        self.weights = np.zeros(n, dtype=np.float64)
        self.bias = 0.0

        for epoch in range(self.epochs):
            linear = np.dot(X, self.weights) + self.bias
            predictions = self.sigmoid(linear)
            error = predictions - y
            dw = (np.dot(X.T, error) + self.reg_strength * self.weights) / m
            db = np.sum(error) / m

            self.weights -= self.lr * dw
            self.bias -= self.lr * db

            if epoch % 100 == 0 or epoch == self.epochs - 1:
                loss = -np.mean(y * np.log(predictions + 1e-9) + (1 - y) * np.log(1 - predictions + 1e-9))

    def predict_proba(self, X):
        X = np.array(X, dtype=np.float64)
        X = (X - self.X_min) / (self.X_max - self.X_min + 1e-8)
        return self.sigmoid(np.dot(X, self.weights) + self.bias)

    def predict(self, X):
        return (self.predict_proba(X) >= 0.5).astype(int)

    def save(self, filename):
        with open(filename, "wb") as f:
            pickle.dump(self.classifiers, f)

    def load_pretrained(self, filename):
        with open(filename, "rb") as f:
            self.classifiers = pickle.load(f)
            self.classes = np.array(list(self.classifiers.keys()))


class CustomMulticlassLogisticRegression:
    def __init__(self, lr=0.01, epochs=1000, reg_strength=0.0):
        self.lr = lr
        self.epochs = epochs
        self.reg_strength = reg_strength
        self.classifiers = {}

    def fit(self, X, y):
        self.classes = np.unique(y)
        for cls in self.classes:
            y_binary = (y == cls).astype(int)
            model = CustomLogisticRegression(
                lr=self.lr, epochs=self.epochs, reg_strength=self.reg_strength
            )
            model.fit(X, y_binary)
            self.classifiers[cls] = model

    def predict_proba(self, X):
        return {cls: model.predict_proba(X) for cls, model in self.classifiers.items()}

    def predict(self, X):
        probs = self.predict_proba(X)
        probs_matrix = np.column_stack([probs[cls] for cls in self.classes])
        return self.classes[np.argmax(probs_matrix, axis=1)]

    def save(self, filename):
        with open(filename, "wb") as f:
            pickle.dump(self.classifiers, f)

    def load_pretrained(self, filename):
        with open(filename, "rb") as f:
            self.classifiers = pickle.load(f)
            self.classes = np.array(list(self.classifiers.keys()))



**Training the Models**

In [123]:
import pandas as pd
import numpy as np
import re
import json
from collections import Counter
from sklearn.metrics import accuracy_score, classification_report

# Save column names used in training after preprocessing
training_feature_names = list(X.columns)

with open("/content/drive/MyDrive/final_feature_names.json", "w") as f:
    json.dump(training_feature_names, f)

# Train and Save All 3 Models
# Grid Search Values
# a_values = [0.5, 1, 2, 3, 5]
# b_values = [0.5, 1, 2, 3, 5]
# N_values = [10, 25, 50, 75, 100, 150]
# split_values = [70, 80, 85, 90, 95]

nb_model = NaiveBayesClassifier(a=2, b=2, N=50, split=90)
nb_model.fit(X_train, y_train)
nb_preds = nb_model.predict(X_test)
print("\nNaive Bayes Accuracy:", round(accuracy_score(y_test, nb_preds), 4))
print("Classification Report (Naive Bayes):")
print(classification_report(y_test, nb_preds))

nb_model_data = {
    "pi": nb_model.pi_map.tolist(),      # convert numpy array to list (optional for readability)
    "theta": nb_model.theta_map.tolist()
}

with open("final_nb_model.pkl", "wb") as f:
    pickle.dump(nb_model_data, f)

# nb_model.save("/content/drive/MyDrive/final_nb_model.pkl")

# Grid Search Values
# lr_list = [0.01, 0.05, 0.1, 0.2]
# epochs_list = [1000, 2000, 3000, 4000, 5000]
# reg_strength_list = [0.01, 0.1, 0.5, 1.0]

lr_model = CustomMulticlassLogisticRegression(lr=0.1, epochs=4000, reg_strength=0.1)
lr_model.fit(X_train, y_train)
lr_preds = lr_model.predict(X_test)
print("\nLogistic Regression Accuracy:", round(accuracy_score(y_test, lr_preds), 4))
print("Classification Report (Logistic Regression):")
print(classification_report(y_test, lr_preds))
# Manually extract each binary classifier's parameters
lr_model_data = {}
for cls, model in lr_model.classifiers.items():
    lr_model_data[cls] = {
        "weights": model.weights.tolist(),
        "bias": model.bias,
        "X_min": model.X_min.tolist(),
        "X_max": model.X_max.tolist()
    }

# Save all classifiers into one dict
with open("/content/drive/MyDrive/final_lr_model.pkl", "wb") as f:
    pickle.dump(lr_model_data, f)

# Grid search
# n_estimators_list = [100, 200, 300, 500, 750, 1000]
# min_samples_split_list = [2, 4, 5, 8, 10, 15]
# max_samples_list = [0.4, 0.5, 0.6, 0.8, 1.0]

# best_acc = 0
# best_params = None
# best_model = None

# param_grid = list(itertools.product(n_estimators_list, min_samples_split_list, max_samples_list))
# print(f"Trying {len(param_grid)} combinations...")

# for n_estimators, min_samples_split, max_samples in param_grid:
#     print(f"\nTrying: n_estimators={n_estimators}, min_samples_split={min_samples_split}, max_samples={max_samples}")

#     model = RandomForestClassifier(
#         n_estimators=n_estimators,
#         min_samples_split=min_samples_split,
#         max_samples=max_samples
#     )
#     model.fit(X_train, y_train)
#     preds = model.predict(X_test)
#     acc = accuracy_score(y_test, preds)

#     print(f"   → Accuracy: {round(acc, 4)}")

#     if acc > best_acc:
#         best_acc = acc
#         best_params = (n_estimators, min_samples_split, max_samples)
#         best_model = model

# print("\nBest Hyperparameters:")
# print(f"n_estimators={best_params[0]}, min_samples_split={best_params[1]}, max_samples={best_params[2]}")
# print(f"Best Accuracy: {round(best_acc, 4)}")

# best_model.save("/content/drive/MyDrive/final_rf_model.pkl")

# print(classification_report(y_test, best_model.predict(X_test)))

rf_model = RandomForestClassifier(n_estimators=100, min_samples_split=2, max_samples=1.0)
rf_model.fit(X_train, y_train)
rf_preds = rf_model.predict(X_test)
print("\nRandom Forest Accuracy:", round(accuracy_score(y_test, rf_preds), 4))
print("Classification Report (Random Forest):")
print(classification_report(y_test, rf_preds))
# Extract all trees as dictionaries
rf_model_data = [tree.tree for tree in rf_model.trees]

# Save as a plain list of dicts (safe for loading)
with open("/content/drive/MyDrive/final_rf_model.pkl", "wb") as f:
    pickle.dump(rf_model_data, f)



Naive Bayes Accuracy: 0.8747
Classification Report (Naive Bayes):
              precision    recall  f1-score   support

       Pizza       0.83      0.95      0.88       165
    Shawarma       0.87      0.90      0.88       165
       Sushi       0.95      0.78      0.86       165

    accuracy                           0.87       495
   macro avg       0.88      0.87      0.87       495
weighted avg       0.88      0.87      0.87       495


Logistic Regression Accuracy: 0.8889
Classification Report (Logistic Regression):
              precision    recall  f1-score   support

       Pizza       0.87      0.94      0.90       165
    Shawarma       0.89      0.88      0.88       165
       Sushi       0.91      0.85      0.88       165

    accuracy                           0.89       495
   macro avg       0.89      0.89      0.89       495
weighted avg       0.89      0.89      0.89       495


Random Forest Accuracy: 0.8545
Classification Report (Random Forest):
              pre

**Save Complete Pkl Files for Testing**

In [132]:
nb_model = NaiveBayesClassifier(a=2, b=2, N=50, split=90)
nb_model.fit(X_train, y_train)

with open("/content/drive/MyDrive/final_nb_model_full.pkl", "wb") as f:
    pickle.dump(nb_model, f)

lr_model = CustomMulticlassLogisticRegression(lr=0.1, epochs=4000, reg_strength=0.1)
lr_model.fit(X_train, y_train)

with open("/content/drive/MyDrive/final_lr_model_full.pkl", "wb") as f:
    pickle.dump(lr_model, f)

rf_model = RandomForestClassifier(n_estimators=100, min_samples_split=2, max_samples=1.0)
rf_model.fit(X_train, y_train)

with open("/content/drive/MyDrive/final_rf_model_full.pkl", "wb") as f:
    pickle.dump(rf_model, f)


**Voting Classifier **

In [135]:
import numpy as np
import pickle
from sklearn.metrics import accuracy_score, classification_report

# --- Load Full Pretrained Models ---
with open("/content/drive/MyDrive/final_nb_model_full.pkl", "rb") as f:
    nb = pickle.load(f)

with open("/content/drive/MyDrive/final_lr_model_full.pkl", "rb") as f:
    lr = pickle.load(f)

with open("/content/drive/MyDrive/final_rf_model_full.pkl", "rb") as f:
    rf = pickle.load(f)

# --- Predict Probabilities ---
nb_probs = nb.predict_proba(X_test)
lr_probs = lr.predict_proba(X_test)
rf_probs = rf.predict_proba(X_test)

# --- Soft Voting Ensemble ---
class_labels = ["Pizza", "Sushi", "Shawarma"]
weights = {"nb": 0.1, "lr": 0.8, "rf": 0.1}

avg_probs = {
    label: weights["nb"] * nb_probs[label] +
           weights["lr"] * lr_probs[label] +
           weights["rf"] * rf_probs[label]
    for label in class_labels
}

final_preds = []
for i in range(X_test.shape[0]):
    label_scores = {label: avg_probs[label][i] for label in class_labels}
    final_preds.append(max(label_scores, key=label_scores.get))

# --- Evaluate ---
final_preds = np.array(final_preds)
print("\nWeighted Voting Accuracy:", round(accuracy_score(y_test, final_preds), 4))
print("Classification Report (Voting Ensemble):")
print(classification_report(y_test, final_preds))



Weighted Voting Accuracy: 0.8949
Classification Report (Voting Ensemble):
              precision    recall  f1-score   support

       Pizza       0.87      0.94      0.90       165
    Shawarma       0.90      0.89      0.89       165
       Sushi       0.93      0.85      0.89       165

    accuracy                           0.89       495
   macro avg       0.90      0.89      0.89       495
weighted avg       0.90      0.89      0.89       495



**Pred.py file**

In [126]:
import pandas as pd
import numpy as np
import pickle
import re
import json

# --- Preprocessing Parameters ---
numerical_columns = [
    "Q1: From a scale 1 to 5, how complex is it to make this food? (Where 1 is the most simple, and 5 is the most complex)",
    "Q2: How many ingredients would you expect this food item to contain?",
    "Q4: How much would you expect to pay for one serving of this food item?"
]

text_cols = [
    "Q3: In what setting would you expect this food to be served? Please check all that apply",
    "Q5: What movie do you think of when thinking of this food item?",
    "Q6: What drink would you pair with this food item?",
    "Q7: When you think about this food item, who does it remind you of?"
]

q8_col = "Q8: How much hot sauce would you add to this food item?"
hot_sauce_map = {
    "A little (mild)": "Mild",
    "A moderate amount (medium)": "Medium",
    "A lot (hot)": "Hot",
    "I will have some of this food item with my hot sauce": "Medium"
}

# --- Preprocessing ---
def extract_numeric(value):
    if pd.isnull(value):
        return None
    numbers = re.findall(r"\d+\.?\d*", str(value))
    return np.mean([float(n) for n in numbers]) if numbers else None

def simple_bow(df, column_name, prefix):
    vocab = set()
    tokenized = []
    for text in df[column_name]:
        tokens = re.findall(r'\b\w+\b', text.lower())
        tokenized.append(tokens)
        vocab.update(tokens)
    vocab = sorted(vocab)
    vocab_index = {word: i for i, word in enumerate(vocab)}
    bow_matrix = np.zeros((len(df), len(vocab)), dtype=int)
    for i, tokens in enumerate(tokenized):
        for token in tokens:
            if token in vocab_index:
                bow_matrix[i, vocab_index[token]] = 1
    return pd.DataFrame(bow_matrix, columns=[f"{prefix}_{word}" for word in vocab])

def preprocess(df):
    df = df.copy()
    if "Label" in df.columns:
        df = df.drop(columns=["Label"])

    for col in numerical_columns:
        df[col] = df[col].apply(extract_numeric)
        df[col] = df[col].fillna(df[col].mean())

    for col in text_cols:
        df[col] = df[col].fillna("none").str.lower().str.strip()

    bow_frames = []
    for i, col in enumerate(text_cols):
        bow_df = simple_bow(df, col, f"Q{i+3}")
        bow_frames.append(bow_df)

    df = pd.concat([df] + bow_frames, axis=1)
    df.drop(columns=text_cols, inplace=True)

    df["Q8_cleaned"] = df[q8_col].map(hot_sauce_map).fillna("None")
    for category in df["Q8_cleaned"].unique():
        df[f"Q8_cleaned_{category}"] = (df["Q8_cleaned"] == category).astype(int)
    df.drop(columns=[q8_col, "Q8_cleaned"], inplace=True)

    df.fillna(0, inplace=True)
    for col in df.columns:
        if df[col].dtype in ["float64", "bool"]:
            df[col] = df[col].astype(int)

    with open("/content/drive/MyDrive/final_feature_names.json", "r") as f:
        expected_columns = json.load(f)

    missing_cols = [col for col in expected_columns if col not in df.columns]
    df_missing = pd.DataFrame(0, index=df.index, columns=missing_cols)
    df = pd.concat([df, df_missing], axis=1)
    df = df[expected_columns].copy()
    return df.astype(np.float64).values

# --- Prediction ---
def predict_all(csv_path):
    df = pd.read_csv(csv_path)
    X = preprocess(df)

    with open("/content/drive/MyDrive/final_nb_model.pkl", "rb") as f:
        nb_params = pickle.load(f)

    with open("/content/drive/MyDrive/final_lr_model.pkl", "rb") as f:
        lr_params = pickle.load(f)

    with open("/content/drive/MyDrive/final_rf_model.pkl", "rb") as f:
        rf_trees = pickle.load(f)

    def nb_predict_proba(X, pi, theta):
        log_pi = np.log(pi)
        log_theta = np.log(theta)
        log_1_theta = np.log(1 - theta)
        log_probs = np.zeros((X.shape[0], 3))
        for i in range(3):
            log_probs[:, i] = X @ log_theta[:, i] + (1 - X) @ log_1_theta[:, i] + log_pi[i]
        exp = np.exp(log_probs - np.max(log_probs, axis=1, keepdims=True))
        probs = exp / np.sum(exp, axis=1, keepdims=True)
        return {
            "Pizza": probs[:, 0],
            "Sushi": probs[:, 1],
            "Shawarma": probs[:, 2]
        }

    nb_probs = nb_predict_proba(X, np.array(nb_params["pi"]), np.array(nb_params["theta"]))

    def lr_predict_proba(X, classifiers):
        probs = {}
        class_order = list(classifiers.keys())
        prob_matrix = np.zeros((X.shape[0], len(class_order)))
        for i, cls in enumerate(class_order):
            model = classifiers[cls]
            X_scaled = (X - np.array(model["X_min"])) / (np.array(model["X_max"]) - np.array(model["X_min"]) + 1e-8)
            z = np.clip(X_scaled @ np.array(model["weights"]) + model["bias"], -500, 500)
            prob_matrix[:, i] = 1 / (1 + np.exp(-z))
            probs[cls] = prob_matrix[:, i]
        return probs

    lr_probs = lr_predict_proba(X, lr_params)

    def predict_tree(tree, x):
        while isinstance(tree, dict) and 'label' not in tree:
            feature = tree["feature"]
            value = tree["value"]
            if x[feature] == value:
                tree = tree["left"]
            else:
                tree = tree["right"]
        return tree["label"]

    def rf_predict_proba(X, trees):
        class_labels = ["Pizza", "Sushi", "Shawarma"]
        label_to_idx = {l: i for i, l in enumerate(class_labels)}
        votes = np.zeros((X.shape[0], 3))
        for tree in trees:
            preds = [predict_tree(tree, x) for x in X]
            for i, p in enumerate(preds):
                votes[i][label_to_idx[p]] += 1
        probs = votes / len(trees)
        return {
            "Pizza": probs[:, 0],
            "Sushi": probs[:, 1],
            "Shawarma": probs[:, 2]
        }

    rf_probs = rf_predict_proba(X, rf_trees)

    # --- Soft Voting ---
    labels = ["Pizza", "Sushi", "Shawarma"]
    weights = {"nb": 0.1, "lr": 0.8, "rf": 0.1}
    final_probs = {}
    for label in labels:
        final_probs[label] = (
            weights["nb"] * nb_probs[label] +
            weights["lr"] * lr_probs[label] +
            weights["rf"] * rf_probs[label]
        )

    predictions = []
    for i in range(X.shape[0]):
        label_scores = {label: final_probs[label][i] for label in labels}
        best = max(label_scores, key=label_scores.get)
        predictions.append(best)

    return predictions


**Test Pred.py**

In [127]:
import pandas as pd
import numpy as np

# --- Load full original data (with raw Q1–Q8 + Label) ---
df_full = pd.read_csv("/content/drive/MyDrive/cleaned_data_combined_modified.csv")
X_full = df_full.drop(columns=["Label"])
y_full = df_full["Label"]

# --- Custom stratified split to get test indices only ---
def get_test_indices(X, y, test_size=0.3, random_state=42):
    np.random.seed(random_state)
    X = X.reset_index(drop=True)
    y = y.reset_index(drop=True)
    test_idx = []

    for label in y.unique():
        idx = y[y == label].index.tolist()
        np.random.shuffle(idx)
        split = int(len(idx) * (1 - test_size))
        test_idx += idx[split:]

    return test_idx

# --- Extract raw test set from original data ---
test_indices = get_test_indices(X_full, y_full)
df_test = df_full.iloc[test_indices]

# --- Save test split for evaluation and submission/testing ---
test_csv_path = "/content/drive/MyDrive/test_split_raw.csv"
df_test.to_csv(test_csv_path, index=False)
print(f"Saved test split with {len(df_test)} rows to:", test_csv_path)

# --- Run prediction on the raw test split ---
true_labels = df_test["Label"].values
preds = predict_all(test_csv_path)

# --- Evaluate accuracy ---
accuracy = np.mean(np.array(preds) == true_labels)
print(f"\nAccuracy on test split: {accuracy:.4f}")

# Optional: Confusion Matrix
def print_confusion_matrix(y_true, y_pred, labels=["Pizza", "Sushi", "Shawarma"]):
    matrix = np.zeros((len(labels), len(labels)), dtype=int)
    label_to_idx = {label: i for i, label in enumerate(labels)}

    for true, pred in zip(y_true, y_pred):
        matrix[label_to_idx[true]][label_to_idx[pred]] += 1

    print("\nConfusion Matrix:")
    print(pd.DataFrame(matrix, index=labels, columns=labels))

print_confusion_matrix(true_labels, preds)


Saved test split with 495 rows to: /content/drive/MyDrive/test_split_raw.csv

Accuracy on test split: 0.8949

Confusion Matrix:
          Pizza  Sushi  Shawarma
Pizza       155      4         6
Sushi        13    141        11
Shawarma     11      7       147
