In [1]:
import pandas as pd
import numpy as np
from typing import Tuple, List, Dict
import matplotlib.pyplot as plt
import math

from scipy.optimize import minimize

import sys
sys.path.insert(0, '../scripts/')
import utils as utl

In [2]:
df = pd.read_csv('../data/1.raw/customerClassification.csv')#, parse_dates=['DateTime'],index_col=['DateTime'])
df.columns
X_cols = ['Gender', 'Ever_Married', 'Age', 'Graduated', 'Profession',
       'Work_Experience', 'Spending_Score', 'Family_Size',
       'Segmentation']
X_cat_cols = ['Gender', 'Ever_Married', 'Graduated', 'Profession',
       'Spending_Score', 'Segmentation']
y_col = 'Var_1'

X = df[X_cols]
X = X.fillna(0)

# for cat in X_cat_cols:
#     X[cat] = X[cat].astype('category').cat.codes

# for c in X.columns:
#     X[c] = utl.min_max_scaling(X[c])[0]


y = df[y_col].fillna(0).astype('category').cat.codes

In [3]:
def entropy(y: pd.Series) -> float:
    """Calculates the entropy of y

    Parameters
    ----------
    y : pd.Series
        Categorical value for which the entropy will be calculated

    Returns
    -------
    float
        Entropy
    """
    entropy = 0
    ntt = len(y)
    if ntt != 0:
        for u in y.unique():
            y_c = y.loc[y==u]
            p = len(y_c)/ntt
            if p != 0:
                entropy += -p*math.log2(p)
    return entropy

In [4]:
def get_splits(x:pd.Series, y:pd.Series) -> np.array:
    """Returns the splits of the categorical data x, using the median x for each category of y

    Parameters
    ----------
    x : pd.Series
        Input data for which the splits will be calculated
    y : pd.Series
        Target categorical data 

    Returns
    -------
    np.array
        _description_
    """
    vals = []
    for u in y.unique():
        x_np = x.loc[y==u].to_numpy()
        vals.append(np.median(x_np))
    vals_sorted = np.sort(np.array(vals))
    _, idx = np.unique(vals_sorted, return_index=True)
    return vals_sorted[np.sort(idx)]

In [5]:
def iloc_ranges(y: pd.Series, x: pd.Series, splits: List, i: int) -> pd.Series:
    """Filters the data from the series y for which the continuous data x is between the values split[i-1] and split[i]

    Parameters
    ----------
    x : pd.Series
        Continuous data to use as base for the filtering
    y : pd.Series or pd.DataFrame
        Data to be filtered
    splits : List
        Values at which the values of x can be splitted
    i : int
        Index of the split to be used


    Returns
    -------
    pd.Series
        _description_
    """
    if len(x) > 0:
        if i > 0 and i < len(splits):
            Sv = y.loc[(x>splits[i-1]) & (x<=splits[i])]
        elif i == 0:
            Sv = y.loc[x<=splits[0]]
        else:
            Sv = y.loc[x>splits[-1]]
        return Sv
    return pd.Series([])

In [6]:
def info_gain_cat(x:pd.Series, y:pd.Series) -> float:
    """Calculates the info gain of using a given categorical attribute to describe categorical data

    Parameters
    ----------
    x : pd.Series
        Categorical values of the attribute
    y : pd.Series
        Categorical target

    Returns
    -------
    float
        Information gain
    """
    ES = entropy(y)
    ESv = 0
    nt = len(y)
    if nt != 0:
        for u in x.unique():
            Sv = y.loc[x==u]
            n = len(Sv)        
            ESv += - (n/nt)*entropy(Sv)
    return ES - ESv

def info_gain_con(x:pd.Series, y:pd.Series) -> float:
    """Calculates the info gain of using a given continuous attribute to describe categorical data

    Parameters
    ----------
    x : pd.Series
        Categorical values of the attribute
    y : pd.Series
        Categorical target

    Returns
    -------
    float
        Information gain
    """
    splits = get_splits(x, y)
    ES = entropy(y)
    ESv = 0
    nt = len(y)
    if nt != 0:
        for i in range(len(splits) + 1):
            Sv = iloc_ranges(y, x, splits, i)
            n = len(Sv)
            ESv += - (n/nt)*entropy(Sv)
    return ES - ESv

def get_max_ig(X: pd.DataFrame, y: pd.Series, X_cat_cols: List) -> str:
    dct = {}
    cols = list(X.columns)
    for attr in cols:
        if attr in X_cat_cols:
            dct[attr] = info_gain_cat(X[attr], y)
        else:
            dct[attr] = info_gain_con(X[attr], y)
    return max(dct, key=dct.get)    

In [7]:
class Decision_Tree:
    def __init__(self) -> None:
        pass

    def new_nodes(self, X: pd.DataFrame, y: pd.Series, attr_max_gain: str, X_cat_cols) -> Dict:
        dct_res = {}
        cols = list(X.columns)
        cols.remove(attr_max_gain)
        X_upd = X[cols]
        Xcc = X_cat_cols.copy()
        if attr_max_gain in X_cat_cols:
            Xcc.remove(attr_max_gain)
        
        dct_res['attr'] = attr_max_gain
        if attr_max_gain in X_cat_cols:
            dct_res['type'] = 'cat'
            dct_res['vals'] = X[attr_max_gain].unique()
            childs = {}
            for u in dct_res['vals']:
                Xn = X_upd.loc[X[attr_max_gain]==u]
                yn = y.loc[X[attr_max_gain]==u]
                if len(yn)>0:
                    childs[u] = self.des_tree_nodes(Xn, yn, Xcc)
            dct_res['res'] = childs
        else:
            dct_res['type'] = 'cont'
            splits = get_splits(X[attr_max_gain], y)
            dct_res['vals'] = splits
            childs = {}
            for i in range(len(splits) + 1):                
                Xn = iloc_ranges(X_upd, X[attr_max_gain], splits, i)
                yn = iloc_ranges(y, X[attr_max_gain], splits, i)
                if len(yn)>0:
                    childs[str(i)] = self.des_tree_nodes(Xn, yn, Xcc)
            dct_res['res'] = childs
        return dct_res
    
    def des_tree_nodes(self, X: pd.DataFrame, y: pd.Series, X_cat_cols):
        cols = list(X.columns)
        attr_max_gain = get_max_ig(X, y, X_cat_cols)
        cols.remove(attr_max_gain)
        if len(cols)!=0:
            out = self.new_nodes(X, y, attr_max_gain, X_cat_cols)
            return out
        y_count = {}
        for u in y.unique():
            y_count[u] = y.loc[y==u].count()
        out = max(y_count, key=y_count.get)
        return out

In [8]:
dt = Decision_Tree()
res_dict = dt.des_tree_nodes(X, y, X_cat_cols)
res_dict

{'attr': 'Gender',
 'type': 'cat',
 'vals': array(['Male', 'Female'], dtype=object),
 'res': {'Male': {'attr': 'Work_Experience',
   'type': 'cont',
   'vals': array([1.]),
   'res': {'0': {'attr': 'Ever_Married',
     'type': 'cat',
     'vals': array(['No', 'Yes', 0], dtype=object),
     'res': {'No': {'attr': 'Spending_Score',
       'type': 'cat',
       'vals': array(['Low'], dtype=object),
       'res': {'Low': {'attr': 'Segmentation',
         'type': 'cat',
         'vals': array(['D', 'C', 'B', 'A'], dtype=object),
         'res': {'D': {'attr': 'Graduated',
           'type': 'cat',
           'vals': array(['No', 'Yes', 0], dtype=object),
           'res': {'No': {'attr': 'Age',
             'type': 'cont',
             'vals': array([21. , 21.5, 22. , 23.5]),
             'res': {'0': {'attr': 'Profession',
               'type': 'cat',
               'vals': array(['Healthcare', 'Doctor', 'Marketing', 'Artist', 'Entertainment'],
                     dtype=object),
        

In [9]:
X['Ever_Married'].unique()

array(['No', 'Yes', 0], dtype=object)