In [412]:
import os
import torch
import torchaudio
import pandas as pd
import numpy as np
import sklearn
from sklearn.preprocessing import OneHotEncoder


import matplotlib.pyplot as plt
from IPython.display import Audio, display

In [413]:
def print_stats(waveform, sample_rate=None, src=None):
  if src:
    print("-" * 10)
    print("Source:", src)
    print("-" * 10)
  if sample_rate:
    print("Sample Rate:", sample_rate)
  print("Shape:", tuple(waveform.shape))
  print("Dtype:", waveform.dtype)
  print(f" - Max:     {waveform.max().item():6.3f}")
  print(f" - Min:     {waveform.min().item():6.3f}")
  print(f" - Mean:    {waveform.mean().item():6.3f}")
  print(f" - Std Dev: {waveform.std().item():6.3f}")
  print()
  print(waveform)
  print()

def plot_waveform(waveform, sample_rate, title="Waveform", xlim=None, ylim=None):
  waveform = waveform.numpy()

  num_channels, num_frames = waveform.shape
  time_axis = torch.arange(0, num_frames) / sample_rate

  figure, axes = plt.subplots(num_channels, 1)
  if num_channels == 1:
    axes = [axes]
  for c in range(num_channels):
    axes[c].plot(time_axis, waveform[c], linewidth=1)
    axes[c].grid(True)
    if num_channels > 1:
      axes[c].set_ylabel(f'Channel {c+1}')
    if xlim:
      axes[c].set_xlim(xlim)
    if ylim:
      axes[c].set_ylim(ylim)
  figure.suptitle(title)
  plt.show(block=False)

def plot_specgram(waveform, sample_rate, title="Spectrogram", xlim=None):
  waveform = waveform.numpy()

  num_channels, num_frames = waveform.shape
  time_axis = torch.arange(0, num_frames) / sample_rate

  figure, axes = plt.subplots(num_channels, 1)
  if num_channels == 1:
    axes = [axes]
  for c in range(num_channels):
    axes[c].specgram(waveform[c], Fs=sample_rate)
    if num_channels > 1:
      axes[c].set_ylabel(f'Channel {c+1}')
    if xlim:
      axes[c].set_xlim(xlim)
  figure.suptitle(title)
  plt.show(block=False)

def play_audio(waveform, sample_rate):
  waveform = waveform.numpy()

  num_channels, num_frames = waveform.shape
  if num_channels == 1:
    display(Audio(waveform[0], rate=sample_rate))
  elif num_channels == 2:
    display(Audio((waveform[0], waveform[1]), rate=sample_rate))
  else:
    raise ValueError("Waveform with more than 2 channels are not supported.")

def _get_sample(path, resample=None):
  effects = [
    ["remix", "1"]
  ]
  if resample:
    effects.extend([
      ["lowpass", f"{resample // 2}"],
      ["rate", f'{resample}'],
    ])
  return torchaudio.sox_effects.apply_effects_file(path, effects=effects)

def inspect_file(path):
  print("-" * 10)
  print("Source:", path)
  print("-" * 10)
  print(f" - File size: {os.path.getsize(path)} bytes")
  print(f" - {torchaudio.info(path)}")

In [414]:
import glob
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import time
import random
import copy
from torch.utils.data import Dataset, DataLoader

In [415]:
device = torch.device('cuda')

SEED = 1234
random.seed(SEED)
torch.manual_seed(SEED)

<torch._C.Generator at 0x7f66f53686b0>

In [416]:
AUDIO_PATH = "./dcase_synth/audio/"
META_PATH = "./dcase_synth/metadata/"
TRAIN_PATH = "train/synthetic21_train/"
VALID_PATH = "validation/synthetic21_validation/"
BATCH_SIZE = 16
SEQ_LEN = 100
INPUT_DIM = 4410
HIDDEN_DIM = 128
OUTPUT_DIM = 10
N_LAYERS = 2
BIDIRECTIONAL = True
DROPOUT = 0.25
NUM_EPOCHS = 3
LEARNING_RATE = 1e-3

class AudioDataset(Dataset):
    def __init__(self, train = True, transform = None):
        if train :
            self.audio_path = AUDIO_PATH + TRAIN_PATH
            self.meta_path = META_PATH + TRAIN_PATH 
        else:
            self.audio_path = AUDIO_PATH + VALID_PATH
            self.meta_path = META_PATH + VALID_PATH
            
        audio_list = []
        fileLen = len(glob.glob(self.audio_path + "soundscapes/*.wav"))

        for fileNum in range(fileLen):
            audio_list.append(self.audio_path + f"soundscapes/{fileNum}.wav")
        self.audio_list = audio_list
        self.transform = transform

        vanilabels = []
        for i in range(SEQ_LEN):
            label = [0 for j in range(10)]
            vanilabels.append(label)

        meta_df = pd.read_csv(self.meta_path + "soundscapes.tsv", sep="\t")
        labelList = []
        ohe = OneHotEncoder()
        ohe.fit(meta_df[["event_label"]])

        for fileNum in range(len(self.audio_list)):
            metaData = meta_df[meta_df["filename"] == f"{fileNum}.wav"][["onset", "offset", "event_label"]].round(1).to_numpy()
            
            labels = copy.deepcopy(vanilabels)

            for datum in metaData:
                event_label = ohe.transform([[datum[2]]]).toarray()[0]

                for k in range(int(datum[1]-datum[0])*10):
                    labels[int(datum[0]*10)+k] = [x+y for x,y in zip(labels[int(datum[0]*10)+k], event_label)]
                    
            labelList.append(labels)

        self.label_list = torch.tensor(labelList)

    def __len__(self):
        return len(self.audio_list)

    def __getitem__(self, idx):
        audio_path = self.audio_list[idx]

        audio, sample_rate = torchaudio.load(audio_path)
        label = self.label_list[idx]
        
        if self.transform is not None:
            audio = self.transform(audio)

        return audio.reshape(SEQ_LEN, -1), label


In [418]:
trainDataset = AudioDataset(train= True)

trainDataLoader = DataLoader(dataset= trainDataset, batch_size= BATCH_SIZE, shuffle= True)



In [419]:
class AudioLSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, n_layers, bidirectional, dropout): 
        super().__init__()
        self.rnn = nn.LSTM(input_dim, hidden_dim, num_layers = n_layers, batch_first= True, bidirectional = bidirectional)
        self.fc = nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, output_dim)
        self.sig = nn.Sigmoid()
        self.dropout = nn.Dropout(dropout)

        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.num_layers = n_layers
        self.bidirectional = bidirectional

    def forward(self, source):
        source = self.dropout(source)
        # source = [BATCH_SIZE, seq = 100(0.1s), input_size = 441000/100 = 4410]
        
        outputs, _ = self.rnn(source)
        # output = [BATCH_SIZE, seq, hidden_dim * n directions]
        # hidden/cell = [m layers * n directions, BATCH_SIZE, hidden_dim]

        outputs = self.dropout(outputs)

        predictions = self.fc(outputs)
        predictions = self.sig(predictions)
        # predictions = [batch size, seq, output dim]

        return predictions

    def predict(self, source):
        outputs, (hidden, cell) = self.rnn(source)

        predictions = self.fc(outputs)
        predictions = self.sig(predictions)

        predictions = predictions.round()
         
        return predictions

In [428]:
model = AudioLSTM(INPUT_DIM, HIDDEN_DIM, OUTPUT_DIM, N_LAYERS, BIDIRECTIONAL, DROPOUT)

criterion = nn.NLLLoss()
optimizer = torch.optim.Adam(model.parameters(), lr= LEARNING_RATE)

total_step = len(trainDataLoader)
for epoch in range(NUM_EPOCHS):
    for i, (audio, label) in enumerate(trainDataLoader):

        outputs = model(audio)
        # outputs = torch.flatten(outputs, start_dim=1)
        # labels = torch.flatten(label, start_dim=1)
        loss = 0 
        print(outputs[j].shape)

        for j in range(BATCH_SIZE):
            loss += criterion(outputs[j], label[j])

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if (i+1) % 100 == 0:
            print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(
             epoch+1, NUM_EPOCHS, i+1, total_step, loss.item()))

torch.Size([100, 10])


RuntimeError: 0D or 1D target tensor expected, multi-target not supported