## Loss functions
Going to put any custom loss functions I need to make over the course of building models for this research here. If it gets to be too many, I'll keep them in separate notebooks.

In [None]:
import sys
import numpy as np
import tensorflow as tf
from tensorflow.keras.losses import CategoricalCrossentropy

In [None]:
def refresh(obj):  # clear the state of the loss function / metric if it holds onto it for some damn reason
    try:
        obj.reset_state()
        print('reset state')
    except:
        pass
    try:
        obj.reset_states()
        print('reset state')
    except:
        pass

### CategoricalCrossEntropy for each point in a series

In [None]:
def catcrossentropy_per_pt(y_true, y_pred):
    """
    A metric for a series of datapoints, each of which needs classification.
    
    Parameters
    ----------
    y_true: tensorflow tensor of shape (batch size, series length, num_categories)
        The true values to compare with. For datapoint in the series,
        the category information should be one-hot encoded
    y_pred: tensorflow tensor of shape (batch size, series length, num_categories)
        The predicted values. For datapoint in the series,
        the category information should be expressed in probabilities (fractions of 1)    
    
    Returns
    -------
    loss: tensorflow tensor of shape (1,)
        The categorical cross entropy for each datapoint in each series, summed

    """
    
    loss_fn = CategoricalCrossentropy()
    
    losses = tf.zeros(shape=(1,))

    for i in range(int(tf.shape(y_true)[1])):  # loop over every datapoint in the series
        loss = loss_fn(y_true[:,i,:], y_pred[:,i,:])
        losses += loss
        
    losses /= int(tf.shape(y_true)[1])
    
    return losses

### generator function to handle masked sequential data (OUT OF DATE)

In [None]:
def gen_loss_per_pt(loss_fn=CategoricalCrossentropy(), mask_layer=None):
    """
    A generator function for series' of datapoints that returns a loss function
    which takes into account a mask of the inputs
    
    Parameters
    ----------
    loss_fn : loss-type class object
        loss function to use per each data point
    mask_layer : layer.Masking object
        masking layer used to throw out padding datapoints
    
    Returns
    -------
    loss_per_pt : function
        The generated loss function taking into account the values of loss_fn and mask_layer
    """
    
    def loss_per_pt(y_true, y_pred):
        """
        A metric for a series of datapoints, each of which needs its own separate evaluation.

        Parameters
        ----------
        y_true: tensorflow tensor of shape (batch size, series length, num_categories)
            The true values to compare with. For datapoint in the series,
            the category information should be one-hot encoded
            SHOULD BE MASKED AS PER MASK_LAYER'S EXPECTATIONS
        y_pred: tensorflow tensor of shape (batch size, series length, num_categories)
            The predicted values. For datapoint in the series,
            the category information should be expressed in probabilities (fractions of 1)    

        Returns
        -------
        loss: tensorflow tensor of shape (1,)
            The loss for each datapoint in each series, summed

        """
        losses = tf.zeros(shape=(1,))
        n_points = tf.shape(y_true)[1]
        if mask_layer is not None:
            mask = mask_layer.compute_mask(y_true)
            for i in range(n_points):  # loop over every datapoint in the series
                y_t = tf.boolean_mask(y_true[:,i,:], mask[:,i])
                y_p = tf.boolean_mask(y_pred[:,i,:], mask[:,i]) 
                refresh(loss_fn)
                loss = loss_fn(y_t, y_p)
                if not loss.shape: # fn returned loss for each point
                    loss = sum(loss)
                    # we need to do it this way because scalar tensors apparently have a __len__
                    # but you can't call len(scalar_tensor)
                losses += loss        
        else:
            for i in range(n_points):  # loop over every datapoint in the series
                refresh(loss_fn)
                loss = loss_fn(y_true[:,i,:], y_pred[:,i,:])
                if not loss.shape:
                    loss = sum(loss)
                losses += loss
        # normalize by number of datapoints
        losses /= tf.cast(n_points, losses.dtype)
        
        return losses
    loss_per_pt.name = loss_fn.name + "_per_pt"
    
    return loss_per_pt

### Loss per point class

In [None]:
class LossPerPt:
    """
    A class for losses for series' of datapoints
    which take into account a mask of the inputs and possible weights
    
    Parameters
    ----------
    loss_fn : loss-type class object
        loss function to use per each data point
    mask_layer : layer.Masking object
        masking layer used to throw out padding datapoints
    class_weights : dict, default None
        dictionary specifying weights for the different categories. Keys should be the one-hot index of the
        category (e.g. 0, 1, 2, ...), as an integer. Values should be floats.
    
    """
    
    def __init__(self, loss_fn=CategoricalCrossentropy(), mask_layer=None, class_weights=None):
        """ Initialize the class. NOTE tensors >2 dim do NOT support class_weights called from model.fit
        So we initialize them here specifically."""
        self.loss_fn = loss_fn
        self.mask_layer = mask_layer
        self.class_weights = class_weights
        self.__name__ = loss_fn.name + "_per_pt"
    
    def __call__(self, y_true, y_pred, sample_weights = None):
        """
        A metric for a series of datapoints, each of which needs its own separate evaluation.

        Parameters
        ----------
        y_true: tensorflow tensor of shape (batch size, series length, num_categories)
            The true values to compare with. For datapoint in the series,
            the category information should be one-hot encoded
            SHOULD BE MASKED AS PER MASK_LAYER'S EXPECTATIONS
        y_pred: tensorflow tensor of shape (batch size, series length, num_categories)
            The predicted values. For datapoint in the series,
            the category information should be expressed in probabilities (fractions of 1)  
        sample_weight: the samplewise weighting desired. Scalar or (batch_size,...)
        Returns
        -------
        loss: tensorflow tensor of shape (1,)
            The loss for each datapoint in each series, summed

        """
        losses = tf.zeros(shape=(1,))
        n_points = tf.shape(y_true)[1]
        full_weights = self._full_sample_weights(y_true, sample_weights)
        
        if self.mask_layer is not None:
            mask = self.mask_layer.compute_mask(y_true)
            for i in range(n_points):  # loop over every datapoint in the series
                y_t = tf.boolean_mask(y_true[:,i,:], mask[:,i])
                y_p = tf.boolean_mask(y_pred[:,i,:], mask[:,i]) 
                wts = tf.boolean_mask(wts[:,i], mask[:,i]) 
                refresh(self.loss_fn)
                loss = self.loss_fn(y_t, y_p, sample_weight = wts)
                if not loss.shape: # fn returned loss for each point
                    loss = sum(loss)
                    # we need to do it this way because scalar tensors apparently have a __len__
                    # but you can't call len(scalar_tensor)
                losses += loss        
        else:
            for i in range(n_points):  # loop over every datapoint in the series
                refresh(self.loss_fn)
                loss = self.loss_fn(y_true[:,i,:], y_pred[:,i,:], sample_weight = full_weights[:,i])
                if not loss.shape:
                    loss = sum(loss)
                losses += loss
        # normalize by number of datapoints
        losses /= tf.cast(n_points, losses.dtype)
        
        return losses

    
    def _full_sample_weights(self, y_true, sample_weights):
        """ Should work for nd tensors, because numpy"""
        
        full_weights = np.ones_like(y_true[...,0]) # default even weights

        if sample_weights is not None:  # use sample weights
            if not hasattr(sample_weights, "__len__"):  # scalar boye to batch-length array
                sample_weights = np.full(y_true.shape[0], sample_weights)
            new_dims = y_true.ndim - sample_weights.ndim - 1  # don't need one-hot dimension
            # reshape to correct number of dimensions
            sample_weights_nd = sample_weights.copy().reshape(sample_weights.shape + tuple(1 for i in range(new_dims)))
            # tile to correct number of reps per dimension
            sample_weights_nd = np.tile(sample_weights_nd, tuple(1  for i in range(sample_weights.ndim)) + y_true.shape[sample_weights.ndim:-1])
            full_weights *= sample_weights_nd
            
        if self.class_weights is not None:  # use class weights
            class_weights_nd = np.zeros_like(full_weights)
            for cls_num, weight in self.class_weights.items():
                class_weights_nd += weight*y_true[..., cls_num] 
            full_weights *= class_weights_nd
            
        return full_weights
