In [1]:
# default_exp model

# Model

> Model architecture

In [9]:
#hide
%reload_ext autoreload
%autoreload 2
from nbdev.showdoc import *
import warnings
warnings.filterwarnings("ignore")

In [10]:
# export
import torch
import torch.nn as nn
import numpy as np

In [11]:
# export
class Difference(nn.Module):
    '''Difference Layer'''
    def __init__(self, mode = 'simple'):
        super(Difference, self).__init__()
        self.mode = mode
      
    def forward(self, x1, x2):
        '''Difference Layer: supports 3 types (simple, abs, square)'''
        if self.mode == 'simple':
            return x1 - x2
        elif self.mode == 'abs':
            return torch.abs(x1 - x2)
        elif self.mode == 'square':
            return torch.square(x1 - x2)
        else:
            raise NotImplementedError

In [12]:
# export
class TrajectoryDN(nn.Module):
    '''Trajectory Identification using Difference Net'''
    def __init__(self, diff_net, n_features):
        super(TrajectoryDN, self).__init__()
        self.diff_net = diff_net
        self.n_features = n_features
        self.n_hidden_rnn = 64 # number of hidden states for RNN
        self.n_layers_rnn = 2 # number of RNN layers (stacked)
        self.lstm = torch.nn.LSTM(
            input_size = self.n_features, 
            hidden_size = self.n_hidden_rnn,
            num_layers = self.n_layers_rnn, 
            batch_first = True
        )
        self.fcn = nn.Sequential(
            nn.Linear(self.n_hidden_rnn, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 1),
            nn.Sigmoid()
        )
    
    def forward(self, x1, x2, x_seq_lens): 
        # DifferenceNet
        x_diff_org = self.diff_net(x1, x2[0]) # (B X T X F)
        
        x_diff_dest = self.diff_net(x1, x2[1]) # (B X T X F)
        x = torch.cat((x_diff_org, x_diff_dest), dim=2) # (B X T X F*2)
        # Pack padded batch of sequences for RNN module
        packed = nn.utils.rnn.pack_padded_sequence(x, x_seq_lens.cpu(), batch_first=True, enforce_sorted=False)
        # Forward pass through LSTM
        outputs, (hidden, cell) = self.lstm(packed)
        # FCN
        x = hidden[-1] # Use the hidden state from the last LSTM in a stacked LSTM
        x = self.fcn(x)
        return x

In [13]:
show_doc(TrajectoryDN)

<h2 id="TrajectoryDN" class="doc_header"><code>class</code> <code>TrajectoryDN</code><a href="" class="source_link" style="float:right">[source]</a></h2>

> <code>TrajectoryDN</code>(**`diff_net`**, **`n_features`**) :: `Module`

Trajectory Identification using Difference Net

<!-- ![TDN](../images/TrajectoryDN.jpeg) -->

In [14]:
# slow
# usage: TrajectoryDN
from torch.utils.data import DataLoader

from ti.dataloader import DatasetTraj, zero_padding, getSTW, splitData
from ti.prep import Transformer

# data generator
params = {
    'batch_size': 16,
    'shuffle': True,
    'collate_fn': zero_padding
}

stw = getSTW()
tr_range, val_range, ts_range = splitData(len(stw))

train_g = DataLoader(DatasetTraj(tr_range, stw, mode='sim'), **params)
transformer = Transformer()

diff_net = Difference(mode='simple')
net = TrajectoryDN(diff_net, n_features=len(transformer.features_traj)*2) # 2x for org and dest 

In [15]:
# slow
# usage: TrajectoryDN
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
count = 0
net.to(device)
for x1, x2, y, x_seq_lens, max_seq_len in train_g:
    x1, y, x_seq_lens = torch.Tensor(x1).to(device), torch.Tensor(y).to(device), torch.Tensor(x_seq_lens).to(device)
    org = x2[0]
    dst = x2[1]
    org = torch.Tensor(org).to(device)
    dst = torch.Tensor(dst).to(device)
    x2 = [org, dst]
    print('Batch')
    print(x1.shape)
    print(x2[0].shape, x2[0].shape)
    print(y)
    print(x_seq_lens)
    print(max_seq_len)
    print(net(x1, x2, x_seq_lens))
    if count >= 0:
        break
    count+=1

Batch
torch.Size([9, 15, 4])
torch.Size([9, 15, 4]) torch.Size([9, 15, 4])
tensor([0., 0., 1., 0., 1., 0., 1., 0., 0.])
tensor([ 4.,  3.,  7., 15.,  7.,  9.,  6.,  7.,  3.])
15
tensor([[0.5010],
        [0.5012],
        [0.5007],
        [0.5007],
        [0.5007],
        [0.5006],
        [0.5008],
        [0.5007],
        [0.5012]], grad_fn=<SigmoidBackward>)


In [16]:
# export
class TrajectorySN(nn.Module):
    '''Trajectory Identification using Siamese Net'''
    def __init__(self, n_features, embed_size=16):
        super(TrajectorySN, self).__init__()
        self.n_features = n_features
        self.embed_size = embed_size
        self.n_hidden_rnn = 64 # number of hidden states for RNN
        self.n_layers_rnn = 2 # number of RNN layers (stacked)
        self.lstm = torch.nn.LSTM(
            input_size = self.n_features, 
            hidden_size = self.n_hidden_rnn,
            num_layers = self.n_layers_rnn, 
            batch_first = True
        )
        self.traj_embed = nn.Sequential(
            nn.Linear(self.n_hidden_rnn, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, self.embed_size),
            nn.ReLU()
        )
        self.stop_embed = nn.Sequential(
            nn.Linear(self.n_features*2, 32), # 2x for org and dest 
            nn.ReLU(),
            nn.Linear(32, self.embed_size),
            nn.ReLU()
        )
    
    def forward(self, x1, x2, x_seq_lens): 
        # Siam 1: x1
        # Pack padded batch of sequences for RNN module
        packed = nn.utils.rnn.pack_padded_sequence(
            x1, 
            x_seq_lens, 
            batch_first=True, 
            enforce_sorted=False
        )
        # Forward pass through LSTM
        outputs, (hidden, cell) = self.lstm(packed)
        # FCN
        x1 = hidden[-1] # Use the hidden state from the last LSTM in a stacked LSTM
        x1 = self.traj_embed(x1)
        # Siamese Prep: x2
        x20 = x2[0][:,-1,:]
        x21 = x2[1][:,-1,:]
        x2 = torch.cat((x20, x21), dim=1)
        # Siam 2: x2
        x2 = self.stop_embed(x2)
        return x1, x2

In [17]:
show_doc(TrajectorySN)

<h2 id="TrajectorySN" class="doc_header"><code>class</code> <code>TrajectorySN</code><a href="" class="source_link" style="float:right">[source]</a></h2>

> <code>TrajectorySN</code>(**`n_features`**, **`embed_size`**=*`16`*) :: `Module`

Trajectory Identification using Siamese Net

In [18]:
# export
class ContrastiveLoss(nn.Module):
    '''Contrastive Loss'''
    def __init__(self, margin=1.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
        # Find the pairwise distance or eucledian distance of two output feature vectors
        euclidean_distance = torch.pairwise_distance(output1, output2)
        # perform contrastive loss calculation with the distance
        pos_ = (1-label) * torch.pow(euclidean_distance, 2)
        neg_ = label * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2)
        loss_contrastive = torch.mean(pos_ + neg_)
        return loss_contrastive

<!-- ![TSN](../images/TrajectorySN.jpeg) -->

In [19]:
# export
def contastive_loss(x1, x2, y, margin=1.0):
    distance = torch.pairwise_distance(x1, x2)
    pos_part = (1.0 - y) * torch.pow(distance, 2)
    neg_part = y * torch.relu(torch.pow(margin - distance, 2))
    loss = 0.5 * (pos_part + neg_part)
    return loss.mean()

In [20]:
# slow
# usage: TrajectorySN
net = TrajectorySN(n_features=len(transformer.features_traj)) 

In [21]:
# slow
# usage: TrajectorySN
count = 0
for x1, x2, y, x_seq_lens, max_seq_len in train_g:
    print('Batch')
    print(x1.shape)
    print(x2[0].shape, x2[0].shape)
    y = torch.Tensor(y)
    print(y)
    print(x_seq_lens)
    print(max_seq_len)
    x1, x2 = net(x1, x2, x_seq_lens)
    print(x1.shape, x2.shape)
    print(contastive_loss(x1, x2, y).item())
    if count >= 0:
        break
    count+=1

Batch
torch.Size([9, 23, 4])
torch.Size([9, 23, 4]) torch.Size([9, 23, 4])
tensor([1., 1., 1., 0., 0., 1., 1., 1., 1.])
[8, 3, 3, 5, 21, 23, 4, 4, 7]
23
torch.Size([9, 16]) torch.Size([9, 16])
0.1713111698627472


# Export -

In [22]:
#hide
from nbdev.export import notebook2script
notebook2script()

Converted 00_prep.ipynb.
Converted 01_dataloader.ipynb.
Converted 02_model.ipynb.
Converted 03_train.ipynb.
Converted index.ipynb.
