In [22]:
#|default_exp losses

#|export
import torch
import numpy as np
import pandas as pd
from tsai.basics import *

In [23]:
# TODO WeightedLoss should be a class different from Loss, as one should implemente the weights logic an the other one the loss logic. Then the weighted losses would inherit from both.

# Custom losses

In [24]:
#|export
class Loss(nn.Module):
    def __init__(self, ranges, weights, solact_levels):
        super().__init__()
        self.register_buffer('ranges', torch.Tensor(ranges))
        self.register_buffer('weights', torch.Tensor(weights))
        self.solact_levels = solact_levels

    def weighted_loss_tensor(self, target):        
        batch, variables, horizon = target.shape  # Example shape (32, 4, 6)
        variable, range, interval = self.ranges.shape  # Example shape (4, 4, 2)

        target_shaped = torch.reshape(target, (batch, variables, 1, horizon))  # Example shape (32, 4, 6) -> (32, 4, 1, 6)
        ranges_shaped = torch.reshape(self.ranges, (variable, range, 1, interval))  # Example shape (4, 4, 2) -> (4, 4, 1, 2)

        weights_tensor = ((ranges_shaped[..., 0] <= target_shaped) & (target_shaped <= ranges_shaped[..., 1])).float()
        
        return torch.einsum('r,bvrh->bvh', self.weights, weights_tensor)
    
    def loss_measure(self, y_pred, y_true):
        return NotImplementedError
    
    def forward(self, y_pred, y_true):
        error = self.loss_measure(y_pred, y_true)
        weights = self.weighted_loss_tensor(y_true)
        loss = (error * weights).mean()
        
        return loss

In [25]:
# Test
device = 'cpu'
ranges = np.array([[[0, 1], [1, 2], [2, 3], [3, 4]],
                   [[0, 1], [1, 2], [2, 3], [3, 4]],
                   [[0, 1], [1, 2], [2, 3], [3, 4]],
                   [[0, 1], [1, 2], [2, 3], [3, 4]]])

weights = np.array([1, 2, 3, 4])

target = torch.tensor([[[0.5, 1.5, 2.5, 3.5, 4.5, 5.5],
                        [0.5, 1.5, 2.5, 3.5, 4.5, 5.5],
                        [0.5, 1.5, 2.5, 3.5, 4.5, 5.5],
                        [0.5, 1.5, 2.5, 3.5, 4.5, 5.5]]], device=device, dtype=torch.float32)

input = target + 1

expected_weights = torch.tensor([[[1, 2, 3, 4, 0, 0],
                                 [1, 2, 3, 4, 0, 0],
                                 [1, 2, 3, 4, 0, 0],
                                 [1, 2, 3, 4, 0, 0]]], device=device, dtype=torch.float32)

solact_levels = ['low', 'moderate', 'elevated', 'high']




def test_LossWeightsTensor():
    loss = Loss(ranges, weights, solact_levels).to(device)
    result = loss.weighted_loss_tensor(target)

    assert torch.equal(result, expected_weights), f"Expected {expected_weights}, but got {result}"
    print(f"Loss Tensor test passed!")

In [26]:
#|export

class wMSELoss(Loss):
    def __init__(self, ranges, weights, solact_levels):
        super().__init__(ranges, weights, solact_levels)

    
    def loss_measure(self, y_pred, y_true):
        return (y_true-y_pred)**2

In [27]:
#|export

class wMAELoss(Loss):
    def __init__(self, ranges, weights, solact_levels):
        super().__init__(ranges, weights, solact_levels)

    
    def loss_measure(self, y_pred, y_true):
        return torch.abs(y_true-y_pred)

In [28]:
#|export

class wMSLELoss(Loss):
    def __init__(self, ranges, weights, solact_levels):
        super().__init__(ranges, weights, solact_levels)
    
    def loss_measure(self, y_pred, y_true):
        return (torch.log1p(y_true) - torch.log1p(y_pred)) ** 2

In [29]:
#|export

class wHubberLoss(Loss):
    def __init__(self, ranges, weights, solact_levels, delta=2.0):
        super().__init__(ranges, weights, solact_levels)
        self.delta = delta
    
    def loss_measure(self, y_pred, y_true):
        error = y_true - y_pred
        
        is_small_error = error < self.delta
        small_error_loss = (0.5 * (error ** 2))
        large_error_loss = (self.delta * (torch.abs(error) - 0.5 * self.delta))

        return torch.where(is_small_error, small_error_loss, large_error_loss)

In [30]:
#|export

class ClassificationLoss(Loss):
    def __init__(self, ranges, solact_levels, loss):
        n_variables = ranges.shape[1]
        weights = np.arange(n_variables)

        super().__init__(ranges, weights, solact_levels)

        self.loss_measure = loss.loss_measure

    def forward(self, y_pred, y_true):
        error = self.loss_measure(self, y_pred, y_true)
        weights = 1 + torch.abs(self.weighted_loss_tensor(y_true) - self.weighted_loss_tensor(y_pred))
        loss = (error * weights).mean()
        
        return loss

In [31]:
#| export

class TrendedLoss(nn.Module):
    def __init__(self, loss):
        super().__init__()
        self.loss_measure = loss.loss_measure

    @staticmethod
    def _slope(y):
        x = np.arange(len(y))
        slope, _ = np.polyfit(x, y, deg=1)
        return slope

    @staticmethod
    def _calculate_trends(tensor):
        np_tensor = tensor.cpu().detach().numpy()
        trends = np.apply_along_axis(TrendedLoss._slope, 2, np_tensor)
        return torch.Tensor(trends)

    def forward(self, y_pred, y_true):
        batch, variables, _ = y_pred.shape

        input_trend = TrendedLoss._calculate_trends(y_pred)
        target_trend = TrendedLoss._calculate_trends(y_true)
        
        trend_diff = torch.abs(torch.Tensor(input_trend) - torch.Tensor(target_trend)).to(device)

        pct_var = ((y_pred-y_true)**2).to(device)
        out = (pct_var * trend_diff.reshape(batch,variables,1)).to(device)
        loss = out.mean()

        return loss.cpu()

In [32]:
# Test

def check_loss_function(loss_class, expected_value, loss_func=None):
    if loss_func is None:
        loss = loss_class(ranges, weights, solact_levels).to(device)
    else:
        loss = loss_class(ranges, solact_levels, loss_func).to(device)
    result = loss(input, target)

    assert torch.isclose(result, expected_value), f"Expected {expected_value}, but got {result}"
    print(f"{type(loss).__name__} test passed!")

def test_wMSELoss():
    expected_mse_loss = torch.mean(expected_weights * (target - input) ** 2)
    check_loss_function(wMSELoss, expected_mse_loss)

def test_wMAELoss():
    expected_mae_loss = torch.mean(expected_weights * torch.abs(target - input))
    check_loss_function(wMAELoss, expected_mae_loss)

def test_wMSLELoss():
    expected_msle_loss = torch.mean(expected_weights * ((torch.log1p(target) - torch.log1p(input)) ** 2))
    check_loss_function(wMSLELoss, expected_msle_loss)

def test_wHuberLoss():
    delta = 1
    expected_hubber_loss = torch.mean(expected_weights * 
                                   torch.where(torch.abs(input - target) < delta, 
                                                0.5 * (input - target) ** 2,
                                                delta * (torch.abs(input - target) - 0.5 * delta)
                                                )
                                  )
    check_loss_function(wHubberLoss, expected_hubber_loss)

def test_ClassificationLoss():
    expected_classification_loss = torch.mean((input - target) ** 2) + 1 
    check_loss_function(ClassificationLoss, expected_classification_loss, wMSELoss)

In [33]:
#|export

class LossMetrics:
    def __init__(self, loss_func:Loss):
        self.loss_func = loss_func

    def loss_call(self, input, target, weight_idx):
        loss_copy = deepcopy(self.loss_func)

        for idx in range(len(loss_copy.weights)):
            if idx != weight_idx:
                loss_copy.weights[idx] = 0
        
        return loss_copy(input, target)

    def loss_low(self, input, target):
        return self.loss_call(input, target, 0)

    def loss_moderate(self, input, target):
        return self.loss_call(input, target, 1)

    def loss_elevated(self, input, target):
        return self.loss_call(input, target, 2)

    def loss_high(self, input, target):
        return self.loss_call(input, target, 3)

    def metrics(self):
        return [self.loss_low, self.loss_moderate, self.loss_elevated, self.loss_high]

In [34]:
# Test
def test_LossMetrics():
    loss = wMAELoss(ranges, weights, solact_levels).to(device)
    metrics = LossMetrics(loss).metrics()

    loss_value = loss(input, target)
    metrics_values = [metric(input, target) for metric in metrics]

    assert torch.isclose(loss_value, sum(metrics_values)), f"Expected {loss_value}, but got {sum(metrics_values)} ({metrics_values})"
    print("LossMetrics test passed!")

In [35]:
#| test
test_LossWeightsTensor()
test_wMSELoss()
test_wMAELoss()
test_wMSLELoss()
test_wHuberLoss()
test_LossMetrics()
test_ClassificationLoss()


Loss Tensor test passed!
wMSELoss test passed!
wMAELoss test passed!
wMSLELoss test passed!
wHubberLoss test passed!
LossMetrics test passed!
ClassificationLoss test passed!


In [36]:
loss = TrendedLoss(wMSELoss)
loss(input, target)

tensor(0.)

In [37]:
#|eval: false
#|hide
from nbdev import *
nbdev_export()