In [None]:
from google.colab import drive
drive.mount("/content/drive")

Mounted at /content/drive


In [None]:
import pandas as pd
import numpy as np
import itertools

In [None]:
dataset = pd.read_csv('/content/drive/MyDrive/ML_Labs/Lab_4/advertising.csv')
dataset.head()

Unnamed: 0,Daily Time Spent on Site,Age,Area Income,Daily Internet Usage,Ad Topic Line,City,Male,Country,Timestamp,Clicked on Ad
0,68.95,35,61833.9,256.09,Cloned 5thgeneration orchestration,Wrightburgh,0,Tunisia,2016-03-27 00:53:11,0
1,80.23,31,68441.85,193.77,Monitored national standardization,West Jodi,1,Nauru,2016-04-04 01:39:02,0
2,69.47,26,59785.94,236.5,Organic bottom-line service-desk,Davidton,0,San Marino,2016-03-13 20:35:42,0
3,74.15,29,54806.18,245.89,Triple-buffered reciprocal time-frame,West Terrifurt,1,Italy,2016-01-10 02:31:19,0
4,68.37,35,73889.99,225.58,Robust logistical utilization,South Manuel,0,Iceland,2016-06-03 03:36:18,0


In [None]:
def entropy(y):
  if isinstance(y, pd.Series):
    a = y.value_counts()/y.shape[0]
    entropy = np.sum(-a*np.log2(a+1e-9))
    return(entropy)
  else:
    raise('Object must be a Pandas Series.')

entropy(data.Male)

In [None]:
def variance(y):
  if(len(y) == 1):
    return 0
  else:
    return y.var()

def information_gain(y, mask, func=entropy):
  a = sum(mask)
  b = mask.shape[0] - a

  if(a == 0 or b ==0):
    gain = 0
  else:
    if y.dtypes != 'O':
      gain = variance(y) - (a/(a+b)* variance(y[mask])) - (b/(a+b)*variance(y[-mask]))
    else:
      gain = func(y)-a/(a+b)*func(y[mask])-b/(a+b)*func(y[-mask])

  return gain

In [None]:
information_gain(dataset['Clicked on Ad'], dataset['Male'] == 1)

To calculate the best split of a numeric variable, first, all possible values that the variable is taking must be obtained. Once we have the options, for each option we will calculate the Information Gain using as a filter if the value is less than that value. Obviously, the first possible data will be drop, because the split will include all values.

In case we have categorical variables, the idea is the same, only that in this case we will have to calculate the Information Gain for all possible combinations of that variable, excluding the option that includes all the options (since it would not be doing any split). This is quite computationally costly if we have a high number of categories, that decision tree algorithms usually only accept categorical variables with less than 20 categories.

So, once we have all the splits, we will stick with the split that generates the highest Information Gain.

In [None]:
def categorical_options(a):
  #Creates all possible combinations from a Pandas Series.

  a = a.unique()

  opciones = []
  for L in range(0, len(a)+1):
      for subset in itertools.combinations(a, L):
          subset = list(subset)
          opciones.append(subset)

  return opciones[1:-1]

def max_information_gain_split(x, y, func=entropy):

  split_value = []
  gain = []

  numeric_variable = True if x.dtypes != 'O' else False

  if numeric_variable:
    options = x.sort_values().unique()[1:]
  else:
    options = categorical_options(x)

  # Calculate gain for all values
  for val in options:
    mask =   x < val if numeric_variable else x.isin(val)
    val_gain = information_gain(y, mask, func)
    # Append results
    gain.append(val_gain)
    split_value.append(val)

  if len(gain) == 0:
    return(None,None,None,False)

  else:
    best_gain = max(gain)
    best_gain_index = gain.index(best_gain)
    best_split = split_value[best_gain_index]
    return(best_gain,best_split,numeric_variable, True)

age_gain, age_split, _, _ = max_information_gain_split(dataset['Age'], dataset['Clicked on Ad'],)

print(
  "The best split for Age is when the variable is less than ",
  age_split,"\nInformation Gain for that split is:", age_gain)

best split will be the one that generates the highest Information Gain. To know which one is it, we simply have to calculate the Information Gain for each of the predictor variables of the model.

In [None]:
dataset.drop('Clicked on Ad', axis= 1).apply(max_information_gain_split, y = dataset['Clicked on Ad'])

For Training

In [None]:
def get_best_split(y, dataset):
  masks = dataset.drop(y, axis= 1).apply(max_information_gain_split, y = dataset[y])
  if sum(masks.loc[3,:]) == 0:
    return(None, None, None, None)

  else:
    # Get only masks that can be splitted
    masks = masks.loc[:,masks.loc[3,:]]

    # Get the results for split with highest IG
    split_variable = masks.iloc[0].astype(np.float32).idxmax()
    #split_valid = masks[split_variable][]
    split_value = masks[split_variable][1]
    split_gain = masks[split_variable][0]
    split_numeric = masks[split_variable][2]

    return(split_variable, split_value, split_ig, split_numeric)


def make_split(variable, value, dataset, is_numeric):
  if is_numeric:
    dataset_1 = dataset[dataset[variable] < value]
    dataset_2 = dataset[(dataset[variable] < value) == False]
  else:
    dataset_1 = dataset[dataset[variable].isin(value)]
    dataset_2 = dataset[(dataset[variable].isin(value)) == False]

  return(dataset_1,dataset_2)


def make_prediction(dataset, target_factor):
  if target_factor:
    pred = dataset.value_counts().idxmax()
  else:
    pred = dataset.mean()

  return pred

In [None]:
def train_tree(dataset,y, target_factor, max_depth = None,min_samples_split = None, min_information_gain = 1e-20, counter=0, max_categories = 20):

  if counter==0:
    types = dataset.dtypes
    check_columns = types[types == "object"].index
    for column in check_columns:
      var_length = len(dataset[column].value_counts())
      if var_length > max_categories:
        raise ValueError('The variable ' + column + ' has '+ str(var_length) + ' unique values, which is more than the accepted ones: ' +  str(max_categories))

  if max_depth == None:
    depth_cond = True

  else:
    if counter < max_depth:
      depth_cond = True

    else:
      depth_cond = False

  if min_samples_split == None:
    sample_cond = True

  else:
    if dataset.shape[0] > min_samples_split:
      sample_cond = True

    else:
      sample_cond = False

  if depth_cond & sample_cond:
    var,val,gain,var_type = get_best_split(y, dataset)

    if gain is not None and gain >= min_information_gain:
      counter += 1
      left,right = make_split(var, val, dataset,var_type)

      # Instantiate sub-tree
      split_type = "<=" if var_type else "in"
      question =   "{} {}  {}".format(var,split_type,val)
      # question = "\n" + counter*" " + "|->" + var + " " + split_type + " " + str(val)
      subtree = {question: []}

      # Find answers (recursion)
      yes_answer = train_tree(left,y, target_factor, max_depth,min_samples_split,min_information_gain, counter)

      no_answer = train_tree(right,y, target_factor, max_depth,min_samples_split,min_information_gain, counter)

      if yes_answer == no_answer:
        subtree = yes_answer

      else:
        subtree[question].append(yes_answer)
        subtree[question].append(no_answer)

    else:
      pred = make_prediction(dataset[y],target_factor)
      return pred

  else:
    pred = make_prediction(dataset[y],target_factor)
    return pred

  return subtree

max_depth = 5
min_samples_split = 20
min_information_gain  = 1e-5

decision = train_tree(dataset,'Clicked on Ad',True, max_depth,min_samples_split,min_information_gain)

decision

In [None]:
def classify_data(observation, tree):
  question = list(tree.keys())[0]

  if question.split()[1] == '<=':
    if observation[question.split()[0]] <= float(question.split()[2]):
      answer = tree[question][0]
    else:
      answer = tree[question][1]
  else:
    if observation[question.split()[0]] in (question.split()[2]):
      answer = tree[question][0]
    else:
      answer = tree[question][1]

  if not isinstance(answer, dict):
    return answer
  else:
    residual_tree = answer
    return classify_data(observation, answer)

TASKS:


1.   Implement decision tree algorithm on a dataset of your choice.
2.   Instead of Entropy, use GINI INDEX and observe the performance for any difference(s).
3.   Employ SciKit Learn implementation of decision tree and plot it. Observe for any difference(s).

In [None]:
import pandas as pd
import numpy as np
import itertools
import matplotlib.pyplot as plt
from sklearn import tree
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split

# Load the dataset
dataset = pd.read_csv('advertising.csv')

# ---- Task 1: Implement decision tree algorithm ----

# Fix the entropy function
def entropy(y):
    if isinstance(y, pd.Series):
        a = y.value_counts()/y.shape[0]
        entropy = np.sum(-a*np.log2(a+1e-9))
        return entropy
    else:
        raise TypeError('Object must be a Pandas Series.')

# Add the gini index function
def gini_index(y):
    if isinstance(y, pd.Series):
        a = y.value_counts()/y.shape[0]
        gini = 1 - np.sum(a**2)
        return gini
    else:
        raise TypeError('Object must be a Pandas Series.')

def variance(y):
    if len(y) == 1:
        return 0
    else:
        return y.var()

def information_gain(y, mask, func=entropy):
    a = sum(mask)
    b = mask.shape[0] - a

    if a == 0 or b == 0:
        gain = 0
    else:
        if y.dtypes != 'O':
            gain = variance(y) - (a/(a+b) * variance(y[mask])) - (b/(a+b) * variance(y[~mask]))
        else:
            gain = func(y) - a/(a+b) * func(y[mask]) - b/(a+b) * func(y[~mask])

    return gain

def categorical_options(a):
    # Creates all possible combinations from a Pandas Series
    a = a.unique()

    options = []
    for L in range(0, len(a)+1):
        for subset in itertools.combinations(a, L):
            subset = list(subset)
            options.append(subset)

    return options[1:-1]

def max_information_gain_split(x, y, func=entropy):
    split_value = []
    gain = []

    numeric_variable = True if x.dtypes != 'O' else False

    if numeric_variable:
        options = x.sort_values().unique()[1:]
    else:
        options = categorical_options(x)

    # Calculate gain for all values
    for val in options:
        mask = x < val if numeric_variable else x.isin(val)
        val_gain = information_gain(y, mask, func)
        # Append results
        gain.append(val_gain)
        split_value.append(val)

    if len(gain) == 0:
        return None, None, None, False

    else:
        best_gain = max(gain)
        best_gain_index = gain.index(best_gain)
        best_split = split_value[best_gain_index]
        return best_gain, best_split, numeric_variable, True

def get_best_split(y, dataset):
    masks = dataset.drop(y, axis=1).apply(max_information_gain_split, y=dataset[y])
    if sum(masks.loc[3,:]) == 0:
        return None, None, None, None

    else:
        # Get only masks that can be splitted
        masks = masks.loc[:,masks.loc[3,:]]

        # Get the results for split with highest IG
        split_variable = masks.iloc[0].astype(np.float32).idxmax()
        split_value = masks[split_variable][1]
        split_gain = masks[split_variable][0]
        split_numeric = masks[split_variable][2]

        return split_variable, split_value, split_gain, split_numeric

def make_split(variable, value, dataset, is_numeric):
    if is_numeric:
        dataset_1 = dataset[dataset[variable] < value]
        dataset_2 = dataset[(dataset[variable] < value) == False]
    else:
        dataset_1 = dataset[dataset[variable].isin(value)]
        dataset_2 = dataset[(dataset[variable].isin(value)) == False]

    return dataset_1, dataset_2

def make_prediction(dataset, target_factor):
    if target_factor:
        pred = dataset.value_counts().idxmax()
    else:
        pred = dataset.mean()

    return pred

def train_tree(dataset, y, target_factor, max_depth=None, min_samples_split=None,
               min_information_gain=1e-20, counter=0, max_categories=20, func=entropy):

    if counter == 0:
        types = dataset.dtypes
        check_columns = types[types == "object"].index
        for column in check_columns:
            var_length = len(dataset[column].value_counts())
            if var_length > max_categories:
                raise ValueError('The variable ' + column + ' has ' + str(var_length) +
                                ' unique values, which is more than the accepted ones: ' + str(max_categories))

    if max_depth is None:
        depth_cond = True
    else:
        depth_cond = counter < max_depth

    if min_samples_split is None:
        sample_cond = True
    else:
        sample_cond = dataset.shape[0] > min_samples_split

    if depth_cond and sample_cond:
        # Override the function used to calculate information gain
        global max_information_gain_split
        original_func = max_information_gain_split.__defaults__[0]
        max_information_gain_split.__defaults__ = (func,)

        var, val, gain, var_type = get_best_split(y, dataset)

        # Restore the original function
        max_information_gain_split.__defaults__ = (original_func,)

        if gain is not None and gain >= min_information_gain:
            counter += 1
            left, right = make_split(var, val, dataset, var_type)

            # Instantiate sub-tree
            split_type = "<=" if var_type else "in"
            question = "{} {} {}".format(var, split_type, val)
            subtree = {question: []}

            # Find answers (recursion)
            yes_answer = train_tree(left, y, target_factor, max_depth, min_samples_split,
                                   min_information_gain, counter, max_categories, func)

            no_answer = train_tree(right, y, target_factor, max_depth, min_samples_split,
                                  min_information_gain, counter, max_categories, func)

            if yes_answer == no_answer:
                subtree = yes_answer
            else:
                subtree[question].append(yes_answer)
                subtree[question].append(no_answer)

        else:
            pred = make_prediction(dataset[y], target_factor)
            return pred

    else:
        pred = make_prediction(dataset[y], target_factor)
        return pred

    return subtree

def classify_data(observation, tree):
    question = list(tree.keys())[0]

    # Parse the question
    parts = question.split()
    variable = parts[0]
    operator = parts[1]

    if operator == '<=':
        value = float(parts[2])
        if observation[variable] <= value:
            answer = tree[question][0]
        else:
            answer = tree[question][1]
    else:  # operator == 'in'
        # Safe parsing of the value part which could be a list representation
        try:
            # Convert string representation of list to actual list
            value_str = question[question.find('[')+1:question.find(']')]
            value_list = [x.strip(" '\"") for x in value_str.split(',')]
            if observation[variable] in value_list:
                answer = tree[question][0]
            else:
                answer = tree[question][1]
        except:
            # Fallback for simpler cases
            if observation[variable] in eval(parts[2]):
                answer = tree[question][0]
            else:
                answer = tree[question][1]

    if not isinstance(answer, dict):
        return answer
    else:
        return classify_data(observation, answer)

# Function to evaluate the model
def evaluate_model(tree, X, y_true):
    y_pred = []
    for i, row in X.iterrows():
        pred = classify_data(row, tree)
        y_pred.append(pred)

    accuracy = accuracy_score(y_true, y_pred)
    return accuracy, y_pred

# Main execution
    # Split data
    X = dataset.drop('Clicked on Ad', axis=1)
    y = dataset['Clicked on Ad']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

    train_data = X_train.copy()
    train_data['Clicked on Ad'] = y_train

    # ---- Task 1: Decision Tree with Entropy ----
    print("\n--- Task 1: Decision Tree with Entropy ---")
    max_depth = 5
    min_samples_split = 20
    min_information_gain = 1e-5

    # Train with entropy
    tree_entropy = train_tree(train_data, 'Clicked on Ad', True,
                             max_depth, min_samples_split, min_information_gain, func=entropy)

    # Evaluate entropy model
    accuracy_entropy, _ = evaluate_model(tree_entropy, X_test, y_test)
    print(f"Entropy Decision Tree Accuracy: {accuracy_entropy:.4f}")

    # ---- Task 2: Decision Tree with Gini Index ----
    print("\n--- Task 2: Decision Tree with Gini Index ---")

    # Train with gini
    tree_gini = train_tree(train_data, 'Clicked on Ad', True,
                           max_depth, min_samples_split, min_information_gain, func=gini_index)

    # Evaluate gini model
    accuracy_gini, _ = evaluate_model(tree_gini, X_test, y_test)
    print(f"Gini Index Decision Tree Accuracy: {accuracy_gini:.4f}")
    print(f"Difference in accuracy: {abs(accuracy_entropy - accuracy_gini):.4f}")

    # ---- Task 3: Scikit-learn Decision Tree ----
    print("\n--- Task 3: Scikit-learn Decision Tree ---")

    # Handle categorical variables for sklearn
    X_train_proc = pd.get_dummies(X_train.drop(['Ad Topic Line', 'City', 'Country', 'Timestamp'], axis=1))
    X_test_proc = pd.get_dummies(X_test.drop(['Ad Topic Line', 'City', 'Country', 'Timestamp'], axis=1))

    # Ensure columns match
    missing_cols = set(X_train_proc.columns) - set(X_test_proc.columns)
    for col in missing_cols:
        X_test_proc[col] = 0
    X_test_proc = X_test_proc[X_train_proc.columns]

    # Train sklearn model with entropy
    clf_entropy = tree.DecisionTreeClassifier(criterion='entropy', max_depth=max_depth,
                                             min_samples_split=min_samples_split, random_state=42)
    clf_entropy.fit(X_train_proc, y_train)

    # Predict and evaluate
    y_pred_entropy = clf_entropy.predict(X_test_proc)
    accuracy_sklearn_entropy = accuracy_score(y_test, y_pred_entropy)
    print(f"Scikit-learn Entropy Decision Tree Accuracy: {accuracy_sklearn_entropy:.4f}")

    # Train sklearn model with gini
    clf_gini = tree.DecisionTreeClassifier(criterion='gini', max_depth=max_depth,
                                          min_samples_split=min_samples_split, random_state=42)
    clf_gini.fit(X_train_proc, y_train)

    # Predict and evaluate
    y_pred_gini = clf_gini.predict(X_test_proc)
    accuracy_sklearn_gini = accuracy_score(y_test, y_pred_gini)
    print(f"Scikit-learn Gini Decision Tree Accuracy: {accuracy_sklearn_gini:.4f}")

    # Visualize the scikit-learn tree
    plt.figure(figsize=(15, 10))
    tree.plot_tree(clf_entropy, feature_names=X_train_proc.columns, class_names=['0', '1'], filled=True)
    plt.title("Decision Tree (Entropy) Visualization")
    plt.savefig("decision_tree_entropy.png")

    plt.figure(figsize=(15, 10))
    tree.plot_tree(clf_gini, feature_names=X_train_proc.columns, class_names=['0', '1'], filled=True)
    plt.title("Decision Tree (Gini) Visualization")
    plt.savefig("decision_tree_gini.png")

    print("\nTree visualizations saved as decision_tree_entropy.png and decision_tree_gini.png")

    # Compare all methods
    print("\n--- Comparison of All Methods ---")
    print(f"Custom Entropy Decision Tree Accuracy: {accuracy_entropy:.4f}")
    print(f"Custom Gini Index Decision Tree Accuracy: {accuracy_gini:.4f}")
    print(f"Scikit-learn Entropy Decision Tree Accuracy: {accuracy_sklearn_entropy:.4f}")
    print(f"Scikit-learn Gini Decision Tree Accuracy: {accuracy_sklearn_gini:.4f}")