In [None]:
# Modified version of ID3 algorithm
# Original code on
# https://towardsdatascience.com/id3-decision-tree-classifier-from-scratch-in-python-b38ef145fd90

In [None]:
import math
import pandas as pd
import numpy as np
import time
from collections import dequea

In [None]:
""" Entropy Measures module 2
--------------------
Hs: Shannon's entropy
Hp: Non-additive superstatistical entropy
Hm: Non-additive superstatistical complementary entropy
"""
def Hs(x):
    return -x*math.log(x,2)

def Hp(x):
    return Hs(x) - 0.5*(Hs(x))**2 + 0.1666*(Hs(x))**3 - 1/24*(Hs(x))**4

def Hm(x):
    return Hs(x) + 0.5*(Hs(x))**2 + 0.1666*(Hs(x))**3 + 1/24*(Hs(x))**4

In [None]:
class Node:
    """Characterizes the nodes at different sublevels"""

    def __init__(self):
        self.value = None
        self.next = None
        self.childs = None

class DecisionTreeClassifier:
    """Decision Tree Classifier: ID3 algorithm"""

    def __init__(self, X, feature_names, labels):
        self.X = X
        self.feature_names = feature_names
        self.labels = labels
        self.labelCategories = list(set(labels))
        self.labelCategoriesCount = [list(labels).count(x) for x in self.labelCategories]
        self.node = None
        self.entropy = self._get_entropy([x for x in range(len(self.labels))])  # calculates the initial entropy

    def _get_entropy(self, x_ids):
        """Entropy module"""
        labels = [self.labels[i] for i in x_ids]
        label_count = [labels.count(x) for x in self.labelCategories]
        """-----------------------------"""
        """Implement the entropy measure"""
        # entropy = sum([ Hp(count/len(x_ids)) if count else 0 for count in label_count ])
        # Renyi entropy
        α=2.1
        entropy = math.log(sum([ (count/len(x_ids))**α if count else 0 for count in label_count ]), 2)/(1-α)
        """-----------------------------"""
        return entropy

    def _get_information_gain(self, x_ids, feature_id):
        """Information gain module"""
        info_gain = self._get_entropy(x_ids)
        x_features = [self.X[x][feature_id] for x in x_ids]
        feature_vals = list(set(x_features))
        feature_vals_count = [x_features.count(x) for x in feature_vals]
        feature_vals_id = [
            [x_ids[i]
            for i, x in enumerate(x_features)
            if x == y]
            for y in feature_vals
        ]

        info_gain = info_gain - sum([val_counts / len(x_ids) * self._get_entropy(val_ids)
                                     for val_counts, val_ids in zip(feature_vals_count, feature_vals_id)])

        return info_gain

    def _get_feature_max_information_gain(self, x_ids, feature_ids):
        """Searching for features that maximize the information gain"""
        features_entropy = [self._get_information_gain(x_ids, feature_id) for feature_id in feature_ids]
        max_id = feature_ids[features_entropy.index(max(features_entropy))]

        return self.feature_names[max_id], max_id

    def id3(self):
        """Initialize ID3 algorithm"""
        x_ids = [x for x in range(len(self.X))]
        feature_ids = [x for x in range(len(self.feature_names))]
        self.node = self._id3_recv(x_ids, feature_ids, self.node)
        print('')

    def _id3_recv(self, x_ids, feature_ids, node):
        """ID3 algorithm"""
        if not node:
            node = Node()
        labels_in_features = [self.labels[x] for x in x_ids]
        if len(set(labels_in_features)) == 1:
            node.value = self.labels[x_ids[0]]
            return node
        if len(feature_ids) == 0:
            node.value = max(set(labels_in_features), key=labels_in_features.count)
            return node
        # else...
        best_feature_name, best_feature_id = self._get_feature_max_information_gain(x_ids, feature_ids)
        node.value = best_feature_name
        node.childs = []
        feature_values = list(set([self.X[x][best_feature_id] for x in x_ids]))
        for value in feature_values:
            child = Node()
            child.value = value
            node.childs.append(child)
            child_x_ids = [x for x in x_ids if self.X[x][best_feature_id] == value]
            if not child_x_ids:
                child.next = max(set(labels_in_features), key=labels_in_features.count)
                print('')
            else:
                if feature_ids and best_feature_id in feature_ids:
                    to_remove = feature_ids.index(best_feature_id)
                    feature_ids.pop(to_remove)
                child.next = self._id3_recv(child_x_ids, feature_ids, child.next)
        return node

    def printTree(self):
        if not self.node:
            return
        nodes = deque()
        nodes.append(self.node)
        while len(nodes) > 0:
            node = nodes.popleft()
            print(node.value)
            if node.childs:
                for child in node.childs:
                    print('({})'.format(child.value))
                    nodes.append(child.next)
            elif node.next:
                print(node.next)

In [None]:
# generate syntetic data
data = {
    # attributes
    'social_network': ['FB', 'TW', 'IG', 'TK'],
    'suspicious': ['Low', 'High'],
    'viralization': ['small', 'medium', 'large'],
    # target
    'fake_news': ['Yes', 'No']
}

data_df = pd.DataFrame(columns=data.keys())
# vary the number of instances to examine the asymptotic behaviour of entropy measures
np.random.seed(42)
for i in range(1000):
    data_df.loc[i, 'social_network'] = str(np.random.choice(data['social_network'], 1)[0])
    data_df.loc[i, 'suspicious'] = str(np.random.choice(data['suspicious'], 1)[0])
    data_df.loc[i, 'viralization'] = str(np.random.choice(data['viralization'], 1)[0])
    data_df.loc[i, 'fake_news'] = str(np.random.choice(data['fake_news'], 1)[0])
data_df.head()

In [None]:
# numerical experiments
# testing 5 classifications in a row to measure the average running-time
# ----------------------------------------------------------------------/

# separate target from predictors
X = np.array(data_df.drop('fake_news', axis=1).copy())
y = np.array(data_df['fake_news'].copy())
feature_names = list(data_df.keys())[:3]

# time stamps
intervals = np.zeros(5)

# evaluations
for i in range(5):
    # run algorithm -----------------------------------/
    t = time.process_time()
    # call algorithm ID3
    tree_clf = DecisionTreeClassifier(X=X, feature_names=feature_names, labels=y)
    print("System entropy {:.4f}".format(tree_clf.entropy))
    # run algorithm
    tree_clf.id3()
    tree_clf.printTree()
    # end algorithm -----------------------------------/
    intervals[i] = time.process_time() - t
    
print(intervals)