In [3]:
import numpy as np
import pandas as pd
from ucimlrepo import fetch_ucirepo

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


In [4]:
# Algorithm 1 DTEC algorithm
# Require: Dataset X, number of clusters K (not obligatory).
# Ensure: An unsupervised evidential decision tree T.
# Initialize the root node of decision tree T using dataset X;
# while there is unevaluated node of single cluster do
# Evaluate all possible cutting points at the taken node by the evidential silhouette metric using Eqs. (4)-(7);
# Select the cutting point with the largest average silhouette value;
# if the average silhouette value after splitting is larger than before then
# Split this node of single cluster using Eqs. (8)-(10);
# Determine the boundaries of the generated child nodes;
# Use these boundaries to split the node of meta-cluster which includes the above single cluster;
# else
# Go to next node;
# end if
# end while
# while K is available and the number of generated clusters is not equal to K do
# if the number of generated clusters is larger than K then
# Evaluate the quality of each single cluster by the evidential silhouette metric using Eq. (11);
# Merge the cluster having lowest quality with its nearest cluster;
# else
# Continue splitting at the leaf node that has the largest average evidential silhouette value after splitting.
# end if
# end while

In [13]:

class DecisionNode:
    """Class to represent a decision node in a decision tree."""
    
    def __init__(self, left, right, mass_functions, decision_function, instance_indices, class_label=None):
        """Create a node with a left child, right child, decision function and optional class label.
        This is a binary tree so each node has two children (left and right). 
        The decision function is used to make a decision when the node is asked to classify an instance.
        
        Args:
            left (DecisionNode) : left child node
            right (DecisionNode) : right child node
            decision_function (function) : function to make decision
            class_label (int) : optional class label for the node
        """
        self.left = left
        self.right = right
        self.mass_functions = mass_functions
        self.instance_indices = instance_indices
        self.decision_function = decision_function
        self.class_label = class_label
        
    def decide(self, feature):
        """Classify an instance based on its feature vector using the decision function."""
        if self.class_label is not None:
            return self.class_label
        elif self.decision_function(feature):
            return self.left.decide(feature)
        else:
            return self.right.decide(feature)
        

# Pignistic probability BetP(A) = summation(|A ∩ B|/|B|) . m(B) 
# where A is a subset of B, and m(B) is the mass function of B.
# The pignistic probability is a measure of the belief in the proposition A given the evidence B.
class DecisonTree:
    """Class to represent a decision tree model for classification."""
    
    def __init__(self, max_depth=None):
        """Create a decision tree model.
        
        Args:
            max_depth (int) : maximum depth of the tree
        """
        self.f = 11
        self.max_depth = max_depth
        self.Dataset = fetch_ucirepo(id=109)
        self.X = self.Dataset.data.features
        #convert X to a numpy array
        self.X = np.array(self.X)
        self.y = self.Dataset.data.targets
        #convert y to a numpy array
        self.y = np.array(self.y)
        self.mass_functions = {}
        self.metadata = self.Dataset.metadata
        self.variables = self.Dataset.variables
        self.root_instance_indices = np.arange(len(self.X))
        self.root = None  
        
    def fit(self, X, y):
        """Build the decision tree model by fitting to the data.
        
        Args:
            X (array-like) : feature vectors
            y (array-like) : class labels
        """
        self.root = self._build_tree(X, y, depth=0)
        
    def _build_tree(self, X, y, depth):
        """Recursively build the decision tree model.
        
        Args:
            X (array-like) : feature vectors
            y (array-like) : class labels
            depth (int) : current depth of the tree
        """
        if self.max_depth is not None and depth >= self.max_depth:
            return DecisionNode(None, None, None, class_label=self._majority_class(y))
    
        
    def feature_distance(self, xi, xj):
        """Calculate the distance between two feature vectors."""
        xi = np.reshape(xi, (1, 13))
        # print(len(xi))
        xj = np.reshape(xj, (1, 13))
        return np.linalg.norm(xi - xj) ** 2
    
    def pignistic_probability_unit(self, A, B):
        """Calculate the pignistic probability of A given B.
        
        Args:
            A (array-like) : subset of B
            B (array-like) : evidence
            m (array-like) : mass function of B
        """
        if len(B) == 0:
            return 0
        return len(np.intersect1d(A, B)) / len(B)
    
    def cutting_points(self, feature):
        """Find all possible cutting points for a feature."""
        return np.unique(self.X[:, feature])
    
    def cut_feature(self, instance_indices, feature, cutting_point):
        """Split the dataset based on a feature and cutting point and return the indices of the points."""
        L = np.where(self.X[:, feature] < cutting_point)[0]
        R = np.where(self.X[:, feature] > cutting_point)[0]
        L = np.intersect1d(L, instance_indices)
        R = np.intersect1d(R, instance_indices)
        return L, R
    
    def calculate_centers(self, l, r, parent_mass):
        """Calculate the centers of the child nodes."""
        l_sum  = np.sum([parent_mass[i] for i in l])
        if l_sum == 0:
            c_l = np.zeros(self.X.shape[1], dtype=float)
        else:
            c_l = np.sum([self.X[i] * parent_mass[i] for i in l], axis=0) / l_sum
            
        r_sum = np.sum([parent_mass[i] for i in r])
        if r_sum == 0:
            c_r = np.zeros(self.X.shape[1], dtype=float)
        else:
            c_r = np.sum([self.X[i] * parent_mass[i] for i in r], axis=0) / r_sum
        return c_l, c_r
    
    def calculate_mass_functions(self, instance_indices, cutting_point, c_l, c_r, gamma):
        """Calculate the mass function for the child nodes."""
        c_m = self.X[np.where(self.X[:, self.f] == cutting_point)[0]][0]
        d_l = np.array([self.feature_distance(self.X[i], c_l) for i in instance_indices]) / self.feature_distance(c_l, c_m)
        d_r = np.array([self.feature_distance(self.X[i], c_r) for i in instance_indices]) / self.feature_distance(c_r, c_m)
        d_m = np.array([self.feature_distance(self.X[i], c_m) for i in instance_indices]) / (self.feature_distance(c_l, c_r) / gamma)
        
        inv_dl = np.array([1.0 / d if d != 0 else 0 for d in d_l])
        inv_dr = np.array([1.0 / d if d != 0 else 0 for d in d_r])
        inv_dm = np.array([1.0 / d if d != 0 else 0 for d in d_m])
        
        m_l = inv_dl / (inv_dl + inv_dr + inv_dm)
        m_r = inv_dr / (inv_dl + inv_dr + inv_dm)
        m_m = inv_dm / (inv_dl + inv_dr + inv_dm)
        
        return m_l, m_r, m_m
    
    def assign_clusters(self, instance_indices, m_l, m_r, m_m):
        """Assign instances to clusters based on the mass functions."""
        all_mass_functions = np.array([m_l, m_r, m_m])
        cluster_assignments = np.argmax(all_mass_functions, axis=0)
        # print(cluster_assignments)
        # print(len(cluster_assignments))
        clusters = [instance_indices[cluster_assignments == i] for i in range(3)]
        l = clusters[0]
        r = clusters[1]
        m = clusters[2]
        
        return l, r, m
    
    def determine_bounds(self, l, r, cutting_point, feature):
        """Determine the boundaries of the child nodes."""
        if len(l) == 0:
            l_max = 0
        else:
            l_max = max([self.X[i][feature] for i in l])
        
        if len(r) == 0:
            r_min = 0
        else:
            r_min = min([self.X[i][feature] for i in r])
        return l_max, r_min
    
    def calculate_silhouette(self, l, r, m, m_l, m_r, m_m, c_l, c_r, cutting_point):
        c_m = self.X[np.where(self.X[:, self.f] == cutting_point)[0]][0]
        pignistic_probability_l = self.pignistic_probability_unit(l, l) * m_l + self.pignistic_probability_unit(l, r) * m_r + self.pignistic_probability_unit(l, m) * m_m
        pignistic_probability_r = self.pignistic_probability_unit(r, r) * m_r + self.pignistic_probability_unit(r, l) * m_l + self.pignistic_probability_unit(r, m) * m_m
        pignistic_probability_m = self.pignistic_probability_unit(m, m) * m_m + self.pignistic_probability_unit(m, l) * m_m + self.pignistic_probability_unit(m, r) * m_r
        
        # print("Pignistic Probability L: ", pignistic_probability_l)
        # print("Pignistic Probability R: ", pignistic_probability_r)
        # print("Pignistic Probability M: ", pignistic_probability_m)
                
        a_l = np.array([np.sum([self.feature_distance(self.X[i], self.X[j]) * (pignistic_probability_l[j]) for j in l]) for i in l]) / (np.sum([pignistic_probability_l[j] for j in l]))
        a_r = np.array([np.sum([self.feature_distance(self.X[i], self.X[j]) * (pignistic_probability_r[j]) for j in r]) for i in r]) / (np.sum([pignistic_probability_r[j] for j in r]))
        a_m = np.array([np.sum([self.feature_distance(self.X[i], self.X[j]) * (pignistic_probability_m[j]) for j in m]) for i in m]) / (np.sum([pignistic_probability_m[j] for j in m]))
        
        # print("A_L: ", a_l)
        # print("A_R: ", a_r)
        # print("A_M: ", a_m)

        clusters = [l, r, m]
        pignistic_probabilities = [pignistic_probability_l, pignistic_probability_r, pignistic_probability_m]

        n_l = 0
        if self.feature_distance(c_l, c_r) > self.feature_distance(c_l, c_m):
            n_l = 2
        else:
            n_l = 1

        n_r = 0
        if self.feature_distance(c_r, c_l) > self.feature_distance(c_r, c_m):
            n_r = 2
        else:
            n_r = 0

        n_m = 0
        if self.feature_distance(c_m, c_l) > self.feature_distance(c_m, c_r):
            n_m = 1
        else:
            n_m = 0

        psum_l = np.sum([pignistic_probabilities[n_l][j] for j in clusters[n_l]])
        psum_r = np.sum([pignistic_probabilities[n_r][j] for j in clusters[n_r]])
        psum_m = np.sum([pignistic_probabilities[n_m][j] for j in clusters[n_m]])
        
        if psum_l == 0:
            b_l = np.zeros(len(l))
        else:
            b_l = np.array([np.sum([self.feature_distance(self.X[i], self.X[j]) * (pignistic_probabilities[n_l][j]) for j in clusters[n_l]]) for i in l]) / psum_l
            
        if psum_r == 0:
            b_r = np.zeros(len(r))
        else:
            b_r = np.array([np.sum([self.feature_distance(self.X[i], self.X[j]) * (pignistic_probabilities[n_r][j]) for j in clusters[n_r]]) for i in r]) / psum_r
            
        if psum_m == 0:
            b_m = np.zeros(len(m))
        else:
            b_m = np.array([np.sum([self.feature_distance(self.X[i], self.X[j]) * (pignistic_probabilities[n_m][j]) for j in clusters[n_m]]) for i in m]) / psum_m
        
        # print("B_L: ", b_l)
        # print("B_R: ", b_r)
        # print("B_M: ", b_m)
        
        es_l = np.abs(b_l - a_l) / np.maximum(a_l, b_l)
        es_r = np.abs(b_r - a_r) / np.maximum(a_r, b_r)
        es_m = np.abs(b_m - a_m) / np.maximum(a_m, b_m)
        
        # print("ES_L: ", es_l)
        # print("ES_R: ", es_r)
        # print("ES_M: ", es_m)

        max_pignistic_probability = np.maximum(pignistic_probability_l, pignistic_probability_r, pignistic_probability_m)
        # print("Max Pignistic Probability: ", max_pignistic_probability)
        
        
        num_sum = 0
        den_sum = 0
        for i in range(len(es_l)):
            num_sum += max_pignistic_probability[l[i]] * es_l[i]
            den_sum += max_pignistic_probability[l[i]]
        for i in range(len(es_r)):
            num_sum += max_pignistic_probability[r[i]] * es_r[i]
            den_sum += max_pignistic_probability[r[i]]
        for i in range(len(es_m)):
            num_sum += max_pignistic_probability[m[i]] * es_m[i]
            den_sum += max_pignistic_probability[m[i]]
            
        # print("Num Sum: ", num_sum)
        # print("Den Sum: ", den_sum)
        
        if den_sum == 0:
            average_silhouette = 0
        else:
            average_silhouette = num_sum / den_sum
        
        return average_silhouette      
    
    def max_silhouette(self, instance_indices, feature):
        """Find the cutting point with the largest average silhouette value."""
        cutting_points = self.cutting_points(feature)
        max_silhouette = -np.inf
        best_cutting_point = None
        for cutting_point in cutting_points:
            l, r = self.cut_feature(instance_indices, feature, cutting_point)
            c_l, c_r = self.calculate_centers(l, r, np.ones(len(instance_indices), dtype=float))
            # print("C_L: ", c_l[feature], " C_R: ", c_r[feature])
            m_l, m_r, m_m = self.calculate_mass_functions(instance_indices, cutting_point, c_l, c_r, gamma=1.0)
            # print("M_L: ", m_l, " M_R: ", m_r, " M_M: ", m_m)            
            l, r, m = self.assign_clusters(instance_indices, m_l, m_r, m_m)
            print("Bounds: ", self.determine_bounds(l, r, cutting_point, feature))
            silhouette = self.calculate_silhouette(l, r, m, m_l, m_r, m_m, c_l, c_r, cutting_point)
            
            print("Cutting Point: ", cutting_point, "Average Silhouette: ", silhouette)
            if silhouette > max_silhouette:
                max_silhouette = silhouette
                best_cutting_point = cutting_point
        return best_cutting_point, max_silhouette

In [14]:
tree = DecisonTree()
tree.max_silhouette(tree.root_instance_indices, 11)

Bounds:  (1.27, 2.23)
Cutting Point:  1.27 Average Silhouette:  0.8033274966917178
Bounds:  (1.29, 2.0)
Cutting Point:  1.29 Average Silhouette:  0.7919806425019604
Bounds:  (1.3, 2.0)
Cutting Point:  1.3 Average Silhouette:  0.7909719695134357
Bounds:  (1.33, 2.01)
Cutting Point:  1.33 Average Silhouette:  0.7955331149615924
Bounds:  (1.36, 2.05)
Cutting Point:  1.36 Average Silhouette:  0.7933526290838179
Bounds:  (1.42, 2.11)
Cutting Point:  1.42 Average Silhouette:  0.7946066445256845
Bounds:  (1.47, 2.12)
Cutting Point:  1.47 Average Silhouette:  0.8051149949689154
Bounds:  (1.48, 2.14)
Cutting Point:  1.48 Average Silhouette:  0.8048098077870913
Bounds:  (1.51, 2.15)
Cutting Point:  1.51 Average Silhouette:  0.8015975121952652
Bounds:  (1.55, 2.23)
Cutting Point:  1.55 Average Silhouette:  0.8040580637637388
Bounds:  (1.56, 2.23)
Cutting Point:  1.56 Average Silhouette:  0.7974053506237775
Bounds:  (1.58, 2.23)
Cutting Point:  1.58 Average Silhouette:  0.8054468475251965
Bounds: 

(2.71, 0.8776094158447616)

In [2]:
Dataset = fetch_ucirepo(id=109)
X = Dataset.data.features
#convert X to a numpy array
X = np.array(X)
print(X)
print(X.shape)

[[1.423e+01 1.710e+00 2.430e+00 ... 1.040e+00 3.920e+00 1.065e+03]
 [1.320e+01 1.780e+00 2.140e+00 ... 1.050e+00 3.400e+00 1.050e+03]
 [1.316e+01 2.360e+00 2.670e+00 ... 1.030e+00 3.170e+00 1.185e+03]
 ...
 [1.327e+01 4.280e+00 2.260e+00 ... 5.900e-01 1.560e+00 8.350e+02]
 [1.317e+01 2.590e+00 2.370e+00 ... 6.000e-01 1.620e+00 8.400e+02]
 [1.413e+01 4.100e+00 2.740e+00 ... 6.100e-01 1.600e+00 5.600e+02]]
(178, 13)


In [None]:
print(np.unique(X[:, 0]))

In [None]:
print(np.where(X[:, 0] < 15.67)[0])
print(X[0])