In [1]:
# importing useful libraries
import numpy as np
import tensorflow as tf
import random as python_random

# setting random seed for result reproducibility
np.random.seed(1)
python_random.seed(12)
tf.random.set_seed(123)

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import Metric

from sklearn.datasets import make_multilabel_classification
from sklearn.metrics import fbeta_score
from sklearn.feature_extraction.text import TfidfTransformer

import matplotlib.pyplot as plt

import warnings
warnings.filterwarnings('ignore')

%matplotlib inline

In [2]:
X, y = make_multilabel_classification(n_samples=60000, n_features=5000, n_classes=20, n_labels=4, length=100, allow_unlabeled=False, sparse=False, return_indicator='dense', return_distributions=False, random_state=1)
print('shape of X is {}'.format(X.shape))
print('shape of y is {}'.format(y.shape))

tfidf = TfidfTransformer() # initializes a TfidfTransformer
Xt = tfidf.fit_transform(X) # fits and transforms X

shape of X is (60000, 5000)
shape of y is (60000, 20)


In [3]:
beta = 2 # arbitrarily setting beta to 2. You can set it to any value you choose to
threshold = 0.2 # arbitrarily setting beta to 0.2. You can set it to any value you choose to

def multi_label_fbeta(ytrue , ypred, beta=beta, average='samples', threshold=threshold, epsilon=1e-7, \
                      sample_weight=None):
    # epsilon is set to avoid division by zero error
    beta_squared = beta**2

    # casting ytrue and ypred as floats
    ytrue = tf.cast(ytrue, tf.float32)
    
    # making ypred one hot encoded 
    ypred = tf.cast(tf.greater_equal(tf.cast(ypred, tf.float32), tf.constant(threshold)), tf.float32)
    
    if average == 'samples':
        tp = tf.reduce_sum(ytrue * ypred, axis=-1) # calculating true positives
        predicted_positive = tf.reduce_sum(ypred, axis=-1) # calculating predicted positives
        actual_positive = tf.reduce_sum(ytrue, axis=-1) # calculating actual positives
    
    else: # either any of 'macro', 'weighted' and 'raw'
        tp = tf.reduce_sum(ytrue * ypred, axis=0) # calculating true positives
        predicted_positive = tf.reduce_sum(ypred, axis=0) # calculating predicted positives
        actual_positive = tf.reduce_sum(ytrue, axis=0) # calculating actual positives
    
    # calculating precision and recall
    precision = tp/(predicted_positive+epsilon)
    recall = tp/(actual_positive+epsilon)

    # finding fbeta
    fb = (1+beta_squared)*precision*recall / (beta_squared*precision + recall + epsilon)

    if average == 'weighted':
        supports = tf.reduce_sum(ytrue, axis=0)
        return tf.reduce_sum(fb*supports / tf.reduce_sum(supports))

    elif average == 'raw':
        return fb
    
    elif average == 'samples' and sample_weight is not None:
        return tf.reduce_sum(fb*sample_weight)
    
    return tf.reduce_mean(fb) # then it is either 'macro' or 'samples' (without sample weight)

In [4]:
def build_model(start=512, metrics=multi_label_fbeta, run_eagerly=False, lr=1e-3):
    model = Sequential() # initializes a sequential model

    # adding three layers where a filter size is half of the preceding filter size
    for _ in range(3):
      model.add(Dense(start, activation='relu'))
      start //= 2

    #model.add(Flatten()) # flattens the layer

    model.add(Dense(20, activation='sigmoid')) # ouput layer

    opt = Adam(lr=lr) # initializes an optimizer

    # compling model
    model.compile(loss='binary_crossentropy', optimizer=opt, metrics=[metrics], run_eagerly=run_eagerly)
    
    return model

In [5]:
model = build_model()
model.fit(X, y, batch_size=128, epochs=3, validation_split=0.2, shuffle=False);

Epoch 1/3
Epoch 2/3
Epoch 3/3


In [6]:
random_ytrue = np.random.choice([0, 1], (20, 17))
random_ypred = np.random.choice([0, 1], (20, 17))

print('f1_score of prediction using multi_label_fbeta is {}'.format(multi_label_fbeta(random_ytrue, random_ypred)))
print('f1_score of prediction using scikit-learn fbeta is {}'.format(fbeta_score(\
                                                    random_ytrue, random_ypred, beta=2, average='samples')))

f1_score of prediction using multi_label_fbeta is 0.46118515729904175
f1_score of prediction using scikit-learn fbeta is 0.4611851797939628


In [7]:
n_class = 20

class StatefullMultiLabelFBeta(Metric):
    def __init__(self, name='state_full_binary_fbeta', beta=beta, average='samples', \
                 n_class=n_class, threshold=threshold, epsilon=1e-7, **kwargs):
        
        # initializing an object of the super class
        super(StatefullMultiLabelFBeta, self).__init__(name=name, **kwargs)
            
        # initializing atrributes
        self.tp = self.add_weight(name='tp', shape=(n_class,), initializer='zeros') # initializing true positives
        self.actual_positives = self.add_weight(name='ap', shape=(n_class,), initializer='zeros') 
        self.predicted_positives = self.add_weight(name='pp', shape=(n_class,), initializer='zeros')

        self.n_samples = self.add_weight(name='n_samples', initializer='zeros')
        self.sum_fb = self.add_weight(name='sum_fb', initializer='zeros')

        # initializing other atrributes that wouldn't be changed for every object of this class
        self.beta_squared = beta**2
        self.average = average
        self.n_class = n_class
        self.threshold = threshold
        self.epsilon = epsilon

    def update_state(self, ytrue, ypred, sample_weight=None):
        # casting ytrue float dtype
        ytrue = tf.cast(ytrue, tf.float32)
        
        # making ypred one hot encoded 
        ypred = tf.cast(tf.greater_equal(tf.cast(ypred, tf.float32), tf.constant(threshold)), tf.float32)
        
        if self.average == 'samples': # we are to keep track of only fbeta
            # calculate true positives, predicted positives and actual positives atrribute along the last axis
            tp = tf.reduce_sum(ytrue*ypred, axis=-1) 
            predicted_positives = tf.reduce_sum(ypred, axis=-1)
            actual_positives = tf.reduce_sum(ytrue, axis=-1)
            
            precision = tp/(predicted_positives+self.epsilon) # calculate the precision
            recall = tp/(actual_positives+self.epsilon) # calculate the recall
            
            # calculate the fbeta score
            fb = (1+self.beta_squared)*precision*recall / (self.beta_squared*precision + \
                                                                      recall + self.epsilon)
            
            if sample_weight is not None: # if sample weight is available for stand alone usage
                self.fb = tf.reduce_sum(fb*sample_weight)
            else:
                n_rows = tf.reduce_sum(tf.shape(ytrue)*tf.constant([1, 0])) # getting the number of rows in ytrue
                self.n_samples.assign_add(tf.cast(n_rows, tf.float32)) # updating n_samples
                self.sum_fb.assign_add(tf.reduce_sum(fb)) # getting the running sum of fb
                self.fb = self.sum_fb / self.n_samples # getting the running mean of fb

        else:
            # keep track of true, predicted and actual positives because they are calculated along axis 0
            self.tp.assign_add(tf.reduce_sum(ytrue*ypred, axis=0)) 
            self.assign_add(predicted_positives = tf.reduce_sum(ypred, axis=0))
            self.actual_positives.assign_add(tf.reduce_sum(ytrue, axis=0)) 
            
    def result(self):
        if self.average != 'samples':
            precision = self.tp/(self.predicted_positives+self.epsilon) # calculate the precision
            recall = self.tp/(self.actual_positives+self.epsilon) # calculate the recall

            # calculate the fbeta score
            fb = (1+self.beta_squared)*precision*recall / (self.beta_squared*precision + \
                                                                      recall + self.epsilon)
            if self.average == 'weighted':
                return tf.reduce_sum(fb*self.actual_positives / tf.reduce_sum(self.actual_positives))

            elif self.average == 'raw':
                return fb
            
            return tf.reduce_mean(fb) # then it is 'macro' averaging 
    
        return self.fb # then it is either 'samples' with or without sample weight

    def reset_states(self):
        self.tp.assign(tf.zeros(self.n_class)) # resets true positives to zero
        self.predicted_positives.assign(tf.zeros(self.n_class)) # resets predicted positives to zero
        self.actual_positives.assign(tf.zeros(self.n_class)) # resets actual positives to zero
        self.n_samples.assign(0)
        self.sum_fb.assign(0)

In [8]:
stateful_multi_label_fbeta = StatefullMultiLabelFBeta()
stateful_model = build_model(metrics=stateful_multi_label_fbeta)
stateful_model.fit(X, y, batch_size=128, epochs=3, validation_split=0.2, shuffle=False);

Epoch 1/3
Epoch 2/3
Epoch 3/3


In [9]:
n_sample = 100
n_class = 20
m = StatefullMultiLabelFBeta(n_class=n_class) # initializes a stateful multi class fbeta object

random_ytrue = np.random.choice([0, 1], (n_sample, n_class))
random_ypred = np.random.choice([0, 1], (n_sample, n_class))

m.update_state(random_ytrue, random_ypred)
print('Intermediate result for stateful multi class fbeta is: {}'.format(float(m.result())))
print('Intermediate result for scikit-learn fbeta is: {}'.format(fbeta_score(\
                                                        random_ytrue, random_ypred, beta=2, average='samples')))
print()

increment_size = 20
a_true = np.random.choice([0, 1], (increment_size, n_class))
a_pred = np.random.choice([0, 1], (increment_size, n_class))

m.update_state(a_true, a_pred)
print('Final result for stateful multi class fbeta is: {}'.format(float(m.result())))

arr_true = np.append(random_ytrue, a_true, axis=0)
arr_pred = np.append(random_ypred, a_pred, axis=0)

print('Final result for scikit-learn multi class fbeta is: {}'.format(\
                                                    fbeta_score(arr_true, arr_pred, beta=2, average='samples')))

Intermediate result for stateful multi class fbeta is: 0.49903061985969543
Intermediate result for scikit-learn fbeta is: 0.49903061948857347

Final result for stateful multi class fbeta is: 0.5037157535552979
Final result for scikit-learn multi class fbeta is: 0.503715777491389
