# 10. Fifth model implementation

Let's dive into the last model, which is a derived version of the model studied in the notebook 9. It uses partial orders instead of full orders in the chain. It is based on the same paper.

## 10.1. Setup

In [1]:
from skmultilearn.dataset import load_dataset
import numpy as np
from skmultilearn.problem_transform import ClassifierChain
import pygad
from typing import List
import sklearn.metrics as metrics
from typing import Any, Optional
import copy
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
import math
from numpy.typing import NDArray
from typing import Dict
import pandas as pd
from typing import cast
import logging


## 10.2. Data

In [4]:
desired_datasets = ["scene", "emotions", "birds"]

datasets = {}
for dataset_name in desired_datasets:
    print(f"getting dataset `{dataset_name}`")
    
    full_dataset = load_dataset(dataset_name, "undivided")
    X, y, _, _ = full_dataset

    train_dataset = load_dataset(dataset_name, "train")
    X_train, y_train, _, _ = train_dataset

    test_dataset = load_dataset(dataset_name, "test")
    X_test, y_test, _, _ = test_dataset

    datasets[dataset_name] = {
        "X": X,
        "y": y,
        "X_train": X_train,
        "y_train": y_train,
        "X_test": X_test,
        "y_test": y_test,
        "rows": X.shape[0],
        "labels_count": y.shape[1]
    }

for name, info in datasets.items():
    print("===")
    print(f"information for dataset `{name}`")
    print(f"rows: {info['rows']}, labels: {info['labels_count']}")


getting dataset `scene`
scene:undivided - exists, not redownloading
scene:train - exists, not redownloading
scene:test - exists, not redownloading
getting dataset `emotions`
emotions:undivided - exists, not redownloading
emotions:train - exists, not redownloading
emotions:test - exists, not redownloading
getting dataset `birds`
birds:undivided - exists, not redownloading
birds:train - exists, not redownloading
birds:test - exists, not redownloading
===
information for dataset `scene`
rows: 2407, labels: 6
===
information for dataset `emotions`
rows: 593, labels: 6
===
information for dataset `birds`
rows: 645, labels: 19


## 10.3. Entropy functions

In [3]:
Probabilities = Dict[int, Dict[int, float]]

def calculate_probabilities(y: NDArray[np.int64]) -> Probabilities:
    dense_y = y.todense()

    label_count = dense_y.shape[1]
    rows_count = dense_y.shape[0]

    probs = {}

    for label in range(label_count):
        probs[label] = {}
        y_label_specific = np.asarray(dense_y[:, label]).reshape(-1)
        # convert_matrix_to_vector

        possible_values = np.unique(y_label_specific)

        for value in possible_values:
            instances_with_label = np.count_nonzero(y_label_specific == value)
            probs[label][value] = instances_with_label / rows_count
    
    return probs

Entropies = Dict[int, float]

def calculate_entropies(probabilities: Probabilities) -> Entropies:
    entropies = {}

    for label, calculated_probabilities in probabilities.items():
        results = []
        for _, prob in calculated_probabilities.items():
            summand = prob * math.log(prob, 2)
            results.append(summand)
        
        entropy = -1 * sum(results)
        entropies[label] = entropy

    return entropies

def calculate_joint_probability(probabilities: Probabilities, label_x: int, label_y: int):
    results = []
    
    for _, prob_i in probabilities[label_x].items():
        for _, prob_j in probabilities[label_y].items():
            and_prob = prob_i * prob_j

            if and_prob > 0:  # avoid taking the log of 0
                summand = and_prob * np.log2(and_prob)
                results.append(summand)
    
    joint_probability = -1 * sum(results)
    return joint_probability

def calculate_conditional_entropy(probabilities: Probabilities, entropies: Entropies, label_x: int, label_y: int):
    joint_entropy = calculate_joint_probability(probabilities, label_x, label_y)
    entropy = entropies[label_y]
    return joint_entropy - entropy

In [5]:
LOPMatrix = Dict[int, Dict[int, float]]

def build_lop_matrix(
    label_order: List[int],
    probabilities: Probabilities,
    entropies: Entropies
) -> LOPMatrix:
    matrix = {}

    for row_i in label_order:
        matrix[row_i] = {}
        for row_j in label_order:
            if row_i == row_j:
                matrix[row_i][row_j] = 0
                # this is to match the table described in the paper
                # but in reality we _have_ a >0 conditional entropy for a label with itself
                continue

            cond_entropy = calculate_conditional_entropy(probabilities, entropies, row_i, row_j)
            matrix[row_i][row_j] = cond_entropy
        
    return matrix

def calculate_lop(lop_matrix: LOPMatrix) -> float:
    matrix_size_n = len(lop_matrix)
    lop_df = pd.DataFrame(lop_matrix)

    upper_triangle_sum = 0
    for row_position in range(matrix_size_n):
        for column_position in range(matrix_size_n):
            if column_position > row_position:
                conditional_probability = lop_df.iloc[row_position, column_position]
                upper_triangle_sum += cast(float, conditional_probability)
                # the conversion to a dataframe is not necessary
                # but makes it easier to find the element we want
                # by their order in the rows or columns
                # instead of the actual column or row index
    
    return upper_triangle_sum

## 10.5. New entropy functions

In [24]:
def mutual_information(probabilities: Probabilities, entropies: Entropies, label_x: int, label_y: int):
    entropy = entropies[label_x]
    conditional_entropy = calculate_conditional_entropy(probabilities, entropies, label_x, label_y)

    # return entropy - conditional_entropy

    a = entropies[label_x]
    b = entropies[label_y]

    calculate_joint_entropy = calculate_joint_probability(probabilities, label_x, label_y)

    return a + b - calculate_joint_entropy


In [28]:
probs = calculate_probabilities(datasets["emotions"]["y"])
entropies = calculate_entropies(probs)

print("=== mutual information ===")
print(mutual_information(probs, entropies, 2, 0))

res = []
for i in range(len(probs)):
    res.append([])
    for j in range(len(probs)):
        res[i].append(mutual_information(probs, entropies, i, j))

pd.DataFrame(res)

=== mutual information ===
0.0


Unnamed: 0,0,1,2,3,4,5
0,0.0,2.220446e-16,-2.220446e-16,2.220446e-16,0.0,0.0
1,2.220446e-16,0.0,0.0,0.0,2.220446e-16,2.220446e-16
2,0.0,0.0,-4.440892e-16,-2.220446e-16,-2.220446e-16,0.0
3,2.220446e-16,0.0,-2.220446e-16,0.0,0.0,2.220446e-16
4,0.0,2.220446e-16,-2.220446e-16,0.0,0.0,0.0
5,0.0,0.0,0.0,0.0,0.0,0.0


## 10.6. Results so far

The results are weird because they are very, very small. And some of them are negative. But [this Wikipedia page](https://en.wikipedia.org/wiki/Mutual_information) tells that the mutual information **cannot** be negative.

It might be because my calculations of the entropy are wrong.

## 10.7. Checking the results by looking for other implementations

I found [this implementation](https://github.com/pafoster/pyitlib). Let's see if it works as expected.

I also found [this other implementation](https://github.com/nikdon/pyEntropy), but it seems limited; and [this other from scipy](https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.entropy.html), but it also seems limited.

In [29]:
!pip install pyitlib

Collecting pyitlib
  Downloading pyitlib-0.2.3.tar.gz (30 kB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting scikit-learn<=0.24,>=0.16.0 (from pyitlib)
  Downloading scikit_learn-0.24.0-cp39-cp39-win_amd64.whl (6.9 MB)
     ---------------------------------------- 6.9/6.9 MB 3.4 MB/s eta 0:00:00
Collecting future>=0.16.0 (from pyitlib)
  Downloading future-0.18.3.tar.gz (840 kB)
     ------------------------------------- 840.9/840.9 kB 26.8 MB/s eta 0:00:00
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Building wheels for collected packages: pyitlib, future
  Building wheel for pyitlib (setup.py): started
  Building wheel for pyitlib (setup.py): finished with status 'done'
  Created wheel for pyitlib: filename=pyitlib-0.2.3-py3-none-any.whl size=29367 sha256=b37a01ce175782c490f57abcd4816ad5fa382d2a3fde2ef0ec9a5cb244871996
  Stored in directory: c:\users\edgard\appda

  You can safely remove it manually.


In [30]:
import numpy as np
from pyitlib import discrete_random_variable as drv

In [48]:
X = np.array([0,1,0,1])
drv.entropy([0,1,0,1])

array(1.)

In [81]:
y = datasets["emotions"]["y"]
dense_y = y.todense()

for label in range(dense_y.shape[1]):
    y_label_specific = np.asarray(dense_y[:, label]).reshape(-1)
    e = drv.entropy(y_label_specific.tolist())

    print(f"for label {label} the entropy is {e}")

for label 0 the entropy is 0.8709543977484913
for label 1 the entropy is 0.855358883986916
for label 2 the entropy is 0.9913156991505125
for label 3 the entropy is 0.8106092437567831
for label 4 the entropy is 0.8599154189768907
for label 5 the entropy is 0.9029822958790428


In [75]:
entropies

{0: 0.8709543977484913,
 1: 0.8553588839869162,
 2: 0.9913156991505125,
 3: 0.8106092437567831,
 4: 0.8599154189768907,
 5: 0.9029822958790428}

So far my calculation for entropy is correct.

In [84]:
for label_x in range(dense_y.shape[1]):
    for label_y in range(dense_y.shape[1]):
        y_label_specific_x = np.asarray(dense_y[:, label_x]).reshape(-1)
        y_label_specific_y = np.asarray(dense_y[:, label_y]).reshape(-1)
        
        e = drv.entropy_conditional(y_label_specific_x.tolist(), y_label_specific_y.tolist())

        print(f"for ({label_x}, {label_y}), we got {e}")

for (0, 0), we got 0.0
for (0, 1), we got 0.8681775017071309
for (0, 2), we got 0.6805721429652296
for (0, 3), we got 0.7234043892136215
for (0, 4), we got 0.7806134241301019
for (0, 5), we got 0.8110675830696776
for (1, 0), we got 0.8525819879455556
for (1, 1), we got 0.0
for (1, 2), we got 0.8433595973068642
for (1, 3), we got 0.774297908543917
for (1, 4), we got 0.7055354159044847
for (1, 5), we got 0.7619971777464194
for (2, 0), we got 0.8009334443672507
for (2, 1), we got 0.9793164124704608
for (2, 2), we got 0.0
for (2, 3), we got 0.9262497452773448
for (2, 4), we got 0.9746674787371223
for (2, 5), we got 0.7172239534657157
for (3, 0), we got 0.6630592352219131
for (3, 1), we got 0.7295482683137842
for (3, 2), we got 0.7455432898836154
for (3, 3), we got 0.0
for (3, 4), we got 0.6091876963156833
for (3, 5), we got 0.6699847819811764
for (4, 0), we got 0.7695744453585014
for (4, 1), we got 0.7100919508944596
for (4, 2), we got 0.8432671985635005
for (4, 3), we got 0.65849387153579

In [86]:
for label_x in range(dense_y.shape[1]):
    for label_y in range(dense_y.shape[1]):
        y_label_specific_x = np.asarray(dense_y[:, label_x]).reshape(-1)
        y_label_specific_y = np.asarray(dense_y[:, label_y]).reshape(-1)
        
        e = drv.information_mutual(y_label_specific_x.tolist(), y_label_specific_y.tolist())

        print(f"for ({label_x}, {label_y}), we got {e}")

for (0, 0), we got 0.8709543977484913
for (0, 1), we got 0.0027768960413603327
for (0, 2), we got 0.19038225478326165
for (0, 3), we got 0.14755000853486977
for (0, 4), we got 0.09034097361838933
for (0, 5), we got 0.05988681467881363
for (1, 0), we got 0.0027768960413603327
for (1, 1), we got 0.855358883986916
for (1, 2), we got 0.01199928668005179
for (1, 3), we got 0.08106097544299895
for (1, 4), we got 0.1498234680824313
for (1, 5), we got 0.09336170624049656
for (2, 0), we got 0.19038225478326187
for (2, 1), we got 0.01199928668005179
for (2, 2), we got 0.9913156991505125
for (2, 3), we got 0.06506595387316771
for (2, 4), we got 0.016648220413390202
for (2, 5), we got 0.2740917456847969
for (3, 0), we got 0.14755000853487
for (3, 1), we got 0.08106097544299895
for (3, 2), we got 0.06506595387316771
for (3, 3), we got 0.8106092437567831
for (3, 4), we got 0.20142154744109986
for (3, 5), we got 0.14062446177560672
for (4, 0), we got 0.09034097361838933
for (4, 1), we got 0.149823468

The calculations for these other "entropy properties" are very different to those of my own implementation. **My own implementation is clearly wrong**. I will adopt this new library.

**Something else that is nice is that this calculation is already resulting in zero when we test a label against itself**. This is good because it means that the entropy is zero when the label is fully determined by the other label. And it matches exactly what the article describes.

## 10.8 Re-implementing the fourth model

In [97]:
ConditionalEntropies = List[List[float]]

def calculate_conditional_entropies(y: Any) -> ConditionalEntropies:
    dense_y = y.todense()

    label_count = dense_y.shape[1]

    results = []
    for label_x in range(label_count):
        results.append([])
        for label_y in range(label_count):
            y_label_specific_x = np.asarray(dense_y[:, label_x]).reshape(-1)
            y_label_specific_y = np.asarray(dense_y[:, label_y]).reshape(-1)
            
            conditional_entropy = drv.entropy_conditional(y_label_specific_x.tolist(), y_label_specific_y.tolist())
            results[label_x].append(float(conditional_entropy))
    
    return results

In [99]:
conditional_entropies = calculate_conditional_entropies(datasets["emotions"]["y"])
conditional_entropies

[[0.0,
  0.8681775017071309,
  0.6805721429652296,
  0.7234043892136215,
  0.7806134241301019,
  0.8110675830696776],
 [0.8525819879455556,
  0.0,
  0.8433595973068642,
  0.774297908543917,
  0.7055354159044847,
  0.7619971777464194],
 [0.8009334443672507,
  0.9793164124704608,
  0.0,
  0.9262497452773448,
  0.9746674787371223,
  0.7172239534657157],
 [0.6630592352219131,
  0.7295482683137842,
  0.7455432898836154,
  0.0,
  0.6091876963156833,
  0.6699847819811764],
 [0.7695744453585014,
  0.7100919508944596,
  0.8432671985635005,
  0.6584938715357909,
  0.0,
  0.8010033262512426],
 [0.8430954812002294,
  0.8096205896385462,
  0.6288905501942459,
  0.762357834103436,
  0.8440702031533946,
  0.0]]

In [102]:
def build_lop_matrix(
    label_order: List[int],
    conditional_entropies: ConditionalEntropies
) -> LOPMatrix:
    matrix = {}

    for row_i in label_order:
        matrix[row_i] = {}
        for row_j in label_order:
            conditional_entropy = conditional_entropies[row_i][row_j]
            matrix[row_i][row_j] = conditional_entropy
        
    return matrix


pd.DataFrame(build_lop_matrix([2, 0, 1, 3, 5, 4], conditional_entropies))

Unnamed: 0,2,0,1,3,5,4
2,0.0,0.680572,0.84336,0.745543,0.628891,0.843267
0,0.800933,0.0,0.852582,0.663059,0.843095,0.769574
1,0.979316,0.868178,0.0,0.729548,0.809621,0.710092
3,0.92625,0.723404,0.774298,0.0,0.762358,0.658494
5,0.717224,0.811068,0.761997,0.669985,0.0,0.801003
4,0.974667,0.780613,0.705535,0.609188,0.84407,0.0


In [None]:
def build_lop_matrix(
    label_order: List[int],
    conditional_entropies: ConditionalEntropies
) -> LOPMatrix:
    matrix = {}

    for row_i in label_order:
        matrix[row_i] = {}
        for row_j in label_order:
            conditional_entropy = conditional_entropies[row_i][row_j]
            matrix[row_i][row_j] = conditional_entropy
        
    return matrix

def calculate_lop(lop_matrix: LOPMatrix) -> float:
    matrix_size_n = len(lop_matrix)
    lop_df = pd.DataFrame(lop_matrix)

    upper_triangle_sum = 0
    for row_position in range(matrix_size_n):
        for column_position in range(matrix_size_n):
            if column_position > row_position:
                conditional_probability = lop_df.iloc[row_position, column_position]
                upper_triangle_sum += cast(float, conditional_probability)
                # the conversion to a data frame is not necessary
                # but makes it easier to find the element we want
                # by their order in the rows or columns
                # instead of the actual column or row index
    
    return upper_triangle_sum

class FixedClassifierChainWithLOPAndGA():
    def __init__(
        self,
        base_classifier: Any,
        num_generations: int = 5,
        random_state: Optional[int] = None
    ) -> None:
        self.base_classifier = base_classifier
        self.num_generations = num_generations

        if random_state is None:
            self.random_state = np.random.randint(0, 1000)
        else:
            self.random_state = random_state
    
    def fit(self, X: Any, y: Any):
        self.conditional_entropies = calculate_conditional_entropies(y)
        
        label_count = y.shape[1]
        label_space = np.arange(label_count)
        # solutions_per_population = math.ceil(label_count / 2)
        solutions_per_population = 50

        ga_model = pygad.GA( #type:ignore
            gene_type=int,
            gene_space=label_space,
            random_seed=self.random_state,
            save_best_solutions=False,
            fitness_func=self.model_fitness_func,
            allow_duplicate_genes=False, # very important, otherwise we will have duplicate labels in the ordering
            num_genes=label_count,

            # set up
            num_generations=self.num_generations,
            sol_per_pop=solutions_per_population,

            # following what the article describes
            keep_elitism=5,
            parent_selection_type="rws",
            num_parents_mating=2,
            crossover_probability=0.9,
            crossover_type="two_points",
            mutation_type="swap",
            mutation_probability=0.01,
        )

        ga_model.run()

        solution, _, _ = ga_model.best_solution()

        best_classifier = FixedClassifierChain(
            base_classifier=copy.deepcopy(self.base_classifier),
            order=solution,
        )

        best_classifier.fit(X, y)

        self.best_classifier = best_classifier
    
    def model_fitness_func(self, ga_instance: Any, solution: Any, solution_idx: Any) -> float:
        return self.test_solution(solution)

    def test_solution(self, label_order: List[int]) -> float:
        if self.probabilities is None or self.entropies is None:
            raise Exception("probabilities and entropies must be calculated before testing a solution")
        
        lop_matrix = build_lop_matrix(label_order, self.probabilities, self.entropies)
        return calculate_lop(lop_matrix)
    
    def predict(self, X: Any) -> Any:
        if self.best_classifier is None:
            raise Exception("model was not trained yet")

        return self.best_classifier.predict(X)