In [None]:
"""

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from collections import Counter

# credits: https://github.com/AssemblyAI-Community/Machine-Learning-From-Scratch/blob/main/05%20Random%20Forests/train.py
# chatgpt, deep seek, copilot

class Node:
    def __init__(self, feature=None, threshold=None, left=None, right=None,*,value=None):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value

    def is_leaf_node(self):
        return self.value is not None
class DecisionTree:
    def __init__(self, criterion="gini", max_depth=30, min_samples_split=10, min_samples_leaf=1, n_features=None):
        self.criterion = criterion
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.min_samples_leaf = min_samples_leaf
        self.n_features = n_features
        self.root = None

    def fit(self, X, y):
        self.root = self._grow_tree(X, y)

    def _grow_tree(self, X, y, depth=0):
        n_samples, n_feats = X.shape
        n_labels = len(np.unique(y))

        # check the stopping criteria
        if (depth >= self.max_depth or n_labels == 1 or n_samples < self.min_samples_split):
            leaf_value = self._most_common_label(y)
            return Node(value=leaf_value)

        feat_idxs = np.random.choice(n_feats, self.n_features, replace=False) if self.n_features else np.arange(n_feats)

        # find the best split
        best_feature, best_thresh = self._best_split(X, y, feat_idxs)

        # create child nodes
        left_idxs, right_idxs = self._split(X[:, best_feature], best_thresh)
        left = self._grow_tree(X[left_idxs, :], y[left_idxs], depth + 1)
        right = self._grow_tree(X[right_idxs, :], y[right_idxs], depth + 1)
        return Node(best_feature, best_thresh, left, right)

    def _best_split(self, X, y, feat_idxs):
        best_gain = -1
        split_idx, split_threshold = None, None

        for feat_idx in feat_idxs:
            X_column = X[:, feat_idx]
            thresholds = np.unique(X_column)

            for thr in thresholds:
                # calculate the information gain
                gain = self._information_gain(y, X_column, thr)

                if gain > best_gain:
                    best_gain = gain
                    split_idx = feat_idx
                    split_threshold = thr

        return split_idx, split_threshold

    def _information_gain(self, y, X_column, threshold):
        # parent entropy
        parent_entropy = self._entropy(y)

        # create children
        left_idxs, right_idxs = self._split(X_column, threshold)

        if len(left_idxs) == 0 or len(right_idxs) == 0:
            return 0

        # calculate the weighted avg. entropy of children
        n = len(y)
        n_l, n_r = len(left_idxs), len(right_idxs)
        e_l, e_r = self._entropy(y[left_idxs]), self._entropy(y[right_idxs])
        child_entropy = (n_l/n) * e_l + (n_r/n) * e_r

        # calculate the IG
        information_gain = parent_entropy - child_entropy
        return information_gain

    def _split(self, X_column, split_thresh):
        left_idxs = np.argwhere(X_column <= split_thresh).flatten()
        right_idxs = np.argwhere(X_column > split_thresh).flatten()
        return left_idxs, right_idxs

    def _entropy(self, y):
        hist = np.bincount(y)
        ps = hist / len(y)
        return -np.sum([p * np.log(p) for p in ps if p > 0])

    def _most_common_label(self, y):
        if len(y) == 0:
            raise ValueError("y is empty, cannot determine most common label!")

        counter = Counter(y)

        # Check if the counter is empty
        if not counter:
            return None  # You can return a default value if needed, but in general this should not happen.

        # Get the most common label
        most_common = counter.most_common(1)[0][0]
        return most_common

    def predict(self, X):
        return np.array([self._traverse_tree(x, self.root) for x in X])

    def _traverse_tree(self, x, node):
        if node.is_leaf_node():
            return node.value

        if x[node.feature] <= node.threshold:
            return self._traverse_tree(x, node.left)
        return self._traverse_tree(x, node.right)



class RandomForest:
    def __init__(self, n_trees=100, criterion="gini", max_depth=30, min_samples_split=10, min_samples_leaf=1, n_features=None):
        self.n_trees = n_trees
        self.criterion = criterion
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.min_samples_leaf = min_samples_leaf
        self.n_features = n_features
        self.trees = []

    def fit(self, X, y):
        assert len(X) > 0 and len(y) > 0, "Input data is empty!"
        self.trees = []
        n_features = X.shape[1]

        # Calculate actual number of features to use
        if isinstance(self.n_features, str):
            if self.n_features == 'sqrt':
                n_feature = int(math.sqrt(n_features))
            elif self.n_features == 'log2':
                n_feature = int(math.log2(n_features))
            else:
                n_feature = None
        else:
            n_feature = self.n_features

        for _ in range(self.n_trees):
            tree = DecisionTree(
                criterion=self.criterion,
                max_depth=self.max_depth,
                min_samples_split=self.min_samples_split,
                min_samples_leaf=self.min_samples_leaf,  # Pass min_samples_leaf
                n_features=n_feature  # Pass the computed integer value for feature selection
            )
            X_sample, y_sample = self._bootstrap_samples(X, y)

            assert len(y_sample) > 0, "y_train is empty after bootstrapping!"

            tree.fit(X_sample, y_sample)
            self.trees.append(tree)

    def _bootstrap_samples(self, X, y):

        print(f"X_train shape: {X.shape}, y_train shape: {y.shape}")

        n_samples = X.shape[0]
        if n_samples == 0:
            return X, y  # Return original if no samples
        idxs = np.random.choice(n_samples, n_samples, replace=True)
        return X[idxs], y[idxs]

    def _most_common_label(self, y):
        if len(y) == 0:
            raise ValueError("y is empty, cannot determine most common label!")

        counter = Counter(y)

        # Check if the counter is empty
        if not counter:
            return None  # You can return a default value if needed, but in general this should not happen.

        most_common = counter.most_common(1)[0][0]
        return most_common

    def predict(self, X):
        predictions = np.array([tree.predict(X) for tree in self.trees])
        tree_preds = np.swapaxes(predictions, 0, 1)
        predictions = np.array([self._most_common_label(pred) for pred in tree_preds])
        return predictions

"""


In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from collections import Counter
import math

class Node:
    def __init__(self, feature=None, threshold=None, left=None, right=None,*,value=None):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value

    def is_leaf_node(self):
        return self.value is not None

class DecisionTree:
    def __init__(self, criterion="gini", max_depth=30, min_samples_split=2, min_samples_leaf=1, n_features=None):
        self.criterion = criterion
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.min_samples_leaf = min_samples_leaf
        self.n_features = n_features
        self.root = None

    def fit(self, X, y):
        # Validate inputs
        if len(X) == 0 or len(y) == 0:
            raise ValueError("Input data is empty")
        if len(X) != len(y):
            raise ValueError("X and y must have the same length")

        self.root = self._grow_tree(X, y)

    def _grow_tree(self, X, y, depth=0):
        n_samples, n_feats = X.shape
        n_labels = len(np.unique(y))

        # Check stopping criteria
        if (depth >= self.max_depth or n_labels == 1 or n_samples < self.min_samples_split):
            leaf_value = self._most_common_label(y)
            return Node(value=leaf_value)

        # Feature selection
        feat_idxs = np.random.choice(n_feats, min(self.n_features, n_feats) if self.n_features else n_feats, replace=False)

        # Find best split
        best_feature, best_thresh = self._best_split(X, y, feat_idxs)

        # If no split found, return leaf node
        if best_feature is None or best_thresh is None:
            leaf_value = self._most_common_label(y)
            return Node(value=leaf_value)

        # Create child nodes
        left_idxs, right_idxs = self._split(X[:, best_feature], best_thresh)

        # Check if split results in nodes with less than min_samples_leaf
        if len(left_idxs) < self.min_samples_leaf or len(right_idxs) < self.min_samples_leaf:
            leaf_value = self._most_common_label(y)
            return Node(value=leaf_value)

        left = self._grow_tree(X[left_idxs, :], y[left_idxs], depth + 1)
        right = self._grow_tree(X[right_idxs, :], y[right_idxs], depth + 1)
        return Node(best_feature, best_thresh, left, right)

    def _best_split(self, X, y, feat_idxs):
        best_gain = -1
        split_idx, split_threshold = None, None

        for feat_idx in feat_idxs:
            X_column = X[:, feat_idx]
            thresholds = np.unique(X_column)

            for thr in thresholds:
                # Calculate information gain
                gain = self._information_gain(y, X_column, thr)

                if gain > best_gain:
                    left_idxs, right_idxs = self._split(X_column, thr)
                    # Only consider splits that result in both children having at least min_samples_leaf
                    if len(left_idxs) >= self.min_samples_leaf and len(right_idxs) >= self.min_samples_leaf:
                        best_gain = gain
                        split_idx = feat_idx
                        split_threshold = thr

        return split_idx, split_threshold

    def _information_gain(self, y, X_column, threshold):
        # Parent entropy
        parent_entropy = self._entropy(y)

        # Create children
        left_idxs, right_idxs = self._split(X_column, threshold)

        if len(left_idxs) == 0 or len(right_idxs) == 0:
            return 0

        # Calculate weighted avg entropy of children
        n = len(y)
        n_l, n_r = len(left_idxs), len(right_idxs)
        e_l, e_r = self._entropy(y[left_idxs]), self._entropy(y[right_idxs])
        child_entropy = (n_l/n) * e_l + (n_r/n) * e_r

        # Calculate IG
        information_gain = parent_entropy - child_entropy
        return information_gain

    def _split(self, X_column, split_thresh):
        left_idxs = np.argwhere(X_column <= split_thresh).flatten()
        right_idxs = np.argwhere(X_column > split_thresh).flatten()
        return left_idxs, right_idxs

    def _entropy(self, y):
        hist = np.bincount(y)
        ps = hist / len(y)
        return -np.sum([p * np.log2(p) for p in ps if p > 0])

    def _most_common_label(self, y):
        if len(y) == 0:
            return 0  # Return default value instead of raising error
        counter = Counter(y)
        return counter.most_common(1)[0][0]

    def predict(self, X):
        return np.array([self._traverse_tree(x, self.root) for x in X])

    def _traverse_tree(self, x, node):
        if node.is_leaf_node():
            return node.value
        if x[node.feature] <= node.threshold:
            return self._traverse_tree(x, node.left)
        return self._traverse_tree(x, node.right)

class RandomForest:
    def __init__(self, n_trees=100, criterion="gini", max_depth=30, min_samples_split=2, min_samples_leaf=1, n_features=None):
        self.n_trees = n_trees
        self.criterion = criterion
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.min_samples_leaf = min_samples_leaf
        self.n_features = n_features
        self.trees = []

    def fit(self, X, y):
        # Validate inputs
        if len(X) == 0 or len(y) == 0:
            raise ValueError("Input data is empty")
        if len(X) != len(y):
            raise ValueError("X and y must have the same length")

        self.trees = []
        n_features = X.shape[1]

        # Calculate number of features to use
        if isinstance(self.n_features, str):
            if self.n_features == 'sqrt':
                n_feature = int(math.sqrt(n_features))
            elif self.n_features == 'log2':
                n_feature = int(math.log2(n_features))
            else:
                n_feature = None
        else:
            n_feature = self.n_features

        for _ in range(self.n_trees):
            tree = DecisionTree(
                criterion=self.criterion,
                max_depth=self.max_depth,
                min_samples_split=self.min_samples_split,
                min_samples_leaf=self.min_samples_leaf,
                n_features=n_feature
            )
            X_sample, y_sample = self._bootstrap_samples(X, y)
            tree.fit(X_sample, y_sample)
            self.trees.append(tree)

    def _bootstrap_samples(self, X, y):
        n_samples = X.shape[0]
        idxs = np.random.choice(n_samples, n_samples, replace=True)
        return X[idxs], y[idxs]

    def _most_common_label(self, y):
        if len(y) == 0:
            return 0  # Return default value instead of raising error
        counter = Counter(y)
        return counter.most_common(1)[0][0]

    def predict(self, X):
        predictions = np.array([tree.predict(X) for tree in self.trees])
        tree_preds = np.swapaxes(predictions, 0, 1)
        return np.array([self._most_common_label(pred) for pred in tree_preds])

In [None]:
from google.colab import files
uploaded = files.upload()

Saving final_final_fr_fr.csv to final_final_fr_fr.csv


In [None]:


import math
import numpy as np
from collections import Counter


df = pd.read_csv('/content/final_final_fr_fr.csv')

# Print unique values for each column
for column in df.columns:
    print(f"Unique values in {column}: {df[column].unique()}")

# Your vectorized_one_hot function
def vectorized_one_hot(answers, options, attribute_to_index):
    num_samples = len(answers)
    num_attributes = len(options)
    one_hot_matrix = np.zeros((num_samples, num_attributes))

    for i, ans_list in enumerate(answers):
        for ans in ans_list:
            ans = ans.strip()
            if ans in attribute_to_index:
                one_hot_matrix[i, attribute_to_index[ans]] = 1
    return one_hot_matrix

# Define question columns
questions = [q1, q2, q3, q4, q5, q6, q7, q8] = [
    "Q1: From a scale 1 to 5, how complex is it to make this food? (Where 1 is the most simple, and 5 is the most complex)",
    "Q2: How many ingredients would you expect this food item to contain?",
    "Q3: In what setting would you expect this food to be served? Please check all that apply",
    "Q4: How much would you expect to pay for one serving of this food item?",
    "Q5: What movie do you think of when thinking of this food item?",
    "Q6: What drink would you pair with this food item?",
    "Q7: When you think about this food item, who does it remind you of?",
    "Q8: How much hot sauce would you add to this food item?"
]
t = 'Label'

df[q1] = pd.to_numeric(df[q1], errors='coerce')
df[q2] = pd.to_numeric(df[q2], errors='coerce')
df[q4] = pd.to_numeric(df[q4], errors='coerce')

# Create missing indicators before imputation
for col in [q1, q2, q4]:
    df[f'{col}_missing'] = df[col].isna().astype(int)

# # Fill numerical NaNs with median (now including missing indicators)
# df[q1].fillna(df[q1].median(), inplace=True)
# df[q2].fillna(df[q2].median(), inplace=True)
# df[q4].fillna(df[q4].median(), inplace=True)


# 1. Fix pandas warnings
df = df.fillna({
    q1: df[q1].median(),
    q2: df[q2].median(),
    q4: df[q4].median()
})



# Define options for categorical columns
q3_options = ['none','Week day lunch','Week day dinner','Weekend lunch',
             'Weekend dinner','At a party', 'Late night snack']
q7_options = ['Parents','Siblings','Friends', 'Teachers', 'Strangers', 'none']
q8_options = ['I will have some of this food item with my hot sauce',
             'A lot (hot)', 'A moderate amount (medium)', 'A little (mild)', 'none']

# Create attribute to index mappings
q3_attribute_to_index = {attr: idx for idx, attr in enumerate(q3_options)}
q7_attribute_to_index = {attr: idx for idx, attr in enumerate(q7_options)}
q8_attribute_to_index = {attr: idx for idx, attr in enumerate(q8_options)}

# Manual feature engineering approach
numerical_features = df[[q1, q2, q4]].values

# Convert DataFrame columns to lists before splitting
q3_answers = [ans.split(",") for ans in df[q3].astype(str).tolist()]
q7_answers = [ans.split(",") for ans in df[q7].astype(str).tolist()]
q8_answers = [ans.split(",") for ans in df[q8].astype(str).tolist()]

q3_hot = vectorized_one_hot(q3_answers, q3_options, q3_attribute_to_index)
q7_hot = vectorized_one_hot(q7_answers, q7_options, q7_attribute_to_index)
q8_hot = vectorized_one_hot(q8_answers, q8_options, q8_attribute_to_index)

# Frequency encoding
q5_encoded = df[q5].map(df[q5].value_counts(normalize=True)).values.reshape(-1, 1)
q6_encoded = df[q6].map(df[q6].value_counts(normalize=True)).values.reshape(-1, 1)

# # Combine features
# X = np.hstack([
#     numerical_features,
#     q3_hot,
#     q7_hot,
#     q8_hot,
#     q5_encoded,
#     q6_encoded
# ])

# 2. Ensure numerical features
X = np.hstack([
    numerical_features.astype(float),
    q3_hot.astype(float),
    q7_hot.astype(float),
    q8_hot.astype(float),
    q5_encoded.astype(float),
    q6_encoded.astype(float)
])



# Encode labels
le = LabelEncoder()
y = le.fit_transform(df[t])

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)



# 3. Verify before training
assert X.dtype.kind in ('f', 'i'), "X contains non-numeric values"
print(f"X shape: {X.shape}, y shape: {y.shape}")


import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split



# Before training, verify your data
print("Data verification:")
print(f"X dtype: {X.dtype}")
print(f"X sample:\n{X[:3]}")
print(f"y sample: {y[:10]}")

# Check for any remaining strings/objects
assert X.dtype.kind in ('f', 'i'), f"X contains non-numeric values: {X.dtype}"
assert y.dtype.kind in ('i'), f"y contains non-integer values: {y.dtype}"

print(f"q3_hot dtype: {q3_hot.dtype}")
print(df[[q1, q2, q4]].info())
print(f"q5_encoded contains NaN: {np.isnan(q5_encoded).any()}")


print(f"Unique values after encoding: {le.classes_}")
print(f"Encoded labels: {y[:10]}")  # Print first 10 encoded labels
print(df[t].value_counts())  # Get counts for each class in the target column
print(f"Training labels distribution: {pd.Series(y_train).value_counts()}")
print(f"Testing labels distribution: {pd.Series(y_test).value_counts()}")


# Initialize with proper parameters
rf_model = RandomForest(
    n_trees=100,
    criterion='gini',
    max_depth=30,
    min_samples_split=10,
    min_samples_leaf=1,
    n_features='sqrt'
)

# Train and predict

rf_model.fit(X_train, y_train)
predictions = rf_model.predict(X_test)

# Calculate accuracy
def accuracy(y_true, y_pred):
    return np.sum(y_true == y_pred) / len(y_true)

acc = accuracy(y_test, predictions)
print(f"Accuracy: {acc:.4f}")

Unique values in id: [716549 715742 727333 606874 505318 605771 606929 609789 644623 626792
 714863 602805 602526 497421 389328 289931 620148 387155 516975 630825
 614043 521125 627780 492744 618747 607815 601609 602657 616449 501893
 507448 606556 724326 616945 630962 715814 502799 605278 388888 519548
 405521 607410 715795 625326 627220 634964 722500 602824 618355 611097
 634328 607160 728522 718570 714598 601950 612478 501416 629710 502610
 603476 600957 713423 722431 633234 148179 715845 632153 606552 738409
 612096 743537 632896 605207 617542 719554 627427 411315 631507 627798
 712093 627488 626493 601363 712898 603398 631012 601848 612131 408630
 632091 601122 602871 520828 514199 389697 714430 612567 744468 502722
 519681 631125 606119 627252 616568 603838 719735 608062 608704 520383
 603793 603933 605519 524803 629712 627505 526131 607065 500077 633350
 601945 629878 712766 603664 840827 604897 525422 602560 738379 622023
 604770 608318 626757 745297 627599 520037 632189 627636

In [5]:
def build_all_models(n_estimators,
                     criterion,
                     max_depths,
                     min_samples_split,
                     min_samples_leaf,
                     X_train=X_train,
                     y_train=y_train,
                     X_test=X_test,
                     y_test=y_test):
    """
     min_samples_leaf

    """
    out = {}
    for n in n_estimators:
      for d in max_depths:
        for s in min_samples_split:
            for l in min_samples_leaf:
                out[(n, d, s, l)] = {}

                rf_model = RandomForest(
                              n_trees=n,
                              criterion=criterion,
                              max_depth=d,
                              min_samples_split=s,
                              min_samples_leaf=l,
                              n_features='sqrt'
                          )

                rf_model.fit(X_train, y_train)

                predictions = rf_model.predict(X_test)

                # TODO: store the validation and training scores in the `out` dictionary
                out[(n, d, s, l)]['test'] = accuracy(y_test, predictions) # TODO

                # accuracy(y_test, predictions)
                # rf_model.score(X_train, y_train) # TODO
    return out

In [7]:
# Hyperparameters values to try in our grid search
n_estimators = [50, 100]
criterions = ["entropy", "gini"]
max_depths = [1, 10, 50]
min_samples_split = [1, 2, 10]
min_samples_leaf = [1, 2, 4]


"""
rf_model = RandomForest(
    n_trees=100,
    criterion='gini',
    max_depth=30,
    min_samples_split=10,
    min_samples_leaf=1,
    n_features='sqrt'
)
"""

for criterion in criterions:
    print("\nUsing criterion {}".format(criterion))
    res = build_all_models(n_estimators, criterion, max_depths, min_samples_split, min_samples_leaf)

    best_acc = 0
    best_crit = None
    for (n, d, s, l) in res:
      if res[(n, d, s, l)]['test'] > best_acc:
        best_acc = res[(n, d, s, l)]['test']
        best_crit = (n, d, s, l)
    print("Best n_trees: ", best_crit[0], "Best depth: ", best_crit[1], ". Best samples: ", best_crit[2], "Best leaf: ", best_crit[3], ". Accuracy: ", best_acc)


Using criterion entropy
Best n_trees:  100 Best depth:  50 . Best samples:  1 Best leaf:  2 . Accuracy:  0.8693009118541033

Using criterion gini
Best n_trees:  100 Best depth:  50 . Best samples:  1 Best leaf:  1 . Accuracy:  0.8723404255319149


In [8]:
# Hyperparameters values to try in our grid search
n_estimators = [50, 100]
criterions = ["entropy", "gini"]
max_depths = [20, 30, 50]
min_samples_split = [1, 10, 15]
min_samples_leaf = [1, 2, 4]


"""
rf_model = RandomForest(
    n_trees=100,
    criterion='gini',
    max_depth=30,
    min_samples_split=10,
    min_samples_leaf=1,
    n_features='sqrt'
)
"""

for criterion in criterions:
    print("\nUsing criterion {}".format(criterion))
    res = build_all_models(n_estimators, criterion, max_depths, min_samples_split, min_samples_leaf)

    best_acc = 0
    best_crit = None
    for (n, d, s, l) in res:
      if res[(n, d, s, l)]['test'] > best_acc:
        best_acc = res[(n, d, s, l)]['test']
        best_crit = (n, d, s, l)
    print("Best n_trees: ", best_crit[0], "Best depth: ", best_crit[1], ". Best samples: ", best_crit[2], "Best leaf: ", best_crit[3], ". Accuracy: ", best_acc)


Using criterion entropy
Best n_trees:  100 Best depth:  30 . Best samples:  10 Best leaf:  2 . Accuracy:  0.878419452887538

Using criterion gini
Best n_trees:  100 Best depth:  30 . Best samples:  1 Best leaf:  2 . Accuracy:  0.8723404255319149
