In [1]:
import pandas as pd
from typing import List, Any

In [2]:
csvfile = "/Users/alifabdullah/Collaboration/Kaggle-ML-Algorithm-Musings/datasets/car_evaluation.csv"
csv_in_panda_form = pd.read_csv(csvfile)
target_column = "Decision"
feature_columns = [feature_header for feature_header in csv_in_panda_form.columns.drop(target_column)]

#print(csv_in_panda_form)
#print(feature_columns)

#for row in csv_in_panda_form.iterrows():
#  print(row[1]["Outcome"])

print(len(csv_in_panda_form[csv_in_panda_form["NumberOfDoors"] == '2']))
print((csv_in_panda_form["Decision"].unique()))
print(csv_in_panda_form.iloc[0])
  


432
['unacc' 'acc' 'vgood' 'good']
BuyingPrice        vhigh
MaintenanceCost    vhigh
NumberOfDoors          2
NumberOfPersons        2
LugBoot            small
Safety               low
Decision           unacc
Name: 0, dtype: object


In [8]:
class Branch:
  """
  This class represents a branch from a Decision tree.
  """
  
  def __init__(self):
    self.value = None
    self.next_node : 'DecisionTreeNode' = None

  def change_value(self, value):
    self.value = value

  def change_next_node(self, next_node: 'DecisionTreeNode'):
    self.next_node = next_node

class DecisionTreeNode:
  """
  This class represents the nodes that will make up my decision tree.
  There are two types of nodes. Branching nodes - whose children are either
  more branching nodes or leaf nodes - and leaf nodes - where, upon reaching these
  nodes, you end up with a decision to make about the test instance.

  Note that you can only have a list of branches or a decision, not both.
  One of them must be none at all times.
  """

  # If decision is None, we create a Branch Node
  # If decision is not None, we create a Leaf Node
  def __init__(self, decision: Any = None):
    if decision is None:
      self.list_of_branches : List[Branch] = []
      self.decision = None
    else: 
      self.list_of_branches = None
      self.decision = decision
    self.feature_name = None
  
  def add_branch(self, branch: Branch):
    assert self.decision is None, "Cannot add branches to a leaf node."
    self.list_of_branches.append(branch)
  
  def change_decision(self, decision):
    assert self.list_of_branches == [], "Cannot change the decision of a branch node."
    self.decision = decision
  
  def assign_feature_name(self, feature_name):
    # Ensure immutability for feature name once we assign it
    assert self.feature_name is None, "Cannot reassign feature name."

    self.feature_name = feature_name

In [9]:
def _decision_tree_split_decider(dataframe : pd.DataFrame, list_of_labels, target_column):
  """
  Takes in a Pandas dataframe, goes through every feature, figures out which 
  one gives the greatest error difference before and after the particular feature,
  chooses that to split the data, and repeats the process for the remaining
  features.
  """

  # Get all features beside target column
  all_features = dataframe.drop(columns=[target_column]).columns.to_list()

  # All non terminating nodes of the tree (that are at the end of value branches
  # a previously chosen feature).
  frontier = []

  # Repeat the decision tree construction process until all features have been looked at
  while len(all_features) > 0:

    biggest_error_diff = 0
    feature_with_biggest_error_diff = None

    # Compare error before any feature split with each feature split's error:
    error_before_split = _dataset_error_calculator(dataframe, list_of_labels, target_column)
    print(f"Before feature split: {error_before_split}")

    # Loop over every feature to get its error on the dataframe
    for feature in all_features:
      feature_error = _dataset_error_calculator(dataframe, list_of_labels, target_column, feature)
      print(f"{feature}: {feature_error}")

      # Find the feature split that causes the biggest error difference from before to after the split
      if error_before_split - feature_error > biggest_error_diff:
        biggest_error_diff = error_before_split - feature_error
        feature_with_biggest_error_diff = feature
    
    current_tree_node = DecisionTreeNode()
    unique_values_of_the_current_biggest_feature = dataframe[feature_with_biggest_error_diff].unique()
    print(f"Feature with biggest error diff: {feature_with_biggest_error_diff}")
    print(f"Unique Values of the feature with biggest error diff: {unique_values_of_the_current_biggest_feature}")

    # Create our tree head, and give it the feature with the biggest error difference
    # from before the split to after the split
    current_tree_node.assign_feature_name(feature_with_biggest_error_diff)

    for unique_value_of_the_current_biggest_feature in unique_values_of_the_current_biggest_feature:
      print(f"Individual unique values: {unique_value_of_the_current_biggest_feature}")
      branch_of_the_current_tree_node = Branch()
      branch_of_the_current_tree_node.change_value(unique_value_of_the_current_biggest_feature)
      current_tree_node.add_branch(branch_of_the_current_tree_node)
      
      # Get sub datasets whose values correspond to the current chosen value, and
      # the feature splitting.
    print(f"Current tree node: {current_tree_node}")
    print(f"Branches of the current tree node: {current_tree_node.list_of_branches}")
    break

def _dataset_error_calculator(dataframe, list_of_labels, target_column, feature=None):
  if feature is not None:
    unique_feature_values = dataframe[feature].unique()
    error_sum = 0
    for feature_value in unique_feature_values:
      current_df = dataframe[dataframe[feature] == feature_value]
      current_df_error, label_to_count_mapping_dictionary_current_df = _dataset_consistency_metrics(current_df, list_of_labels, target_column)
      error_sum += current_df_error
    return error_sum
  else:
    error, label_to_count_mapping_dictionary = _dataset_consistency_metrics(dataframe, list_of_labels, target_column)
    return error
    

def _dataset_consistency_metrics(dataframe, list_of_labels, target_column):
  """
  Tells you the counts of instances, in a dataset, corresponding to labels, as 
  well as the error of the dataset, where the error is calculated as the number
  of dataset instances corresponding to the label with the least instances.

  Args:
    dataframe - A Pandas DataFrame, representing our dataset
    list_of_labels - List of labels that represents all labels that could be assigned (replaced by self.list_of_labels going forward)
    target_column: The name of the column we are trying to predict (replaced by self.target_column going forward)
  
  Returns:
    error - The error, as defined above
    label_counts - A dictionary mapping between labels and their respective counts
  """
  label_count_mapping_dict = dict()
  for key in list_of_labels:
    label_count_mapping_dict[key] = 0
  
  for row in dataframe.iterrows():
    label_count_mapping_dict[row[1][target_column]] += 1 
  
  error = float("inf")
  for key in list_of_labels:
    if label_count_mapping_dict[key] < error:
      error = label_count_mapping_dict[key]
  return error, label_count_mapping_dict

print(_dataset_error_calculator(csv_in_panda_form, ['unacc','acc','vgood','good'], "Decision", None))
print(_dataset_error_calculator(csv_in_panda_form, ['unacc','acc','vgood','good'],"Decision", "NumberOfDoors"))
print(_decision_tree_split_decider(csv_in_panda_form, ['unacc','acc','vgood','good'], "Decision"))

65
61
Before feature split: 65
BuyingPrice: 62
MaintenanceCost: 49
NumberOfDoors: 61
NumberOfPersons: 63
LugBoot: 48
Safety: 30
Feature with biggest error diff: Safety
Unique Values of the feature with biggest error diff: ['low' 'med' 'high']
Individual unique values: low
Individual unique values: med
Individual unique values: high
Current tree node: <__main__.DecisionTreeNode object at 0x10cb42560>
Branches of the current tree node: [<__main__.Branch object at 0x10cad52a0>, <__main__.Branch object at 0x10cb42a40>, <__main__.Branch object at 0x10caeead0>]
None
