In [1]:
import os
import numpy as np
import pandas as pd
import torch
import torchaudio
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import torch.nn.functional as F

In [2]:
_model = None # type: VoiceImpersonator
_hparams = None
_device = None
_use_cuda = None
_train_audio_transforms = None
_valid_audio_transforms = None

In [3]:
def load_model(model_path=None, save_model_path=None, batch_size=8):
    global _model, _device, _hparams, _use_cuda, _train_audio_transforms, _valid_audio_transforms
    _use_cuda = torch.cuda.is_available()
    _device = torch.device("cuda" if _use_cuda else "cpu")
    warm_start=True

    _hparams = {
        "n_cnn_layers": 2,
        "n_rnn_layers": 4,
        "rnn_dim": 512,
#         "n_class": 29,
#         "n_feats": 64,
        "stride":2,
        "dropout": 0.1,
        "learning_rate": 5e-4,
        "batch_size": batch_size,
        "epochs": 10,
        "save_model_path": save_model_path
    }

    _model = VoiceImpersonator(
    _hparams['n_cnn_layers'], _hparams['n_rnn_layers'], _hparams['rnn_dim'],
    _hparams['stride'], _hparams['dropout']
    ).to(_device)

    if model_path is not None:
        _model.load_state_dict(torch.load(model_path), strict=False)

    print(_model)
    print('Num Model Parameters', sum([param.nelement() for param in _model.parameters()]))
    
    _train_audio_transforms = nn.Sequential(
    torchaudio.transforms.MelSpectrogram(sample_rate=16000, n_mels=128),
    torchaudio.transforms.FrequencyMasking(freq_mask_param=30),
    torchaudio.transforms.TimeMasking(time_mask_param=100)
    )

    _valid_audio_transforms = torchaudio.transforms.MelSpectrogram()

In [None]:
# Define blocks of layers and full model
class VoiceImpersonator(nn.Module):
    def __init__(self, n_cnn_layers, n_rnn_layers, rnn_dim, stride=2, dropout=0.1):
        super().__init__()
        # Single convolutional layer for basic heirarchal feature extraction
        # Conv2D(in_channels, out_channels, kernel_size, stride, padding, dilation....)
        self.cnn = nn.Conv2d(1, 32, 3, stride=stride, padding=1)

        # n_cnn Residual Convolutional Layers for deeper feature extraction
        self.rescnn_layers = nn.Sequential(*[
            ResidualCNN(32, 32, kernel=3, stride=1, dropout=dropout, n_feats=64) 
            for _ in range(n_cnn_layers)
        ])
                             
        # Single fully connected layer, 2048 inputs (64 features * 32 filters), rnn_dim outputs
        self.fully_connected = nn.Linear(2048, rnn_dim)
        
        
        self.birnn_layers = nn.Sequential(*[
            BidirectionalGRU(rnn_dim=rnn_dim if i==0 else rnn_dim*2,
                             hidden_size=rnn_dim, dropout=dropout, batch_first=i==0)
            for i in range(n_rnn_layers)
        ])

    def forward(self, x):
        x = self.cnn(x)
        x = self.rescnn_layers(x)
        sizes = x.size()
        x = x.view(sizes[0], sizes[1] * sizes[2], sizes[3])  # (batch, feature, time)
        x = x.transpose(1, 2) # (batch, time, feature)
        x = self.fully_connected(x)
        x = self.birnn_layers(x)
        return x


class CNNLayerNorm(nn.Module):
    """Layer normalization built for cnns input"""
    def __init__(self, n_feats):
        super(CNNLayerNorm, self).__init__()
        self.layer_norm = nn.LayerNorm(n_feats)

    def forward(self, x):
        # x (batch, channel, feature, time)
        x = x.transpose(2, 3).contiguous() # (batch, channel, time, feature)
        x = self.layer_norm(x)
        return x.transpose(2, 3).contiguous() # (batch, channel, feature, time) 


class ResidualCNN(nn.Module):
    """Residual CNN inspired by https://arxiv.org/pdf/1603.05027.pdf
        except with layer norm instead of batch norm
    """
    def __init__(self, in_channels, out_channels, kernel, stride, dropout, n_feats):
        super(ResidualCNN, self).__init__()

        self.cnn1 = nn.Conv2d(in_channels, out_channels, kernel, stride, padding=kernel//2)
        self.cnn2 = nn.Conv2d(out_channels, out_channels, kernel, stride, padding=kernel//2)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.layer_norm1 = CNNLayerNorm(n_feats)
        self.layer_norm2 = CNNLayerNorm(n_feats)

    def forward(self, x):
        residual = x  # (batch, channel, feature, time)
        x = self.layer_norm1(x)
        x = F.gelu(x)
        x = self.dropout1(x)
        x = self.cnn1(x)
        x = self.layer_norm2(x)
        x = F.gelu(x)
        x = self.dropout2(x)
        x = self.cnn2(x)
        x += residual
        return x # (batch, channel, feature, time)


class BidirectionalGRU(nn.Module):

    def __init__(self, rnn_dim, hidden_size, dropout, batch_first):
        super(BidirectionalGRU, self).__init__()

        self.BiGRU = nn.GRU(
            input_size=rnn_dim, hidden_size=hidden_size,
            num_layers=1, batch_first=batch_first, bidirectional=True)
        self.layer_norm = nn.LayerNorm(rnn_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x = self.layer_norm(x)
        x = F.gelu(x)
        x, _ = self.BiGRU(x)
        x = self.dropout(x)
        return x

In [None]:
# Manage data preprocessing (creating spectrograms, transforms, etc)
def data_processing(data, data_type="train"):
    global _train_audio_transforms, _valid_audio_transforms
    spectrograms = []
    labels = []
    input_lengths = []
    label_lengths = []

    
    for (waveform, _, utterance, _, _, _) in data:
        if data_type == 'infer':
            spec = valid_audio_transforms(waveform).squeeze(0).transpose(0, 1)
            spectrograms.append(spec)
            continue
        elif data_type == 'train':
            spec = _train_audio_transforms(waveform).squeeze(0).transpose(0, 1)
        elif data_type == 'valid':
            spec = _valid_audio_transforms(waveform).squeeze(0).transpose(0, 1)
        else:
            raise Exception('data_type should be train or valid')
        spectrograms.append(spec)
        label = torch.Tensor(text_transform.text_to_int(utterance.lower()))
        labels.append(label)
        input_lengths.append(spec.shape[0]//2)
        label_lengths.append(len(label))

    spectrograms = nn.utils.rnn.pad_sequence(spectrograms, batch_first=True).unsqueeze(1).transpose(2, 3)
    
    if data_type != 'infer':
        labels = nn.utils.rnn.pad_sequence(labels, batch_first=True)

    return spectrograms, labels, input_lengths, label_lengths

In [None]:
# Define Training Behavior       
def train():
    global _model, _device, _hparams, _use_cuda

    if _model is None:
        raise Exception("Model was not loaded, call load_model() before training")

    dataset_dir = os.path.expanduser("~/dev/datasets/Libri")
    if not os.path.isdir(dataset_dir):
        print("Running")
        os.makedirs(dataset_dir)

    train_url="train-clean-100"
    test_url="test-clean"


    train_dataset = torchaudio.datasets.LIBRISPEECH(dataset_dir, url=train_url, download=True)
    test_dataset = torchaudio.datasets.LIBRISPEECH(dataset_dir, url=test_url, download=True)

    kwargs = {'num_workers': 1, 'pin_memory': True} if _use_cuda else {}
    train_loader = data.DataLoader(dataset=train_dataset,
                                batch_size=_hparams['batch_size'],
                                shuffle=True,
                                collate_fn=lambda x: data_processing(x, 'train'),
                                **kwargs)
    test_loader = data.DataLoader(dataset=test_dataset,
                                batch_size=_hparams['batch_size'],
                                shuffle=False,
                                collate_fn=lambda x: data_processing(x, 'valid'),
                                **kwargs)


    optimizer = optim.AdamW(_model.parameters(), _hparams['learning_rate'])
#     criterion = nn.CTCLoss(blank=28).to(_device)
            
                             
                             
    # NEED A NEW LOSS
                             
                             
                             
    scheduler = optim.lr_scheduler.OneCycleLR(optimizer, max_lr=_hparams['learning_rate'], 
                                            steps_per_epoch=int(len(train_loader)),
                                            epochs=_hparams['epochs'],
                                            anneal_strategy='linear')
          
                             
    _model.train()
    data_len = len(train_loader.dataset)
    for epoch in range(1, _hparams['epochs'] + 1):
        for batch_idx, _data in enumerate(train_loader):
            torch.cuda.empty_cache()
            spectrograms, labels, input_lengths, label_lengths = _data
            spectrograms, labels = spectrograms.to(_device), labels.to(_device)

            optimizer.zero_grad()

            output = _model(spectrograms)  # (batch, time, n_class)
            output = F.log_softmax(output, dim=2)
            output = output.transpose(0, 1) # (time, batch, n_class)

                             
            # NEED TO IMPLEMENT NEW LOSS AND GRADIENT PROPOGATION
                             
#             loss = criterion(output, labels, input_lengths, label_lengths)
            loss.backward()

            optimizer.step()
            scheduler.step()
            if batch_idx % 100 == 0 or batch_idx == data_len:
                print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                    epoch, batch_idx * len(spectrograms), data_len,
                    100. * batch_idx / len(train_loader), loss.item()))
                             
        # Calculate validation statistics and save after each epoch                 
        test(_model, _device, test_loader, criterion, epoch)
        torch.save(_model.state_dict(), _hparams['save_model_path'])

In [None]:
# Define Validation Metrics and Behavior
def test(model, device, test_loader, criterion, epoch):
    print('\nevaluating...')
    model.eval()
    test_loss = 0
    test_cer, test_wer = [], []
    with torch.no_grad():
        for i, _data in enumerate(test_loader):
            spectrograms, labels, input_lengths, label_lengths = _data 
            spectrograms, labels = spectrograms.to(device), labels.to(device)

            output = model(spectrograms)  # (batch, time, n_class)
            output = F.log_softmax(output, dim=2)
            output = output.transpose(0, 1) # (time, batch, n_class)

            loss = criterion(output, labels, input_lengths, label_lengths)
            test_loss += loss.item() / len(test_loader)

    print('Test set: Average loss: {:.4f}, Average CER: {:4f} Average WER: {:.4f}\n'.format(test_loss, avg_cer, avg_wer))
               
                             
# def encode(input_file_path):
#     global _model, _device, _hparams, _use_cuda

#     if _model is None:
#         raise Exception("Model was not loaded, call load_model() and truncate_model() before encoding")

#     waveform, sample_rate = torchaudio.load(input_file_path, normalization=True)
#     input_data = [[waveform, None, None, None, None, None]]
#     input_layer = data_processing(input_data, 'infer')

#     _model.eval()
#     output=_model(input_layer[0].to(_device))
    
#     return output

In [None]:
# Load in the model from a file
load_model("./contentEncoder/saved_models/deepspeech5.pt", "./contentEncoder/saved_models/deepspeech6.pt", 8)

# Train the model
train()