In [1]:
#|default_exp models.utils

# Model utils

In [2]:
import sys
sys.path.append('..')
from tsai.basics import *
from mocatml.data import *

In [3]:
#|export
from mocatml.models.conv_rnn import *
from fastcore.all import *
from fastai.vision.all import Learner

In [54]:
#|export
@patch
@delegates(Learner.get_preds)
def get_preds_iterative(self:Learner, dl, n_iter=1, track_losses=False, **kwargs):
    """
        Call get preds iteratively on a dataloader with a `DensityTupleTransform`
        TODO: Crashes if inner=True (kwargs), so it will produce invisible progress
        bars 
    """
    p, t = self.get_preds(dl=dl, **kwargs)
    ds_copy = copy(dl.ds) # Useful to move the gap without changing the original ds
    if track_losses:
        losses = [self.loss_func(p,t).item()]
    for _ in range(n_iter-1):
        ds_copy.data = ds_copy.data[:,ds_copy.h:] # Move h steps forward
        dl_new = dl.new(TfmdLists(range(len(ds_copy)), 
                                    DensityTupleTransform(ds_copy)))
        p,t = self.get_preds(dl=dl_new, **kwargs)
        if track_losses:
            losses.append(self.loss_func(p,t).item())
    if track_losses:
        return p, t, losses
    else:
        return p, t

In [5]:
# test
default_device(0)
lbk = 4
h = 4
n_iter = 4
data = np_load_compressed('TLE_density_10_15x15.npy.gz', 
                          path='../example_data')
ds = DensityData(data, lbk=lbk, h=h)
tl = TfmdLists(range(len(ds)), DensityTupleTransform(ds))
dls = tl.dataloaders(bs=32, shuffle=False, num_workers=0)
learn = Learner(dls=dls, 
                model=StackUnstack(SimpleModel()).to(default_device()),
                loss_func=StackLoss())
p,t,losses = learn.get_preds_iterative(dl=dls[0], n_iter=n_iter, track_losses=True);
test_eq(len(p), h)
test_eq(p[0].shape, [10, 1, 36, 99])
# Compare with the direct prediction made with a gap
ds_gap = copy(ds)
ds_gap.gap += (n_iter-1)*h
dl_gap = dls.new(TfmdLists(range(len(ds_gap)), DensityTupleTransform(ds_gap)))
p2,t2 = learn.get_preds(dl=dl_gap)
loss2 = learn.loss_func(p2,t2).item()
test_ne(p, p2)
test_eq(t, t2)

In [82]:
#|export
@patch
def get_individual_losses(self:Learner, p, t):
    """
        Get the loss for each element given predictions and targets computed
        in learn.get_preds
    """
    individual_losses = []
    for i in range(p[0].shape[0]):
        p_element = tuple(p_horizon[i] for p_horizon in p) 
        t_element = tuple(t_horizon[i] for t_horizon in t)
        loss = self.loss_func(p_element, t_element)
        individual_losses.append(loss)
    return tensor(individual_losses)

In [84]:
# test
individual_losses = learn.get_individual_losses(p,t)
test_eq(len(individual_losses), len(ds))
test_close(individual_losses.mean(), learn.loss_func(p,t), eps=1e-2)

In [85]:
foo = Interpretation(learn, dls, tensor(individual_losses))

In [87]:
individual_losses

tensor([38370.7695, 38361.1016, 37989.8320, 38242.4609, 38417.0742, 38220.0078,
        38228.2930, 38430.6680, 38634.5781, 38359.8867])