Midi file player
https://midiplayer.ehubsoft.net/

# Setup

In [1]:
# !pip install librosa | tail -n 1  # I don't want a super long output
# !pip install miditoolkit | tail -n 1  # I don't want a super long output
# !pip install xgboost | tail -n 1  # I don't want a super long output
# !pip install lightgbm | tail -n 1  # I don't want a super long output

In [1]:
# Probably more imports than are really necessary...
import os
import torch
import torchaudio
from torch.utils.data import Dataset, DataLoader, random_split
import torch.nn as nn
import torch.nn.functional as F
from torchaudio.transforms import MelSpectrogram, AmplitudeToDB
from tqdm import tqdm
import librosa
import numpy as np
import miditoolkit
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import f1_score, average_precision_score, accuracy_score
import random

# from mido import MidiFile
# from sklearn.model_selection import train_test_split
# from music21 import converter, chord, stream
# from sklearn.preprocessing import LabelEncoder
# from xgboost import XGBClassifier
# from sklearn.ensemble import GradientBoostingClassifier
# from lightgbm import LGBMClassifier

# How average precision score works

Suppose prediction = [1 1 0 1], target = [1 0 0 0]  
Precision at rank 1 = 1 / 1 = 1.0  
Precision at rank 2 = 1 /2 = 0.5  
Precision at rank 3 = 2 / 3 = 0.67  
Precision at rank 4 = 2 / 4 = 0.5  
So the Average Precision (AP) of my prediction and target is (1.0 + 0.5 + 0.67 + 0.5) / 4

For 2D prediction and target arrays like in accuracy3, the average precision for the entire set is
the average of the average precision values of each instance (or pair of prediction and ground truth).

Suppose  
predictions = [  
    [1, 1],  # Instance 1 predictions  
    [0, 1]   # Instance 2 predictions  
]  
groundtruth = [  
    [1, 0],  # Instance 1 ground truth  
    [1, 1]   # Instance 2 ground truth  
]   
AP for instance 1 = (1.0 + 0.5) / 2 = 0.75  
AP for instance 2 = (0.0 + 0.5) / 2 = 0.25  
So the mean Average Precision (mAP) is (0.75 + 0.25) / 2 = 0.5

# Model: CNN

### Tags and Eval Function

In [6]:
TAGS = ['rock', 'oldies', 'jazz', 'pop', 'dance',  'blues',  'punk', 'chill', 'electronic', 'country']

In [7]:
def accuracy3(groundtruth, predictions):
    preds, targets = [], []
    for k in groundtruth:
        if not (k in predictions):
            print("Missing " + str(k) + " from predictions")
            return 0
        prediction = [1 if tag in predictions[k] else 0 for tag in TAGS]
        target = [1 if tag in groundtruth[k] else 0 for tag in TAGS]
        preds.append(prediction)
        targets.append(target)

    mAP = average_precision_score(targets, preds, average='macro')
    return mAP

### Getting Waveform Data for CNN

In [49]:
dataroot3 = "student_files/task3_audio_classification/"

In [50]:
def extract_waveform(path, pitch_shift_steps=0):
    waveform, sr = librosa.load(dataroot3 + path, sr=SAMPLE_RATE)

    # Apply pitch shift for data augmentation
    if pitch_shift_steps != 0:
        waveform = librosa.effects.pitch_shift(waveform, sr=sr, n_steps=pitch_shift_steps)

    waveform = np.array([waveform])
    if sr != SAMPLE_RATE:
        resample = torchaudio.transforms.Resample(orig_freq=sr, new_freq=SAMPLE_RATE)
        waveform = resample(waveform)
    # Pad so that everything is the right length
    target_len = SAMPLE_RATE * AUDIO_DURATION
    # waveform = torch.from_numpy(waveform)  # Adding this because I was getting TypeError: pad(): argument 'input' (position 1) must be Tensor, not numpy.ndarray
    if waveform.shape[1] < target_len:
        print("padding")
        pad_len = target_len - waveform.shape[1]
        waveform = F.pad(waveform, (0, pad_len))
    else:
        waveform = waveform[:, :target_len]
    waveform = torch.FloatTensor(waveform)
    # print(waveform.shape[1])
    if waveform.shape[1] != target_len:  # 160000
        print("error")
    return waveform

### Constants

In [57]:
# Some constants (you can change any of these if useful)
SAMPLE_RATE = 16000
N_MELS = 64
N_CLASSES = 10
AUDIO_DURATION = 10 # seconds
BATCH_SIZE = 32

### Dataset

In [58]:
class AudioDataset(Dataset):
    def __init__(self, meta):
        self.meta = meta  # e.g. 'train/0.wav': ['rock']
        ks = list(meta.keys())
        self.idToPath = dict(zip(range(len(ks)), ks))  # e.g. 0: 'train/0.wav'
        self.pathToFeat = {}

        self.mel = MelSpectrogram(sample_rate=SAMPLE_RATE, n_mels=N_MELS)
        self.db = AmplitudeToDB()

        for path in ks:
            waveforms = extract_waveform(path)
            mel_spec = self.db(self.mel(waveform)).squeeze(0)
            self.pathToFeat[path] = mel_spec  # e.g. 'train/0.wav': <Tensor shape=[N_MELS=64, T0]>

    def __len__(self):
        return len(self.meta)

    def __getitem__(self, idx):
        path = self.idToPath[idx]
        tags = self.meta[path]
        bin_label = torch.tensor([1 if tag in tags else 0 for tag in TAGS], dtype=torch.float32)
    
        if self.preload:
            mel_spec = self.pathToFeat[path]
            # No pitch shift for preloaded
            return mel_spec.unsqueeze(0), bin_label, path
        else:
            # Original waveform
            waveform_orig = extract_waveform(path)
            mel_spec_orig = self.db(self.mel(waveform_orig)).squeeze(0)
    
            # Pitch shifted version (within an octave up or down)
            pitch_shift_steps = random.choice([i for i in range(-11, 12) if i != 0])  ## exclude 0 (no pitch change) 
            waveform_aug = extract_waveform(path, pitch_shift_steps=pitch_shift_steps)
            mel_spec_aug = self.db(self.mel(waveform_aug)).squeeze(0)
    
            # Return both (could also randomize whether to return original or augmented only)
            return (mel_spec_orig.unsqueeze(0), mel_spec_aug.unsqueeze(0)), bin_label, path

    # def __getitem__(self, idx):
    #     # Faster version, preloads the features
    #     path = self.idToPath[idx]
    #     tags = self.meta[path]
    #     bin_label = torch.tensor([1 if tag in tags else 0 for tag in TAGS], dtype=torch.float32)

    #     if self.preload:
    #         mel_spec = self.pathToFeat[path]
    #     else:
    #         waveform = extract_waveform(path)
    #         mel_spec = self.db(self.mel(waveform)).squeeze(0)

    #     return mel_spec.unsqueeze(0), bin_label, path

In [59]:
class Loaders():
    def __init__(self, train_path, test_path, split_ratio=0.9, seed = 0):
        torch.manual_seed(seed)
        random.seed(seed)

        meta_train = eval(open(train_path, 'r').read())
        l_test = eval(open(test_path, 'r').read())
        meta_test = dict([(x,[]) for x in l_test]) # Need a dictionary for the above class

        all_train = AudioDataset(meta_train)
        test_set = AudioDataset(meta_test)

        # Split all_train into train + valid
        total_len = len(all_train)
        train_len = int(total_len * split_ratio)
        valid_len = total_len - train_len
        train_set, valid_set = random_split(all_train, [train_len, valid_len])

        self.loaderTrain = DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)
        self.loaderValid = DataLoader(valid_set, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)
        self.loaderTest = DataLoader(test_set, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)

In [60]:
class CNNClassifier(nn.Module):
    def __init__(self, n_classes=N_CLASSES):
        super(CNNClassifier, self).__init__()
        self.conv1 = nn.Conv2d(1, 16, 3, padding=1)
        self.conv2 = nn.Conv2d(16, 32, 3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.dropout = nn.Dropout(0.3)
        self.fc1 = nn.Linear(32 * (N_MELS // 4) * (801 // 4), 256)
        self.fc2 = nn.Linear(256, n_classes)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))  # (B, 16, mel/2, time/2)
        x = self.pool(F.relu(self.conv2(x)))  # (B, 32, mel/4, time/4)
        x = x.view(x.size(0), -1)
        x = self.dropout(F.relu(self.fc1(x)))
        return torch.sigmoid(self.fc2(x))  # multilabel → sigmoid

In [61]:
class Pipeline():
    def __init__(self, model, learning_rate, seed = 0):
        # These two lines will (mostly) make things deterministic.
        # You're welcome to modify them to try to get a better solution.
        torch.manual_seed(seed)
        random.seed(seed)

        # self.device = torch.device("cpu") # Can change this if you have a GPU, but the autograder will use CPU
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = model.to(self.device) #model.cuda() # Also uncomment these lines for GPU
        self.optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
        self.criterion = nn.BCELoss()

    def evaluate(self, loader, threshold=0.5, outpath=None):
        self.model.eval()
        preds, targets, paths = [], [], []
        with torch.no_grad():
            for x, y, ps in loader:
                x = x.to(self.device) #x.cuda()
                y = y.to(self.device) #y.cuda()
                outputs = self.model(x)
                preds.append(outputs.cpu())
                targets.append(y.cpu())
                paths += list(ps)

        preds = torch.cat(preds)
        targets = torch.cat(targets)
        preds_bin = (preds > threshold).float()

        predictions = {}
        for i in range(preds_bin.shape[0]):
            predictions[paths[i]] = [TAGS[j] for j in range(len(preds_bin[i])) if preds_bin[i][j]]

        mAP = None
        if outpath: # Save predictions
            with open(outpath, "w") as z:
                z.write(str(predictions) + '\n')
        else: # Only compute accuracy if we're *not* saving predictions, since we can't compute test accuracy
            mAP = average_precision_score(targets, preds, average='macro')
        return predictions, mAP

    def train(self, train_loader, val_loader, num_epochs):
        for epoch in range(num_epochs):
            self.model.train()
            running_loss = 0.0
            for (x1, x2), y, path in tqdm(train_loader, desc=f"Epoch {epoch+1}"):
                x1 = x1.to(self.device)
                x2 = x2.to(self.device)
                y = y.to(self.device)
            
                self.optimizer.zero_grad()
            
                # Forward both versions and average the results
                outputs1 = self.model(x1)
                outputs2 = self.model(x2)
                outputs = (outputs1 + outputs2) / 2
            
                loss = self.criterion(outputs, y)
                loss.backward()
                self.optimizer.step()
            # for x, y, path in tqdm(train_loader, desc=f"Epoch {epoch+1}"):
            #     x = x.to(self.device) #x.cuda()
            #     y = y.to(self.device) #y.cuda()
            #     self.optimizer.zero_grad()
            #     outputs = self.model(x)
            #     loss = self.criterion(outputs, y)
            #     loss.backward()
            #     self.optimizer.step()
            #     running_loss += loss.item()
            val_predictions, mAP = self.evaluate(val_loader)
            print(f"[Epoch {epoch+1}] Loss: {running_loss/len(train_loader):.4f} | Val mAP: {mAP:.4f}")

In [62]:
loaders = Loaders(dataroot3 + "/train.json", dataroot3 + "/test.json")

In [63]:
model = CNNClassifier()

In [64]:
pipeline = Pipeline(model, 1e-4)
pipeline.train(loaders.loaderTrain, loaders.loaderValid, 15)

Epoch 1:   0%|          | 0/113 [00:00<?, ?it/s]


ValueError: too many values to unpack (expected 2)

In [None]:
!rm predictions3.json

train_preds, train_mAP = pipeline.evaluate(loaders.loaderTrain, 0.5)
valid_preds, valid_mAP = pipeline.evaluate(loaders.loaderValid, 0.5)
test_preds, _ = pipeline.evaluate(loaders.loaderTest, 0.5, "predictions3.json")

all_train = eval(open(dataroot3 + "/train.json").read())
for k in valid_preds:
    # We split our training set into train+valid
    # so need to remove validation instances from the training set for evaluation
    all_train.pop(k)
acc3 = accuracy3(all_train, train_preds)
print("Task 3 training mAP = " + str(acc3))

In [None]:
# !rm predictions3.json
# run3()

# Model: Broken AST

https://huggingface.co/MIT/ast-finetuned-audioset-10-10-0.4593

In [4]:
dataroot3 = "student_files/task3_audio_classification/"

In [5]:
def extract_waveform(path):
    waveform, sr = librosa.load(dataroot3 + path, sr=SAMPLE_RATE)
    waveform = np.array(waveform)
    if sr != SAMPLE_RATE:
        resample = torchaudio.transforms.Resample(orig_freq=sr, new_freq=SAMPLE_RATE)
        print("resampling")
        waveform = resample(waveform)
    # Pad so that everything is the right length
    # target_len = SAMPLE_RATE * AUDIO_DURATION
    # # waveform = torch.from_numpy(waveform)  # Adding this because I was getting TypeError: pad(): argument 'input' (position 1) must be Tensor, not numpy.ndarray
    # if waveform.shape[0] < target_len:
    #     print("padding")
    #     pad_len = target_len - waveform.shape[0]
    #     waveform = F.pad(waveform, (0, pad_len))
    # else:
    #     waveform = waveform[:target_len]
    waveform = torch.FloatTensor(waveform)
    
    # print(len(waveform))  # should be 160000
    if (len(waveform)) != 160000:
        print("skip this!")
    
    return waveform

In [45]:
SAMPLE_RATE = 16000
N_MELS = 64
N_CLASSES = 10
AUDIO_DURATION = 10 # seconds
BATCH_SIZE = 4  # From this piazza post: https://piazza.com/class/m8rskujtdvsgy/post/340
EPOCHS = 10  # From Discord
LEARNING_RATE = 1e-5  # From Discord

In [46]:
from transformers import ASTConfig, AutoFeatureExtractor, ASTForAudioClassification
from datasets import load_dataset
import torch

In [47]:
config = ASTConfig(num_labels=10)

In [48]:
class AudioDataset(Dataset):
    def __init__(self, meta):
        self.meta = meta  # e.g. 'train/0.wav': ['rock']
        ks = list(meta.keys())
        self.idToPath = dict(zip(range(len(ks)), ks))  # e.g. 0: 'train/0.wav'
        self.pathToFeat = {}

        for path in ks:
            waveform = extract_waveform(path).squeeze(0).numpy()  # Convert to NumPy array
            self.pathToFeat[path] = waveform  # e.g. 'train/0.wav': waveform

    def __len__(self):
        return len(self.meta)

    def __getitem__(self, idx):
        path = self.idToPath[idx]
        tags = self.meta[path]
        bin_label = torch.tensor([1 if tag in tags else 0 for tag in TAGS], dtype=torch.float32)
        waveform = self.pathToFeat[path]
    
        return waveform, bin_label, path

In [49]:
class Loaders():
    def __init__(self, train_path, test_path, split_ratio=0.9, seed = 0):
        torch.manual_seed(seed)
        random.seed(seed)

        meta_train = eval(open(train_path, 'r').read())
        l_test = eval(open(test_path, 'r').read())
        meta_test = dict([(x,[]) for x in l_test]) # Need a dictionary for the above class

        all_train = AudioDataset(meta_train)
        test_set = AudioDataset(meta_test)

        # Split all_train into train + valid
        total_len = len(all_train)
        train_len = int(total_len * split_ratio)
        valid_len = total_len - train_len
        train_set, valid_set = random_split(all_train, [train_len, valid_len])
        
        self.loaderTrain = DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)
        self.loaderValid = DataLoader(valid_set, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)
        self.loaderTest = DataLoader(test_set, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)

In [37]:
class Pipeline():
    def __init__(self, learning_rate, seed = 0):
        torch.manual_seed(seed)
        random.seed(seed)
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        self.feature_extractor = AutoFeatureExtractor.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593")  # , config=config
        # self.model = ASTForAudioClassification.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593").to(self.device)
        self.model = ASTForAudioClassification(config=config).to(self.device)
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=LEARNING_RATE)
        self.criterion = nn.BCELoss()

    def evaluate(self, loader, threshold=0.5, outpath=None):
        self.model.eval()
        preds, targets, paths = [], [], []
        with torch.no_grad():
            for x, y, path in loader:
                x = x.to(self.device) #x.cuda()
                y = y.to(self.device) #y.cuda()
                x = x[0]
                y = y[0]
                path = path[0]
                
                # x is a list of raw waveforms
                inputs = self.feature_extractor(x, sampling_rate=SAMPLE_RATE, return_tensors="pt")
                # inputs = {k: v.to(self.device) for k, v in inputs.items()}

                with torch.no_grad():
                    logits = self.model(**inputs).logits
                preds.append(torch.sigmoid(logits).cpu())
                
                y = y.to(self.device)
                targets.append(y.cpu())
                paths += list(ps)

        preds = torch.cat(preds)
        targets = torch.cat(targets)
        preds_bin = (preds > threshold).float()

        predictions = {}
        for i in range(preds_bin.shape[0]):
            predictions[paths[i]] = [TAGS[j] for j in range(len(preds_bin[i])) if preds_bin[i][j]]

        mAP = None
        if outpath: # Save predictions
            with open(outpath, "w") as z:
                z.write(str(predictions) + '\n')
        else: # Only compute accuracy if we're *not* saving predictions, since we can't compute test accuracy
            mAP = average_precision_score(targets, preds, average='macro')
        return predictions, mAP

    def train(self, train_loader, val_loader, num_epochs):
        for epoch in range(num_epochs):
            self.model.train()
            running_loss = 0.0
            for x, y, path in tqdm(train_loader, desc=f"Epoch {epoch+1}"):
                x = x.to(self.device) #x.cuda()
                y = y.to(self.device) #y.cuda()
                x = x[0]
                y = y[0]
                path = path[0]
                print(x, y, path)
                self.optimizer.zero_grad()
                inputs = self.feature_extractor(x, sampling_rate=SAMPLE_RATE, return_tensors="pt")
                outputs = self.model(**inputs).logits
                loss = self.criterion(outputs, y)
                loss.backward()
                self.optimizer.step()
                running_loss += loss.item()
            val_predictions, mAP = self.evaluate(val_loader)
            print(f"[Epoch {epoch+1}] Loss: {running_loss/len(train_loader):.4f} | Val mAP: {mAP:.4f}")

In [None]:
loaders = Loaders(dataroot3 + "/train.json", dataroot3 + "/test.json")

In [None]:
pipeline = Pipeline()
pipeline.train(loaders.loaderTrain, loaders.loaderValid, EPOCHS)

In [38]:
train_preds, train_mAP = pipeline.evaluate(loaders.loaderTrain, 0.5)
valid_preds, valid_mAP = pipeline.evaluate(loaders.loaderValid, 0.5)
test_preds, _ = pipeline.evaluate(loaders.loaderTest, 0.5, "predictions3.json")

all_train = eval(open(dataroot3 + "/train.json").read())
for k in valid_preds:
    # We split our training set into train+valid
    # so need to remove validation instances from the training set for evaluation
    all_train.pop(k)
acc3 = accuracy3(all_train, train_preds)
print("Task 3 training mAP = " + str(acc3))

In [39]:
!rm predictions3.json
run3()

rm: cannot remove 'predictions3.json': No such file or directory


NameError: name 'LEARNING_RATE' is not defined

# Model: AST Attempt 2

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchaudio
import librosa
import numpy as np
from torch.utils.data import Dataset, DataLoader, random_split
from torchaudio.transforms import MelSpectrogram, AmplitudeToDB
from tqdm import tqdm
import random
from sklearn.metrics import average_precision_score
from transformers import ASTConfig, AutoFeatureExtractor, ASTForAudioClassification

2025-05-20 20:29:43.902424: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2025-05-20 20:29:43.902491: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2025-05-20 20:29:43.903774: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-05-20 20:29:43.910390: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(device)

cuda


In [3]:
# Some constants (you can change any of these if useful)
SAMPLE_RATE = 16000
N_MELS = 64
N_CLASSES = 10  # This should be inferred from your TAGS list or dataset
AUDIO_DURATION = 10 # seconds
BATCH_SIZE = 4  # From this piazza post: https://piazza.com/class/m8rskujtdvsgy/post/340
EPOCHS = 8  # 10 From Discord
LEARNING_RATE = 1e-5  # From Discord

In [4]:
dataroot3 = "student_files/task3_audio_classification/"

In [5]:
TAGS = ['rock', 'oldies', 'jazz', 'pop', 'dance',  'blues',  'punk', 'chill', 'electronic', 'country']

In [6]:
def accuracy3(groundtruth, predictions):
    preds, targets = [], []
    for k in groundtruth:
        if not (k in predictions):
            print("Missing " + str(k) + " from predictions")
            return 0
        prediction = [1 if tag in predictions[k] else 0 for tag in TAGS]
        target = [1 if tag in groundtruth[k] else 0 for tag in TAGS]
        preds.append(prediction)
        targets.append(target)

    mAP = average_precision_score(targets, preds, average='macro')
    return mAP

In [7]:
def extract_waveform(path, augment=False):
    w, sr = librosa.load(dataroot3 + '/' + path, sr=SAMPLE_RATE)

    waveform = np.array([w])
    
    if sr != SAMPLE_RATE:
        resample = torchaudio.transforms.Resample(orig_freq=sr, new_freq=SAMPLE_RATE)
        waveform = resample(waveform)
    # Pad so that everything is the right length
    target_len = SAMPLE_RATE * AUDIO_DURATION
    waveform = torch.from_numpy(waveform)
    if waveform.shape[1] < target_len:
        pad_len = target_len - waveform.shape[1]
        waveform = F.pad(waveform, (0, pad_len))
    else:
        waveform = waveform[:, :target_len]
        
    waveform = torch.FloatTensor(waveform)

    if not augment:
        return waveform

    # Data augmentation
    rng = random.randint(0, 2)
    if rng == 0:
        pitch_shift_steps = random.choice([i for i in range(-11, 12) if i != 0])  ## exclude 0 (no pitch change) 
        augmented_waveform = librosa.effects.pitch_shift(w, sr=sr, n_steps=pitch_shift_steps)
    elif rng == 1:
        speed_factor = random.uniform(0.5, 1.5)
        augmented_waveform = librosa.effects.time_stretch(y=w, rate=speed_factor)
    elif rng == 2:
        volume_factor = random.uniform(0.5, 2.0)
        augmented_waveform = w * volume_factor
        # Clip values to prevent distortion/clipping if volume becomes too high
        # Audio typically ranges from -1.0 to 1.0 (float32)
        augmented_waveform = np.clip(augmented_waveform, -1.0, 1.0)

    # Pad so that everything is the right length
    augmented_waveform = np.array([w])
    augmented_waveform = torch.from_numpy(augmented_waveform)
    if augmented_waveform.shape[1] < target_len:
        pad_len = target_len - augmented_waveform.shape[1]
        augmented_waveform = F.pad(augmented_waveform, (0, pad_len))
    else:
        augmented_waveform = augmented_waveform[:, :target_len]
        
    augmented_waveform = torch.FloatTensor(augmented_waveform)

    return (waveform, augmented_waveform)

In [8]:
# def extract_waveform_pitch_shift(path):
#     waveform, sr = librosa.load(dataroot3 + '/' + path, sr=SAMPLE_RATE)

#     pitch_shift_steps = random.choice([i for i in range(-11, 12) if i != 0])  ## exclude 0 (no pitch change) 
#     waveform = librosa.effects.pitch_shift(waveform, sr=sr, n_steps=pitch_shift_steps)
    
#     waveform = np.array([waveform])
    
#     waveform = torch.FloatTensor(waveform)
    
#     return waveform

In [9]:
# def extract_waveform_speed_change(path):
#     waveform, sr = librosa.load(dataroot3 + '/' + path, sr=SAMPLE_RATE)
    
#     speed_factor = random.uniform(0.5, 1.5)
#     waveform = librosa.effects.time_stretch(y=waveform, rate=speed_factor)

#     waveform = np.array([waveform])
    
#     waveform = torch.FloatTensor(waveform)
    
#     return waveform

In [10]:
# def extract_waveform_volume_change(path):
#     waveform, sr = librosa.load(dataroot3 + '/' + path, sr=SAMPLE_RATE)

#     volume_factor = random.uniform(0.5, 2.0)
#     waveform = waveform * volume_factor
#     # Clip values to prevent distortion/clipping if volume becomes too high
#     # Audio typically ranges from -1.0 to 1.0 (float32)
#     augmented_waveform = np.clip(waveform, -1.0, 1.0)

#     waveform = np.array([waveform])
    
#     waveform = torch.FloatTensor(waveform)
    
#     return waveform

In [11]:
# config = ASTConfig(num_labels=N_CLASSES)

In [12]:
feature_extractor = AutoFeatureExtractor.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593")
pretrained_model = ASTForAudioClassification.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593")

In [13]:
# note: the code doesn't work if preload is false

class AudioDataset(Dataset):
    def __init__(self, meta, feature_extractor, augment, preload=True):
        self.meta = meta
        print(len(meta))
        ks = list(meta.keys())
        self.idToPath = dict(zip(range(len(ks)), ks))
        self.pathToFeat = {}
        self.feature_extractor = feature_extractor
        self.preload = preload

        if self.preload:
            idx_augment = len(ks)
            for path in ks:
                # waveform = extract_waveform(path)
                # features = self.feature_extractor(waveform.squeeze().numpy(), sampling_rate=SAMPLE_RATE, return_tensors="pt")
                # self.pathToFeat[path] = features['input_values'].squeeze(0)

                rng = random.randint(0, 3)
                if augment and rng == 0:
                    print(idx_augment)
                    # A quarter of a chance of data being augmented
                    waveforms = extract_waveform(path, augment=True)
                    
                    waveform = waveforms[0]
                    features = self.feature_extractor(waveform.squeeze().numpy(), sampling_rate=SAMPLE_RATE, return_tensors="pt")
                    self.pathToFeat[path] = features['input_values'].squeeze(0)
                    
                    augmented_waveform = waveforms[1]
                    features = self.feature_extractor(augmented_waveform.squeeze().numpy(), sampling_rate=SAMPLE_RATE, return_tensors="pt")
                    self.pathToFeat[path + "_augment"] = features['input_values'].squeeze(0)
                    self.idToPath[idx_augment] = path + "_augment"
                    self.meta[path + "_augment"] = self.meta[path]
                    idx_augment += 1
                else:
                    waveform = extract_waveform(path)
                    features = self.feature_extractor(waveform.squeeze().numpy(), sampling_rate=SAMPLE_RATE, return_tensors="pt")
                    self.pathToFeat[path] = features['input_values'].squeeze(0)

            if augment:
                idx_augment = len(ks)
                for path in ks:
                    
                    idx_augment += 1

    def __len__(self):
        return len(self.meta)

    def __getitem__(self, idx):
        path = self.idToPath[idx]
        tags = self.meta[path]
        bin_label = torch.tensor([1 if tag in tags else 0 for tag in TAGS], dtype=torch.float32)

        if self.preload:
            features = self.pathToFeat[path]
        else:
            waveform = extract_waveform(path)
            features = self.feature_extractor(waveform.squeeze().numpy(), sampling_rate=SAMPLE_RATE, return_tensors="pt")
            features = features['input_values'].squeeze(0)

        return features, bin_label, path

In [14]:
class Loaders():
    def __init__(self, train_path, test_path, feature_extractor, split_ratio=0.9, seed=0):
        torch.manual_seed(seed)
        random.seed(seed)

        meta_train = eval(open(train_path, 'r').read())
        l_test = eval(open(test_path, 'r').read())
        meta_test = dict([(x, []) for x in l_test])

        all_train = AudioDataset(meta_train, feature_extractor, augment=True)
        test_set = AudioDataset(meta_test, feature_extractor, augment=False)

        total_len = len(all_train)
        train_len = int(total_len * split_ratio)
        valid_len = total_len - train_len
        train_set, valid_set = random_split(all_train, [train_len, valid_len])

        self.loaderTrain = DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)
        self.loaderValid = DataLoader(valid_set, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)
        self.loaderTest = DataLoader(test_set, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)

In [15]:
class ASTForMultiLabel(nn.Module):
    def __init__(self, base_model, num_labels):
        super().__init__()
        self.base_model = base_model  # e.g. ASTForAudioClassification.from_pretrained(...)
        # self.dropout = nn.Dropout(0.3)
        self.classifier = nn.Linear(527, num_labels)
        nn.init.xavier_uniform_(self.classifier.weight)

    def forward(self, input_values):
        outputs = self.base_model(input_values).logits  # shape: [B, 527]
        # outputs = self.dropout(outputs)
        outputs = self.classifier(outputs)              # shape: [B, num_labels]
        return torch.sigmoid(outputs)                   # For multi-label classification

In [16]:
class Pipeline():
    def __init__(self, model, learning_rate, seed=0):
        torch.manual_seed(seed)
        random.seed(seed)

        self.device = device
        self.model = model.to(self.device)
        self.optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
        self.criterion = nn.BCELoss()

    def evaluate(self, loader, threshold=0.5, outpath=None):
        print("evaluating")
        self.model.eval()
        preds, targets, paths = [], [], []
        with torch.no_grad():
            for x, y, ps in loader:
                x = x.to(self.device)
                y = y.to(self.device)
                outputs = self.model(x)
                preds.append(outputs.cpu())
                targets.append(y.cpu())
                paths += list(ps)

        preds = torch.cat(preds)
        targets = torch.cat(targets)
        preds_bin = (preds > threshold).float()

        predictions = {}
        for i in range(preds_bin.shape[0]):
            predictions[paths[i]] = [TAGS[j] for j in range(len(preds_bin[i])) if preds_bin[i][j]]

        mAP = None
        if outpath:
            with open(outpath, "w") as z:
                z.write(str(predictions) + '\n')
        else:
            mAP = average_precision_score(targets, preds, average='macro')
        return predictions, mAP

    def train(self, train_loader, val_loader, num_epochs):
        print("training")
        for epoch in range(num_epochs):
            self.model.train()
            running_loss = 0.0
            for x, y, path in tqdm(train_loader, desc=f"Epoch {epoch+1}"):
                x = x.to(self.device)
                y = y.to(self.device)
                self.optimizer.zero_grad()
                outputs = self.model(x)
                loss = self.criterion(outputs, y)
                loss.backward()
                self.optimizer.step()
                running_loss += loss.item()
            val_predictions, mAP = self.evaluate(val_loader)
            print(f"[Epoch {epoch+1}] Loss: {running_loss/len(train_loader):.4f} | Val mAP: {mAP:.4f}")

In [17]:
def accuracy3(all_train_meta, train_preds):
    correct_predictions = 0
    total_predictions = 0
    for audio_file, true_tags in all_train_meta.items():
        if audio_file in train_preds:
            predicted_tags = set(train_preds[audio_file])
            true_tags_set = set(true_tags)
            if predicted_tags == true_tags_set:
                correct_predictions += 1
            total_predictions += 1
    if total_predictions > 0:
        return correct_predictions / total_predictions
    else:
        return 0.0

In [18]:
# def run3():
#     print("here 1")
#     loaders = Loaders(dataroot3 + "/train.json", dataroot3 + "/test.json", feature_extractor)
#     print("here 2")
#     model = ASTForMultiLabel(pretrained_model, N_CLASSES)
#     print("here 3")
#     pipeline = Pipeline(model, LEARNING_RATE)
#     print("here 4")

#     pipeline.train(loaders.loaderTrain, loaders.loaderValid, EPOCHS)
#     train_preds, train_mAP = pipeline.evaluate(loaders.loaderTrain, 0.5)
#     valid_preds, valid_mAP = pipeline.evaluate(loaders.loaderValid, 0.5)
#     test_preds, _ = pipeline.evaluate(loaders.loaderTest, 0.5, "predictions3.json")

#     all_train = eval(open(dataroot3 + "/train.json").read())
#     for k in valid_preds:
#         if k in all_train: # Ensure the key exists before trying to pop
#             all_train.pop(k)
#     acc3 = accuracy3(all_train, train_preds)
#     print("Task 3 training accuracy (exact match) = " + str(acc3))
#     print("Task 3 training mAP = " + str(train_mAP))
#     print("Task 3 validation mAP = " + str(valid_mAP))

In [19]:
# !rm predictions3.json
# run3()

In [20]:
print("here 1")
loaders = Loaders(dataroot3 + "/train.json", dataroot3 + "/test.json", feature_extractor)
# print("here 2")
# model = ASTForMultiLabel(pretrained_model, N_CLASSES)
# print("here 3")
# pipeline = Pipeline(model, LEARNING_RATE)
# print("here 4")
# pipeline.train(loaders.loaderTrain, loaders.loaderValid, EPOCHS)

here 1
4000
4000
4001
4002
4003
4004
4005
4006
4007
4008
4009
4010
4011
4012
4013
4014
4015
4016
4017
4018
4019
4020
4021
4022
4023
4024
4025
4026
4027
4028
4029
4030
4031
4032
4033
4034
4035
4036
4037
4038
4039
4040
4041
4042
4043
4044
4045
4046
4047
4048
4049
4050
4051
4052
4053
4054
4055
4056
4057
4058
4059
4060
4061
4062
4063
4064
4065
4066
4067
4068
4069
4070
4071
4072
4073
4074
4075
4076
4077
4078
4079
4080
4081
4082
4083
4084
4085
4086
4087
4088
4089
4090
4091
4092
4093
4094
4095
4096
4097
4098
4099
4100
4101
4102
4103
4104
4105
4106
4107
4108
4109
4110
4111
4112
4113
4114
4115
4116
4117
4118
4119
4120
4121
4122
4123
4124
4125
4126
4127
4128
4129
4130
4131
4132
4133
4134
4135
4136
4137
4138
4139
4140
4141
4142
4143
4144
4145
4146
4147
4148
4149
4150
4151
4152
4153
4154
4155
4156
4157
4158
4159
4160
4161
4162
4163
4164
4165
4166
4167
4168
4169
4170
4171
4172
4173
4174
4175
4176
4177
4178
4179
4180
4181
4182
4183
4184
4185
4186
4187
4188
4189
4190
4191
4192
4193
4194
4195
4196
419

Epoch 1: 100%|██████████| 1126/1126 [07:28<00:00,  2.51it/s]


evaluating
[Epoch 1] Loss: 0.1933 | Val mAP: 0.6880


Epoch 2: 100%|██████████| 1126/1126 [07:30<00:00,  2.50it/s]


evaluating
[Epoch 2] Loss: 0.0938 | Val mAP: 0.7755


Epoch 3: 100%|██████████| 1126/1126 [07:30<00:00,  2.50it/s]


evaluating
[Epoch 3] Loss: 0.0384 | Val mAP: 0.8222


Epoch 4: 100%|██████████| 1126/1126 [07:30<00:00,  2.50it/s]


evaluating
[Epoch 4] Loss: 0.0198 | Val mAP: 0.8423


Epoch 5: 100%|██████████| 1126/1126 [07:30<00:00,  2.50it/s]


evaluating
[Epoch 5] Loss: 0.0112 | Val mAP: 0.8441


Epoch 6: 100%|██████████| 1126/1126 [07:30<00:00,  2.50it/s]


evaluating
[Epoch 6] Loss: 0.0121 | Val mAP: 0.8491


Epoch 7: 100%|██████████| 1126/1126 [07:28<00:00,  2.51it/s]


evaluating
[Epoch 7] Loss: 0.0095 | Val mAP: 0.8506


Epoch 8: 100%|██████████| 1126/1126 [07:30<00:00,  2.50it/s]


evaluating
[Epoch 8] Loss: 0.0052 | Val mAP: 0.8539


In [21]:
torch.save(model, "trained_ast_model.pth")
print(f"Trained model saved to: trained_ast_model.pth")

Trained model saved to: trained_ast_model.pth


In [None]:
# model = torch.load("ast_model_full.pth")

In [22]:
test_preds, _ = pipeline.evaluate(loaders.loaderTest, 0.5, "predictions3.json")

evaluating


In [23]:
train_preds, train_mAP = pipeline.evaluate(loaders.loaderTrain, 0.5)
valid_preds, valid_mAP = pipeline.evaluate(loaders.loaderValid, 0.5)

all_train = eval(open(dataroot3 + "/train.json").read())
for k in valid_preds:
    if k in all_train: # Ensure the key exists before trying to pop
        all_train.pop(k)
acc3 = accuracy3(all_train, train_preds)
print("Task 3 training accuracy (exact match) = " + str(acc3))
print("Task 3 training mAP = " + str(train_mAP))
print("Task 3 validation mAP = " + str(valid_mAP))

evaluating
evaluating
Task 3 training accuracy (exact match) = 0.9744869661674986
Task 3 training mAP = 0.9996161692415464
Task 3 validation mAP = 0.8538579134996767
