<a href="https://colab.research.google.com/github/arkaprabha10/Samsung-PRISM/blob/main/Pytorch_VAE_Conv.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import torch
from torch.autograd import Variable
from torch.nn import Sequential, Conv2d, Linear, BatchNorm2d, Dropout, Softmax, Tanh, BCELoss, ReLU, Module, ConvTranspose2d, Flatten, Sigmoid
from torch.optim import Adam
import numpy as np
import matplotlib.pyplot as plt
import librosa.display
import IPython.display as ipd
from sklearn.model_selection import train_test_split

In [None]:
input_length = 22050*5
def load_audio_file(file_path, input_length=input_length):
    data = librosa.core.load(file_path, sr=22050)[0] #, sr=16000
    if len(data)>input_length:
        
        
        max_offset = len(data)-input_length
        
        offset = np.random.randint(max_offset)
        
        data = data[offset:(input_length+offset)]
        
        
    else:
        if input_length > len(data):
            max_offset = input_length - len(data)

            offset = np.random.randint(max_offset)
        else:
            offset = 0
        
        
        data = np.pad(data, (offset, input_length - len(data) - offset), "constant")
        
        
    data = preprocess_audio_mel_T(data)
    return data

In [None]:
n_mels = 80
def preprocess_audio_mel_T(signal, sample_rate=22050, window_size=20, #log_specgram
                 step_size=10, eps=1e-10):
    # pre_emphasis = 0.97
    # emphasized_signal = np.append(signal[0], signal[1:] - pre_emphasis * signal[:-1])
    # frame_size = 0.025
    # frame_stride = 0.01
    # frame_length, frame_step = frame_size * sample_rate, frame_stride * sample_rate  # Convert from seconds to samples
    # signal_length = len(emphasized_signal)
    # frame_length = int(round(frame_length))
    # frame_step = int(round(frame_step))
    # num_frames = int(np.ceil(float(np.abs(signal_length - frame_length)) / frame_step))  # Make sure that we have at least 1 frame

    # pad_signal_length = num_frames * frame_step + frame_length
    # z = np.zeros((pad_signal_length - signal_length))
    # pad_signal = np.append(emphasized_signal, z)
    # for i in range (0,len(pad_signal),frame_step):
    #   temp=pad_signal[i:frame_length+i]
    #   temp*=np.hanning(len(temp))
    #   for j in range(len(temp)):
    #     pad_signal[i+j]=temp[j]

    # ans=librosa.core.stft(y=signal, win_length=int(0.025*sample_rate), hop_length=int(0.00971*sample_rate), n_fft = 2046)
    ans = librosa.feature.melspectrogram(signal, sr = sample_rate, n_mels = 80)
    return ans

In [None]:
x_in = []
count_dir = 0
count_file = 0
Data_Path = '/content/drive/MyDrive/speech_data'
with os.scandir(Data_Path) as entries:
  # for entry in entries:
  #   if count_dir == 1:
  #     break
  #   count_dir +=1
#   #     with os.scandir(entry) as d:
#   #       if d.is_dir():
    # with os.scandir(entry) as files:
  for file_ in entries:
    if count_file == 30:
      break
    count_file+=1
    data = load_audio_file(file_)
    x_in.append(data.tolist())
print('Data loaded successfully!')

Data loaded successfully!


In [None]:
x_in = np.expand_dims(x_in, -1).astype('float32')
max = np.amax(x_in)
mean = np.mean(x_in)
x_in = x_in/max

In [None]:
x_in = x_in - mean/max

In [None]:
x_train = x_in[:12]
print(x_train.shape)
x_test = x_in[12:]
print(x_test.shape)

In [None]:
x_train = torch.from_numpy(x_train)
x_test = torch.from_numpy(x_test)
print(x_train.shape)

torch.Size([12, 80, 216, 1])


In [None]:
train_x = x_train
test_x = x_test

print(train_x.shape)

torch.Size([12, 80, 216, 1])


In [None]:
class VAE(Module):
  def __init__(self):
    super(VAE, self).__init__()

    #encoder
    self.enc1= Sequential(
        Conv2d(80, 128, kernel_size=(3,1), stride = (1,1)),
        ReLU(inplace=True),
        BatchNorm2d(128),
        Conv2d(128, 256, kernel_size=(3,1), stride = (2,1)),
        ReLU(inplace=True),
        BatchNorm2d(256),
        Conv2d(256, 512, kernel_size=(3,1), stride = (2,1)),
        ReLU(inplace=True),
        BatchNorm2d(512),
        Flatten()
    )
    self.enc2 = Sequential(
        Linear(in_features=512, out_features=128),
        ReLU(inplace=True),
        BatchNorm2d(128),
    )
     
    def Sampling(self, mu, log_var):
      std = torch.exp(0.5*log_var) #std_deviation
      eps = torch.randn_like(std)
      sample = mu + (eps*std)
      return sample
    
    #decoder
    self.dec1 = Sequential(
        Linear(in_features=128, out_features=20*54*256),
        BatchNorm2d(20*54*256),
    )
    self.dec2 = Sequential(
        Conv2d(256, 128, kernel_size=(3,1), stride = (2,1)),
        ReLU(inplace=True),
        BatchNorm2d(128),
        Conv2d(128, 64, kernel_size=(3,1), stride = (2,1)),
        ReLU(inplace=True),
        BatchNorm2d(64),
        Conv2d(64, 1, kernel_size=(1,3), stride = (1,1)),
        ReLU(inplace=True),
        BatchNorm2d(1),
    )
    
  def forward(self, x):
    x = self.enc1(x)
    x = self.enc2(x).view(-1, 2, 128)

    mu = x[:, 0, :]

    log_var = x[:, 1, :]

    z = self.Sampling(mu, log_var)

    x = self.dec1(z)
    x = torch.reshape(x,(20,54,256))
    reconstruction = self.dec2(x)

    return reconstruction, mu, log_var

In [None]:
model = VAE()
# defining the optimizer
optimizer = Adam(model.parameters(), lr=1e-4)
# defining the loss function
criterion = BCELoss()
# checking if GPU is available
if torch.cuda.is_available():
    model = model.cuda()
    criterion = criterion.cuda()
    
print(model)

VAE(
  (enc1): Sequential(
    (0): Conv2d(80, 128, kernel_size=(3, 1), stride=(1, 1))
    (1): ReLU(inplace=True)
    (2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (3): Conv2d(128, 256, kernel_size=(3, 1), stride=(2, 1))
    (4): ReLU(inplace=True)
    (5): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): Conv2d(256, 512, kernel_size=(3, 1), stride=(2, 1))
    (7): ReLU(inplace=True)
    (8): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (9): Flatten(start_dim=1, end_dim=-1)
  )
  (enc2): Sequential(
    (0): Linear(in_features=512, out_features=128, bias=True)
    (1): ReLU(inplace=True)
    (2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (dec1): Sequential(
    (0): Linear(in_features=128, out_features=276480, bias=True)
    (1): BatchNorm2d(276480, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
 

In [None]:
def train(epoch):
    model.train()
    tr_loss = 0
    # getting the training set
    x_train, y_train = Variable(train_x), Variable(train_x)
    # getting the validation set
    x_val, y_val = Variable(test_x), Variable(test_x)
    # converting the data into GPU format
    if torch.cuda.is_available():
        x_train = x_train.cuda()
        y_train = y_train.cuda()
        x_val = x_val.cuda()
        y_val = y_val.cuda()

    # clearing the Gradients of the model parameters
    optimizer.zero_grad()
    
    # prediction for training and validation set
    output_train = model(x_train)
    output_val = model(x_val)

    # computing the training and validation loss
    loss_train = criterion(output_train, y_train)
    loss_val = criterion(output_val, y_val)
    train_losses.append(loss_train)
    val_losses.append(loss_val)

    # computing the updated weights of all the model parameters
    loss_train.backward()
    optimizer.step()
    tr_loss = loss_train.item()
    if epoch%2 == 0:
        # printing the validation loss
        print('Epoch : ',epoch+1, '\t', 'loss :', loss_val)

In [None]:
n_epochs = 25
# empty list to store training losses
train_losses = []
# empty list to store validation losses
val_losses = []
# training the model
for epoch in range(n_epochs):
    train(epoch)

RuntimeError: ignored