In [None]:
import numpy as np
import pandas as pd
import re
from collections import Counter
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

class DecisionTree:
    def __init__(self, criterion='gini', max_depth=None, min_sample_split=2, min_samples_leaf=1):
        self.criterion = criterion
        self.max_depth = max_depth
        self.min_sample_split = min_sample_split
        self.min_samples_leaf = min_samples_leaf
        self.tree = None

    def fit(self, X, y):
        self.tree = self._grow_tree(X, y)

    def predict(self, X):
        return np.array([self._predict(inputs) for inputs in X])

    def _gini(self, y):
        y = np.array(y, dtype=int)
        y = y - np.min(y)  # Ensure non-negative labels
        counts = np.bincount(y)
        probabilities = counts / len(y)
        return 1 - np.sum(probabilities ** 2)

    def _split(self, X, y, index, threshold):
        left_mask = X[:, index] < threshold
        right_mask = X[:, index] >= threshold
        return X[left_mask], X[right_mask], y[left_mask], y[right_mask]

    def _information_gain(self, y, y_left, y_right, criterion):
        p = len(y_left) / len(y)
        if criterion == 'gini':
            return self._gini(y) - (p * self._gini(y_left) + (1 - p) * self._gini(y_right))
        else:
            raise ValueError("Invalid criterion.")

    def _best_split(self, X, y):
        best_gain = -1
        best_index, best_threshold = None, None

        for index in range(X.shape[1]):
            thresholds = np.unique(X[:, index])
            for threshold in thresholds:
                X_left, X_right, y_left, y_right = self._split(X, y, index, threshold)
                if len(y_left) < self.min_samples_leaf or len(y_right) < self.min_samples_leaf:
                    continue
                gain = self._information_gain(y, y_left, y_right, self.criterion)
                if gain > best_gain:
                    best_gain = gain
                    best_index = index
                    best_threshold = threshold
        return best_index, best_threshold

    def _grow_tree(self, X, y, depth=0):
        num_samples, num_features = X.shape
        num_labels = len(np.unique(y))

        if (depth >= self.max_depth or num_labels == 1 or num_samples < self.min_sample_split):
            leaf_value = self._most_common_label(y)
            return {'leaf': True, 'value': leaf_value}

        index, threshold = self._best_split(X, y)
        if index is None:
            leaf_value = self._most_common_label(y)
            return {'leaf': True, 'value': leaf_value}

        X_left, X_right, y_left, y_right = self._split(X, y, index, threshold)
        left_subtree = self._grow_tree(X_left, y_left, depth + 1)
        right_subtree = self._grow_tree(X_right, y_right, depth + 1)

        return {
            'leaf': False,
            'index': index,
            'threshold': threshold,
            'left': left_subtree,
            'right': right_subtree
        }

    def _most_common_label(self, y):
        return Counter(y).most_common(1)[0][0]

    def _predict(self, inputs):
        node = self.tree
        while not node['leaf']:
            if inputs[node['index']] < node['threshold']:
                node = node['left']
            else:
                node = node['right']
        return node['value']
    
    
class RandomForest:
    def __init__(self, base_classifier, num_trees=10, min_features=2):
        self.base_classifier = base_classifier
        self.num_trees = num_trees
        self.min_features = min_features
        self.trees = []

    def fit(self, X, y):
        n_samples, n_features = X.shape
        self.trees = []

        for _ in range(self.num_trees):
            # Bagging: Sample with replacement
            indices = np.random.choice(n_samples, n_samples, replace=True)
            X_subset, y_subset = X[indices], y[indices]

            # Feature selection
            feature_indices = np.random.choice(n_features, min(self.min_features, n_features), replace=False)
            X_subset = X_subset[:, feature_indices]

            # Train a tree
            tree = self.base_classifier
            tree_params = {k: v for k, v in self.base_classifier.__dict__.items() if k != 'tree'}
            tree_instance = self.base_classifier.__class__(**tree_params)
            tree_instance.fit(X_subset, y_subset)
            self.trees.append((tree_instance, feature_indices))

    def predict(self, X):
        predictions = np.zeros((len(X), len(self.trees)))
        for i, (tree, feature_indices) in enumerate(self.trees):
            X_subset = X[:, feature_indices]
            predictions[:, i] = tree.predict(X_subset)
        return np.array([Counter(row).most_common(1)[0][0] for row in predictions])
    
    
class AdaBoost:
    def __init__(self, weak_learner, num_learners=50, learning_rate=1.0):
        self.weak_learner = weak_learner
        self.num_learners = num_learners
        self.learning_rate = learning_rate
        self.learners = []
        self.learner_weights = []

    def fit(self, X, y):
        n_samples = X.shape[0]
        weights = np.ones(n_samples) / n_samples
        y = np.where(y == 0, -1, 1)  # Convert labels to {-1, 1}

        for _ in range(self.num_learners):
            # Train a weak learner
            learner = self._create_weak_learner()
            learner.fit(X, y)
            predictions = learner.predict(X)

            # Compute error and learner weight
            error = np.sum(weights * (predictions != y))
            if error > 0.5:
                break
            learner_weight = self.learning_rate * 0.5 * np.log((1 - error) / error)

            # Update weights
            weights *= np.exp(-learner_weight * y * predictions)
            weights /= np.sum(weights)

            # Save learner and its weight
            self.learners.append(learner)
            self.learner_weights.append(learner_weight)

    def predict(self, X):
        predictions = np.zeros(X.shape[0])
        for learner, weight in zip(self.learners, self.learner_weights):
            predictions += weight * learner.predict(X)
        return np.where(predictions >= 0, 1, 0)

    def _create_weak_learner(self):
        # Create a new instance of the weak learner
        learner_params = {k: v for k, v in self.weak_learner.__dict__.items() if k != 'tree'}
        return self.weak_learner.__class__(**learner_params)
    


# Load and preprocess the Titanic dataset
train = pd.read_csv('./train.csv')
test = pd.read_csv('./test.csv')
original_train = train.copy()
original_test = test.copy()
full_data = [train, test]

train['Has_Cabin'] = train["Cabin"].apply(lambda x: 0 if type(x) == float else 1)
test['Has_Cabin'] = test["Cabin"].apply(lambda x: 0 if type(x) == float else 1)

# Create new feature FamilySize as a combination of SibSp and Parch
for dataset in full_data:
    dataset['FamilySize'] = dataset['SibSp'] + dataset['Parch'] + 1
# Create new feature IsAlone from FamilySize
for dataset in full_data:
    dataset['IsAlone'] = 0
    dataset.loc[dataset['FamilySize'] == 1, 'IsAlone'] = 1
# Remove all NULLS in the Embarked column
for dataset in full_data:
    dataset['Embarked'] = dataset['Embarked'].fillna('S')
# Remove all NULLS in the Fare column
for dataset in full_data:
    dataset['Fare'] = dataset['Fare'].fillna(train['Fare'].median())

# Remove all NULLS in the Age column
for dataset in full_data:
    age_avg = dataset['Age'].mean()
    age_std = dataset['Age'].std()
    age_null_count = dataset['Age'].isnull().sum()
    age_null_random_list = np.random.randint(age_avg - age_std, age_avg + age_std, size=age_null_count)
    # Next line has been improved to avoid warning
    dataset.loc[np.isnan(dataset['Age']), 'Age'] = age_null_random_list
    dataset['Age'] = dataset['Age'].astype(int)

# Define function to extract titles from passenger names
def get_title(name):
    title_search = re.search(' ([A-Za-z]+)', name)
    # If the title exists, extract and return it.
    if title_search:
        return title_search.group(1)
    return ""
for dataset in full_data:
    dataset['Title'] = dataset['Name'].apply(get_title)
# Group all non-common titles into one single grouping "Rare"
for dataset in full_data:
    dataset['Title'] = dataset['Title'].replace(['Lady', 'Countess','Capt', 'Col','Don', 'Dr', 'Major', 'Rev', 'Sir', 'Jonkheer', 'Dona'], 'Rare')

    dataset['Title'] = dataset['Title'].replace('Mlle', 'Miss')
    dataset['Title'] = dataset['Title'].replace('Ms', 'Miss')
    dataset['Title'] = dataset['Title'].replace('Mme', 'Mrs')

for dataset in full_data:
    # Mapping Sex
    dataset['Sex'] = dataset['Sex'].map( {'female': 0, 'male': 1} ).astype(int)
    
    # Mapping titles
    title_mapping = {"Mr": 1, "Master": 2, "Mrs": 3, "Miss": 4, "Rare": 5}
    dataset['Title'] = dataset['Title'].map(title_mapping)
    dataset['Title'] = dataset['Title'].fillna(0)

    # Mapping Embarked
    dataset['Embarked'] = dataset['Embarked'].map( {'S': 0, 'C': 1, 'Q': 2} ).astype(int)
    
    # Mapping Fare
    dataset.loc[ dataset['Fare'] <= 7.91, 'Fare'] = 0
    dataset.loc[(dataset['Fare'] > 7.91) & (dataset['Fare'] <= 14.454), 'Fare'] = 1
    dataset.loc[(dataset['Fare'] > 14.454) & (dataset['Fare'] <= 31), 'Fare']   = 2
    dataset.loc[ dataset['Fare'] > 31, 'Fare'] = 3
    dataset['Fare'] = dataset['Fare'].astype(int)
    
    # Mapping Age
    dataset.loc[ dataset['Age'] <= 16, 'Age'] = 0
    dataset.loc[(dataset['Age'] > 16) & (dataset['Age'] <= 32), 'Age'] = 1
    dataset.loc[(dataset['Age'] > 32) & (dataset['Age'] <= 48), 'Age'] = 2
    dataset.loc[(dataset['Age'] > 48) & (dataset['Age'] <= 64), 'Age'] = 3
    dataset.loc[ dataset['Age'] > 64, 'Age'] = 4


drop_elements = ['PassengerId', 'Name', 'Ticket', 'Cabin', 'SibSp']
train = train.drop(drop_elements, axis = 1)
test  = test.drop(drop_elements, axis = 1)


# Split the data
X = train.drop('Survived', axis=1)
y = train['Survived']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

X_train = X_train.to_numpy()
X_test = X_test.to_numpy()
y_train = y_train.to_numpy()
y_test = y_test.to_numpy()

# Train and evaluate Decision Tree
dt = DecisionTree(max_depth=5)
dt.fit(X_train, y_train)
y_pred_dt = dt.predict(X_test)
print(f"Decision Tree Accuracy: {accuracy_score(y_test, y_pred_dt)}")

# Train and evaluate Random Forest
rf = RandomForest(base_classifier = DecisionTree(max_depth=5), num_trees=50)
rf.fit(X_train, y_train)
y_pred_rf = rf.predict(X_test)
print(f"Random Forest Accuracy: {accuracy_score(y_test, y_pred_rf)}")

# Train and evaluate AdaBoost
ab = AdaBoost(weak_learner = DecisionTree(max_depth=1), num_learners=50)
ab.fit(X_train, y_train)
y_pred_ab = ab.predict(X_test)
print(f"AdaBoost Accuracy: {accuracy_score(y_test, y_pred_ab)}")    
    

Decision Tree Accuracy: 0.8100558659217877
Random Forest Accuracy: 0.7597765363128491
AdaBoost Accuracy: 0.7821229050279329
