# Library

In [None]:
import numpy as np
import torch
import torch.nn as nn

from utils import *
from dataset import TossingDataset
from torch.utils.data import DataLoader

# Model

In [None]:
class EmbeddedNet(nn.Module):

    def __init__(self, in_traj_num, pre_traj_num):
        super(EmbeddedNet, self).__init__()
        self.hidden_dim = 128
        self.fc_1 = nn.Sequential(
            nn.Linear(in_traj_num * 2, self.hidden_dim),
            nn.ReLU(inplace=True),
            nn.Linear(self.hidden_dim, self.hidden_dim),
            nn.ReLU(inplace=True),
        )
        self.fc_out = nn.Linear(self.hidden_dim, 4)

    def forward(self, x):
        x = self.fc_1(x)
        parameters = self.fc_out(x)
        y_hat = predict_locations(parameters, 3, 15, fps=30.0)

        return y_hat, parameters
    
def predict_locations(parameters, in_frames_num, pre_frames_num, fps):

    # Generate t matrix
    t_vector = torch.arange(in_frames_num, pre_frames_num + in_frames_num, 1.0).cuda() / fps
    t_vector_square = torch.pow(t_vector, 2)
    t_matrix = torch.stack((torch.ones(pre_frames_num).cuda(), 
                            t_vector, 
                            t_vector_square))

     # Get x axis paremeter
    x_param = parameters[:, :2]
    x_param = torch.cat((x_param, 
                         torch.ones(x_param.shape[0]).view(-1, 1).cuda() * 0), dim=1)
    x_locs_est = torch.mm(x_param, t_matrix)

    # Get y axis parameter
    y_param = parameters[:, 2:]
    
    y_param = torch.cat((y_param, torch.ones(y_param.shape[0]).view(-1, 1).cuda() * (-0.5) * 9.8), dim=1)
    y_locs_est = torch.mm(y_param, t_matrix)

    # Combine results
    return torch.cat((x_locs_est, y_locs_est), dim=1)

In [None]:
def train_model(model, train_loader, test_loader, num_epochs, optimizer, scheduler, criterion):
    # Training the Model
    min_test_dif = float('inf')
    epoch_loss = []
    for epoch in range(num_epochs):
        batch_loss = []
        for i, data in enumerate(train_loader):
            
            # get the inputs
            inputs = data['current_locs_gt']
            locs_gt = data['future_locs_gt']

            inputs = inputs.cuda()
            locs_gt = locs_gt.cuda()

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = net(inputs)
            
            loss = criterion(outputs[0], locs_gt)
            
            loss.backward()
            optimizer.step()
            
            batch_loss.append(loss.item())

        # Results every epoch
        cur_epoch_loss = sum(batch_loss) / len(batch_loss)
        
        # Scheduler
        scheduler.step(cur_epoch_loss)
        
        # Test the network
        train_dif = test_model(model, train_loader)
        test_dif = test_model(model, test_loader)
        
        # Print the result
        print('Epoch: %d Train Loss: %.03f Train Dif: %.03f Test Dif: %.03f' 
              % (epoch, cur_epoch_loss, train_dif, test_dif))
        epoch_loss.append(cur_epoch_loss)
        
        if min_test_dif > test_dif:
            min_test_dif = test_dif
            print('Best')
        
    return epoch_loss

def test_model(model, test_loader):
    # Test the Model
    model.eval()
    
    batch_loss = []
    for i, data in enumerate(test_loader):

        # get the inputs
        inputs = data['current_locs_gt']
        locs_gt = data['future_locs_gt']

        inputs = inputs.cuda()
        locs_gt = locs_gt.cuda()

        outputs = net(inputs)
        loss = get_mean_distance(locs_gt, outputs[0])
        batch_loss.append(loss.item())

    # Results every epoch
    cur_epoch_loss = sum(batch_loss) / len(batch_loss)
    
    model.train()
    
    return cur_epoch_loss

In [None]:
#################### Hyperparameters ####################
num_epochs = 50000
learning_rate = 0.001
weight_decay = 0
in_frames_num = 3
pre_frames_num = 15
factor = 0.95
patience = 40
batch_size = 16
#################### Hyperparameters ####################
net = EmbeddedNet(in_traj_num=3, pre_traj_num=15).cuda()
criterion = torch.nn.MSELoss()

train_set = TossingDataset(
    './dataset/r1_k0.2/train', 
    in_traj_num=3, 
    pre_traj_num=15,
    sample_num=32
)

test_set = TossingDataset(
    './dataset/r1_k0.2/test', 
    in_traj_num=3,
    pre_traj_num=15
)

print(len(train_set), len(test_set))

train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_set, batch_size=len(test_set), shuffle=False)

optimizer = torch.optim.Adam(net.parameters(), lr=learning_rate, weight_decay=weight_decay)

scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, 
    mode='min', 
    factor=factor, 
    patience=patience, 
    verbose=True, 
    threshold=1e-3
)

train_loss = train_model(
    net, 
    train_loader, 
    test_loader, 
    num_epochs, 
    optimizer, 
    scheduler, 
    criterion
)