# imports

In [421]:
import os
import pandas as pd
from sklearn.impute import KNNImputer
import math
import numpy as np
from graphviz import Digraph
import random

## Project_path

In [422]:
project_path = '/content/drive/My Drive/Colab Notebooks/CarClassification'
print("Project directory:", project_path)

Project directory: /content/drive/My Drive/Colab Notebooks/CarClassification


# Load Datasets

In [423]:
dataset = pd.read_csv(f'{project_path}/datasets/original.train_ds.csv')
test = pd.read_csv(f'{project_path}/datasets/original.test_ds.csv')
# target_label = 'transmission'
# target_label = 'Manufacturer'
target_label = 'model'

# Pre Process

## Drop Nulls

In [424]:
cleaned_dataset = dataset.dropna()

## FFILL

In [None]:
cleaned_dataset = dataset.fillna(method='ffill')

## BFILL

In [None]:
cleaned_dataset = dataset.fillna(method='bfill')

## Fill Numeric Values

### Mean

In [None]:
numeric_dataset = dataset.select_dtypes(include=['number'])
numeric_dataset.fillna(numeric_dataset.mean(), inplace=True)

### Median

In [None]:
numeric_dataset = dataset.select_dtypes(include=['number'])
numeric_dataset.fillna(numeric_dataset.median(), inplace=True)

### Mode

In [None]:
numeric_dataset = dataset.select_dtypes(include=['number'])
numeric_dataset.fillna(numeric_dataset.mode()[0], inplace=True)

### Custom Value

In [None]:
numeric_dataset = dataset.select_dtypes(include=['number'])
numeric_dataset.fillna(value=1, inplace=True)

### Interpolate Linear

In [None]:
numeric_dataset = dataset.select_dtypes(include=['number'])
numeric_dataset.interpolate(method='linear', inplace=True)

## Fill Categorical

In [None]:
categorical_dataset = dataset.select_dtypes(include=['object'])
for col in categorical_dataset.columns:
    mode_value = categorical_dataset[col].mode()[0]
    categorical_dataset[col].fillna(mode_value, inplace=True)

## Concat Numeric & Categircal

In [None]:
cleaned_dataset =  pd.concat([categorical_dataset, numeric_dataset], axis=1)

# Helper functions

In [425]:
def is_discrete(labels:pd.DataFrame.columns):
  if(labels.dtype == 'int64'):
    num_unique = labels.nunique()
    if(num_unique < 20):
      return True
  return False

def float_detector(dataset):
  for col in dataset.select_dtypes(include='float'):
    if (dataset[col] % 1 == 0).all():  # Check if all values are integers
      dataset[col] = dataset[col].astype(int)
  return dataset

def random_outcome(labels):
  model_counts = labels.value_counts()
  total_count = model_counts.sum()
  probabilities = model_counts / total_count
  outcomes = list(zip(probabilities.index, probabilities))
  random_number = random.random()

  cumulative_probability = 0
  for outcome, probability in outcomes:
    cumulative_probability += probability
    if random_number < cumulative_probability:
        return outcome
        break

def probilities_outcome(labels):
  model_counts = labels.value_counts()
  total_count = model_counts.sum()
  probabilities = model_counts / total_count
  outcomes = list(zip(probabilities.index, probabilities))
  temp_dict = {}

  for outcome, probability in outcomes:
    temp_dict[outcome] = probability
  return temp_dict


def find_optimal_threshold(data, numeric_column):
  unique_values = sorted(data[numeric_column].unique())
  optimal_threshold = None
  max_info_gain = float('-inf')
  for i in range(len(unique_values) - 1):
      threshold = (unique_values[i] + unique_values[i+1]) / 2  # Calculate threshold as midpoint
      left_split = data[data[numeric_column] <= threshold]
      right_split = data[data[numeric_column] > threshold]

      info_gain = cal_information_gain(left_split[numeric_column], right_split[numeric_column])

      if info_gain > max_info_gain:
          optimal_threshold = threshold
          max_info_gain = info_gain

  if optimal_threshold is None or np.isnan(optimal_threshold):
    return np.float64(data[numeric_column].mean())
  return optimal_threshold

def cal_information_gain(left_labels, right_labels):

    parent_entropy = entropy(np.concatenate((left_labels, right_labels)))
    total_samples = len(left_labels) + len(right_labels)
    weighted_entropy = (len(left_labels) / total_samples) * entropy(left_labels) \
                        + (len(right_labels) / total_samples) * entropy(right_labels)

    information_gain = parent_entropy - weighted_entropy
    return information_gain
def entropy(labels):
    unique_labels, counts = np.unique(labels, return_counts=True)
    probabilities = counts / len(labels)
    entropy = -np.sum(probabilities * np.log2(probabilities))
    return entropy

def calculate_information_gain(feature, parent_label, child_labels):

  child_labels = [label.root.data[feature] for label in child_labels]
  parent_entropy = entropy(parent_label[feature])
  total_samples = len(parent_label[feature])
  if total_samples == 0:
      return 0  # Avoid division by zero
  weighted_entropy = 0
  for labels in child_labels:
    branch_samples = len(labels)
    weighted_entropy += (branch_samples / total_samples) * entropy(labels)
  information_gain = parent_entropy - weighted_entropy
  return information_gain

# Float to Integer Convertion

In [426]:
cleaned_dataset = float_detector(cleaned_dataset)
print(cleaned_dataset.dtypes)
cleaned_test = test

model            object
year              int64
price             int64
transmission     object
mileage           int64
fuelType         object
tax               int64
mpg             float64
engineSize      float64
Manufacturer     object
dtype: object


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dataset[col] = dataset[col].astype(int)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dataset[col] = dataset[col].astype(int)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dataset[col] = dataset[col].astype(int)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row

# Discretization

## No Discretization

In [427]:
discrete_features = {feature: False for feature in cleaned_dataset.columns}
print(discrete_features)

{'model': False, 'year': False, 'price': False, 'transmission': False, 'mileage': False, 'fuelType': False, 'tax': False, 'mpg': False, 'engineSize': False, 'Manufacturer': False}


## Yes Discretization

In [None]:
discrete_features = {feature: is_discrete(cleaned_dataset[feature]) for feature in cleaned_dataset.columns}
print(discrete_features)

{'model': False, 'year': True, 'price': False, 'transmission': False, 'mileage': False, 'fuelType': False, 'tax': True, 'mpg': False, 'engineSize': False, 'Manufacturer': False}


#Node

In [428]:
class Node:
  def __init__(self, data:pd.DataFrame, features: pd.DataFrame.columns, label, discerete_features={}, value = None) -> None:
     self.data = data
     self.features = features
     self.label = label
     self.value = value
     self.discerete_features = discerete_features


# Decision Tree

In [435]:
class Tree:
  def __init__(self, root:Node, depth=0, split_method='entropy', target_label='model') -> None:
     self.root = root
     self.depth = depth
     self.split_method = split_method
     self.target_label = target_label
     self.branches = {}
     self.features_ig = {key: 0 for key in self.root.features}

  def build(self) -> 'Tree':

    if len(self.root.features) == 1 and len(self.root.data) > 1:
      # self.root.value = probilities_outcome(self.root.data[self.target_label])
      self.root.value = random_outcome(self.root.data[self.target_label])
      self.root.label = self.root.value
    elif len(self.root.data) == 1:
      # self.root.value = probilities_outcome(self.root.data[self.target_label])
      # self.root.value = random_outcome(self.root.data[self.target_label])
      self.root.value = self.root.data[self.target_label].value_counts().idxmax()
      self.root.label = self.root.value
    elif self.root.data.empty:
      self.root.label = None
      self.root.value = None
    else:
      selected_feature = self.feature_selector(self.split_method, self.root.features)
      self.branches = self.split(self.root.data, selected_feature)
      for branch in self.branches:
        self.branches[branch].build()

  def feature_selector(self, mode, features):
    if(mode == 'entropy'):
      features_entropy = {key: 0 for key in self.root.features}
      for feature in features:
        if(feature == self.target_label):
        # if(feature == 'model'):
          feature_entropy = float('+inf')
        else:
          feature_entropy = entropy(self.root.data[feature].to_list())
        features_entropy[feature] = feature_entropy

      selected_feature = min(features_entropy, key=features_entropy.get)
    elif(mode == 'information_gain'):
      features_ig = {key: 0 for key in self.root.features}
      for feature in features:
        if(feature == self.target_label):
        # if(feature == 'model'):
          feature_ig = float('-inf')
        else:
          branches = self.split(self.root.data, feature)
          parent_branch = self.root.data

          feature_ig = calculate_information_gain(self.target_label, parent_branch, list(branches.values()))
          # feature_ig = calculate_information_gain('model', parent_branch, list(branches.values()))
        features_ig[feature] = feature_ig

      selected_feature = max(features_ig, key=features_ig.get)

    return selected_feature

  def split(self, data, feature) -> tuple:
    branches = {}
    if(data[feature].dtype == 'object'):
      branches = self.split_categorical(data, feature)
    elif(self.root.discerete_features[feature]):
      branches = self.split_categorical(data, feature)
    else:
      branches = self.split_numeric(data, feature)
    return branches


  def split_numeric(self, data, feature):
    threshold = find_optimal_threshold(self.root.data, feature)

    # threshold = data[feature].mean()
    branches = {}
    left_data = data[data[feature] <= threshold].drop(feature, axis=1)
    left_node = Node(left_data, left_data.columns, label=f'{feature}*{threshold}*left', discerete_features=self.root.discerete_features)
    left_tree = Tree(left_node, depth=self.depth+1, split_method=self.split_method, target_label = self.target_label)
    branches[f'{feature}*{threshold}*left'] = left_tree

    right_data = data[data[feature] > threshold].drop(feature, axis=1)
    right_node = Node(right_data, right_data.columns, label=f'{feature}*{threshold}*right', discerete_features=self.root.discerete_features)
    right_tree = Tree(right_node, depth=self.depth+1, split_method=self.split_method, target_label = self.target_label)
    branches[f'{feature}*{threshold}*right'] = right_tree
    return branches

  def split_categorical(self, data, feature):
    branches = {}
    for data_point in set(data[feature]):
      new_data = data[data[feature] == data_point].drop(feature, axis=1)
      new_root = Node(new_data, new_data.columns, label=f'{feature}*{data_point}', discerete_features=self.root.discerete_features)
      branches[f'{feature}*{data_point}'] = Tree(new_root, depth=self.depth+1, split_method=self.split_method, target_label = self.target_label)
    return branches




  def visualize(self, path):
    dot = Digraph()
    self._visualize_helper(dot, self)
    dot.render(path, format='png', cleanup=True)

  def _visualize_helper(self, dot, parent):

    if parent.root.label is None:
      return
    dot.node(str(id(parent)), label=self.root.label)
    if self.root.value:
      return
    for branch in self.branches:
      if self.branches[branch].root.label is None:
        return
      edge_label = '*'.join(self.branches[branch].root.label.split('*')[1:])
      self.branches[branch]._visualize_helper(dot, self.branches[branch])
      dot.edge(str(id(parent)), str(id(self.branches[branch])), label=edge_label)

  # def _visualize_helper(self, dot, parent_label):
  #   edge_label = '*'.join(parent_label.split('*')[1:])
  #   print('parent',parent_label.split('*')[0])
  #   print(self.root.label , '**',self.root.value)
  #   dot.node(str(id(parent_label)), label=str(self.root.label))
  #   if self.root.value:
  #     return
  #   for branch in self.branches:
  #     self.branches[branch]._visualize_helper(dot, branch)
  #     dot.edge(str(id(parent_label)), branch, label=edge_label)


  def predict(self, data_point: pd.DataFrame):
    if self.root.value is not None:
      return self.root.value
    else:
      for branch in self.branches:
        if branch.endswith('left'):
          branch_parts = branch.split('*')
          feature = branch_parts[0]
          threshold = float(branch_parts[1])
          if(data_point.iloc[0][feature] <= threshold):
            # print(data_point)
            # print(branch)
            new_data_point = data_point.drop(feature, axis=1)
            return self.branches[branch].predict(new_data_point)

        elif branch.endswith('right'):
          branch_parts = branch.split('*')
          feature = branch_parts[0]
          threshold = float(branch_parts[1])
          if(data_point.iloc[0][feature] > threshold):
            # print(data_point)
            # print(branch)
            new_data_point = data_point.drop(feature, axis=1)
            return self.branches[branch].predict(new_data_point)
        else:
          branch_parts = branch.split('*')
          feature = branch_parts[0]
          feature_value = branch_parts[1]
          if str(data_point.iloc[0][feature]) == str(feature_value):
            # print(data_point)
            # print(branch)
            new_data_point = data_point.drop(feature, axis=1)
            return self.branches[branch].predict(new_data_point)


# Build Tree

## Entropy

In [436]:
node = Node(cleaned_dataset ,cleaned_dataset.columns, 'root', discrete_features)
tree = Tree(node, split_method='entropy', target_label=target_label)
# tree.calculate_ig()
tree.build()

## Information Gain

In [438]:

node = Node(cleaned_dataset ,cleaned_dataset.columns, 'root', discrete_features)
tree = Tree(node, split_method='information_gain', target_label=target_label)
# tree.calculate_ig()
tree.build()

In [413]:
test_pd = cleaned_test.iloc[[1]].copy()
# test_pd = cleaned_dataset.iloc[[2]].copy()
print(test_pd)
result = tree.predict(test_pd)
print(result, test_pd.iloc[0][target_label])

       model  year  price transmission  mileage fuelType  tax   mpg  \
1   2 Series  2019  24590    Semi-Auto     3300   Diesel  145  48.7   

   engineSize Manufacturer  
1         2.0          BMW  
 1 Series  2 Series


# Accuracy

## Train Accuracy

In [296]:
count = 0
for i in range(len(cleaned_dataset)):
  test_pd = cleaned_dataset.iloc[[i]].copy()
  result = tree.predict(test_pd)
  # for outcome in result:
  #   if outcome == test_pd.iloc[0][target_label]:
  #     count+=1
  if result == test_pd.iloc[0][target_label]:
    count+=1
print(count/ len(cleaned_dataset))

0.9311064718162839


## Test Accuracy

In [414]:
count = 0
for i in range(len(cleaned_test)):
  test_pd = cleaned_test.iloc[[i]].copy()
  result = tree.predict(test_pd)
  if result is None:
    continue
  # for outcome in result:
  #   if outcome == test_pd.iloc[0][target_label]:
  #     count+=1
  if result == test_pd.iloc[0][target_label]:
    count+=1
print(count/ len(cleaned_test))

0.5433333333333333


# Visualize

In [439]:
tree.visualize(f'{project_path}/results/IG_tree')
# tree.visualize(f'{project_path}/results/Entropy_tree')

dot: graph is too large for cairo-renderer bitmaps. Scaling by 0.413521 to fit


In [None]:
import numpy as np
import pandas as pd

def entropy(labels):
    # Calculate the proportion of each class in the labels
    value_counts = labels.value_counts() / len(labels)
    # Filter out zero probabilities to avoid log(0) errors
    value_counts = value_counts[value_counts > 0]
    # Calculate entropy
    entropy = -np.sum(value_counts * np.log2(value_counts))
    return entropy

def information_gain(parent_label, splits):
    # Calculate the entropy of the parent node
    parent_entropy = entropy(parent_label)
    total_samples = len(parent_label)

    # Calculate the weighted entropy of the child nodes
    weighted_entropy = sum(len(split) / total_samples * entropy(split) for split in splits)

    # Calculate information gain
    information_gain = parent_entropy - weighted_entropy
    return information_gain

def calculate_information_gains(data, target_col):
    parent_entropy = entropy(data[target_col])
    information_gains = {}
    for feature in data.columns:
        if feature != target_col:
            splits = [data[data[feature] == value][target_col] for value in data[feature].unique()]
            information_gains[feature] = information_gain(data[target_col], splits)
    return information_gains

# Example usage:
# Assuming 'data' is your pandas DataFrame and 'target_col' is the name of your target variable column
data = cleaned_dataset
all = {}
for col in cleaned_dataset:

  target_col = col  # Replace 'target' with the name of your target variable column
  information_gains = calculate_information_gains(data, target_col)
  all[col] = information_gains
  print(f"Information Gains for {col}:", information_gains)


In [None]:
print(all)

In [None]:
average_information_gains = {}
for feature, gains in all.items():
    average_gain = sum(gains.values()) / len(gains)
    average_information_gains[feature] = average_gain

# Find the feature with the highest average information gain
highest_average_gain_feature = max(average_information_gains, key=average_information_gains.get)
highest_average_gain = average_information_gains[highest_average_gain_feature]

print("Feature with the highest average information gain:", highest_average_gain_feature)
print("Highest average information gain:", highest_average_gain)