<a href="https://colab.research.google.com/github/harshading/Hindi-Facial-Emotion-Synthesis/blob/master/ASR_MODEL_INFERENCE.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

https://towardsdatascience.com/automatic-speech-recognition-data-collection-with-youtube-v3-api-mask-rcnn-and-google-vision-api-2370d6776109

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


#Imports

In [None]:
%%capture

# GPU:
# !pip install torch==1.7.0+cu101 torchvision==0.8.1+cu101 torchaudio==0.7.0 -f https://download.pytorch.org/whl/torch_stable.html

# For interactive demo at the end:
!pip install pydub
!pip install torchaudio
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchaudio
import math
from torch.autograd import Variable

import matplotlib.pyplot as plt
import IPython.display as ipd
from tqdm.notebook import tqdm

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cpu


# Load Model

In [None]:
class PositionalEncoder(nn.Module):
    def __init__(self, d_model, max_seq_len, dropout=0.1):
        super().__init__()
        self.d_model = d_model
        self.dropout = nn.Dropout(p=dropout)


        pe = torch.zeros(max_seq_len, d_model)
        position = torch.arange(0, max_seq_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)
    
    def forward(self, x):
        # add pe constatns to embeddings
        seq_len = x.size(1)

        # print(f"PE Shape:{self.pe.shape}")
        # print(f"X shape: {x.shape}")
        # print(f"Seq Len: {seq_len}")
        # print(f"Added PE Shape: {self.pe[:,:seq_len,:].shape}")

        x = x + self.pe[:,:seq_len, :]  #.cuda().detach()
        return self.dropout(x)

class Encoder(nn.Module):
    def __init__(self, d_model, N_encoder_layers, heads, d_ff, max_seq_len):
        super().__init__()
        self.N = N_encoder_layers
        # self.embed = Embedder(vocab_size, d_model)
        self.pe = PositionalEncoder(d_model, max_seq_len)
        self.encoder = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model, heads, d_ff, dropout=0.3),
            self.N
        )
        # self.layers = get_clones(
        #     EncoderLayer(d_model, heads, d_ff),
        #     # nn.TransformerEncoderLayer(d_model, heads, d_ff),
        #      self.N)
        self.norm = nn.LayerNorm(d_model)
    def forward(self, src):
        # print(f"Before PE X shape: {src.shape}")
        x = self.pe(src)

        # x = (batch_size, seq_len, d_model)
        # but Transformer Encoder layer accepts
        # (seq_len, batch_size, d_model)
        x = x.transpose(0,1)
        x = self.encoder(x)
        # for i in range(self.N):
        #     x = self.layers[i](x)
        x = x.transpose(0,1)
        
        return self.norm(x)


class CNNLayerNorm(nn.Module):
    """Layer normalization built for cnns input"""
    def __init__(self, n_feats):
        super(CNNLayerNorm, self).__init__()
        self.layer_norm = nn.LayerNorm(n_feats)

    def forward(self, x):
        # x (batch, channel, feature, time)
        x = x.transpose(2, 3).contiguous() # (batch, channel, time, feature)
        x = self.layer_norm(x)
        return x.transpose(2, 3).contiguous() # (batch, channel, feature, time) 


class ResidualCNN(nn.Module):
    """Residual CNN inspired by https://arxiv.org/pdf/1603.05027.pdf
        except with layer norm instead of batcorch.Size([1, 43h norm
    """
    def __init__(self, in_channels, out_channels, kernel, stride, dropout, n_feats):
        super(ResidualCNN, self).__init__()

        self.cnn1 = nn.Conv2d(in_channels, out_channels, kernel, stride, padding=kernel//2)
        self.cnn2 = nn.Conv2d(out_channels, out_channels, kernel, stride, padding=kernel//2)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.layer_norm1 = CNNLayerNorm(n_feats)
        self.layer_norm2 = CNNLayerNorm(n_feats)

    def forward(self, x):
        residual = x  # (batch, channel, feature, time)
        # print(f"Before Layer Norm shape: {x.shape}")
        x = self.layer_norm1(x)
        x = F.relu(x)
        x = self.dropout1(x)
        x = self.cnn1(x)
        x = self.layer_norm2(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.cnn2(x)
        x += residual
        return x # (batch, channel, feature, time)

class SpeechModel(nn.Module):
    def __init__(self, max_seq_len, n_feats, N_cnn_layers, n_channels,\
                 N_encoder_layers, d_model, d_ff, heads,N_AUs,\
                 stride=2, dropout=0.1,\
        ):
        super().__init__()
        self.cnn = nn.Conv2d(1, n_channels, 3, stride=1, padding=3//2)
        # cnn for extracting heirachal features

        self.rescnn_layers = nn.Sequential(*[
            ResidualCNN(n_channels, n_channels, kernel=3, stride=1, dropout=dropout, n_feats=n_feats) 
            for _ in range(N_cnn_layers)
        ])
        self.fc = nn.Linear(n_channels*n_feats, d_model) #n_channels

        self.encoder = Encoder(d_model, N_encoder_layers, heads, d_ff, max_seq_len)
        self.linear = nn.Linear(d_model, N_AUs)

    def forward(self, src):  # input: (batch, 1, feature, time)
        x = self.cnn(src)    # (batch, channel, feature, time)
        # print(f"After CNN shape: {x}")

        x = self.rescnn_layers(x)
        # print(f"After RES shape: {x}")

        sizes = x.size()
        x = x.view(sizes[0], sizes[1]*sizes[2], sizes[3])  # (batch, channel*feature, time)
        x = x.transpose(1, 2) # (batch, time, channel*feature)
        x = self.fc(x)        # (batch, time, d_model)

        # print(f"Befor Encoder X shape: {x}")

        e_outputs = self.encoder(x) # (batch_size, seq_len, d_model)

        # print(f"Encoder Output: {e_outputs}")
        avg_enc   = torch.mean(x, -2)   # (batch_size, d_model)

        # print(f"Average Encoder: {avg_enc}")
        output = self.linear(avg_enc)  #(batch_size, N_AUs)

        # print(f"Output before Sigmoid: {output}")
        output = 5*nn.Sigmoid()(output)
        # print(f"Output : {output}")
        return output

In [None]:
heads = 8
N_encoder_layers = 4
d_model = 512 #512
d_ff = 2048 #2048
N_AUs = 17  # TO BE EDIT

max_seq_len = 441 #max_seq_length # TO BE EDIT
n_channels = 16
N_cnn_layers = 1
n_feats = 128 # TO BE EDIT
n_mels = 128

model = SpeechModel(max_seq_len, n_feats, N_cnn_layers, n_channels, N_encoder_layers,\
                    d_model, d_ff, heads, N_AUs)

data_dir = '/content/drive/MyDrive/ASR_Project/'
model.load_state_dict(torch.load(data_dir+'SunilModel', map_location=torch.device('cpu')))

<All keys matched successfully>

#Inference

In [None]:
import pandas as pd
resample_transform = torchaudio.transforms.Resample(44100, 16000)
melspec_transform = torchaudio.transforms.MelSpectrogram(sample_rate=16000)


def get_audio(file_path):
    waveform, _ = torchaudio.load(file_path)
    waveform = resample_transform(waveform)
    return waveform

def get_au(file_path):
    df = pd.read_csv(file_path, delimiter=',')
    columns = df.columns[1:]
    au_data = torch.tensor(df.mean().values[1:])
    return au_data, columns


def get_output(model, wave):
    model.eval()
    wave_melspec = melspec_transform(wave)
    wave_data    = wave_melspec.mean(dim=0, keepdims=True).unsqueeze(0)
    model_output = model(wave_data)[0]
    return model_output


def inference(model, audio_file_path, au_file_path):
    waveform = get_audio(audio_file_path)
    target_aus, au_header = get_au(au_file_path)

    generated_aus = get_output(model, waveform)

    # loss = nn.
    return generated_aus, target_aus, au_header

  "At least one mel filterbank has all zero values. "


In [None]:
au_dir = data_dir+'au_dir/'
audio_dir = data_dir+'split_audios_dir/'


my_audio_file_path = '/content/7qgWBZu9VOc-008.mp3'
my_au_file_path = '/content/7qgWBZu9VOc-008.mp4.csv'

generated_au, target_au,au_header = inference(model, my_audio_file_path, my_au_file_path)

In [None]:
print(f"{au_header.values}")
print(f"Generated\n{generated_au.detach().numpy()}")
print(f"Target\n{target_au.detach().numpy()}")

['AU01_r' 'AU02_r' 'AU04_r' 'AU05_r' 'AU06_r' 'AU07_r' 'AU09_r' 'AU10_r'
 'AU12_r' 'AU14_r' 'AU15_r' 'AU17_r' 'AU20_r' 'AU23_r' 'AU25_r' 'AU26_r'
 'AU45_r']
Generated
[0.7896003  0.6537746  1.1176057  0.64682376 0.9194159  1.3153814
 0.66935724 1.1891489  0.919821   1.0739055  0.73496497 0.98586625
 0.6794081  0.6868584  1.1004884  1.0184083  0.78645414]
Target
[0.5356     0.11653333 2.35333333 0.03853333 1.07193333 2.18373333
 0.02673333 0.82606667 0.48886667 1.89146667 0.10206667 0.21033333
 0.06093333 0.04686667 0.39793333 0.449      0.35826667]
