In [48]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import librosa
import os
from tqdm import tqdm
import warnings
warnings.filterwarnings("ignore")

In [49]:
from sklearn.preprocessing import LabelEncoder

In [50]:
df = {
    'file':[],
    'y':[],
    'sr':[],
    'class':[],
}
files = sorted(os.listdir('../Project 1/Speech Recordings/'))
for file in tqdm(files):
    class_name = file.split('.')[0].split('_')[0]
    # print(class_name,file)
    y, sr  = librosa.load('../Project 1/Speech Recordings/' + file)
    df['file'].append(file)
    df['y'].append(y)
    df['sr'].append(sr)
    df['class'].append(class_name)
    # break
df = pd.DataFrame(df)
df['targets'] = LabelEncoder().fit_transform(df['class'])

100%|██████████| 200/200 [00:46<00:00,  4.26it/s]


In [51]:
import librosa
import numpy as np
import matplotlib.pyplot as plt

# Load the audio file
audio_path = "../Project 1/Speech Recordings/Bikram_E_7.m4a"
y, sr = librosa.load(audio_path, sr=None)

# Define the desired chunk length
chunk_length = 512

# Calculate the number of chunks
num_chunks = len(y) // chunk_length

# Extract chunks
chunks = [y[i*chunk_length:(i+1)*chunk_length] for i in range(num_chunks)]

# If there's any remaining audio at the end, include it as a final chunk
if len(y) % chunk_length != 0:
    chunks.append(y[num_chunks*chunk_length:])

# Compute and plot spectrogram for each chunk
spectrograms = []
for i, chunk in enumerate(chunks):
    # Compute the spectrogram using the short-time Fourier transform (STFT)
    spectrogram = np.abs(librosa.stft(chunk))
    spectrograms.append(spectrogram)

In [52]:
chunks[0].shape

(512,)

In [53]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class PixelCNN(nn.Module):
    def __init__(self, input_channels=2, num_layers=12, kernel_size=7, num_filters=64 , output_channels=1):
        super(PixelCNN, self).__init__()
        self.input_channels = input_channels
        self.num_layers = num_layers
        self.kernel_size = kernel_size
        self.num_filters = num_filters
        
        # Define the initial convolutional layer
        self.initial_conv = nn.Conv1d(input_channels, num_filters, kernel_size=kernel_size, padding=kernel_size//2)
        
        # Define the residual blocks
        self.residual_blocks = nn.ModuleList([
            ResidualBlock(num_filters, kernel_size) for _ in range(num_layers)
        ])
        
        # Define the final convolutional layer for output
        self.out_conv = nn.Conv1d(num_filters, output_channels, kernel_size=1)  # Output single-channel spectrogram

    def forward(self, x):
        x = self.initial_conv(x)
        
        # Pass through residual blocks
        for block in self.residual_blocks:
            x = block(x)
        
        # Apply output convolution
        x = self.out_conv(x)
        return x

class ResidualBlock(nn.Module):
    def __init__(self, num_filters, kernel_size):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv1d(num_filters, num_filters, kernel_size=kernel_size, padding=kernel_size//2)
        self.conv2 = nn.Conv1d(num_filters, num_filters, kernel_size=kernel_size, padding=kernel_size//2)

    def forward(self, x):
        residual = x
        x = F.relu(x)
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x += residual  # Add residual connection
        return x

# # Example usage:
# model = PixelCNN(input_channels=3, num_layers=12, kernel_size=7, num_filters=64)
# print(model)


In [54]:
chunk1 = torch.tensor([chunks[0],]).float()
chunk2 = torch.tensor([chunks[1],]).float()
chunk3 = torch.tensor([chunks[2],]).float()
chunk = torch.cat([chunk1, chunk2,chunk3], dim=0).unsqueeze(0)
# model(chunk).shape

In [55]:
chunk.shape

torch.Size([1, 3, 512])

In [56]:
class VectorQuantizerEMA(nn.Module):
    def __init__(self, num_embeddings, embedding_dim, commitment_cost, decay, epsilon=1e-5):
        super(VectorQuantizerEMA, self).__init__()

        self._embedding_dim = embedding_dim
        self._num_embeddings = num_embeddings

        self._embedding = nn.Embedding(self._num_embeddings, self._embedding_dim)
        self._embedding.weight.data.normal_()
        self._commitment_cost = commitment_cost

        self.register_buffer('_ema_cluster_size', torch.zeros(num_embeddings))
        self._ema_w = nn.Parameter(torch.Tensor(num_embeddings, self._embedding_dim))
        self._ema_w.data.normal_()

        self._decay = decay
        self._epsilon = epsilon

    def forward(self, inputs):
        # convert inputs from BCHW -> BHWC
        #BCH -> BHC
        # print(inputs.shape)
        inputs = inputs.permute(0, 2 , 1).contiguous()
        input_shape = inputs.shape

        # Flatten input
        flat_input = inputs.view(-1, self._embedding_dim)

        # Calculate distances
        distances = (torch.sum(flat_input**2, dim=1, keepdim=True)
                    + torch.sum(self._embedding.weight**2, dim=1)
                    - 2 * torch.matmul(flat_input, self._embedding.weight.t()))

        # Encoding
        encoding_indices = torch.argmin(distances, dim=1).unsqueeze(1)
        encodings = torch.zeros(encoding_indices.shape[0], self._num_embeddings, device=inputs.device)
        encodings.scatter_(1, encoding_indices, 1)

        # Quantize and unflatten
        quantized = torch.matmul(encodings, self._embedding.weight).view(input_shape)

        # Use EMA to update the embedding vectors
        if self.training:
            self._ema_cluster_size = self._ema_cluster_size * self._decay + \
                                     (1 - self._decay) * torch.sum(encodings, 0)

            # Laplace smoothing of the cluster size
            n = torch.sum(self._ema_cluster_size.data)
            self._ema_cluster_size = (
                (self._ema_cluster_size + self._epsilon)
                / (n + self._num_embeddings * self._epsilon) * n)

            dw = torch.matmul(encodings.t(), flat_input)
            self._ema_w = nn.Parameter(self._ema_w * self._decay + (1 - self._decay) * dw)

            self._embedding.weight = nn.Parameter(self._ema_w / self._ema_cluster_size.unsqueeze(1))

        # Loss
        e_latent_loss = F.mse_loss(quantized.detach(), inputs)
        loss = self._commitment_cost * e_latent_loss

        # Straight Through Estimator
        quantized = inputs + (quantized - inputs).detach()
        avg_probs = torch.mean(encodings, dim=0)
        perplexity = torch.exp(-torch.sum(avg_probs * torch.log(avg_probs + 1e-10)))

        # convert quantized from BHWC -> BCHW
        #BHC -> BCH
        return loss, quantized.permute(0, 2, 1).contiguous(), perplexity, encodings

In [69]:
class VectorQuantizer(nn.Module):
    def __init__(self, num_embeddings, embedding_dim, commitment_cost):
        super(VectorQuantizer, self).__init__()

        self._embedding_dim = embedding_dim
        self._num_embeddings = num_embeddings

        self._embedding = nn.Embedding(self._num_embeddings, self._embedding_dim)
        self._embedding.weight.data.uniform_(-1/self._num_embeddings, 1/self._num_embeddings)
        self._commitment_cost = commitment_cost

    def forward(self, inputs):
        # convert inputs from BCHW -> BHWC
        inputs = inputs.permute(0, 2 , 1).contiguous()
        input_shape = inputs.shape

        # Flatten input
        flat_input = inputs.view(-1, self._embedding_dim)

        # Calculate distances
        distances = (torch.sum(flat_input**2, dim=1, keepdim=True)
                    + torch.sum(self._embedding.weight**2, dim=1)
                    - 2 * torch.matmul(flat_input, self._embedding.weight.t()))

        # Encoding
        encoding_indices = torch.argmin(distances, dim=1).unsqueeze(1)
        encodings = torch.zeros(encoding_indices.shape[0], self._num_embeddings, device=inputs.device)
        encodings.scatter_(1, encoding_indices, 1)

        # Quantize and unflatten
        quantized = torch.matmul(encodings, self._embedding.weight).view(input_shape)

        # Loss
        e_latent_loss = F.mse_loss(quantized.detach(), inputs)
        q_latent_loss = F.mse_loss(quantized, inputs.detach())
        loss = q_latent_loss + self._commitment_cost * e_latent_loss

        quantized = inputs + (quantized - inputs).detach()
        avg_probs = torch.mean(encodings, dim=0)
        perplexity = torch.exp(-torch.sum(avg_probs * torch.log(avg_probs + 1e-10)))

        # convert quantized from BHWC -> BCHW
        return loss, quantized.permute(0, 2, 1).contiguous(), perplexity, encodings

In [70]:
class Residual(nn.Module):
    def __init__(self, in_channels, num_hiddens, num_residual_hiddens):
        super(Residual, self).__init__()
        self._block = nn.Sequential(
            nn.ReLU(True),
            nn.Conv1d(in_channels=in_channels,
                      out_channels=num_residual_hiddens,
                      kernel_size=3, stride=1, padding=1, bias=False),
            nn.ReLU(True),
            nn.Conv1d(in_channels=num_residual_hiddens,
                      out_channels=num_hiddens,
                      kernel_size=1, stride=1, bias=False)
        )

    def forward(self, x):
        return x + self._block(x)


class ResidualStack(nn.Module):
    def __init__(self, in_channels, num_hiddens, num_residual_layers, num_residual_hiddens):
        super(ResidualStack, self).__init__()
        self._num_residual_layers = num_residual_layers
        self._layers = nn.ModuleList([Residual(in_channels, num_hiddens, num_residual_hiddens)
                             for _ in range(self._num_residual_layers)])

    def forward(self, x):
        for i in range(self._num_residual_layers):
            x = self._layers[i](x)
        return F.relu(x)
    
class Decoder(nn.Module):
    def __init__(self, in_channels, num_hiddens, num_residual_layers, num_residual_hiddens):
        super(Decoder, self).__init__()

        self._conv_1 = nn.Conv1d(in_channels=in_channels,
                                 out_channels=num_hiddens,
                                 kernel_size=3,
                                 stride=1, padding=1)

        self._residual_stack = ResidualStack(in_channels=num_hiddens,
                                             num_hiddens=num_hiddens,
                                             num_residual_layers=num_residual_layers,
                                             num_residual_hiddens=num_residual_hiddens)

        self._conv_trans_1 = nn.ConvTranspose1d(in_channels=num_hiddens,
                                                out_channels=num_hiddens//2,
                                                kernel_size=4,
                                                stride=2, padding=1)

        self._conv_trans_2 = nn.ConvTranspose1d(in_channels=num_hiddens//2,
                                                out_channels=1,
                                                kernel_size=4,
                                                stride=2, padding=1)

    def forward(self, inputs):
        x = self._conv_1(inputs)
        x = self._residual_stack(x)

        x = self._conv_trans_1(x)
        x = F.relu(x)

        return self._conv_trans_2(x)

In [71]:
class Model(nn.Module):
    def __init__(self,chunk_size, num_hiddens, num_residual_layers, num_residual_hiddens,
                 num_embeddings, embedding_dim, output_dim, commitment_cost, decay=0):
        super(Model, self).__init__()
        self.num_hiddens = num_hiddens
        self._encoder = PixelCNN(input_channels=num_hiddens+1, num_layers=12, kernel_size=7, num_filters=64,output_channels=num_hiddens)
# print(model)
        self._pre_vq_conv = nn.Conv1d(in_channels=num_hiddens,
                                      out_channels=embedding_dim,
                                      kernel_size=1,
                                      stride=1)
        if decay > 0.0:
            self._vq_vae = VectorQuantizerEMA(num_embeddings, embedding_dim,
                                              commitment_cost, decay)
        else:
            self._vq_vae = VectorQuantizer(num_embeddings, embedding_dim,
                                          commitment_cost)

        self._decoder = Decoder(embedding_dim,
                                num_hiddens,
                                num_residual_layers,
                                num_residual_hiddens)
        self.mlp = nn.Linear(4*chunk_size, output_dim)
    def forward(self, x,context=None):
        if context is None:
            context = torch.zeros(x.shape[0], self.num_hiddens, x.shape[2]).to(x)
        x = torch.cat([x,context], dim=1)
        # print(x.shape,'x')
        z1 = self._encoder(x)
        # print(z1.shape,'enc')
        z = self._pre_vq_conv(z1)
        # print(z.shape,'pre_vq_conv')
        loss, quantized, perplexity, _ = self._vq_vae(z)
        # print(quantized.shape,'quan')
        x = self._decoder(quantized)
        # print(x.shape,'dec')
        emotion = self.mlp(x.view(x.shape[0], -1))
        # return loss, quantized, perplexity
        emotion = F.softmax(emotion, dim=1)
        return emotion, z1, loss, perplexity

In [108]:
batch_size = 256
num_training_updates = 15000

num_hiddens = 64 #128
num_residual_hiddens = 32
num_residual_layers = 2

embedding_dim = 128
num_embeddings = 512

output_dim = 20 # 5 emotions

commitment_cost = 1.0#0.25

decay =  0.1

learning_rate = 1e-4

In [109]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [110]:
from torch.utils.data import  DataLoader, Dataset

In [124]:
class AudioDataset(Dataset):
    def __init__(self,df,chunk_length = 1024) -> None:
        super().__init__()
        self.data = df['y'].values
        self.targets = df['targets'].values
        self.chunk_length = chunk_length
    def __len__(self):
        return len(self.data)
    def __getitem__(self, idx):
        y = self.data[idx]
        y, _ = librosa.effects.trim(y)
        # Calculate the number of chunks
        num_chunks = len(y) // self.chunk_length
        # Extract chunks
        chunks = [y[i*self.chunk_length:(i+1)*self.chunk_length] for i in range(num_chunks)]
        # if the chunk only contains zeros, skip it
        # chunks = [chunk for chunk in chunks if np.sum(chunk) != 0]
        return chunks, self.targets[idx]

In [125]:
# split the dataframe into training and validation dataframes
df_train = df.sample(frac=0.8, random_state=0)
df_val = df.drop(df_train.index)

In [126]:
chunk_size = 1024
train_dataset = AudioDataset(df_train,chunk_size)
train_dataloader = DataLoader(train_dataset, batch_size=1, shuffle=True)

val_dataset = AudioDataset(df_val,chunk_size)
val_dataloader = DataLoader(val_dataset, batch_size=1, shuffle=True)


In [127]:
next(iter(val_dataloader))

[[tensor([[-2.0662e-06,  1.8591e-06, -3.1475e-05,  ..., -6.3167e-03,
           -6.7745e-03, -7.3603e-03]]),
  tensor([[-0.0079, -0.0081, -0.0075,  ...,  0.0038,  0.0031,  0.0040]]),
  tensor([[0.0051, 0.0053, 0.0045,  ..., 0.0044, 0.0024, 0.0019]]),
  tensor([[ 0.0012, -0.0006, -0.0019,  ...,  0.0096,  0.0118,  0.0120]]),
  tensor([[ 0.0104,  0.0081,  0.0057,  ..., -0.0058, -0.0025,  0.0005]]),
  tensor([[ 0.0036,  0.0056,  0.0071,  ..., -0.0214, -0.0209, -0.0194]]),
  tensor([[-0.0166, -0.0150, -0.0140,  ..., -0.0198, -0.0220, -0.0228]]),
  tensor([[-0.0210, -0.0205, -0.0204,  ..., -0.0029, -0.0019, -0.0005]]),
  tensor([[-0.0004, -0.0007,  0.0010,  ...,  0.0046,  0.0047,  0.0059]]),
  tensor([[ 0.0058,  0.0060,  0.0094,  ..., -0.0079, -0.0057, -0.0014]]),
  tensor([[-0.0001, -0.0005, -0.0019,  ..., -0.0034, -0.0053, -0.0062]]),
  tensor([[-0.0049, -0.0023, -0.0014,  ..., -0.0053, -0.0111, -0.0157]]),
  tensor([[-0.0183, -0.0207, -0.0224,  ...,  0.0085,  0.0065,  0.0046]]),
  tensor(

In [128]:
model = Model(chunk_size,num_hiddens, num_residual_layers, num_residual_hiddens,
              num_embeddings, embedding_dim,output_dim,
              commitment_cost, decay)
ngpu = torch.cuda.device_count()
if ngpu > 1:
    model =  nn.DataParallel(model)
model = model.to(device)

In [129]:
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
loss_fn = nn.CrossEntropyLoss()

In [145]:
epochs = 100
model.train()
gradient_accumulation_steps = 16
for epoch in range(epochs):
    epoch_loss = 0.0
    for idx, (x, y) in tqdm(enumerate(train_dataloader),total=len(train_dataloader),dynamic_ncols=True):
            y = y.to(device).long()
            context = None
            VQ_loss = 0.0
            Loss = 0.0
            for chunk in x:
                chunk = torch.tensor(chunk).unsqueeze(0).float().to(device)
                emotion, context, vq_loss, perplexity = model(chunk,context)
                VQ_loss += vq_loss
                Loss += loss_fn(emotion, y)
            
            loss = (Loss + VQ_loss)/len(x)
            loss = loss / gradient_accumulation_steps
            loss.backward()
            epoch_loss += loss.item()
            if (idx+1) % gradient_accumulation_steps == 0:
                optimizer.step()
                optimizer.zero_grad()
            if idx % 1 == 0:
                correct = 0
                total = 0
                with torch.no_grad():
                    for x, y in val_dataloader:
                        y = y.to(device).long()
                        context = None
                        for chunk in x:
                            chunk = torch.tensor(chunk).unsqueeze(0).float().to(device)
                            emotion, context, vq_loss, perplexity = model(chunk,context)
                            Loss += loss_fn(emotion, y)
                            predicted = torch.argmax(emotion.data)
                            total += 1
                            correct += (predicted == y).sum().item()
                print(f'batch:{idx +1 }, Loss: {loss.item()} , Accuracy: {correct/total}')
    
    epoch_loss = epoch_loss / len(train_dataloader)
    print(f'Epoch : [{epoch}/{epochs}], Loss: {epoch_loss}')
    state_dict = {'model': model.state_dict(),'epoch': epoch,'epoch_loss': epoch_loss}
    torch.save(state_dict, f'./model_VQ_Emo.pth')

  1%|          | 1/160 [01:01<2:41:39, 61.00s/it]

batch:1, Loss: 0.18692758679389954 , Accuracy: 0.002187120291616039


  1%|          | 1/160 [01:11<3:08:56, 71.30s/it]


KeyboardInterrupt: 

In [131]:
for x, y in val_dataloader:
    y = y.to(device).long()
    context = None
    for chunk in x:
        chunk = torch.tensor(chunk).unsqueeze(0).float().to(device)
        emotion, context, vq_loss, perplexity = model(chunk,context)
        Loss += loss_fn(emotion, y)

TypeError: iteration over a 0-d tensor

In [144]:

torch.argmax(emotion.data).item(), y.item()


(13, 15)

15

tensor([15], device='cuda:0')