In [5]:
from scipy.io.arff import loadarff
import pandas as pd
import numpy as np
import csv
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

def read_and_split_data(path, test_share, target_name):
    #function for reading and splitting data into train and test datasets
    data = loadarff(path)
    df = pd.DataFrame(data[0])
    df[target_name] = LabelEncoder().fit_transform(df[target_name])
    X_train, X_test, y_train, y_test = train_test_split(pd.get_dummies(df.drop(columns=target_name)), df[target_name], test_size=test_share, random_state=42)
    feature_types = np.where(X_train.dtypes.values == 'float64', 'real', 'ohe')
    return X_train, X_test, y_train, y_test, feature_types


In [6]:
import numpy as np
import pandas as pd
from collections import Counter
from tqdm import tqdm
from sklearn.metrics import f1_score
from joblib import Parallel, delayed

def find_best_split(x, y, alpha, beta, f_type='real'):
    #function for finding the best split
    x = pd.Series(x)
    y = pd.Series(y)
    n = y.shape[0]
    if f_type =='real':
        x = x.sort_values() #sorting the feature vector 
        y = y[x.index] #sorting the target vector accordingly 
        thr_inds = np.where(x.values[:-1] != x.values[1:])[0] #finding indexes where the feature changes its value
        thresholds = (x.values[thr_inds]+x.values[thr_inds+1])/2 #finding thresholds
        y = pd.get_dummies(y)
        y_temp = y.cumsum()
        y_temp = y_temp.div(y_temp.sum(axis=1), axis=0) #finding probabilities for all partitions 
        
        if alpha == 1: # in this case, Shannon entropy-based information gain is used
            y_temp = y_temp.values[thr_inds,:]
            y_temp_log = np.log2(y_temp)
            shannon_vals_1 = -np.sum(np.multiply(y_temp,y_temp_log), axis =1) #calculating Shannon entropy
        else:
            y_temp = y_temp.values[thr_inds,:]**alpha 
            if beta == 1: #in this case, Renyi entropy-based information gain is used
                renyi_vals_1 = (1/(1-alpha)) * np.log2(np.sum(y_temp, axis=1)) #calculating Renyi entropy
            else: #in this case, Sharma-Mittal entropy-based information gain is used
                sharma_vals_1 = (1/(1-beta))*(np.power(np.sum(y_temp, axis=1), ((1-beta)/(1-alpha)))-1) #calculating Sharma-Mittal entropy
        #doing the same for the right subtrees
        y_temp = y[::-1].cumsum()
        y_temp = y_temp.div(y_temp.sum(axis=1), axis=0) 
        
        if alpha == 1:
            y_temp = y_temp.values[n-thr_inds-2,:]
            y_temp_log = np.log2(y_temp)
            shannon_vals_2 = -np.sum(np.multiply(y_temp,y_temp_log), axis =1) 
            info_gain = shannon_vals_1*(thr_inds+1)/n + shannon_vals_2*(n-thr_inds-1)/n #calculating Shannon entropy-based information gain
        else:
            y_temp = y_temp.values[n-thr_inds-2,:]**alpha
            if beta == 1:
                renyi_vals_2 = (1/(1-alpha)) * np.log2(np.sum(y_temp, axis=1))
                info_gain = renyi_vals_1*(thr_inds+1)/n + renyi_vals_2*(n-thr_inds-1)/n #calculating Renyi entropy-based information gain
            else:
                sharma_vals_2 = (1/(1-beta))*(np.power(np.sum(y_temp, axis=1), ((1-beta)/(1-alpha)))-1)
                info_gain = sharma_vals_1*(thr_inds+1)/n + sharma_vals_2*(n-thr_inds-1)/n #calculating Sharma-Mittal based information gain
            
        try:
            best_ind = np.argmin(info_gain) 
            best_thr = thresholds[best_ind]
        except:
            return 0, -1
    else: #doing the same but for  ONE hot encoded feature, where only one splitting is possible
        best_thr = 0.5
        best_ind=0
        y1 = y[x<0.5]
        y2 = y[x>0.5]
        n1 = y1.shape[0]
        n2 = n-n1
        
        if alpha == 1:
            y1 = y1.value_counts(normalize=True).values
            y1_log = np.log2(y1)
            y2 = y2.value_counts(normalize=True).values
            y2_log = np.log2(y2)
            info_gain = [-np.sum(np.multiply(y1,y1_log))*n1/n - np.sum(np.multiply(y2,y2_log))*n2/n]
        else:
            y1 = (y1.value_counts(normalize=True).values)**alpha
            y2 = (y2.value_counts(normalize=True).values)**alpha
            if beta == 1:
                info_gain = [(1/(1-alpha)) * np.log2(np.sum(y1))*n1/n + (1/(1-alpha)) * np.log2(np.sum(y2))*n2/n] #calculating Renyi entropy-based information gain
            else:
                info_gain = [(1/(1-beta))*(np.power(np.sum(y1), ((1-beta)/(1-alpha)))-1)*(n1/n)+ (1/(1-beta))*(np.power(np.sum(y2), ((1-beta)/(1-alpha)))-1)*(n2/n)] #calculating Sharma-Mittal entropy-based information gain 
        
    return best_thr, info_gain[best_ind] 

class DecisionTreeRenyi:
    #class describing a decision tree
    def __init__(self, feature_types, max_features, alpha, beta, max_depth=16, min_samples_split=100, min_samples_leaf=10):

        self._tree = {}
        self.feature_types = feature_types
        self.max_features = max_features
        self._max_depth = max_depth
        self._min_samples_split = min_samples_split
        self._min_samples_leaf = min_samples_leaf
        self.alpha = alpha
        self.beta = beta

    def _fit_node(self, sub_X, sub_y, node, depth=0):
        if self._max_depth is not None and depth==self._max_depth: #max depth
            node["type"] = "terminal"
            node["class"] = Counter(sub_y).most_common(1)[0][0]
            return
            
        if np.all(sub_y == sub_y[0]):
            node["type"] = "terminal"
            node["class"] = sub_y[0]
            return
        
        if self._min_samples_split is not None and (len(sub_y)<self._min_samples_split): #min_split
            node["type"] = "terminal"
            node["class"] = Counter(sub_y).most_common(1)[0][0]
            return

        feature_best, threshold_best, infogain_best, split = None, None, None, None
        
        for feature in np.random.choice(self.shape, self.max_features):
            feature_vector = sub_X[:, feature]
            threshold, infogain = find_best_split(feature_vector, sub_y, self.alpha, self.beta, f_type = self.feature_types[feature])
             
            if infogain_best is None or infogain > infogain_best:
                feature_best = feature
                infogain_best = infogain
                split = feature_vector < threshold
                threshold_best = threshold        

        if feature_best is None or infogain_best == -1 or np.all(split) or np.all(~split):
            node["type"] = "terminal"
            node["class"] = Counter(sub_y).most_common(1)[0][0]
            return
        
        if self._min_samples_leaf is not None and (np.sum(split)<self._min_samples_leaf or  np.sum(~split)<self._min_samples_leaf):
            node["type"] = "terminal"
            node["class"] = Counter(sub_y).most_common(1)[0][0]
            return
        
        node["type"] = "nonterminal"
        node["feature_split"] = feature_best
        node["threshold"] = threshold_best
        node["left_child"], node["right_child"] = {}, {}
        
    
        self._fit_node(sub_X[split], sub_y[split], node["left_child"], depth=depth+1)  
        self._fit_node(sub_X[np.logical_not(split)], sub_y[np.logical_not(split)], node["right_child"], depth=depth+1)

    def _predict_node(self, x, node):
        tree = node
        class_i = None
        while True:
            if tree['type'] == 'terminal':
                class_i = tree['class']
                break
            else:
                feature_split_i = tree['feature_split']
                real_split_i = tree['threshold']
                if x[feature_split_i] < real_split_i:
                    tree = tree['left_child']
                else:
                    tree = tree['right_child']
        return class_i

    def fit(self, X, y):
        self.shape = X.shape[1]
        self._fit_node(X, y, self._tree)

    def predict(self, X):
        predicted = []
        for x in X:
            predicted.append(self._predict_node(x, self._tree))
        return np.array(predicted)

class Renyi_SharmaMittal_RandomForest:
    #class describing a random forest
    def __init__(self, n, feature_types, max_features=4, alpha=0.99, beta=1,
                 max_depth=16, min_samples_split=20, min_samples_leaf=10):
        
        self.feature_types = feature_types
        self.max_features = max_features
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.min_samples_leaf = min_samples_leaf
        self.alpha = alpha
        self.beta = beta
        self.n = n
        self.forest = []
     
    def fit(self, X_train, y_train):
        self.forest = Parallel(n_jobs=-1)(delayed(self.fit_tree)(X_train, y_train) for _ in range(self.n))
    
    def fit_tree(self, X_train, y_train):
        temp = DecisionTreeRenyi(self.feature_types, self.max_features, self.alpha, self.beta, self.max_depth, 
                                     self.min_samples_split, self.min_samples_leaf)
        temp.fit(X_train, y_train)
        return temp
            
    def predict(self, X_test):
        preds = self.forest[0].predict(X_test)
        for i in self.forest[1:]:
            preds = np.vstack([preds,i.predict(X_test)])
        preds = np.swapaxes(preds,0,1)
        preds = np.array([Counter(row).most_common(1)[0][0] for row in preds])
        return preds

In [7]:
#example for har dataset with Sharma-Mittal entropy-based information gain
X_train, X_test, y_train, y_test, feature_types = read_and_split_data('har.arff', 0.25, 'Class')
header = ['alpha','beta', 'acc', 'f1']
with open('har_results_SharmaMittal_LineByLine.csv', 'a', encoding='UTF8', newline='') as myfile:
    writer = csv.writer(myfile)
    # write the header
    writer.writerow(header)

    for alpha in tqdm(np.arange(0.01, 1.0, 0.01)):
        for beta in tqdm(np.arange(0.01, 1.0, 0.01), total=99):
            model = Renyi_SharmaMittal_RandomForest(300, feature_types, max_features = int(X_train.shape[1]/3), alpha=alpha, beta=beta)
            model.fit(X_train.values, y_train.values)
            preds = model.predict(X_test.values)
            #calculating accuracy
            acc = sum(preds == y_test.values)/len(y_test)
            #calculating f1 score
            f = f1_score(y_test.values, preds, average=None)

            datax = [alpha, beta, acc, f]
            # write the data
            writer.writerow(datax)
            myfile.flush()

  0%|                                                                                           | 0/99 [00:00<?, ?it/s]
  'precision', 'predicted', average, warn_for)

  1%|▊                                                                             | 1/99 [17:46<29:01:33, 1066.26s/it][A

KeyboardInterrupt: 

In [None]:
#example for har dataset with Renyi entropy-based information gain
X_train, X_test, y_train, y_test, feature_types = read_and_split_data('har.arff', 0.25, 'Class')
header = ['alpha','beta', 'acc', 'f1']
beta = 1
with open('har_results_Renyi_LineByLine.csv', 'a', encoding='UTF8', newline='') as myfile:
    writer = csv.writer(myfile)
    # write the header
    writer.writerow(header)

    for alpha in tqdm(np.arange(0.01, 1.0, 0.01)):
        model = Renyi_SharmaMittal_RandomForest(300, feature_types, max_features = int(X_train.shape[1]/3), alpha=alpha, beta=1)
        model.fit(X_train.values, y_train.values)
        preds = model.predict(X_test.values)
        #calculating accuracy
        acc = sum(preds == y_test.values)/len(y_test)
        #calculating f1 score
        f = f1_score(y_test.values, preds, average=None)

        datax = [alpha, beta, acc, f]
        # write the data
        writer.writerow(datax)
        myfile.flush()