# This notebook for experimenting with modalities data, written in Pytorch

## Embedding modules

### TCN block

In [39]:
# create a simple TCN model to embed sequentail data

import torch
import torch.nn as nn
import torch.nn.functional as F

class TCN(nn.Module):
    def __init__(self, input_size, output_size, num_channels, kernel_size, dropout):
        super(TCN, self).__init__()
        self.tcn = TemporalConvNet(input_size, num_channels, kernel_size, dropout=dropout)
        self.linear = nn.Linear(num_channels[-1], output_size)
    
    def forward(self, x):
        # x is of size (N, C, L) where N is the batch size, C is the number of features, L is the sequence length
        y1 = self.tcn(x)  # input should have dimension (N, C, L)
        o = self.linear(y1[:, :, -1])
        return o

class TemporalConvNet(nn.Module):
    '''
    A TemporalConvNet is a stack of TemporalBlock modules, each of which has a dilated convolutional layer.
    '''
    def __init__(self, input_size, num_channels, kernel_size=2, dropout=0.2):
        super(TemporalConvNet, self).__init__()
        layers = []
        num_levels = len(num_channels)
        for i in range(num_levels):
            dilation_size = 2 ** i
            in_channels = input_size if i == 0 else num_channels[i-1]
            out_channels = num_channels[i]
            layers += [TemporalBlock(in_channels, out_channels, kernel_size, stride=1, dilation=dilation_size, padding=(kernel_size-1) * dilation_size, dropout=dropout)]
        self.network = nn.Sequential(*layers)
    
    def forward(self, x):
        return self.network(x)
    
class TemporalBlock(nn.Module):
    '''
    A temporal block is a stack of two dilated causal convolutional layers with the same dilation factor.
    '''
    def __init__(self, n_inputs, n_outputs, kernel_size, stride, dilation, padding, dropout=0.2):
        super(TemporalBlock, self).__init__()
        self.conv1 = nn.Conv1d(n_inputs, n_outputs, kernel_size, stride=stride, padding=padding, dilation=dilation)
        self.chomp1 = Chomp1d(padding)
        self.relu1 = nn.ReLU()
        self.dropout1 = nn.Dropout(dropout)
        
        self.conv2 = nn.Conv1d(n_outputs, n_outputs, kernel_size, stride=stride, padding=padding, dilation=dilation)
        self.chomp2 = Chomp1d(padding)
        self.relu2 = nn.ReLU()
        self.dropout2 = nn.Dropout(dropout)
        
        self.net = nn.Sequential(self.conv1, self.chomp1, self.relu1, self.dropout1,
                                 self.conv2, self.chomp2, self.relu2, self.dropout2)
        self.downsample = nn.Conv1d(n_inputs, n_outputs, 1) if n_inputs != n_outputs else None
        self.relu = nn.ReLU()
    
    def forward(self, x):
        out = self.net(x)
        res = x if self.downsample is None else self.downsample(x)
        return self.relu(out + res)

class Chomp1d(nn.Module):
    '''
    This layer removes the padding from the right side of the input.
    '''
    def __init__(self, chomp_size):
        super(Chomp1d, self).__init__()
        self.chomp_size = chomp_size
    
    def forward(self, x):
        return x[:, :, :-self.chomp_size].contiguous()

In [40]:
# test TCN module

x = torch.randn(10, 16, 100) # batch size 10, 16 features, 100 time steps
model = TCN(16, 10, [25]*8, 2, 0.2) # 16 input features, 10 output classes, 8 layers, 25 channels per layer, kernel size 2, dropout 0.2
print(model(x).shape) # should be (10, 10)

# test TemporalConvNet module

x = torch.randn(10, 16, 100) # batch size 10, 16 features, 100 time steps
model = TemporalConvNet(16, [44]*8, 2, 0.2) # 16 input features, 8 layers, 44 channels per layer, kernel size 2, dropout 0.2
print(model(x).shape) # should be (10, 44, 100)

torch.Size([10, 10])
torch.Size([10, 44, 100])


### Image Encoder with TCN

In [41]:

class ImageTCN(nn.Module):
    def __init__(self, input_size, output_size, num_channels, kernel_size, dropout):
        super(ImageTCN, self).__init__()
        self.tcn = ImageTemporalConvNet(input_size, num_channels, kernel_size, dropout=dropout)
        self.linear = nn.Linear(num_channels[-1], output_size)
    
    def forward(self, x):
        # x is of size (N, C, H, W) where N is the batch size, C is the number of features, H is the height, W is the width
        y1 = self.tcn(x)  # input should have dimension (N, C, H, W)
        o = self.linear(y1[:, :, -1, -1])
        return F.log_softmax(o, dim=1)
    
class ImageTemporalConvNet(nn.Module):
    '''
    An ImageTemporalConvNet is a stack of ImageTemporalBlock modules, each of which has a dilated convolutional layer.
    '''
    def __init__(self, input_size, num_channels, kernel_size=3, dropout=0.2):
        super(ImageTemporalConvNet, self).__init__()
        layers = []
        num_levels = len(num_channels)
        for i in range(num_levels):
            dilation_size = 2 ** i
            in_channels = input_size if i == 0 else num_channels[i-1]
            out_channels = num_channels[i]
            layers += [ImageTemporalBlock(in_channels, out_channels, kernel_size, stride=1, dilation=dilation_size, padding=(kernel_size-1) * dilation_size, dropout=dropout)]
        self.network = nn.Sequential(*layers)
    
    def forward(self, x):
        return self.network(x)
    
class ImageTemporalBlock(nn.Module):
    '''
    An ImageTemporalBlock is a stack of two dilated causal convolutional layers with the same dilation factor.
    '''
    def __init__(self, n_inputs, n_outputs, kernel_size, stride, dilation, padding, dropout=0.2):
        super(ImageTemporalBlock, self).__init__()
        self.conv1 = nn.Conv2d(n_inputs, n_outputs, kernel_size, stride=stride, padding=padding, dilation=dilation)
        self.chomp1 = ImageChomp2d(padding)
        self.relu1 = nn.ReLU()
        self.dropout1 = nn.Dropout(dropout)
        
        self.conv2 = nn.Conv2d(n_outputs, n_outputs, kernel_size, stride=stride, padding=padding, dilation=dilation)
        self.chomp2 = ImageChomp2d(padding)
        self.relu2 = nn.ReLU()
        self.dropout2 = nn.Dropout(dropout)
        
        self.net = nn.Sequential(self.conv1, self.chomp1, self.relu1, self.dropout1,
                                 self.conv2, self.chomp2, self.relu2, self.dropout2)
        self.downsample = nn.Conv2d(n_inputs, n_outputs, 1) if n_inputs != n_outputs else None
        self.relu = nn.ReLU()
    
    def forward(self, x):
        out = self.net(x)
        res = x if self.downsample is None else self.downsample(x)
        return self.relu(out + res)
    
class ImageChomp2d(nn.Module):
    '''
    This layer removes the padding from the right side of the input.
    '''
    def __init__(self, chomp_size):
        super(ImageChomp2d, self).__init__()
        self.chomp_size = chomp_size
    
    def forward(self, x):
        return x[:, :, :-self.chomp_size, :-self.chomp_size].contiguous()

In [42]:
# test ImageTCN module

x = torch.randn(10, 3, 32, 32) # batch size 10, 3 channels, 32x32 image
model = ImageTCN(3, 10, [44]*8, 3, 0.2) # 3 input channels, 10 output classes, 8 layers, 25 channels per layer, kernel size 3, dropout 0.2
print(model(x).shape) # should be (10, 10)

# test ImageTemporalConvNet module

x = torch.randn(10, 3, 32, 32) # batch size 10, 3 channels, 32x32 image
model = ImageTemporalConvNet(3, [44]*8, 3, 0.2) # 3 input channels, 8 layers, 44 channels per layer, kernel size 3, dropout 0.2
print(model(x).shape) # should be (10, 44, 32, 32)

torch.Size([10, 10])
torch.Size([10, 44, 32, 32])


### CNN Image Encoder

In [43]:
### Encode Image with pretrained ResNet18, get the feature map before the last layer
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models
from torchvision.models.resnet import ResNet18_Weights

class ResNet18_Encoder(nn.Module):
    def __init__(self, pretrained=True):
        super(ResNet18_Encoder, self).__init__()
        if pretrained:
            self.resnet18 = models.resnet18(weights=ResNet18_Weights.IMAGENET1K_V1)
        else:
            self.resnet18 = models.resnet18(weights=None)
        self.resnet18.fc = nn.Identity()
    
    def forward(self, x):
        return self.resnet18(x)


In [32]:
# test ResNet18_Encoder module

x = torch.randn(10, 3, 224, 224) # batch size 10, 3 channels, 224x224 image
model = ResNet18_Encoder(pretrained=True)
print(model(x).shape) # should be (10, 512)

torch.Size([10, 512])


### Combine ResNet18_Encoder and TCN to have video encoder

In [44]:
### Video Temporal Convolutional Network (VTCN) with ResNet18 Encoder

class VTCN(nn.Module):
    def __init__(self, input_size, output_size, num_channels, kernel_size, dropout):
        super(VTCN, self).__init__()
        self.encoder = ResNet18_Encoder(pretrained=True)
        self.tcn = TemporalConvNet(input_size, num_channels, kernel_size, dropout=dropout)
        self.linear = nn.Linear(num_channels[-1], output_size)
    
    def forward(self, x):
        # x is of size (N, C, T, H, W) where N is the batch size, C is the number of features, T is the number of frames, H is the height, W is the width
        N, C, T, H, W = x.size()
        x = x.view(N*T, C, H, W)
        y1 = self.encoder(x)  # input should have dimension (N*T, C, H, W)
        y1 = y1.view(N, T, -1)
        y1 = y1.permute(0, 2, 1) # permute to (N, C, T)
        y1 = self.tcn(y1)  # input should have dimension (N, C, T)
        o = self.linear(y1[:, :, -1])
        return o

In [45]:
# test VTCN module

x = torch.randn(10, 3, 16, 224, 224) # batch size 10, 3 channels, 16 frames, 224x224 image
model = VTCN(512, 44, [25]*8, 2, 0.2) # 512 input features, 44 output features, 8 layers, 25 channels per layer, kernel size 2, dropout 0.2
print(model(x).shape) # should be (10, 10)

torch.Size([10, 44])


# Test

In [46]:
# create 3 modalities of data (video, audio, egg) and encode them with either TCN or VTCN

# batch size 10, 3 channels, 60 frames, 224x224 image
x_video = torch.randn(10, 3, 60, 224, 224)
# batch size 10, 1 channel, 16k audio samples
x_audio = torch.randn(10, 1, 16000) 
# batch size 10, 32 channels, 240Hz in 1 second
x_egg = torch.randn(10, 32, 240)

# Ecoding video with VTCN
model_video = VTCN(512, 44, [25]*8, 2, 0.2)
output_video = model_video(x_video)
print("video embedding vector size", output_video.shape) # should be (10, 44)

# Ecoding audio with TCN
model_audio = TCN(1, 44, [25]*8, 2, 0.2) # 1 input channel, 44 output classes, 8 layers, 25 channels per layer, kernel size 2, dropout 0.2
output_audio = model_audio(x_audio)
print("audio embedding vector size", output_audio.shape) # should be (10, 44)

# Ecoding egg with TCN
model_egg = TCN(32, 44, [25]*8, 2, 0.2)
output_egg = model_egg(x_egg)
print("egg embedding vector size", output_egg.shape) # should be (10, 44)

video embedding vector size torch.Size([10, 44])
audio embedding vector size torch.Size([10, 44])
egg embedding vector size torch.Size([10, 44])


In [49]:
# create 3 modalities of data (video, audio, egg) and encode them with either ImageTCN or VTCN

# batch size 10, 3 channels, 60 frames, 224x224 image
x_video = torch.randn(10, 3, 60, 224, 224)
# audio spectrogram of size 10, 1 channel, 500 frames, 20 frequency bins
x_audio = torch.randn(10, 1, 500, 20)
# eeg spectrogram of size 10, 32 channels, 500 frames, 20 frequency bins
x_egg = torch.randn(10, 32, 500, 20)

# Ecoding video wit VTCN
model_video = VTCN(512, 44, [25]*8, 2, 0.2)
output_video = model_video(x_video)
print("video embedding vector size", output_video.shape) # should be (10, 44)

# Ecoding audio spectrogram with ImageTCN
model_audio = ImageTCN(1, 44, [25]*8, 3, 0.2) # 1 input channel, 44 output classes, 8 layers, 25 channels per layer, kernel size 3, dropout 0.2
output_audio = model_audio(x_audio)
print("audio embedding vector size", output_audio.shape) # should be (10, 44)

# Ecoding egg spectrogram with ImageTCN
model_egg = ImageTCN(32, 44, [25]*8, 3, 0.2)
output_egg = model_egg(x_egg)
print("egg embedding vector size", output_egg.shape) # should be (10, 44)

video embedding vector size torch.Size([10, 44])
audio embedding vector size torch.Size([10, 44])
egg embedding vector size torch.Size([10, 44])
