<a href="https://colab.research.google.com/github/daewoung/DLForMusicAndAudio_Study/blob/main/MIR_Assignment2_Music_AutoTagging.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
DEV = 'cuda'

In [2]:
import torch
import torch.nn as nn
import torchaudio
from tqdm import tqdm
import pandas as pd
from pathlib import Path
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import IPython.display as ipd
from datetime import datetime

def save_fig_with_date(figname):
  plt.savefig(f"{figname}_{datetime.now().strftime('%m_%d_%H_%M_%S')}.png")

In [3]:
!pip install gdown
!gdown --id 15e9E3oZdudErkPKwb0rCAiZXkPxdZkV6
!unzip -q mtat_8000.zip

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Downloading...
From: https://drive.google.com/uc?id=15e9E3oZdudErkPKwb0rCAiZXkPxdZkV6
To: /content/mtat_8000.zip
100% 921M/921M [00:07<00:00, 121MB/s] 


In [18]:
!pip install torchaudio==0.9.0

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting torchaudio==0.9.0
  Downloading torchaudio-0.9.0-cp37-cp37m-manylinux1_x86_64.whl (1.9 MB)
[K     |████████████████████████████████| 1.9 MB 12.2 MB/s 
[?25hCollecting torch==1.9.0
  Downloading torch-1.9.0-cp37-cp37m-manylinux1_x86_64.whl (831.4 MB)
[K     |████████████████████████████████| 831.4 MB 2.7 kB/s 
Installing collected packages: torch, torchaudio
  Attempting uninstall: torch
    Found existing installation: torch 1.12.0+cu113
    Uninstalling torch-1.12.0+cu113:
      Successfully uninstalled torch-1.12.0+cu113
  Attempting uninstall: torchaudio
    Found existing installation: torchaudio 0.12.0+cu113
    Uninstalling torchaudio-0.12.0+cu113:
      Successfully uninstalled torchaudio-0.12.0+cu113
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency 

In [100]:
torchaudio.__version__

'0.9.0'

In [4]:
'''
You don't have to change this cell
'''
class MTATDataset:
  def __init__(self, dir_path, split='train', num_max_data=4000, sr=16000):
    self.dir = Path(dir_path)
    self.labels = pd.read_csv(self.dir / "meta.csv", index_col=[0])
    self.sr = sr

    if split=="train":
      sub_dir_ids = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c']
    elif split=='valid':
      sub_dir_ids = ['d']
    else: #test
      sub_dir_ids = ['e', 'f', 'g']

    is_in_set = [True if x[0] in sub_dir_ids else False for x in self.labels['mp3_path'].values.astype('str')]
    self.labels = self.labels.iloc[is_in_set]
    self.labels = self.labels[:num_max_data]
    self.vocab = self.labels.columns.values[1:-1]
    self.label_tensor = self.convert_label_to_tensor()
  
  def convert_label_to_tensor(self):
    return torch.LongTensor(self.labels.values[:, 1:-1].astype('bool'))

  def __len__(self):
    return len(self.labels)
  

MTAT_DIR = Path('MTAT_SMALL/')

In [80]:
'''
Check how baseline dataset looks like
'''

base_set = MTATDataset(MTAT_DIR)

'''
metadata of dataset is stored in self.labels
'''
base_set.label_tensor[1]

tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0])

In [10]:
'''
You can use labels['mp3_path'].iloc
'''
target_idx = 0 


path_to_target_idx = base_set.labels['mp3_path'].iloc[target_idx]
path_to_target_idx = MTAT_DIR / path_to_target_idx
print(path_to_target_idx)
y, sr = torchaudio.load(path_to_target_idx)
#torchaudio.functional.resample(orig_freq = sr, 16000)


MTAT_SMALL/2/zephyrus-angelus-11-ave_maria__virgo_serena_josquin_des_prez-0-29.mp3


In [101]:

'''
label of each tensor is also stored in self.label_tensor
'''
base_set.vocab[torch.where(base_set.label_tensor[0])]



array(['female', 'quiet', 'choir'], dtype=object)

#Problem 1. Complete Dataset Class


# 일반적인 방법

In [111]:
class OnTheFlyDataset(MTATDataset):
  def __init__(self, dir_path, split='train', num_max_data=4000, sr=16000):
    super().__init__(dir_path, split, num_max_data, sr)
    
  def __getitem__(self, idx):
    #load data - audio, sample rate
    path_to_target_idx = base_set.labels['mp3_path'].iloc[idx]
    path_to_target_idx = self.dir / path_to_target_idx
    audio_sample, r = torchaudio.load(path_to_target_idx)
    #resample
    audio_sample = torchaudio.functional.resample(audio_sample, orig_freq = r, new_freq=self.sr)
    #get label
    label = self.label_tensor[idx]
    return audio_sample[0], label

dummy_set = OnTheFlyDataset(MTAT_DIR, split='train', num_max_data=100)
audio, label = dummy_set[3]
assert audio.ndim == 1, "Number of dimensions of audio tensor has to be 1. Use audio[0] or audio.mean(dim=0) to reduce it"
ipd.display(ipd.Audio(audio, rate=dummy_set.sr))
print(dummy_set.vocab[torch.where(label)])

['drum' 'slow']


# Test

In [152]:
a = dummy_set.labels['mp3_path'].values
data = []
for i in range(len(a)):
  data.append('MTAT_SMALL/' + a[i])
  
print(a[0])
print(data[0])

2/zephyrus-angelus-11-ave_maria__virgo_serena_josquin_des_prez-0-29.mp3
MTAT_SMALL/2/zephyrus-angelus-11-ave_maria__virgo_serena_josquin_des_prez-0-29.mp3


In [167]:
test_dir = Path('MTAT_SMALL/')
i = 0
print(test_dir)
file_list = []
for i in Path(test_dir).glob('**/*.mp3'):
  file_list.append(i)

for i in range(len(file_list)):
  i += 1




audio_sample, r = torchaudio.load(file_list[0])
print(str(file_list[0])+'.pt')

#torch.save({audio_sample,sample}, str(file_list[0])+'.pt')
#a = torch.load(str(file_list[0])+'.pt')
#print(a)

MTAT_SMALL
MTAT_SMALL/e/seth_carlin-mozart_in_the_age_of_enlightenment-11-sonata_15_in_c_minor__andante_georg_benda-117-146.mp3.pt


In [191]:
import os

if(os.path.isfile(Path('MTAT_SMALL/0/satori-sounds_for_meditation-04-meditation_4-465-494.mp3.pt')) == 1):
  print('hi')

hi


In [189]:
if(() == 1):
  print('hi')

SyntaxError: ignored

# .pt 토치파일로 저장하는 방법

In [192]:
class PreProcessDataset(MTATDataset):
  def __init__(self, dir_path, split='train', num_max_data=8000, sr=16000):
    super().__init__(dir_path, split, num_max_data, sr)
    self.pre_process_and_save_data()
    
  def pre_process_and_save_data(self):
      k = self.labels['mp3_path'].values
      self.data_list = []

      for i in range(len(a)):
        self.data_list.append('MTAT_SMALL/' + a[i])


      for i in range(len(self.data_list)):
        if(os.path.isfile(self.data_list[i]) == 0):
          audio_sample, r = torchaudio.load(self.data_list[i])
          audio_sample = torchaudio.functional.resample(audio_sample, orig_freq = r, new_freq=self.sr)
          label = self.label_tensor[i]
          torch.save({audio_sample[0], label}, str(self.data_list[i])+'.pt')

      # self.file_list = []

      # test_dir = self.dir
      # for i in Path(test_dir).glob('**/*.mp3'):
      #   self.file_list.append(i)

      # for i in range(len(self.file_list)):
      #   audio_sample, r = torchaudio.load(self.file_list[i])
      #   audio_sample = torchaudio.functional.resample(audio_sample, orig_freq = r, new_freq=self.sr)
      #   torch.save({audio_sample, i}, str(self.file_list[i])+'.pt')


  def __getitem__(self, idx):
    path_to_target_idx = base_set.labels['mp3_path'].iloc[idx]
    path_to_target_idx = self.dir / path_to_target_idx 
    audio_sample, label = torch.load(str(path_to_target_idx)+'.pt')
    return audio_sample, label

dummy_set = PreProcessDataset(MTAT_DIR, split='train', num_max_data=100)
audio, label = dummy_set[15]
assert audio.ndim == 1, "Number of dimensions of audio tensor has to be 1. Use audio[0] or audio.mean(dim=0) to reduce it"
ipd.display(ipd.Audio(audio, rate=dummy_set.sr))
print(dummy_set.vocab[torch.where(label)])


['guitar' 'male']


# Define Dataset

In [199]:
your_dataset_class = OnTheFlyDataset # One of OnTheFlyDataset, PreProcessDataset, or OnMemoryDataset
# your_dataset_class = OnMemoryDataset
'''
Based on your memory size or storage size, you can change the num_max_data
'''
trainset = your_dataset_class(MTAT_DIR, split='train', num_max_data=4000)
validset = your_dataset_class(MTAT_DIR, split='valid', num_max_data=1000)
testset = your_dataset_class(MTAT_DIR, split='test', num_max_data=2000)

#Data Loader

In [200]:
train_loader = DataLoader(trainset, batch_size=64, shuffle=True, num_workers=2) # you can speed up with num_workers=4 if you have multiple cpu core
valid_loader = DataLoader(validset, batch_size=128, shuffle=False, num_workers=2)
test_loader = DataLoader(testset, batch_size=128, shuffle=False, num_workers=2)

batch = next(iter(train_loader))

# Define Neural Network

In [201]:
class SpecModel(nn.Module):
  def __init__(self, sr, n_fft, hop_length, n_mels):
    super().__init__()
    self.mel_converter = torchaudio.transforms.MelSpectrogram(sample_rate=sr, n_fft=n_fft, hop_length=hop_length, n_mels=n_mels)
    self.db_converter = torchaudio.transforms.AmplitudeToDB()
  
  def forward(self, x):
    mel_spec = self.mel_converter(x)
    return self.db_converter(mel_spec)

class AudioModel(nn.Module):
  def __init__(self, sr, n_fft, hop_length, n_mels, hidden_size, num_output):
    super().__init__()
    self.sr = sr
    self.spec_converter = SpecModel(sr, n_fft, hop_length, n_mels)
    self.conv_layer = nn.Sequential(
      nn.Conv1d(n_mels, out_channels=hidden_size, kernel_size=3),
      nn.MaxPool1d(3),
      nn.ReLU(),
      nn.Conv1d(hidden_size, out_channels=hidden_size, kernel_size=3),
      nn.MaxPool1d(3),
      nn.ReLU(),     
      nn.Conv1d(hidden_size, out_channels=hidden_size, kernel_size=3),
      nn.MaxPool1d(3),
      nn.ReLU(),
    )
    self.final_layer = nn.Linear(hidden_size, num_output)

  def get_spec(self, x):
    '''
    Get result of self.spec_converter
    x (torch.Tensor): audio samples (num_batch_size X num_audio_samples)
    '''
    return self.spec_converter(x)
  
  def forward(self, x):
    spec = self.get_spec(x) # num_batch X num_mel_bins X num_time_bins
    out = self.conv_layer(spec)
    out = torch.max(out, dim=-1)[0] # select [0] because torch.max outputs tuple of (value, index)
    out = self.final_layer(out)
    out = torch.sigmoid(out)
    return out

In [202]:
def get_tpr_fpr(pred, target, threshold=0.5):
  thresh_pred = pred> threshold
  p = torch.sum(target == 1)
  tp = torch.sum((thresh_pred==1) * (target==1))
  n = torch.sum(target == 0)
  fp = torch.sum((thresh_pred==1) * (target==0))
  return tp/p, fp/n

def get_roc_auc(pred, label, num_grid=500):
  auc = 0
  prev_fpr = 0
  for thresh in reversed(torch.linspace(0,1,num_grid)):
    tpr, fpr = get_tpr_fpr(pred, label, threshold=thresh)
    auc += tpr * (fpr-prev_fpr)
    prev_fpr = fpr
  return auc

def train_model(model, train_loader, valid_loader, optimizer, num_epochs, loss_func, device='cuda'):
  loss_records =[] 
  valid_acc_records = []
  model.vocab = train_loader.dataset.vocab
  model.train() # Set model to train mode
  for epoch in tqdm(range(num_epochs)):
    for batch in train_loader:
      optimizer.zero_grad() # Rest gradient of every parameters in optimizer (every parameters in the model)
      audio, label = batch
      audio = audio.to(device)
      label = label.to(device)
      pred = model(audio)
      loss = loss_func(pred, label.float())
      loss.backward() # Run backpropagation
      optimizer.step() # Update parameters
      loss_records.append(loss.item())
    valid_acc = validate_model(model, valid_loader, device)
    valid_acc_records.append(valid_acc.item())
  return {"loss": loss_records, "valid_acc": valid_acc_records}

def validate_model(model, valid_loader, device, acc_func=get_roc_auc):
  valid_acc = 0
  model.eval()
  model.to(device)
  with torch.no_grad():
    for batch in valid_loader:
      audio, label = batch
      pred = model(audio.to(device))
      auc = acc_func(pred, label.to(device))
      valid_acc += auc * len(label)
  model.train()
  return valid_acc / len(valid_loader.dataset)

In [None]:
'''
Train the default model
'''

model = AudioModel(sr=16000, n_fft=1024, hop_length=512, n_mels=48, num_output=50, hidden_size=32)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
model = model.to(DEV)
loss_func = torch.nn.BCELoss()
train_record = train_model(model, train_loader, valid_loader, optimizer, num_epochs=30, loss_func=loss_func, device=DEV)

  return torch.max_pool1d(input, kernel_size, stride, padding, dilation, ceil_mode)
 90%|█████████ | 27/30 [35:39<03:50, 76.84s/it]

In [None]:
plt.plot(train_record['loss'])
save_fig_with_date('default_train_loss')

In [None]:
plt.plot(train_record['valid_acc'])
save_fig_with_date('default_train_valid_acc')

# Problem 2. Practice with nn.Sequential() (5 pts)

In [None]:
class StackManualLayer(nn.Module):
  def __init__(self):
    super().__init__()
    self.layer1 = nn.Conv1d(16, 4, kernel_size=2)
    self.activation = nn.Sigmoid()
    self.layer2 = nn.Conv1d(4, 4, kernel_size=2)
    self.layer3 = nn.Conv1d(4, 1, kernel_size=2)
    
  def forward(self, x):
    out = self.layer1(x)
    out = self.activation(out)
    out = self.layer2(out)
    out = self.activation(out)
    out = self.layer3(out)
    return out

'''
TODO: Complete this nn.Sequential so that it computes exactly same thing with StackManualLayer
'''
class SequentialLayer(nn.Module):
  def __init__(self):
    super().__init__()
    self.layers = nn.Sequential(
                      nn.Conv1d(16,4, kernel_size = 2),
                      nn.Sigmoid(),
                      nn.Conv1d(4,4, kernel_size = 2),
                      nn.Sigmoid(),
                      nn.Conv1d(4,1, kernel_size = 2),
    )
  def forward(self, x):
    out = self.layers(x)
    return out
  
# Do not change the code below
torch.manual_seed(0)
manual_layer = StackManualLayer()
torch.manual_seed(0)
sequential_layer = SequentialLayer()

'''
The printed result has to be same
'''

test_dummy = torch.arange(128).view(1,16,8).float()
manual_out = manual_layer(test_dummy)
print(f"Output with Manual Stack Layer: {manual_out}")
sequential_out = sequential_layer(test_dummy)
print(f"Output with Sequential Layer: {sequential_out}")

# Problem 3. Make Your Own Conv Layers (15 pts)


In [None]:
class YourModel(AudioModel):
  def __init__(self, sr, n_fft, hop_length, n_mels, hidden_size, num_output):
    super().__init__(sr, n_fft, hop_length, n_mels, hidden_size, num_output)
    self.conv_layer = nn.Sequential(
      
    )

In [None]:
your_model = YourModel(sr=16000, n_fft=1024, hop_length=512, n_mels=48, num_output=50, hidden_size=32)
optimizer = torch.optim.Adam(your_model.parameters(), lr=1e-3)
your_model = your_model.to(DEV)
your_train_record = train_model(your_model, train_loader, valid_loader, optimizer, num_epochs=30, loss_func=loss_func, device=DEV)

## Save the figure with comparison of default setting
plt.figure(figsize=(8,16))
plt.subplot(2,1,1)
plt.plot(train_record['loss'])
plt.plot(your_train_record['loss'])
plt.subplot(2,1,2)
plt.plot(train_record['valid_acc'])
plt.plot(your_train_record['valid_acc'])
save_fig_with_date('your_conv_layer_comparison_with_default')