<a href="https://colab.research.google.com/github/hahngyuri/CollegeCourse/blob/main/(test)MIR_Assignment_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Assignment 2: Music Auto-tagging Model
- In this assignment, you will train your auto-tagging model using PyTorch
- The dataset is from MagnaTagATune
  - Randomly selected 8000 mp3 files
  - 5000 files for training, 1000 for validation, 2000 for test  
- Every code cell before the Problem 0 has to be ran without modification or error
- You have to submit three files:
  - Notebook in ipynb
  - `MIR_Assignment_2_{your_student_id}` file of the completed code
    - {} is placeholder for your student id. Do not include {}
  - Model file in pt
    - `your_model_best_{your_student_id}.pt`

- Problem 1: Complete three dataset classes (16 pts)
- Probelm 2: Train your own model (15 pts)
- Problem 3: Implement Convolutional Neural Network (20 pts)
- Problem 4: Complete Binary Cross Entropy function (4 pts)
- Problem 5: Complete Precision-Recall Area Under Curve function (20 pts)
- Problem 6: Find the best threshold (15 pts)
- Problem 7: Load audio and make prediction (10 pts)

## 0. Import Library

In [None]:
DEV = 'cuda' # select your device 'cpu' or 'cuda'

from datetime import datetime
from pathlib import Path
from typing import List, Tuple, Union, Callable

import torch
import torch.nn as nn
import torchaudio
from torch.utils.data import DataLoader
from tqdm import tqdm
import pandas as pd
import matplotlib.pyplot as plt
import IPython.display as ipd

def save_fig_with_date(figname:str):
  plt.savefig(f"{figname}_{datetime.now().strftime('%m_%d_%H_%M_%S')}.png")

- Download dataset from Google Drive link and Unzip at `MTAT_SMALL/`
  - You can also download it from [OneDrive Link](https://sogang365-my.sharepoint.com/:u:/g/personal/dasaem_jeong_o365_sogang_ac_kr/EdkHWV-qvxBEi-d0Ua73VG4BEp7EZO7HMvrXsWqeJvMJzg?e=Yi4jf0)


In [None]:
!pip install --upgrade gdown
!gdown --id 15e9E3oZdudErkPKwb0rCAiZXkPxdZkV6
!unzip -q mtat_8000.zip

Downloading...
From (original): https://drive.google.com/uc?id=15e9E3oZdudErkPKwb0rCAiZXkPxdZkV6
From (redirected): https://drive.google.com/uc?id=15e9E3oZdudErkPKwb0rCAiZXkPxdZkV6&confirm=t&uuid=8b9580fa-b44c-4cb4-8c46-56c73e2cc0c4
To: /content/mtat_8000.zip
100% 921M/921M [00:13<00:00, 67.9MB/s]


## Problem 1. Complete Dataset Class (21 pts)
- In this problem, you have to implement three ways to load the data
    - 1) Load audio file and resample every time the data is called
    - 2) Save pre-processed data in .pt file and load it every time the data is called
    - 3) Load every audio file on memory before the training starts

In [None]:
'''
You don't have to change this cell
'''
class MTATDataset:
  def __init__(self, dir_path:str, split:str='train', num_max_data:int=4000, sr:int=16000):
    self.dir = Path(dir_path)
    self.labels = pd.read_csv(self.dir / "meta.csv", index_col=[0])
    self.sr = sr

    if split=="train":
      sub_dir_ids = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c']
    elif split=='valid':
      sub_dir_ids = ['d']
    else: #test
      sub_dir_ids = ['e', 'f', 'g']

    is_in_set = [True if x[0] in sub_dir_ids else False for x in self.labels['mp3_path'].values.astype('str')]
    self.labels = self.labels.iloc[is_in_set]
    self.labels = self.labels[:num_max_data]
    self.vocab = self.labels.columns.values[1:-1]
    self.label_tensor = self.convert_label_to_tensor()

  def convert_label_to_tensor(self):
    return torch.LongTensor(self.labels.values[:, 1:-1].astype('bool'))

  def __len__(self):
    return len(self.labels)


MTAT_DIR = Path('MTAT_SMALL/')

In [None]:
'''
Check how baseline dataset looks like
'''

base_set = MTATDataset(MTAT_DIR)

'''
metadata of dataset is stored in self.labels
'''
base_set.labels

Unnamed: 0,clip_id,singer,harpsichord,sitar,heavy,foreign,no piano,classical,female,jazz,...,rock,dance,cello,techno,flute,beat,soft,choir,baroque,mp3_path
20552,45147,0,0,0,0,0,0,0,1,0,...,0,0,0,0,0,0,0,1,0,2/zephyrus-angelus-11-ave_maria__virgo_serena_...
3899,8539,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,a/tilopa-pictures_of_silence-02-ni-175-204.mp3
8996,19647,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,5/arthur_yoria-of_the_lovely-04-several_mistak...
4055,8856,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,8/stargarden-music_for_modern_listening-02-per...
6361,13834,0,0,0,0,0,0,1,0,0,...,0,0,0,0,0,0,0,0,0,a/dac_crowell-the_mechanism_of_starlight-03-me...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
15397,33729,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,4/jami_sieber-second_sight-07-the_goats_earth-...
19285,42374,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,9/self_delusion-happiness_hurts_me-10-dead_sta...
4099,8934,0,0,1,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,c/jamie_janover-now_center_of_time-02-playa-20...
18897,41453,0,0,0,0,0,0,0,0,0,...,0,0,0,0,1,0,0,0,0,2/jesse_manno-sea_spirits-09-tidur-59-88.mp3


In [None]:
'''
You can use labels['mp3_path'].iloc
'''
target_idx = 0

path_to_target_idx = base_set.labels['mp3_path'].iloc[target_idx]
print(path_to_target_idx)

2/zephyrus-angelus-11-ave_maria__virgo_serena_josquin_des_prez-0-29.mp3


In [None]:
'''
label of each tensor is also stored in self.label_tensor
'''
base_set.label_tensor

tensor([[0, 0, 0,  ..., 0, 1, 0],
        [0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0],
        ...,
        [0, 0, 1,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 1]])

In [None]:
class OnTheFlyDataset(MTATDataset):
  def __init__(self, dir_path:str, split:str='train', num_max_data:int=4000, sr:int=16000):
    super().__init__(dir_path, split, num_max_data, sr)

  def __getitem__(self, idx):
    '''
    __getitem__ returns a corresponding idx-th data sample among the dataset.
    In music-tag dataset, it has to return (audio_sample, label) of idx-th data.

    OnTheFlyDataset loads the audio file whenever this __getitem__ function is called.
    In this function, you have to implement these things

    1) Get the file path of idx-th data sample (use self.labels['mp3_path'])
    2) Load the audio of that file path
    3) Resample the audio sample into frequency of self.sr (You can use torchaudio.functional.resample)
    4) Return resampled audio sample and the label (tag data) of the data sample

    Output
      audio_sample (torch.FloatTensor):
      label (torch.FloatTensor): A tensor with shape of 50 dimension. Each dimension has value either 0 or 1
                                 If n-th dimension's value is 1, it means n-th tag is True for this data sample

    TODO: Complete this function
    '''
    file_path = self.dir / self.labels['mp3_path'].iloc[idx]
    audio, original_sr = torchaudio.load(file_path)
    audio_sample = torchaudio.functional.resample(audio, orig_freq=original_sr, new_freq=self.sr).mean(dim=0)
    label = self.label_tensor[idx].float()

    return audio_sample, label

dummy_set = OnTheFlyDataset(MTAT_DIR, split='train', num_max_data=100)
audio, label = dummy_set[0]
assert audio.ndim == 1, "Number of dimensions of audio tensor has to be 1. Use audio[0] or audio.mean(dim=0) to reduce it"
assert len(audio) == 465984, "Audio tensor has wrong shape"
assert label.ndim == 1, "Number of dimensions of label tensor has to be 1"

print("Complete!")

ipd.display(ipd.Audio(audio, rate=dummy_set.sr))
print(dummy_set.vocab[torch.where(label)])

Complete!


['female' 'quiet' 'choir']


In [None]:
class PreProcessDataset(MTATDataset):
  def __init__(self, dir_path:str, split:str='train', num_max_data:int=4000, sr:int=16000):
    super().__init__(dir_path, split, num_max_data, sr)

    self.pre_process_and_save_data()

  def pre_process_and_save_data(self):
    '''
    self.pre_process_and_save_data loads every audio sample in the dataset, resample it, and save it into pt file.
    In this function, you have to implement these things

    1) For every data sample in the dataset, check whether pre-processed data already exists
      - You can get data sample path by self.labels['mp3_path'].values
      - path of pre-processed data can be in the same directory, but with different suffix.
      - You can make it with Path(mp3_path).with_suffix('.pt')
    2) If it doesn't exist, do follow things
      a) Load audio file
      b) Resample the audio file with samplerate of self.sr
      c) Get label of this audio file
      d) Save {'audio': audio_tensor, 'label':label_tensor} with torch.save

    Output
      None

    TODO: Complete this function
    '''
    for idx, mp3_path in  enumerate(self.labels['mp3_path'].values):

      pt_path = self.dir / Path(mp3_path).with_suffix('.pt')
      if pt_path.exists() :
        continue
      audio, original_sr = torchaudio.load(self.dir / mp3_path)
      audio_resampled = torchaudio.functional.resample(audio, orig_freq=original_sr, new_freq=self.sr)
      audio_resampled = audio_resampled.mean(dim=0)
      label_idx = self.labels[self.labels['mp3_path'] == mp3_path].index[0]
      if idx >= len(self.label_tensor):
        break

      label = self.label_tensor[idx]
      torch.save({'audio': audio_resampled, 'label': label}, pt_path)



  def __getitem__(self, idx):
    '''
    __getitem__ returns a corresponding idx-th data sample among the dataset.
    In music-tag dataset, it has to return (audio_sample, label) of idx-th data.

    PreProcessDataset loads the pre-processed pt file whenever this __getitem__ function is called.
    In this function, you have to implement these things

    1) Get the pt file path of idx-th data sample (use self.labels)
    2) Load the pre-procssed data of that file path (use torch.load)
    3) Return the audio sample and the label (tag data) of the data sample

    TODO: Complete this function
    '''
    pt_path = self.dir / Path(self.labels['mp3_path'].iloc[idx]).with_suffix('.pt')
    data = torch.load(pt_path)
    audio_sample = data['audio']
    label = data['label']

    return audio_sample, label


dummy_set = PreProcessDataset(MTAT_DIR, split='train', num_max_data=100)
audio, label = dummy_set[15]
assert audio.ndim == 1, "Number of dimensions of audio tensor has to be 1. Use audio[0] or audio.mean(dim=0) to reduce it"
assert len(audio) == 465984, "Audio tensor has wrong shape"
assert label.ndim == 1, "Number of dimensions of label tensor has to be 1"
assert (MTAT_DIR / '2/zephyrus-angelus-11-ave_maria__virgo_serena_josquin_des_prez-0-29.pt').exists(), "pt file is not generated"
assert torch.load(MTAT_DIR / '2/zephyrus-angelus-11-ave_maria__virgo_serena_josquin_des_prez-0-29.pt')['audio'].shape == (465984,), "Audio tensor is not saved properly"

ipd.display(ipd.Audio(audio, rate=dummy_set.sr))
print(dummy_set.vocab[torch.where(label)])

['guitar' 'male']


In [None]:
class OnMemoryDataset(MTATDataset):
  def __init__(self, dir_path:str, split:str='train', num_max_data:int=4000, sr:int=16000):
    super().__init__(dir_path, split, num_max_data, sr)

    self.loaded_audios = self.load_audio()

  def load_audio(self):
    '''
    In this function, you have to load all the audio file in the dataset, and resample them,
    and store the data on the memory as a python variable

    For each data in the dataset,
      a) Load Audio
      b) Resample it to self.sr
      c) Append it to total_audio_datas

    Output:
      total_audio_datas (list): A list of torch.FloatTensor. i-th item of the list corresponds to the audio sample of i-th data
                                Each item is an audio sample in torch.FloatTensor with sampling rate of self.sr
    '''
    total_audio_datas = []

    ### Write your code from here
    for mp3_path in self.labels['mp3_path'].values:
            audio, original_sr = torchaudio.load(self.dir / mp3_path)
            audio_resampled = torchaudio.functional.resample(audio, orig_freq=original_sr, new_freq=self.sr)
            audio_resampled = audio_resampled.mean(dim=0)
            total_audio_datas.append(audio_resampled)

    return total_audio_datas

  def __getitem__(self, idx):
    '''
    __getitem__ returns a corresponding idx-th data sample among the dataset.
    In music-tag dataset, it has to return (audio_sample, label) of idx-th data.

    OnMemoryDataset returns the pre-loaded audio data that is aved on self.loaded_audios whenever this __getitem__ function is called.
    In this function, you have to implement these things

    1) Load the pre-procssed audio data from self.loaded_audios
    2) Return the audio sample and the label (tag data) of the data sample

    TODO: Complete this function
    '''
    audio_sample = self.loaded_audios[idx]
    label = self.label_tensor[idx].float()
    return audio_sample, label

dummy_set = OnMemoryDataset(MTAT_DIR, split='train', num_max_data=50)
audio, label = dummy_set[10]
assert audio.ndim == 1, "Number of dimensions of audio tensor has to be 1. Use audio[0] or audio.mean(dim=0) to reduce it"
assert audio.ndim == 1, "Number of dimensions of audio tensor has to be 1. Use audio[0] or audio.mean(dim=0) to reduce it"
assert len(audio) == 465984, "Audio tensor has wrong shape"
assert dummy_set.loaded_audios[0].shape == (465984,), "Audio tensor is not saved properly"

ipd.display(ipd.Audio(audio, rate=dummy_set.sr))
print(dummy_set.vocab[torch.where(label)])

['classical' 'quiet' 'ambient' 'string' 'harp' 'slow']


#### Define Dataset
- You can select one of your implementations

In [None]:
your_dataset_class = OnTheFlyDataset # One of OnTheFlyDataset, PreProcessDataset, or OnMemoryDataset
# your_dataset_class = OnMemoryDataset
'''
Based on your memory size or storage size, you can change the num_max_data
'''
trainset = your_dataset_class(MTAT_DIR, split='train', num_max_data=5000)
validset = your_dataset_class(MTAT_DIR, split='valid', num_max_data=1000)
testset = your_dataset_class(MTAT_DIR, split='test', num_max_data=2000)

#### DataLoader
- Define `DataLoader` using the dataset

In [None]:
train_loader = DataLoader(trainset, batch_size=64, shuffle=True, num_workers=4) # you can speed up with num_workers=4 if you have multiple cpu core
valid_loader = DataLoader(validset, batch_size=128, shuffle=False, num_workers=4)
test_loader = DataLoader(testset, batch_size=128, shuffle=False, num_workers=4)

batch = next(iter(train_loader))



## Preparation: Define Neural Network
- Define the neural network

In [None]:
class SpecModel(nn.Module):
  def __init__(self, sr:int, n_fft:int, hop_length:int, n_mels:int):
    super().__init__()
    self.mel_converter = torchaudio.transforms.MelSpectrogram(sample_rate=sr, n_fft=n_fft, hop_length=hop_length, n_mels=n_mels)
    self.db_converter = torchaudio.transforms.AmplitudeToDB()

  def forward(self, x):
    mel_spec = self.mel_converter(x)
    return self.db_converter(mel_spec)

class AudioModel(nn.Module):
  def __init__(self, sr:int, n_fft:int, hop_length:int, n_mels:int, hidden_size:int, num_output:int):
    super().__init__()
    self.sr = sr
    self.spec_converter = SpecModel(sr, n_fft, hop_length, n_mels)
    self.conv_layer = nn.Sequential(
      nn.Conv1d(n_mels, out_channels=hidden_size, kernel_size=3),
      nn.MaxPool1d(3),
      nn.ReLU(),
      nn.Conv1d(hidden_size, out_channels=hidden_size, kernel_size=3),
      nn.MaxPool1d(3),
      nn.ReLU(),
      nn.Conv1d(hidden_size, out_channels=hidden_size, kernel_size=3),
      nn.MaxPool1d(3),
      nn.ReLU(),
    )
    self.final_layer = nn.Linear(hidden_size, num_output)

  def get_spec(self, x):
    '''
    Get result of self.spec_converter
    x (torch.Tensor): audio samples (num_batch_size X num_audio_samples)
    '''
    return self.spec_converter(x)

  def forward(self, x):
    spec = self.get_spec(x) # num_batch X num_mel_bins X num_time_bins
    out = self.conv_layer(spec)
    out = torch.max(out, dim=-1)[0] # select [0] because torch.max outputs tuple of (value, index)
    out = self.final_layer(out)
    out = torch.sigmoid(out)
    return out

## 3. Train the Network
- First, just run the cells below so that you can obtain the first result
- Plot the training loss and validation accuracy


In [None]:
def get_tpr_fpr(pred:torch.Tensor, target:torch.Tensor, threshold:float=0.5):
  thresh_pred = pred> threshold
  p = torch.sum(target == 1)
  tp = torch.sum((thresh_pred==1) * (target==1))
  n = torch.sum(target == 0)
  fp = torch.sum((thresh_pred==1) * (target==0))
  return tp/p, fp/n

def get_roc_auc(pred:torch.Tensor, target:torch.Tensor, num_grid=500):
  auc = 0
  prev_fpr = 0
  for thresh in reversed(torch.linspace(0,1,num_grid)):
    tpr, fpr = get_tpr_fpr(pred, target, threshold=thresh)
    auc += tpr * (fpr-prev_fpr)
    prev_fpr = fpr
  return auc

def train_model(model:nn.Module, train_loader:DataLoader, valid_loader:DataLoader, optimizer:torch.optim.Optimizer, num_epochs:int, loss_func, device='cuda'):
  loss_records =[]
  valid_acc_records = []
  model.vocab = train_loader.dataset.vocab
  model.train() # Set model to train mode
  for epoch in tqdm(range(num_epochs)):
    for batch in train_loader:
      optimizer.zero_grad() # Rest gradient of every parameters in optimizer (every parameters in the model)
      audio, label = batch
      audio = audio.to(device)
      label = label.to(device)
      pred = model(audio)
      loss = loss_func(pred, label.float())
      loss.backward() # Run backpropagation
      optimizer.step() # Update parameters
      loss_records.append(loss.item())
    valid_acc = validate_model(model, valid_loader, device)
    valid_acc_records.append(valid_acc.item())
  return {"loss": loss_records, "valid_acc": valid_acc_records}

def validate_model(model, valid_loader, device, acc_func=get_roc_auc):
  valid_acc = 0
  model.eval()
  model.to(device)
  with torch.no_grad():
    for batch in valid_loader:
      audio, label = batch
      pred = model(audio.to(device))
      auc = acc_func(pred, label.to(device))
      valid_acc += auc * len(label)
  model.train()
  return valid_acc / len(valid_loader.dataset)

In [None]:
'''
Train the default model
'''

model = AudioModel(sr=16000, n_fft=1024, hop_length=512, n_mels=48, num_output=50, hidden_size=32)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
model = model.to(DEV)
loss_func = torch.nn.BCELoss()
train_record = train_model(model, train_loader, valid_loader, optimizer, num_epochs=30, loss_func=loss_func, device=DEV)


RuntimeError: Found no NVIDIA driver on your system. Please check that you have an NVIDIA GPU and installed a driver from http://www.nvidia.com/Download/index.aspx

In [None]:
plt.plot(train_record['loss'])
save_fig_with_date('default_train_loss')

In [None]:
plt.plot(train_record['valid_acc'])
save_fig_with_date('default_train_valid_acc')

### Probelm 2. Try Various Settings and Find Best Model (15 pts)
- You can try different `n_fft`, `n_mels`, or `hidden_size`, or different `conv_layer` in your model

In [None]:
class YourModel(AudioModel):
  # Cautions: You have to define default values for all the parameters for the automatic evaluation
  def __init__(self, sr=16000, n_fft=1024, hop_length=512, n_mels=48, hidden_size=32, num_output=50):
    super().__init__(sr, n_fft, hop_length, n_mels, hidden_size, num_output)

    # TODO: Implement your own model
    self.conv_layer_2d = nn.Sequential(

        nn.Conv2d(in_channels=1, out_channels=hidden_size, kernel_size=(3, 3), padding=(1, 1)),
        nn.BatchNorm2d(hidden_size),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2)),
        nn.Conv2d(in_channels=hidden_size, out_channels=hidden_size * 2, kernel_size=(3, 3), padding=(1, 1)),
        nn.BatchNorm2d(hidden_size * 2),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2)),
        nn.Conv2d(in_channels=hidden_size * 2, out_channels=hidden_size * 4, kernel_size=(3, 3), padding=(1, 1)),
        nn.BatchNorm2d(hidden_size * 4),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2)),
    )
    self.adaptive_pool = nn.AdaptiveAvgPool2d((1, 1))

    self.final_layer_custom = nn.Linear(hidden_size * 4, num_output)

  def forward(self, x):
    # TODO: Implement your own forward pass
    spec = self.get_spec(x) # Output shape: (batch, n_mels, time_bins)
    spec = spec.unsqueeze(1)
    out = self.conv_layer_2d(spec)
    out = self.adaptive_pool(out)
    out = torch.flatten(out, 1)
    out = self.final_layer_custom(out)
    out = torch.sigmoid(out)

    return out



your_model = YourModel(sr=16000, n_fft=512, hop_length=256, n_mels=48, num_output=50, hidden_size=16)
optimizer = torch.optim.Adam(your_model.parameters(), lr=1e-3)
your_model = your_model.to(DEV)
your_train_record = train_model(your_model, train_loader, valid_loader, optimizer, num_epochs=30, loss_func=loss_func, device=DEV)

## Save the figure with comparison of default setting
plt.figure(figsize=(8,16))
plt.subplot(2,1,1)
plt.plot(train_record['loss'])
plt.plot(your_train_record['loss'])
plt.subplot(2,1,2)
plt.plot(train_record['valid_acc'])
plt.plot(your_train_record['valid_acc'])
save_fig_with_date('comparison_with_default')

# Save the model
torch.save(your_model.state_dict(), f"your_model_{your_train_record['valid_acc']}.pt")

In [None]:
# Change the name of the best model to 'your_model_best.pt', and check you can load it
# Caution: Make sure that the selected hyperparameters (n_fft, hop_length, etc) of YourModel is saved as default arguments
# So that you can load the model without specifying the hyperparameters
model = YourModel()
torch.save(model.state_dict(), 'your_model_best.pt')
model = YourModel()
model.load_state_dict(torch.load('your_model_best.pt'))

In [None]:
'''
Get the test result
'''
test_acc = validate_model(your_model, test_loader, DEV)
print(f"Calculated ROC_AUC value for Test Set is : {test_acc:.4f}")

### Problem 3: Implement Convolutional Neural Network (20 pts)

- Implement the convolutional neural network computation using `nn.Linear` and for loop

In [None]:
# Implement Conv1d with Linear

def get_conv1d_output_with_linear(atensor, conv1d_linear, kernel_size):
  """

  """
  batch_size, in_channels, sequence_length = atensor.shape
  # TODO: Implement the forward pass
  # Assume stride=1 and padding=0 for simplicity
  # To match with the result of nn.Conv1d, flatten the input tensor without changing dimension order
  patches = []
  for i in range(sequence_length - kernel_size + 1):  # 슬라이딩 범위
      patch = atensor[:, :, i:i+kernel_size]  # 슬라이딩된 패치 가져오기
      patches.append(patch)

  patches = torch.stack(patches, dim=1)
  patches = patches.view(batch_size, sequence_length - kernel_size + 1, -1)
  linear_output = conv1d_linear(patches)


  linear_output = linear_output.permute(0, 2, 1)

  return linear_output


# Test the function with different parameters
in_channels = 10
out_channels = 2
kernel_size = 4

dummy_input = torch.randn(5, in_channels, 23)

conv1d = nn.Conv1d(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=1, padding=0)
output = conv1d(dummy_input)
print(output.shape)

conv1d_linear = nn.Linear(in_channels * kernel_size, out_channels)
conv1d_linear.weight.data = conv1d.weight.data.view(out_channels, -1).clone()
conv1d_linear.bias.data = conv1d.bias.data.clone()


linear_output = get_conv1d_output_with_linear(dummy_input, conv1d_linear, kernel_size)
assert linear_output.shape == output.shape, "Output tensors have different shapes"
assert torch.allclose(output, linear_output, atol=1e-6), "Output tensors are different"

In [None]:
# Implement Conv2d with Linear

def get_conv2d_output_with_linear(atensor, conv2d_linear, kernel_size):

  batch_size, in_channels, height, width = atensor.shape
  # TODO: Implement the forward pass
  # Assume stride=1 and padding=0 for simplicity
  # To match with the result of nn.Conv1d, flatten the input tensor without changing dimension order
  kernel_height, kernel_width = kernel_size
  output_height = height - kernel_height + 1
  output_width = width - kernel_width + 1
  patches = []
  for i in range(output_height):
      for j in range(output_width):
          patch = atensor[:, :, i:i+kernel_height, j:j+kernel_width]
          patches.append(patch)

  patches = torch.stack(patches, dim=1)  # (batch_size, num_patches, in_channels, kernel_height, kernel_width)
  patches = patches.view(batch_size, output_height * output_width, -1)  # (batch_size, num_patches, in_channels * kernel_size)
  linear_output = conv2d_linear(patches)  # (batch_size, num_patches, out_channels)
  linear_output = linear_output.view(batch_size, output_height, output_width, -1)  # (batch_size, output_height, output_width, out_channels)
  linear_output = linear_output.permute(0, 3, 1, 2)  # (batch_size, out_channels, output_height, output_width)

  return linear_output

# Test the function with different parameters
in_channels = 10
out_channels = 2
kernel_size = 4


dummy_input = torch.randn(5, in_channels, 13, 17)

conv2d = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=1, padding=0)
output = conv2d(dummy_input)

conv2d_linear = nn.Linear(in_channels * kernel_size * kernel_size, out_channels)
conv2d_linear.weight.data = conv2d.weight.data.view(out_channels, -1).clone()
conv2d_linear.bias.data = conv2d.bias.data.clone()



linear_output = get_conv2d_output_with_linear(dummy_input, conv2d_linear, kernel_size)
assert linear_output.shape == output.shape, "Output tensors have different shapes"
assert torch.allclose(output, linear_output, atol=1e-6), "Output tensors are different"
print("Complete!")

### Problem 4: Complete Binary Cross Entropy Function (5 pts)
- Complete the function that can calculate the Binary Cross Entropy for given prediction and target label without using `torch.BCELoss`
- $BCE = -\frac{1}{N} \sum_{i=1}^{N} y_i \log(\hat{y}_i) + (1-y_i) \log(1-\hat{y}_i)$

In [None]:
def get_binary_cross_entropy(pred:torch.Tensor, target:torch.Tensor, eps=1e-8):
  '''
  pred (torch.Tensor): predicted value of a neural network model for a given input (assume that the value is output of sigmoid function)
  target (torch.Tensor): ground-truth label for a given input, given in multi-hot encoding

  output (torch.Tensor): Mean Binary Cross Entropy Loss value of every sample
  '''
  # TODO: Complete this function
  pred = torch.clamp(pred, eps, 1 - eps)
  bce_loss = - (target * torch.log(pred) + (1 - target) * torch.log(1 - pred))
  return bce_loss.mean()

test_model = AudioModel(sr=16000, n_fft=1024, hop_length=512, n_mels=48, num_output=50, hidden_size=16)
test_model = test_model.to(DEV)
test_optimizer = torch.optim.Adam(test_model.parameters(), lr=1e-3)
train_record = train_model(test_model, train_loader, valid_loader, test_optimizer, num_epochs=5, loss_func=get_binary_cross_entropy, device=DEV)
plt.subplot(2,1,1)
plt.plot(train_record['loss'])
plt.subplot(2,1,2)
plt.plot(train_record['valid_acc'])
save_fig_with_date('handmade_bce_result')

### Problem 5. Complete Precision-Recall Area Under Curve Function (20 pts)
- One of the frequently used metric is Precision-Recall Area Under Curve (PR-AUC)
- Precision is (Number of true positive)/(Number of total positive predictions)
- Recall is (Number of true positive)/(Number of total positive ground-truth)
- Precision and recall values depend on threshold
- PR-AUC is the area under precision-recall curve of varying trheshold
  - X-axis is recall, Y-axis is precision
  - ![Example of PR curve](https://wiki.cloudfactory.com/media/pages/docs/mp-wiki/metrics/precision-recall-curve-and-auc-pr/6a33324886-1684131968/precision-recall-score-example.webp)
- You can refer the pre-defined `get_roc_auc` function
  - Instead of trapezoidal rule, use simple **rectangle rule** to calculate the area under curve, following `get_roc_auc` function


In [None]:
def get_precision_and_recall(pred:torch.Tensor, target:torch.Tensor, threshold:float):
  '''
  This function calculates precision and recall of given (prediction, target, threshold)

  pred (torch.Tensor): predicted value of a neural network model for a given input
  target (torch.Tensor): ground-truth label for a given input, given in multi-hot encoding

  output
    precision (torch.Tensor): (Number of true positive)/(Number of total positive predictions)
    recall (torch.Tensor): (Number of true positive)/(Number of total positive ground-truth)

  IMPORTANT:
    If there is no positive prediction, precision has to be 1
    If there is no positive ground-truth, recall has to be 1

  TODO: Complete this function
  '''

  # Write your code here
  pred_binary = (pred >= threshold).float()
  true_positive = (pred_binary * target).sum()
  total_positive_predictions = pred_binary.sum()
  total_positive_ground_truth = target.sum()
  if total_positive_predictions == 0:  # No positive predictions
      precision = torch.tensor(1.0)
  else:
      precision = true_positive / total_positive_predictions
  if total_positive_ground_truth == 0:  # No positive ground truths
      recall = torch.tensor(1.0)
  else:
      recall = true_positive / total_positive_ground_truth


  '''
  Be careful for not returning nan because of division by zero
  '''
  assert not (torch.isnan(precision) or torch.isnan(recall))
  return precision, recall

def get_precision_recall_auc(pred:torch.Tensor, target:torch.Tensor, num_grid=500):
  '''
  This function returns PR_AUC value for a given prediction and target.
  Assume pred.shape == target.shape

  pred (torch.Tensor): predicted value of a neural network model for a given input
  target (torch.Tensor): ground-truth label for a given input, given in multi-hot encoding

  output (torch.Tensor): Area Under Curve value for Precision-Recall Curve, using rectangle method

  TODO: Complete this function using get_precision_and_recall
  '''
  thresholds = torch.linspace(0, 1, num_grid)
  precision_list = []
  recall_list = []
  for threshold in thresholds:
      precision, recall = get_precision_and_recall(pred, target, threshold)
      precision_list.append(precision)
      recall_list.append(recall)

  precision_list = torch.tensor(precision_list)
  recall_list = torch.tensor(recall_list)

  pr_auc = 0.0
  for i in range(len(recall_list) - 1):
      recall_diff = recall_list[i + 1] - recall_list[i]
      pr_auc += precision_list[i] * recall_diff

  return pr_auc


In [None]:
# download the test data

!wget https://github.com/jdasam/ant5015/raw/refs/heads/2025/assignment2_data.pt

In [None]:
'''
Test the get_precision_recall_auc
'''

# Load the pre-calculated data. Download the data from the link above
pre_calculated_data = torch.load('assignment2_data.pt')
pre_cal_test_pred = pre_calculated_data['test_pred']
pre_cal_test_label = pre_calculated_data['test_label']
correct_pr_auc = pre_calculated_data['pr_auc_value_test']
correct_pr_curve = pre_calculated_data['pr_curve']


'''
Printed result of code below has to be tensor(0.1483)
'''
pr_auc = get_precision_recall_auc(pre_cal_test_pred, pre_cal_test_label)
assert torch.allclose(pr_auc, correct_pr_auc, atol=1e-4), "Result is not correct"
print("Passed!")

In [None]:
selected_model = model
pr_auc_value_valid = validate_model(selected_model, valid_loader, DEV, acc_func=get_precision_recall_auc)
pr_auc_value_test = validate_model(selected_model, test_loader, DEV, acc_func=get_precision_recall_auc)
print(f"Calculated PR_AUC value for Validation Set is : {pr_auc_value_valid.item():.4f}")
print(f"Calculated PR_AUC value for Test Set is : {pr_auc_value_test.item():.4f}")

In [None]:
def draw_pr_auc_curve(pred:torch.Tensor, target:torch.Tensor, num_grid=500):
  '''
  This function draws PR curve for given prediction and target.
  Assume pred.shape == target.shape

  pred (torch.Tensor): predicted value of a neural network model for a given input
  target (torch.Tensor): ground-truth label for a given input, given in multi-hot encoding
  '''
  pr_curve = []
  for thresh in reversed(torch.linspace(0,1,num_grid)):
    precision, recall = get_precision_and_recall(pred, target, threshold=thresh)
    pr_curve.append((recall, precision))

  pr_curve = torch.tensor(pr_curve)
  return pr_curve

plt.figure()
pr_curve = draw_pr_auc_curve(pre_cal_test_pred, pre_cal_test_label, num_grid=100)
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.plot(pr_curve[:,0], pr_curve[:,1])

assert torch.allclose(pr_curve, correct_pr_curve, atol=1e-4), "Result is not correct"

### Problem 6: Find Best Threshold (15 pts)
- For each class, find the best threshold that maximizes the F1 score
- F1 score is defined as 2 * (precision * recall) / (precision + recall)



In [None]:

def get_f1_score(pred:torch.Tensor, target:torch.Tensor, threshold:float):
  '''
  This function calculates F1 score of given (prediction, target, threshold)

  pred (torch.Tensor): predicted value of a neural network model for a given input
  target (torch.Tensor): ground-truth label for a given input, given in multi-hot encoding

  output
    f1_score (torch.Tensor): 2 * (precision * recall) / (precision + recall)

  IMPORTANT:
    If there is no positive prediction, precision has to be 1
    If there is no positive ground-truth, recall has to be 1
  '''

  # Write your code here
  pred_binary = (pred >= threshold).float()
  true_positive = (pred_binary * target).sum()
  total_positive_predictions = pred_binary.sum()
  total_positive_ground_truth = target.sum()

  if total_positive_predictions == 0:
      precision = torch.tensor(1.0)
  else:
      precision = true_positive / total_positive_predictions

  if total_positive_ground_truth == 0:
      recall = torch.tensor(1.0)
  else:
      recall = true_positive / total_positive_ground_truth

  if precision + recall == 0:  # Avoid division by zero
      f1_score = torch.tensor(0.0)
  else:
      f1_score = 2 * (precision * recall) / (precision + recall)

  return f1_score


f1_score = get_f1_score(pre_cal_test_pred, pre_cal_test_label, threshold=0.2)

assert torch.allclose(f1_score, torch.tensor(0.4690), atol=1e-4), "Result is not correct"
print("Passed!")

In [None]:
def find_best_threshold_for_each_class(pred:torch.Tensor, target:torch.Tensor, num_grid=100):
  '''
  This function finds the best threshold for each class to maximize F1 score

  pred (torch.Tensor): predicted value of a neural network model for a given input
  target (torch.Tensor): ground-truth label for a given input, given in multi-hot encoding

  output
    best_thresholds (torch.Tensor): A tensor of best threshold for each class
  '''

  # Write your code here
  num_classes = target.shape[1]
  thresholds = torch.linspace(0, 1, num_grid)

  best_thresholds = torch.zeros(num_classes)

  for c in range(num_classes):
      best_f1 = -1
      best_threshold = 0

      for threshold in thresholds:
          f1_score = get_f1_score(pred[:, c], target[:, c], threshold)

          if f1_score > best_f1:
              best_f1 = f1_score
              best_threshold = threshold

      best_thresholds[c] = best_threshold

  return best_thresholds


best_thresholds = find_best_threshold_for_each_class(pre_cal_test_pred, pre_cal_test_label, num_grid=100)
best_thresholds

plt.figure(figsize=(15,7))
plt.bar(range(50), best_thresholds)
plt.xticks(range(50), rotation=70, labels=dummy_set.vocab.tolist())
plt.title('Best Threshold for each class')

In [None]:
def get_f1_score_for_each_class(pred, target, best_thresholds):
  '''
  This function calculates F1 score for each class

  pred (torch.Tensor): predicted value of a neural network model for a given input
  target (torch.Tensor): ground-truth label for a given input, given in multi-hot encoding
  best_thresholds (torch.Tensor): A tensor of best threshold for each class

  output
    f1_scores (torch.Tensor): A tensor of F1 score for each class
  '''

  # Write your code here
  num_classes = target.shape[1]
  f1_scores = torch.zeros(num_classes)

  # Iterate over each class
  for c in range(num_classes):
      threshold = best_thresholds[c]
      f1_scores[c] = get_f1_score(pred[:, c], target[:, c], threshold)

  return f1_scores

f1_scores = get_f1_score_for_each_class(pre_cal_test_pred, pre_cal_test_label, best_thresholds)

plt.figure(figsize=(15,7))
plt.bar(range(50), f1_scores)
plt.xticks(range(50), rotation=70, labels=dummy_set.vocab.tolist())
plt.title('F1 Score for each class')

- Now, get best threshold for each class from **VALIDATION** set and apply the threhsold to **TEST** set
  - To see the difference, we'll also plot the F1 score of each class with the threshold from the validation set and the threshold from the test set.
  - Remember that calculating the threshold from the test set is a sort of cheating. You will get overfitted threshold for the test set.

In [None]:

def collect_every_pred_and_label(model:nn.Module, data_loader:DataLoader, device='cuda'):
  '''
  This function collects every prediction and label of a given model and data_loader

  model (nn.Module): A neural network model
  data_loader (DataLoader): A DataLoader object that has test data

  output
    every_pred (torch.Tensor): A tensor of every prediction of the model, device has to be 'cpu'
    every_label (torch.Tensor): A tensor of every label of the model, device has to be 'cpu'
  '''

  # Write your code here
  model.eval()
  every_pred = []
  every_label = []

  with torch.no_grad():
      for inputs, labels in data_loader:
          inputs = inputs.to(device)
          labels = labels.to('cpu')

          preds = model(inputs).to('cpu')

          every_pred.append(preds)
          every_label.append(labels)

  # Concatenate all predictions and labels into tensors
  every_pred = torch.cat(every_pred, dim=0)
  every_label = torch.cat(every_label, dim=0)

  return every_pred, every_label

valid_pred, valid_label = collect_every_pred_and_label(selected_model, valid_loader, DEV)
assert valid_pred.device.type == 'cpu', "Prediction has to be in cpu"
assert valid_label.device.type == 'cpu', "Label has to be in cpu"
assert len(valid_pred) == len(valid_label), "Prediction and Label has to have same length"
assert len(valid_pred) == len(validset), "Prediction has to cover every data in the validset"

test_pred, test_label = collect_every_pred_and_label(selected_model, test_loader, DEV)

best_thresholds = find_best_threshold_for_each_class(valid_pred, valid_label, num_grid=100)
best_cheating_thresholds = find_best_threshold_for_each_class(test_pred, test_label, num_grid=100) # This is for cheating

f1_scores = get_f1_score_for_each_class(test_pred, test_label, best_thresholds)
cheating_f1_scores = get_f1_score_for_each_class(test_pred, test_label, best_cheating_thresholds)


# Draw the result
plt.figure(figsize=(15,7))
plt.bar(range(50), f1_scores, width=0.4, align='center', label='F1 Score')
# add cheating f1 scores with shifted x-axis
plt.bar(range(50), cheating_f1_scores, width=0.4, alpha=0.5, align='edge', label='F1 Score with Cheating Threshold')
plt.xticks(range(50), rotation=70, labels=dummy_set.vocab.tolist())
plt.title('F1 Score for each class')
plt.legend()


### Problem 7: Load audio and make prediction (10 pts)


In [None]:

def get_audio_prediction(audio_path:str, model:nn.Module, best_thresholds:torch.Tensor, target_sr=16000):
  '''
  This function takes an audio path, model, sampling rate, and best_thresholds
  and returns the prediction of the model for the audio file.

  audio_path (str): A path of audio file
  model (nn.Module): A neural network model
  best_thresholds (torch.Tensor): A tensor of best threshold for each class
  target_sr (int): Sampling rate of audio file

  output
    audio (torch.Tensor): A tensor of audio file
    pred (list of str): A list of tags that are predicted to be True

  CAUTION: Do not use external variable to get tag names. Use model.vocab to get tag names
  '''
  audio, sr = torchaudio.load(audio_path)  # Load audio with original sampling rate

  # Resample audio to target sampling rate if necessary
  if sr != target_sr:
      resampler = torchaudio.transforms.Resample(orig_freq=sr, new_freq=target_sr)
      audio = resampler(audio)

  # Ensure audio is a 1D tensor (mono)
  if audio.ndim > 1:
      audio = audio.mean(dim=0)  # Convert stereo to mono

  # Move audio data to the model's device
  device = next(model.parameters()).device
  audio = audio.to(device)

  # Pass the audio data through the model to get predictions
  model.eval()  # Set model to evaluation mode
  with torch.no_grad():
      audio = audio.unsqueeze(0)  # Add batch dimension (1, channels, samples)
      pred = model(audio)  # Get predictions

  # Apply thresholds to binarize predictions
  pred_binary = (pred >= best_thresholds).squeeze(0)  # Apply thresholds and remove batch dimension

  # Get predicted tags using model.vocab
  predicted_tags = [model.vocab[i] for i in range(len(pred_binary)) if pred_binary[i].item() == 1]

  return audio.cpu(), predicted_tags



In [None]:
# your_audio_path = 'your_audio_file_path'
your_audio_path = MTAT_DIR / '2/zephyrus-angelus-11-ave_maria__virgo_serena_josquin_des_prez-0-29.mp3'
selected_model = model # Change it if you want to select model with different name

In [None]:
'''
Run Model
'''
selected_model.to('cpu')
selected_model.eval()
y, pred = get_audio_prediction(your_audio_path, selected_model)
assert type(pred) == list, "Prediction has to be list"
assert type(pred[0]) == str, "Each element of prediction has to be string"
assert type(y) == torch.Tensor, "Audio has to be tensor"

ipd.display(ipd.Audio(y, rate=16000))
print(f"Predicted tags are: {pred}")

In [None]:
# Download MIR_Assignment_2.py file and copy your completed code to the file
!wget https://raw.githubusercontent.com/jdasam/ant5015/refs/heads/2025/MIR_Assignment_2.py

In [None]:
# Check your code
!python MIR_Assignment_2.py