In [1]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score


In [2]:
#reading the data
company_data = pd.read_csv('Company_Data.csv')

In [3]:
company_data.head()

Unnamed: 0,Sales,CompPrice,Income,Advertising,Population,Price,ShelveLoc,Age,Education,Urban,US
0,9.5,138,73,11,276,120,Bad,42,17,Yes,Yes
1,11.22,111,48,16,260,83,Good,65,10,Yes,Yes
2,10.06,113,35,10,269,80,Medium,59,12,Yes,Yes
3,7.4,117,100,4,466,97,Medium,55,14,Yes,Yes
4,4.15,141,64,3,340,128,Bad,38,13,Yes,No


###### Pre processing

In [4]:
#User defined fuction to check for null values
"""This defines the function check_null_values with two arguments: 
the dataset 'data' and the list of special characters to treat as null values 'null_values'."""
def check_null_values_all_cols(data, null_values):
    num_cols = len(data[0])
    # iterates over each column in the dataset using the 'range' object 'num_cols'
    for col_idx in range(num_cols):
        null_found = False
        for row in data:
            value = row[col_idx]
            if value is None or str(value).strip() in null_values:
                null_found = True
                break
        if null_found:
            print(f"Null value found in column {col_idx}!")
        else:
            print(f"No null values found in column {col_idx}.")

In [5]:
class CustomImputer():
    def __init__(self, col_idx, strategy, null_values=["", "NA", "Na", "nA", "na", "N/A", "N/a", "n/A", "n/a"]):
        self.col_idx = col_idx
        self.strategy = strategy
        self.null_values = null_values

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X = X.copy()
        col = [row[self.col_idx] for row in X if row[self.col_idx] not in self.null_values]
        if self.strategy == "mean":
            val = sum(col) / len(col)
        elif self.strategy == "median":
            col.sort()
            mid = len(col) // 2
            if len(col) % 2 == 0:
                val = (col[mid-1] + col[mid]) / 2
            else:
                val = col[mid]
        elif self.strategy == "mode":
            val = max(set(col), key = col.count)
        for j in range(len(X)):
            if X[j][self.col_idx] in self.null_values:
                X[j][self.col_idx] = val
        return X


In [6]:
class OutlierTreatment():
    def __init__(self, method='iqr', multiplier=1.5):
        self.method = method
        self.multiplier = multiplier
        
    def fit(self, X, y=None):
        return self
        
    def transform(self, X):
        if self.method == 'iqr':
            Q1 = np.percentile(X, 25, axis=0)
            Q3 = np.percentile(X, 75, axis=0)
            IQR = Q3 - Q1
            lower = Q1 - self.multiplier * IQR
            upper = Q3 + self.multiplier * IQR
            return np.clip(X, lower, upper)
        
        elif self.method == 'zscore':
            Z = np.abs(stats.zscore(X))
            return X[(Z < self.multiplier).all(axis=1)]


In [7]:
class CategoricalToNumerical():
    def __init__(self):
        self.columns = None
        self.mapping = {}
        
    def fit(self, X, y=None):
        self.columns = X.columns
        for col in X.columns:
            if X[col].dtype == 'object':
                values = list(set(X[col]))
                self.mapping[col] = {val:i for i, val in enumerate(values)}
        return self
    
    def transform(self, X):
        X = X.copy()
        for col in X.columns:
            if col in self.mapping:
                X[col] = X[col].apply(lambda x: self.mapping[col].get(x, -1))
        return X


In [8]:
import random

class TrainTestSplitCV():
    def __init__(self, test_size=0.2, random_state=None):
        self.test_size = test_size
        self.random_state = random_state
        
    def fit(self, X, y=None):
        return self
        
    def transform(self, X, y=None):
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=self.test_size, random_state=self.random_state)
        return X_train, X_test, y_train, y_test

        


###### Defining the node class

In [9]:
class Node():
    def __init__(self, feature_index=None, threshold=None, left=None, right=None, info_gain=None, value=None):
        ''' constructor ''' 
        
        # for decision node
        self.feature_index = feature_index #the index of the feature that this node splits on
        self.threshold = threshold #the threshold value used to split the data at this node
        self.left = left #the left child of this node
        self.right = right #the right child of this node
        self.info_gain = info_gain #the information gain obtained by splitting the data at this node
        
        # for leaf node
        self.value = value #the predicted value of the target variable at this node

###### Defining the DecisionTreeClaasifier class

In [10]:
class DecisionTreeClassifier():
    def __init__(self, min_samples_split=2, max_depth=2):
        ''' The __init__() function is the constructor of the DecisionTreeClassifier class. It takes two parameters:

            min_samples_split: the minimum number of samples required to split a node
            max_depth: the maximum depth of the tree '''
        
        # initialize the root of the tree 
        self.root = None
        
        # stopping conditions
        self.min_samples_split = min_samples_split
        self.max_depth = max_depth
        
    def build_tree(self, dataset, curr_depth=0):
        ''' recursive function to build the tree ''' 
        
        X, Y = dataset[:,:-1], dataset[:,-1]#The X and Y variables represent the features and the target variable of the dataset
        num_samples, num_features = np.shape(X)#number of samples and features in the dataset
        
        # split until stopping conditions are met
        if num_samples>=self.min_samples_split and curr_depth<=self.max_depth:
            # find the best split
            best_split = self.get_best_split(dataset, num_samples, num_features)
            # check if information gain is positive
            if best_split["info_gain"]>0:
                # recur left
                left_subtree = self.build_tree(best_split["dataset_left"], curr_depth+1)
                # recur right
                right_subtree = self.build_tree(best_split["dataset_right"], curr_depth+1)
                # return decision node
                return Node(best_split["feature_index"], best_split["threshold"], 
                            left_subtree, right_subtree, best_split["info_gain"])
        
        # compute leaf node
        leaf_value = self.calculate_leaf_value(Y)
        # return leaf node
        return Node(value=leaf_value)
    
    def get_best_split(self, dataset, num_samples, num_features):
        ''' function to find the best split '''
        
        # dictionary to store the best split
        best_split = {}
        max_info_gain = -float("inf")
        
        # loop over all the features
        for feature_index in range(num_features):
            feature_values = dataset[:, feature_index]
            possible_thresholds = np.unique(feature_values)
            # loop over all the feature values present in the data
            for threshold in possible_thresholds:
                # get current split
                dataset_left, dataset_right = self.split(dataset, feature_index, threshold)
                # check if childs are not null
                if len(dataset_left)>0 and len(dataset_right)>0:
                    y, left_y, right_y = dataset[:, -1], dataset_left[:, -1], dataset_right[:, -1]
                    # compute information gain
                    curr_info_gain = self.information_gain(y, left_y, right_y)
                    # update the best split if needed
                    if curr_info_gain>max_info_gain:
                        best_split["feature_index"] = feature_index
                        best_split["threshold"] = threshold
                        best_split["dataset_left"] = dataset_left
                        best_split["dataset_right"] = dataset_right
                        best_split["info_gain"] = curr_info_gain
                        max_info_gain = curr_info_gain
                        
        # return best split
        return best_split
    
    def split(self, dataset, feature_index, threshold):
        ''' the split method is used to split the dataset into two subsets based on a given threshold value and a feature index. It takes the following parameters:

            dataset: The dataset to split.
            feature_index: The index of the feature to split on.
            threshold: The threshold value to split on.'''
        
        dataset_left = np.array([row for row in dataset if row[feature_index]<=threshold])#contains all the rows where the feature value is less than or equal to the threshold
        dataset_right = np.array([row for row in dataset if row[feature_index]>threshold])
        return dataset_left, dataset_right
    
    def information_gain(self, parent, l_child, r_child):
        ''' function to compute information gain '''
        # calculate entropy of parent node
        parent_entropy = self.entropy(parent)
    
        # calculate entropy of left child node
        left_entropy = self.entropy(l_child)
        
        # calculate entropy of right child node
        right_entropy = self.entropy(r_child)
    
        # calculate weighted average entropy of child nodes
        child_entropy = (len(l_child)/len(parent))*left_entropy + (len(r_child)/len(parent))*right_entropy
    
        # calculate information gain
        gain = parent_entropy - child_entropy
    
        return gain

    
    def entropy(self, y):
        ''' function to compute entropy '''
        
        class_labels = np.unique(y)#gets an array of unique class labels in the set of samples.
        entropy = 0 #This initializes the entropy variable to zero.
        for cls in class_labels: #loops over each class label in the array of unique class labels.
            p_cls = len(y[y == cls]) / len(y) #calculates the probability of a sample 
            entropy += -p_cls * np.log2(p_cls) 
        return entropy
    
        
    def calculate_leaf_value(self, Y):
        ''' function to compute leaf node '''
        
        Y = list(Y)
        return max(Y, key=Y.count)#calculates the majority class label for a leaf node
    
    def print_tree(self, tree=None, indent=" "):
        ''' function to print the tree '''
    
        if not tree:
            tree = self.root
    
        if tree.value is not None:
            print(tree.value)
    
        else:
            print("X"+str(tree.feature_index), "<=", tree.threshold, "?", tree.info_gain)
        
            print(indent + "left: ", end="")
            self.print_tree(tree.left, indent + " ")
        
            print(indent + "right: ", end="")
            self.print_tree(tree.right, indent + " ")

    
    def fit(self, X, Y):
        ''' function to train the tree '''
        
        dataset = np.concatenate((X, Y), axis=1)
        self.root = self.build_tree(dataset)
    
    def predict(self, X):
        ''' function to predict the class labels '''
        # convert X to numpy array
        if isinstance(X, pd.DataFrame):
            X = X.values
        
        predictions = [self.make_prediction(x, self.root) for x in X]
        return predictions
    
    def make_prediction(self, x, tree):
        ''' function to traverse the tree and make predictions '''
        
        # leaf node
        if tree.value != None:
            return tree.value
        
        feature_val = x[tree.feature_index]
        if feature_val <= tree.threshold:
            return self.make_prediction(x, tree.left)
        else:
            return self.make_prediction(x, tree.right)
        
    

In [18]:
company_data.head()

Unnamed: 0,Sales,CompPrice,Income,Advertising,Population,Price,ShelveLoc,Age,Education,Urban,US
0,9.5,138,73,11,276,120,1,42,17,1,1
1,11.22,111,48,16,260,83,0,65,10,1,1
2,10.06,113,35,10,269,80,2,59,12,1,1
3,7.4,117,100,4,466,97,2,55,14,1,1
4,4.15,141,64,3,340,128,1,38,13,1,0


In [12]:
# Splitting data into training and testing data set
from sklearn.model_selection import train_test_split


###### Hyper-Parameter Tuning

In [104]:
class HyperparameterFinder():
    def __init__(self, max_depth_values, min_samples_split_values):
        self.max_depth_values = max_depth_values
        self.min_samples_split_values = min_samples_split_values
        self.best_hyperparameters = None

    def fit(self, X, y=None):
        # Split the data into training and testing sets
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

        # Find the best hyperparameters using the find_best_hyperparameters function
        best_hyperparameters, best_accuracy = self.find_best_hyperparameters(X_train, y_train, X_test, y_test)

        # Store the best hyperparameters
        self.best_hyperparameters = best_hyperparameters

        return self

    def transform(self, X, y=None):
        # Create a new decision tree classifier with the best hyperparameters
        dt = DecisionTreeClassifier(max_depth=self.best_hyperparameters['max_depth'], min_samples_split=self.best_hyperparameters['min_samples_split'])

        # Fit the decision tree on the entire training dataset
        dt.fit(X, y)

        # Return the predictions on the entire dataset
        return dt.predict(X), self.best_hyperparameters
    
    def find_best_hyperparameters(self, X_train, y_train, X_test, y_test):
        hyperparameters = {'max_depth': None, 'min_samples_split': None}
        best_accuracy = 0.0

        for max_depth in self.max_depth_values:
            for min_samples_split in self.min_samples_split_values:
                # Create a new decision tree classifier with the current hyperparameters
                dt = DecisionTreeClassifier(max_depth=max_depth, min_samples_split=min_samples_split)

                # Fit the decision tree on the training dataset
                dt.fit(X_train, y_train)

                # Evaluate the decision tree on the testing dataset
                y_pred = dt.predict(X_test)
                accuracy = accuracy_score(y_test, y_pred)

                # Check if the current hyperparameters are better than the previous best hyperparameters
                if accuracy > best_accuracy:
                    best_accuracy = accuracy
                    hyperparameters['max_depth'] = max_depth
                    hyperparameters['min_samples_split'] = min_samples_split

                # Print the current hyperparameters and accuracy for debugging
                print(f"max_depth={max_depth}, min_samples_split={min_samples_split}, accuracy={accuracy}")

        # Print the best hyperparameters and accuracy for debugging
        print(f"best_hyperparameters={hyperparameters}, best_accuracy={best_accuracy}")

        return hyperparameters, best_accuracy


###### Pipeline

In [105]:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler

In [106]:
preprocessing_pipeline = Pipeline([
    ('outlier_treatment', OutlierTreatment()),
     ('encoder', CategoricalToNumerical())
])

In [107]:
preprocessing_pipeline.fit(company_data)

  return bound(*args, **kwds)


Pipeline(steps=[('outlier_treatment',
                 <__main__.OutlierTreatment object at 0x000001C35AD41A90>),
                ('encoder',
                 <__main__.CategoricalToNumerical object at 0x000001C35AD417F0>)])

In [108]:
# define categorical columns
categorical_cols = ['ShelveLoc', 'Urban','US']

# create an instance of CategoricalToNumerical transformer
cat_to_num_transformer = CategoricalToNumerical()

# fit the transformer to the training data
cat_to_num_transformer.fit(company_data[categorical_cols])

# transform the categorical columns in the training and test data
company_data[categorical_cols] = cat_to_num_transformer.transform(company_data[categorical_cols])

In [109]:
pipeline = Pipeline([   
    ('preprocessing', preprocessing_pipeline),
    ('hyper-parameter', HyperparameterFinder(max_depth_values=[2, 4, 6, 8, 10], min_samples_split_values=[2, 4, 6, 8, 10])),
    ('dt', DecisionTreeClassifier())
])

In [110]:
#Splitting data to X and y
X=company_data.drop(labels='ShelveLoc',axis=1)
y=company_data[['ShelveLoc']]

In [111]:
# Train and evaluate the model using the pipeline
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [112]:
model.fit(X_train,y_train)

Pipeline(steps=[('preprocessing',
                 Pipeline(steps=[('outlier_treatment',
                                  <__main__.OutlierTreatment object at 0x000001C35ACB99A0>),
                                 ('encoder',
                                  <__main__.CategoricalToNumerical object at 0x000001C35ACB9610>)])),
                ('dt',
                 <__main__.DecisionTreeClassifier object at 0x000001C35AD1E520>)])

In [113]:
y_pred = model.predict(X_test)

In [114]:
acc = accuracy_score(y_test, y_pred)

In [118]:
# Evaluate the predictions using accuracy_score
accuracy = accuracy_score(y_test, y_pred)
print(f"Accuracy: {accuracy}")

Accuracy: 0.5625
