In [27]:
import numpy as np
import pandas as pd
import os 
import matplotlib.pyplot as plt
import sklearn
import itertools
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import f1_score, accuracy_score

In [10]:
# adjust your path
DATA_SET_PATH = "C:/Users/simon/Downloads/MLPC2025_classification"

metadata = pd.read_csv(os.path.join(DATA_SET_PATH, 'metadata.csv'))
files = metadata["filename"]

features_dir = 'audio_features'
labels_dir = 'labels'
categories = ['Airplane', 'Alarm', 'Beep/Bleep', 'Bell', 'Bicycle', 'Bird Chirp', 'Bus', 'Car', 'Cat Meow', 'Chainsaw', 'Clapping', 'Cough', 'Cow Moo', 'Cowbell', 'Crying', 'Dog Bark', 'Doorbell', 'Drip', 'Drums', 'Fire', 'Footsteps', 'Guitar', 'Hammer', 'Helicopter', 'Hiccup', 'Horn Honk', 'Horse Neigh', 'Insect Buzz', 'Jackhammer', 'Laughter', 'Lawn Mower', 'Motorcycle', 'Piano', 'Pig Oink', 'Power Drill', 'Power Saw', 'Rain', 'Rooster Crow', 'Saxophone', 'Sewing Machine', 'Sheep/Goat Bleat', 'Ship/Boat', 'Shout', 'Singing', 'Siren', 'Sneeze', 'Snoring', 'Speech', 'Stream/River', 'Thunder', 'Train', 'Truck', 'Trumpet', 'Vacuum Cleaner', 'Violin', 'Washing Machine', 'Waves', 'Wind']
print(f"Our dataset has {len(files)} files")
print(f"We have {len(categories)} labels")

Our dataset has 8230 files
We have 58 labels


In [3]:
# functions similar to the tutorial
# might adjust when annotators do not agree but there is not really a good way how to do this
def aggregate_labels(file_labels):
    __y = []
    for frame_labels in file_labels:
        if(sum(frame_labels) == 0):
            __y.append([0])
        elif(np.count_nonzero(frame_labels) == len(frame_labels)):
             __y.append([1])
        else: #The annotators don't agree on the label
            __y.append([np.random.choice(frame_labels)])
    return __y


# this uses the whole embeddings (768 features) as X; might need to adjust this
def read_files(file_names, num_to_read=1000):
    X_train = []
    Y_train = {}
    for c in categories:
        Y_train[c] = []
    for f in file_names[:num_to_read]: #we are not loading the entire dataset due to processing time
        if not os.path.exists(os.path.join(DATA_SET_PATH, features_dir , f.split('.')[0] + '.npz')):
            continue
        features = np.load(os.path.join(DATA_SET_PATH, features_dir , f.split('.')[0] + '.npz'))["embeddings"]
        X_train.append(features)
        y = np.load(os.path.join(DATA_SET_PATH, labels_dir , f.split('.')[0] + '_labels.npz'))
        for c in categories:
            _y = aggregate_labels(y[c])
            Y_train[c].extend(list(itertools.chain.from_iterable(_y)))
    X_train = np.concatenate(X_train)
    
    return X_train, convert_y_dict_to_array(Y_train, categories)

# Convert dictionary to numpy array (for all splits)
def convert_y_dict_to_array(y_dict, categories):
    y_array = np.zeros((len(y_dict[categories[0]]), len(categories)), dtype=int)
    for i, c in enumerate(categories):
        y_array[:, i] = y_dict[c]
    return y_array

In [4]:
# if we perform splits on the frame level consecutive (=highly correlated) frames would be distributed accross all splits
# Therefore,
# we perform splits on the file level to avoid data leakage

nf = len(files)
# I tried some random states and their class distributions; 0 produces very similar distributions for all splits
sampled_files = files.sample(nf, random_state=0)

# train set 70%, val set 20%, test set 10% --> could also val, test = 15 %
train_files = sampled_files[:int(nf*0.7)]
val_files = sampled_files[int(nf*0.7):int(nf*0.9)]
test_files = sampled_files[int(nf*0.9):]

In [5]:
# num_to_read is here the whole dataset, for testing something you can adjust
# this takes a couple of minutes
X_train, y_train = read_files(train_files, num_to_read=len(train_files))
X_val, y_val = read_files(val_files, num_to_read=len(val_files))
X_test, y_test = read_files(test_files, num_to_read=len(test_files))

In [6]:
# Verify shapes
print("X_train:", X_train.shape)  # (n_frames, 768) 768 features, currently the embeddings 
print("y_train:", y_train.shape)  # (n_frames, 58) 58 classes

X_train: (1076443, 768)
y_train: (1076443, 58)


In [7]:
# perform preprocessing on X
# we only fit on train data and then transform all splits accodringly
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_val_scaled = scaler.transform(X_val)
X_test_scaled = scaler.transform(X_test)

In [33]:
# baseline from tutorial
class Baseline_classifier():

    def __init__(self):
        self.majority_class = None

    def fit(self, x_train, y_train):
        '''x_train is a numpy array of features with shape NxD, where N is the number of datapoints and D the feature dimension
        y_train is a list of binary labels in the shape Nx1
        '''
        self.majority_class =  1 if sum(y_train) > len(y_train) / 2 else 0

    def predict(self, x):
        '''x is a numpy array of features with shape NxD, where N is the number of datapoints and D the feature dimension
        The function should return the predicted binary labels as a numpy array of shape Nx1
        '''
        predictions = np.zeros(x.shape[0]) + self.majority_class
        return predictions

# can predict all labels at once
class MultiLabelBaseline():
    def __init__(self):
        self.majority_classes = None 
    
    def fit(self, X_train, y_train):
        '''y_train shape: (n_samples, 58)'''
        self.majority_classes = (np.mean(y_train, axis=0) > 0.5).astype(int)
    
    def predict(self, X):
        '''Returns predictions of shape (n_samples, 58)'''
        return np.tile(self.majority_classes, (X.shape[0], 1))

In [12]:
# I propose using F1-score since we have unbalanced class distributions
# Assumes y_true and y_pred are binary arrays of shape (n_samples, n_classes); should be the case
def get_f1_score(y_true, y_pred, weighted=False):
    if weighted:
        f1 = f1_score(y_true, y_pred, average='weighted') # Weighted F1: weights each class by its frequency
    else:
        f1= f1_score(y_true, y_pred, average='macro') # counts each class equally important
    return f1

In [30]:
# example of eval for a single class
wind_x_train, wind_y_train = X_train_scaled[:100000], y_train[:100000, categories.index("Wind")] # use only 100000 frames
wind_x_val, wind_y_val = X_val_scaled[:10000], y_val[:10000, categories.index("Wind")] # use only 10000 frames

baseline = Baseline_classifier()
baseline.fit(wind_x_train, wind_y_train)

y_train_pred_wind = baseline.predict(wind_x_train)
y_val_pred_wind = baseline.predict(wind_y_val)

f1_train = get_f1_score(wind_y_train, y_train_pred_wind)
f1_val = get_f1_score(wind_y_val, y_val_pred_wind)

acc_train = accuracy_score(wind_y_train, y_train_pred_wind)# accuracy here just for 
acc_val = accuracy_score(wind_y_val, y_val_pred_wind)

print(f"Train f1-score {f1_train}")
print(f"Val f1-score {f1_test}")
print(f"Accuracy train {acc_train}")
print(f"Accuracy val {acc_val}")

Train f1-score 0.483447061072054
Val f1-score 0.48218724109362054
Accuracy train 0.93591
Accuracy val 0.9577


In [25]:
print(np.unique(y_train_pred_wind)) # predicts only 0
print(np.unique(y_test_pred_wind)) # as a matter of fact the baseline will always predict 0 since there is no class that occurres in >50% of frames, so one could also set the baseline to just 0

[0.]
[0.]


In [37]:
# example of eval for multiple classes
x_train, y_train = X_train_scaled[:100000], y_train[:100000]
x_val, y_val = X_val_scaled[:10000], y_val[:10000]

multi_baseline = MultiLabelBaseline()
multi_baseline.fit(x_train, y_train)

y_val_pred = multi_baseline.predict(y_val)

f1_scores = []
for label in range(len(categories)):
    f1_scores.append(get_f1_score(y_val[:, label], y_val_pred[:, label]))

f1_score_mean = np.mean(f1_scores)

print(f"Mean f1-score: {f1_score_mean}")
print(np.unique(y_val_pred)) # all zeros

Mean f1-score: 0.6921392642364108
[0]
