In [None]:
import logging
logger = logging.getLogger()
if not logger.handlers:
    logger.addHandler(logging.StreamHandler())

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from scipy.stats import entropy

df = pd.read_csv('data/data_ellsberg.csv')

In [None]:
ids = ['ID', 'participant_id']

dt_features = ['Openess','Conciensiousness','Extroversion','Agreability','Stability','Locus']

x_labels = []
emotions = ['Hopeful','Curiosity','Enlightenment','Thrilled','Anticipatory','Satisfied' ]
kmeans_features = []
for x in ['1A', '1B', '2A', '2B']:
    kmeans_features += [f'{emotion}{x}' for emotion in emotions]
    x_labels.append([f'{emotion[:4]}{x}' for emotion in emotions])

label = ['DAA']

df = df[ids + kmeans_features + dt_features + label]

In [None]:
import numpy as np

def show_results(labels, centeroids, plot_graph=False):
    k = len(centeroids) # number of clusters

    print('labels:', labels, '\n') 
    print('centroids:')
    for i in range(k):
        print(i,':',centeroids[i,:],'\n')

    if plot_graph:
        games_count = len(x_labels)
        labels_count = len(emotions)
        _, axs = plt.subplots(games_count, figsize=(10, 10))
        for ig in range(games_count):
            graph_labels = x_labels[ig]
            graph_centers = centeroids[:,ig*labels_count:(ig+1)*labels_count]
            for ik in range(k):
                axs[ig].plot(graph_labels, graph_centers[ik,:], label=f'Cluster #{ik}')
            axs[ig].legend(loc='upper right')

In [None]:
K = 2 # Optimal value according to 'daa_kmeans.ipynb' was K=4, but silhouette difference didn't justify working with 2 extra clusters
R = 0 # Set to 'None' to randomise KMeans search

def build_kmeans(points, k, r):
    clusterer = KMeans(n_clusters=k, n_init='auto', random_state=r)
    preds = clusterer.fit_predict(points)
    return clusterer, preds

clusterer, _ = build_kmeans(df[kmeans_features], K, R)
labels = clusterer.labels_
centeroids = clusterer.cluster_centers_

show_results(labels, centeroids, plot_graph=True)

In [None]:
df['Cluster'] = labels
df = df.drop(kmeans_features, axis=1)

In [None]:
import math

class ClusterTable:

    EPS = 1e-5

    def __init__(self, df):
        self.__df = df

        self.__df_counts = self.__build_matrix_of_counts(self.__df)
        self.__df_p = self.__build_matrix_of_p(self.__df_counts)

        self.__daa_counts = self.__df_counts.sum(axis=1)
        self.__daa_p = self.__counts_to_p(self.__daa_counts)

        self.__cluster_counts = self.__df_counts.sum(axis=0)
        self.__cluster_p = self.__counts_to_p(self.__cluster_counts)

    @staticmethod
    def __build_matrix_of_counts(df):
        return pd.crosstab(index=df['DAA'], columns=df['Cluster'])

    @staticmethod
    def __build_matrix_of_p(df_count):
        columns = range(K)
        index = [0,1] # DAA values
        odds = df_count.div(df_count.sum(axis=1).sum(axis=0), axis=0)
        return odds.reindex(index=index, columns=columns, fill_value=0)

    @staticmethod
    def __counts_to_p(occurences):
        return occurences.div(occurences.sum())

    def __str__(self):
        return f'''
Samples:               {self.len}
DAA (X) total:         {self.__daa_counts.tolist()}
DAA (X) p:             {self.__daa_p.tolist()}
DAA (X) entropy:       {self.__daa_entropy}
Cluster (Y) total:     {self.__cluster_counts.tolist()}
Cluster (Y) p:         {self.__cluster_p.tolist()}
Cluster (Y) entropy:   {self.__cluster_entropy}
Joint entropy:         {self.__joint_entropy}
Mutual info:           {self.__mutual_information}
Symmetric uncertainty: {self.symmetric_uncertainty}
KL-divergence:         {self.kl_divergence}
Total var distance:    {self.total_variation_distance}
{self.__df_counts.to_string()}
{self.__df_p.to_string()}
'''

    @property
    def len(self):
        return len(self.__df)

    @property
    def df(self):
        return self.__df

    @property
    def df_p(self):
        return self.__df_p

    @property
    def __cluster_p_for_non_daa(self):
        return self.__df_p.loc[0]

    @property
    def __cluster_p_for_daa(self):
        return self.__df_p.loc[1]

    @property
    def __daa_entropy(self):
        return entropy(self.__daa_p + ClusterTable.EPS, base=2)

    @property
    def __cluster_entropy(self):
        return entropy(self.__cluster_p + ClusterTable.EPS, base=2)

    @property
    def __joint_entropy(self):
        joint_p = np.concatenate([self.__cluster_p_for_daa, self.__cluster_p_for_non_daa])
        return entropy(joint_p + ClusterTable.EPS, base=2)

    @property
    def __mutual_information(self):
        return self.__daa_entropy + self.__cluster_entropy - self.__joint_entropy

    @property
    def symmetric_uncertainty(self):
        return (2 * self.__mutual_information) / (self.__daa_entropy + self.__cluster_entropy)

    @property
    def total_variation_distance(self):
        return 0.5 * sum(abs(self.__cluster_p_for_daa - self.__cluster_p_for_non_daa))

    @property
    def kl_divergence(self):
        pk = self.__cluster_p_for_daa + ClusterTable.EPS
        qk = self.__cluster_p_for_non_daa + ClusterTable.EPS
        pq = entropy(pk, qk=qk, base=2)
        qp = entropy(qk, qk=pk, base=2)
        return sum([pq, qp]) / 2

    @property
    def metric(self):
        return self.symmetric_uncertainty

In [None]:
class DecisionNode:
    ID = 0

    def __init__(self, df, min_gain, min_samples_leaf):
        self.__log = logging.getLogger('dt')

        self.__id = DecisionNode.ID
        DecisionNode.ID += 1
        self.__ct = ClusterTable(df)

        self.__min_samples_leaf = min_samples_leaf
        self.__min_samples_split = 3 * self.__min_samples_leaf
        self.__min_gain = min_gain

        self.__param = None
        self.__cutoff = None
        self.__gain = None
        self.__left = None
        self.__right = None

        self.__fit()
    
    @property
    def id(self):
        return self.__id

    @property
    def ct(self):
        return self.__ct

    @property
    def is_leaf(self):
        return self.__left == None and self.__right == None

    @property
    def param(self):
        return self.__param
    
    @property
    def cutoff(self):
        return self.__cutoff

    @property
    def left(self):
        return self.__left

    @property
    def right(self):
        return self.__right

    def __find_cutoff(self, param, cutoff):
        left_df = self.__ct.df[self.__ct.df[param] <= cutoff]
        right_df = self.__ct.df[self.__ct.df[param] > cutoff]
        return left_df, right_df
        
    def __find_cutoff_gain(self, current, param, cutoff):
        left_df, right_df = self.__find_cutoff(param, cutoff)

        left_len = len(left_df)
        right_len = len(right_df)

        if left_len < self.__min_samples_leaf or right_len < self.__min_samples_leaf:
            self.__log.info(f'\t\t\tNot enough samples in leaf: {left_len}, {right_len}')
            return 0

        left_ct = ClusterTable(left_df)
        self.__log.debug('\t\t\t' + '\n\t\t\t'.join(left_ct.df_p.to_string().split('\n')))

        left_metric = left_ct.metric
        self.__log.info(f'\t\t\tL: {left_metric}')

        right_ct = ClusterTable(right_df)
        self.__log.debug('\t\t\t' + '\n\t\t\t'.join(right_ct.df_p.to_string().split('\n')))

        right_metric = right_ct.metric
        self.__log.info(f'\t\t\tR: {right_metric}')

        return max(left_metric - current, right_metric - current)

    def __find_best_cutoff_index(self, current, param):
        best_cutoff = best_gain = None

        values = sorted(self.__ct.df[param].unique())
        cutoffs = [(values[i] + values[i+1]) / 2 for i in range(len(values) - 1)]
        self.__log.info(f'\t{values}')
        np.random.shuffle(cutoffs)
        for cutoff in cutoffs:
            self.__log.info(f'\t\t{cutoff}')
            gain = self.__find_cutoff_gain(current, param, cutoff)
            self.__log.info(f'\t\t\tGain: {gain}')
            if not best_gain or gain > best_gain:
                best_cutoff, best_gain = cutoff, gain

        return best_cutoff, best_gain

    def __find_best_cutoff(self, current):
        best_param = best_cutoff = best_gain = None

        params = dt_features.copy()
        np.random.shuffle(params)
        for param in params:
            self.__log.info(f'\t{param}')
            cutoff, gain = self.__find_best_cutoff_index(current, param)
            self.__log.info(f'\tBest gain for {param}: {gain}')

            if not gain:
                continue

            if not best_gain or gain > best_gain:
                best_param, best_cutoff, best_gain = param, cutoff, gain

        return best_param, best_cutoff, best_gain 

    def __fit(self):
        current = self.__ct.metric
        self.__log.warning(f'Node #{self.__id} [{current}]')

        if self.ct.len < self.__min_samples_split:
            self.__log.warning(f'\tNot splitting node, not enough samples')
            return

        self.__param, self.__cutoff, self.__gain = self.__find_best_cutoff(current)
        if not self.__gain or self.__gain < self.__min_gain:
            self.__log.warning('\tNot splitting node, gain too low')
            return

        left_df, right_df = self.__find_cutoff(self.__param, self.__cutoff)

        self.__log.warning(f'\tBest gain: {self.__gain}\n')
        self.__log.warning('\tLeft:\t' + '\n\t'.join(str(ClusterTable(left_df)).split('\n')))
        self.__log.warning('\tRight:\t' + '\n\t'.join(str(ClusterTable(right_df)).split('\n')))

        self.__left = DecisionNode(left_df,
                                   min_gain=self.__min_gain,
                                   min_samples_leaf=self.__min_samples_leaf)
        self.__right = DecisionNode(right_df,
                                    min_gain=self.__min_gain,
                                    min_samples_leaf=self.__min_samples_leaf)

class DecisionTree:
    def __init__(self, df, min_gain, min_samples_leaf):
        self.__root = DecisionNode(df,
                                   min_gain=min_gain,
                                   min_samples_leaf=min_samples_leaf)

    @property
    def root(self):
        return self.__root

logger = logging.getLogger('dt')
logger.setLevel('WARNING')

np.random.default_rng(seed=1337)
dt = DecisionTree(df, min_gain=0.01, min_samples_leaf=20)

In [None]:
class DotPrinter:

    def __init__(self, dt):
        self.__dt = dt

    def __node_children(self, node, left):
        label = 'xlabel' if left else 'label'

        operator = '<=' if left else '>'
        criteria = f'{node.param} {operator} {node.cutoff}'

        child = node.left if left else node.right
        child_label = self.__node_label(child)
        child_node = f'{child.id} [label="{child_label}",shape=box,style=filled,color=".7 .3 1."];'
        child_edge = f'{node.id} -> {child.id} [{label}="{criteria}"];'
        return '\n'.join([child_node, child_edge])

    def __node_label(self, node):
        len = node.ct.len
        su = node.ct.metric
        values = node.ct.df['DAA'].value_counts().to_list()
        return f'id = {node.id}\\n' + \
               f'samples = {len}\\n' + \
               f'metric = {su:.5f}\\n' + \
               f'values = {values}'

    def __node_content(self, node):
        if node.is_leaf:
            return ''

        return '\n'.join([
            self.__node_children(node, left=True),
            self.__node_content(node.left),
            self.__node_children(node, left=False),
            self.__node_content(node.right)
        ])

    def print(self):
        node_label = self.__node_label(self.__dt.root)
        node_content = self.__node_content(self.__dt.root)

        return '\n'.join([
            'digraph BST {',
            'node [fontname="Tahoma"]',
            f'{self.__dt.root.id} [label="{node_label}",shape=box,style=filled,color=".6 .2 1."];',
            node_content,
            '}'
        ])

#### Write report: Generate a DOT file and a PNG of the emotion decision tree

In [None]:
import subprocess

report_basename = 'ellsberg_emotion_decision_tree'

report_dot = f'{report_basename}.dot'
with open(report_dot, 'w') as fout:
    dot_output = DotPrinter(dt)
    fout.write(dot_output.print())

report_png = f'{report_basename}.png'
subprocess.check_call(['dot', report_dot, '-Tpng', f'-o{report_png}'])