In [2]:
import torch
import torch.nn as nn
import numpy as np
import torch.nn.functional as F
from torch.autograd  import  Function

# Acoustic Branch

Inputs for acoustic branch will be N x 40 where N [1,33]  
Time step: (2, 10) (seconds?)  
N: relative duration after feature extraction

In [3]:
class AcousticNet(nn.Module):
    def __init__(self, num_conv_layers = 3, kernel_size = 2, conv_width = 32, num_gru_layers = 2):
        super(AcousticNet, self).__init__()
        self.num_conv_layers = num_conv_layers
        self.conv1 = nn.Conv1d(in_channels=40, out_channels=conv_width, kernel_size=kernel_size, padding = kernel_size - 1)
        self.conv2 = nn.Conv1d(in_channels=conv_width, out_channels=conv_width, kernel_size=kernel_size, padding = kernel_size - 1)
        self.conv3 = nn.Conv1d(in_channels=conv_width, out_channels=conv_width, kernel_size=kernel_size, padding = kernel_size - 1)
        self.conv4 = nn.Conv1d(in_channels=conv_width, out_channels=conv_width, kernel_size=kernel_size, padding = kernel_size - 1)
        self.convs = [self.conv1, self.conv2, self.conv3, self.conv4]
        self.max_pool = nn.MaxPool1d(kernel_size = 2)
        self.relu = nn.ReLU()
        
        self.gru = nn.GRU(input_size=conv_width,hidden_size=32,num_layers=num_gru_layers) # 19 is hardcoded
        self.mean_pool = nn.AvgPool1d(kernel_size=2)
        
    def forward(self, x):
        for i in range(self.num_conv_layers):
            x = self.relu(self.max_pool(self.convs[i](x)))
        x = torch.transpose(x, 1, 2) 
        x, _ = self.gru(x)
        x = torch.transpose(x, 1, 2)
        x = F.adaptive_avg_pool1d(x,1)[:, :, -1]
#         x = self.mean_pool(x)
        return x

In [4]:
# Test dummy input
net = AcousticNet(num_conv_layers = 3, kernel_size = 2, conv_width = 32, num_gru_layers = 2)
batch_size = 8
test_vec = torch.randn(batch_size, 40, 17) # samples x features (or channels) x N (relative duration)
output = net(test_vec)
print(f'Shape of output: {output.shape}')
# assert output.shape[-1] == 16

Shape of output: torch.Size([8, 32])


# Lexical Branch

In [5]:
# implement GRU (or transformer)
class LexicalNet(nn.Module):
    def __init__(self, num_gru_layers = 2):
        super(LexicalNet, self).__init__()
        # implement GRU (or transformer)
        self.gru = nn.GRU(input_size=300,hidden_size=32,num_layers=num_gru_layers)
        self.mean_pool = nn.AvgPool1d(kernel_size=2) 
        self.flatten = nn.Flatten()
        
    def forward(self, x):
        x, _ = self.gru(x)
#         x = self.mean_pool(x)
        x = self.flatten(x)
        print(x.shape)
        return x

In [6]:
# Test dummy input
net = LexicalNet(num_gru_layers = 2)
batch_size = 8
test_vec = torch.randn(batch_size, 1, 300)
output = net(test_vec)
# assert output.shape[-1] == 16

torch.Size([8, 32])


# Master branch

In [13]:
class GRL(Function):
    @staticmethod
    def forward(self,x):
        return x
    @staticmethod
    def backward(self,grad_output):
        grad_input = grad_output.neg()
        return grad_input

In [19]:
class MasterNet(nn.Module):
    def __init__(self, acoustic_modality = True, lexical_modality = True, visual_modality = False,
                 num_conv_layers = 3, kernel_size = 2, conv_width = 32, num_gru_layers = 2,
                 num_dense_layers = 1, dense_layer_width = 32, grl_lambda = .3):
        super(MasterNet, self).__init__()
        
        self.acoustic_modality = acoustic_modality
        self.lexical_modality = lexical_modality
        self.visual_modality = visual_modality
        
        self.acoustic_model = AcousticNet(num_conv_layers = num_conv_layers, kernel_size = kernel_size, 
                                     conv_width = conv_width, num_gru_layers = num_gru_layers)
        self.lexical_model = LexicalNet(num_gru_layers = 2)
        
        # emotion classifier
#         self.dense1_emo = nn.Linear()
#         self.dense2_emo = nn.Linear()
        
        width = 0 # width of the FC layers
        if self.acoustic_modality:
            width += 32
        if self.visual_modality:
            width += 0 # to implement
        if self.lexical_modality:
            width += 32
            
        self.fc_1 = nn.Linear(width, dense_layer_width)
        self.fc_2 = nn.Linear(dense_layer_width, 3)
        self.softmax = nn.Softmax(dim=1)

        self.relu = nn.ReLU()
#         # To implement   
#         if num_dense_layers == 2:
#             self.fc = nn.Sequential()
#             self.linear_1 = nn.Linear(width, dense_layer_width)
#         else:
#             self.fc = 
        
        # confound classifier -- to implement
        
        self.grl = GRL()
        self.dense_con = nn.Linear(width, 3)
#         self.dense2_con = None
        
        
    def forward_a(self, x_a):
        x = x_a
        x = self.acoustic_model(x)
        return x
    
    def forward_l(self, x_l):
        x = x_l
        x = self.lexical_model(x)
        return x
    
    def forward_v(self, x_v):
        x = x_v
        return x
    
    def encoder(self, x_v, x_a, x_l):
        print('x_a before encoding', x_a.shape)
        print('x_l before encoding', x_l.shape)
        if self.visual_modality:
            x_v = self.forward_v(x_v)
        if self.acoustic_modality:
            x_a = self.forward_a(x_a)
        if self.lexical_modality:
            x_l = self.forward_l(x_l)
        print('x_a after encoding', x_a.shape)
        print('x_l after encoding', x_l.shape)
        
        if self.visual_modality:
            if self.acoustic_modality:
                if self.lexical_modality:
                    x = torch.cat((x_v, x_a, x_l), 1)
                else:
                    x = torch.cat((x_v, x_a), 1)
            else:
                if self.lexical_modality:
                    x = torch.cat((x_v, x_l), 1)
                else:
                    x = x_v
        else:
            if self.acoustic_modality:
                if self.lexical_modality:
                    x = torch.cat((x_a, x_l), 1)
                else:
                    x = x_a
            else:
                x = x_l
        print('x after concat', x.shape)
        return x

    def stress_model(self, x):
        x = self.grl.apply(x)
        x = self.dense_con(x)
        x = self.softmax(x)
        return x
    
    def recognizer(self, x):
        print(x.shape)
        x = self.relu(self.fc_1(x))
        x = self.fc_2(x)
        x = self.softmax(x)
        return x
    
    def forward(self, x_v, x_a, x_l):
        x = self.encoder(x_v, x_a, x_l)
        emotion_output = self.recognizer(x)
        stress_output = self.stress_model(x)
        
        return emotion_output, stress_output

In [20]:
# Test dummy input
net = MasterNet()
batch_size = 8
acoustic_features = torch.randn(batch_size, 40, 17) # samples x features (or channels) x N (relative duration)
lexical_features = torch.randn(batch_size, 1, 300)
visual_features = None
emotion_output, stress_output = net(visual_features, acoustic_features, lexical_features)
print(f'Shape of emotion output: {emotion_output.shape}')
print(f'Shape of stress output: {stress_output.shape}')
print(emotion_output)
print(stress_output)
# assert output.shape[-1] == 16

x_a before encoding torch.Size([8, 40, 17])
x_l before encoding torch.Size([8, 1, 300])
torch.Size([8, 32])
x_a after encoding torch.Size([8, 32])
x_l after encoding torch.Size([8, 32])
x after concat torch.Size([8, 64])
torch.Size([8, 64])
Shape of emotion output: torch.Size([8, 3])
Shape of stress output: torch.Size([8, 3])
tensor([[0.3092, 0.3467, 0.3441],
        [0.3127, 0.3483, 0.3390],
        [0.3138, 0.3497, 0.3365],
        [0.3063, 0.3614, 0.3323],
        [0.3061, 0.3597, 0.3342],
        [0.3089, 0.3606, 0.3305],
        [0.3080, 0.3599, 0.3321],
        [0.3037, 0.3634, 0.3329]], grad_fn=<SoftmaxBackward>)
tensor([[0.3399, 0.3367, 0.3233],
        [0.3549, 0.3074, 0.3377],
        [0.3672, 0.3226, 0.3102],
        [0.3508, 0.3366, 0.3127],
        [0.3364, 0.3411, 0.3225],
        [0.2944, 0.3969, 0.3087],
        [0.2811, 0.4005, 0.3184],
        [0.2838, 0.3868, 0.3294]], grad_fn=<SoftmaxBackward>)


In [None]:
def train(train_loader, model, optimizer, criterion, device, opt):
    model.train()

    running_loss = 0.
    running_acc = 0.

    groundtruth = []
    prediction = []

    for i, train_data in enumerate(train_loader):
        visual_features, _, acoustic_features, _, lexical_features, _, _, a_labels, _, _ = train_data

        visual_features = visual_features.to(device)
        acoustic_features = acoustic_features.to(device)
        lexical_features = lexical_features.to(device)

        labels = a_labels.to(device)

        optimizer.zero_grad()
        predictions = model(visual_features, acoustic_features, lexical_features)

        loss = criterion(predictions, labels)

        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()

        groundtruth.append(labels.tolist())
        predictions = predictions.argmax(dim=1, keepdim=True)
        prediction.append(predictions.view_as(labels).tolist())

        if opt.verbose and i > 0 and int(len(train_loader) / 10) > 0 and i % (int(len(train_loader) / 10)) == 0:
            print('.', flush=True, end='')
            
    train_loss = running_loss / len(train_loader)

    groundtruth = list(itertools.chain.from_iterable(groundtruth))
    prediction = list(itertools.chain.from_iterable(prediction))

    train_acc = accuracy_score(prediction, groundtruth)

    return train_loss, train_acc