In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
df= pd.read_csv("/kaggle/input/audio-files/Dataset/train_labels.csv")
df.head()

## Feature Extraction

In [None]:
import librosa
# Extracting information from audios using librosa

time_series = []
sample_rate = []
for idx in range(1500):
    audio_path = "/kaggle/input/audio-files/Dataset/train_folder/" +str(idx+1) + ".wav"
    y, sr = librosa.load(audio_path, sr=None, mono=False)
    time_series.append(y)
    sample_rate.append(sr)

df["time_series"] = time_series
df["sample_rate"] = sample_rate
df.head()

In [None]:
# Extraction of MFCC features
n_mfcc = 13

MFCC= []
DELTA_MFCC= []
DELTA2_MFCC= []
MFCC_MEAN= []
MFCC_VAR= []
DELTA_MFCC_MEAN= []
DELTA_MFCC_VAR= []
DELTA2_MFCC_MEAN= []
DELTA2_MFCC_VAR= []

for idx in range(1500):
    y= df.iloc[idx]['time_series']
    sr= df.iloc[idx]['sample_rate']
    mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=n_mfcc)
    MFCC.append(mfcc)
    
    delta_mfcc = librosa.feature.delta(mfcc)
    DELTA_MFCC.append(delta_mfcc)
    
    delta2_mfcc = librosa.feature.delta(mfcc, order=2)
    DELTA2_MFCC.append(delta2_mfcc)
    
    mfcc_mean = np.mean(mfcc, axis=1)
    MFCC_MEAN.append(mfcc_mean)
    
    mfcc_var = np.var(mfcc, axis=1)
    MFCC_VAR.append(mfcc_var)
    
    delta_mfcc_mean = np.mean(delta_mfcc, axis=1)
    DELTA_MFCC_MEAN.append(delta_mfcc_mean)
    
    delta_mfcc_var = np.var(delta_mfcc, axis=1)
    DELTA_MFCC_VAR.append(delta_mfcc_var)
    
    delta2_mfcc_mean = np.mean(delta2_mfcc, axis=1)
    DELTA2_MFCC_MEAN.append(delta2_mfcc_mean)
    
    delta2_mfcc_var = np.var(delta2_mfcc, axis=1)
    DELTA2_MFCC_VAR.append(delta2_mfcc_var)

df["mfcc"] = MFCC
df["delta_mfcc"] = DELTA_MFCC
df["delta2_mfcc"] = DELTA2_MFCC
df["mfcc_mean"] = MFCC_MEAN
df["mfcc_var"] = MFCC_VAR
df["delta_mfcc_mean"] = DELTA_MFCC_MEAN
df["delta_mfcc_var"] = DELTA_MFCC_VAR
df["delta2_mfcc_mean"] = DELTA2_MFCC_MEAN
df["delta2_mfcc_var"] = DELTA2_MFCC_VAR
df.head()


In [None]:
# Extraction of 2 time domain features

rms_energy = []
ZCR = []
for idx in range(1500):
    y= df.iloc[idx]['time_series']
    sr= df.iloc[idx]['sample_rate']
    rms = librosa.feature.rms(y=y)
    rms_energy.append(rms)
    zcr = librosa.feature.zero_crossing_rate(y)
    ZCR.append(zcr)
    
df["rms_energy"] = rms_energy
df["zcr"] = ZCR
df.head()

In [None]:
# Extraction of 2 spectral features

spectral_centroid = []
spectral_bandwidth = []
for idx in range(1500):
    y= df.iloc[idx]['time_series']
    sr= df.iloc[idx]['sample_rate']
    s_centroid = librosa.feature.spectral_centroid(y=y, sr=sr)
    s_bandwidth = librosa.feature.spectral_bandwidth(y=y, sr=sr)
    spectral_centroid.append(s_centroid)
    spectral_bandwidth.append(s_bandwidth)
    
df["spectral_centroid"] = spectral_centroid
df["spectral_bandwidth"] = spectral_bandwidth
df.head()

In [None]:
# Extraction of 2 rhythmic features

onset_rate = []
pulse_clarity= []
for idx in range(1500):
    y= df.iloc[idx]['time_series']
    sr= df.iloc[idx]['sample_rate']
    onset_r = librosa.onset.onset_strength(y=y, sr=sr)
    onset_rate.append(onset_r)
    p_clarity= librosa.beat.plp(y=y, sr=sr)
    pulse_clarity.append(p_clarity)
    
df["pulse_clarity"] = pulse_clarity
df["onset_rate"] = onset_rate
df.head()

In [None]:
df= df.set_index('filename')
df.head()

In [None]:
features= list(df.columns)
features.remove('category')
features.remove('time_series')
features.remove('sample_rate')
features

In [None]:
# Normalisation of each of the features by computing mean and variance

import numpy as np

for feat in features:
    mean_col = np.stack(df[feat])
    mu = np.mean(mean_col, axis=0)
    std = np.std(mean_col, axis=0)
    
    mean_col = (mean_col - mu) / std
    mean_col= np.nan_to_num(mean_col, nan= 0)
    df[feat+"_normalised"] = mean_col.tolist()
    

df.head()


## Exploratory Data Analysis

In [None]:
# Make plots for other features that have been computed (Waveform, ZCR and spectrogram)

import librosa.display
import matplotlib.pyplot as plt

i= int(input("Which datapoint's plots are required? "))
y= df.iloc[i]['time_series']
zcr= df.iloc[i]['zcr']
sr= df.iloc[i]['sample_rate']

time = np.linspace(0, len(y) / sr, num=len(y))

frames = range(len(zcr[0]))
time_zcr = librosa.frames_to_time(frames, sr=sr, hop_length=512)

plt.figure(figsize=(12, 6))

# Plot waveform
plt.subplot(2, 1, 1)
plt.plot(time, y, label="Waveform", color="blue", alpha=0.7)
plt.xlabel("Time (s)")
plt.ylabel("Amplitude")
plt.title("Waveform of the Audio Signal")
plt.legend()
plt.tight_layout()

# Plot Zero Crossing Rate
plt.subplot(2, 1, 2)
plt.plot(time_zcr, zcr[0], label="Zero Crossing Rate", color="red")
plt.xlabel("Time (s)")
plt.ylabel("ZCR")
plt.title("Zero Crossing Rate Over Time")
plt.legend()
plt.tight_layout()


D = librosa.amplitude_to_db(abs(librosa.stft(y)), ref=np.max)

# Plot the spectrogram
plt.figure(figsize=(10, 3))
librosa.display.specshow(D, sr=sr, x_axis='time', y_axis='log')  
plt.colorbar(format='%+2.0f dB')
plt.title('Spectrogram')
plt.tight_layout()
plt.show()


In [None]:
# Separating multidimensional features into individual columns, followed by normalisation
import numpy as np
import pandas as pd

# Creating separate lists for 1D and 2D arrays, to address them accordingly
features_1d = ['mfcc_mean_normalised', 'mfcc_var_normalised',
       'delta_mfcc_mean_normalised', 'delta_mfcc_var_normalised',
       'delta2_mfcc_mean_normalised', 'delta2_mfcc_var_normalised',
        'pulse_clarity_normalised', 'onset_rate_normalised']
features_2d = ['mfcc_normalised', 'delta_mfcc_normalised',
       'delta2_mfcc_normalised', 'zcr_normalised',
        'rms_energy_normalised','spectral_centroid_normalised', 'spectral_bandwidth_normalised']

df_exp = {}

for feat in features_1d:
    arr = np.vstack(df[feat].values)
    for i in range(arr.shape[1]):
        df_exp[f'{feat}_{i}'] = arr[:, i] 

for feat in features_2d:
    arr = np.array(df[feat].tolist())  
    reshaped_arr = arr.reshape(arr.shape[0], -1)  
    col_names = [f'{feat}_{i}_{j}' for i in range(arr.shape[1]) for j in range(arr.shape[2])]
    
    for j, col in enumerate(col_names):
        df_exp[col] = reshaped_arr[:, j]  


df_exp = pd.DataFrame(df_exp)

df_exp.head()

In [None]:
# Generating numerical equivalents for the class labels

from sklearn.preprocessing import OrdinalEncoder
encoder = OrdinalEncoder()

df['category'] = encoder.fit_transform(df[['category']])
df.head()

In [None]:
# Calculating correlations for individual feature columns (normalised) against class labels

from tqdm import tqdm
from scipy.stats import pearsonr

correlations= []
label_cols = df_exp.columns 
for label_col in tqdm(label_cols, desc="Plotting for Label Columns"):
    corr, p_value = pearsonr(df_exp[label_col], df['category'])
    correlations.append(corr)

In [None]:
# Plotting individual columns from the same feature on the same graph

for feat in features_1d:
    l= len(feat)
    y_vals= []
    x_vals= []
    for i in range(len(label_cols)):
        col= label_cols[i]
        if col[0:l]== feat:
            y_vals.append(correlations[i])
    plt.figure(figsize=(8, 5))
    x_vals= list(range(1, len(y_vals)+1))
    plt.barh(x_vals, y_vals, color='skyblue')
    plt.xlabel('Correlation')
    plt.title('Correlation with ' + feat)
    plt.axvline(0, color='gray', linestyle='--')
    plt.tight_layout()
    plt.show()

In [None]:
for feat in features_2d:
    l= len(feat)
    y_vals= []
    x_vals= []
    for i in range(len(label_cols)):
        col= label_cols[i]
        if col[0:l]== feat:
            y_vals.append(correlations[i])
    plt.figure(figsize=(8, 5))
    x_vals= list(range(1, len(y_vals)+1))
    plt.barh(x_vals, y_vals, color='skyblue')
    plt.xlabel('Correlation')
    plt.title('Correlation with '+ feat)
    plt.axvline(0, color='gray', linestyle='--')
    plt.tight_layout()
    plt.show()

In [None]:
df_exp['category']= np.array(df['category'])
df_exp.head()

In [None]:
# Storing the processed feature CSV, to be used later for PCA
df_exp.to_csv('pre_pca_features.csv', index= False)

In [None]:
# Plotting feature distributions vs encoded class labels

print(df_exp.columns)
feature = input("What feature distribution do you want to plot?")
import matplotlib.pyplot as plt
import seaborn as sns

sns.set_theme(style="whitegrid")

plt.figure(figsize=(10, 6))

sns.scatterplot(x=feature, y='category', data=df_exp, color='royalblue')

plt.title(feature, fontsize=16, fontweight='bold')
plt.xlabel(feature + " values", fontsize=12)
plt.ylabel("Category", fontsize=12)

plt.xticks(fontsize=10)
plt.yticks(fontsize=10)
plt.grid(True, linestyle='--', alpha=0.6)

sns.despine()

plt.tight_layout()
plt.show()

In [None]:
# Loading features from the saved CSV file
df_exp_n= pd.read_csv("/kaggle/input/pre-pca-features/pre_pca_features.csv")

## Dimensionality Reduction & Train-Test-Val Split

In [None]:
df_exp_n= df_exp_n.drop(['Unnamed: 0'], axis=1)

In [None]:
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
import pandas as pd

# Perform PCA, while retaining 95% of the total variance
pca = PCA(n_components=0.95)
df_pca = pca.fit_transform(df_exp_n.drop(['category'], axis=1))

df_pca = pd.DataFrame(df_pca, columns=[f'PC{i+1}' for i in range(df_pca.shape[1])])

explained_variance = pca.explained_variance_ratio_

df_pca.head()


In [None]:
df_pca_50 = df_pca.iloc[:, :50]  # Select first 50 columns
df_pca_50.head()


In [None]:
# Plot of explained variance vs the no. of components taken

import matplotlib.pyplot as plt
import numpy as np

plt.figure(figsize=(8,5))
plt.plot(np.cumsum(pca.explained_variance_ratio_), marker='o', linestyle='--')
plt.xlabel('Number of Principal Components')
plt.ylabel('Cumulative Explained Variance')
plt.title('Explained Variance vs. Number of Components')
plt.grid()
plt.show()


In [None]:
# Generating train, test and validation splits

from sklearn.model_selection import train_test_split

train_df, temp_df = train_test_split(df_pca_50, test_size=0.4, shuffle=False)
val_df, test_df = train_test_split(temp_df, test_size=0.5, shuffle = False)
 
train_label, temp_label = train_test_split(df['category'], test_size=0.4, shuffle=False)
val_label, test_label = train_test_split(temp_label, test_size=0.5, shuffle = False)
df.head()

## Self-implemented K-Means clustering

In [None]:
# Our implementation of K-Means clustering

import numpy as np
import pandas as pd

class DIY_KMeans:
    def __init__(self, k, max_iters=100, tol=1e-4):
        self.k = k
        self.max_iters = max_iters
        self.tol = tol
        self.centroids = None

    # Function to obtain cluster centres
    def fit(self, X: pd.DataFrame):
        n_samples = X.shape[0]
        random_indices = np.random.choice(n_samples, self.k, replace=False)
        self.centroids = X.iloc[random_indices].copy().reset_index(drop=True)
        
        for _ in range(self.max_iters):
            labels = self._assign_clusters(X)
            
            new_centroids = X.groupby(labels).mean()
            
            if new_centroids.shape[0] < self.k:
                break
            
            if np.linalg.norm(self.centroids.values - new_centroids.values) < self.tol:
                break
            
            self.centroids = new_centroids.reset_index(drop=True)

    # Function to assign clusters to all data points
    def _assign_clusters(self, X: pd.DataFrame):
        distances = np.linalg.norm(X.values[:, np.newaxis] - self.centroids.values, axis=2)
        return np.argmin(distances, axis=1)

    # Function to generate predictions on the test set
    def predict(self, X: pd.DataFrame):
        return self._assign_clusters(X)


In [None]:
from sklearn.metrics import adjusted_rand_score

kmeans = DIY_KMeans(k=50)
kmeans.fit(train_df)

train_pred= kmeans.predict(train_df) 
val_pred = kmeans.predict(val_df)
test_pred = kmeans.predict(test_df)

ari_train_score = adjusted_rand_score(train_label,train_pred)
ari_val_score = adjusted_rand_score(val_label,val_pred)
ari_test_score = adjusted_rand_score(test_label,test_pred)

print(f"Train ARI Score: {ari_train_score}")
print(f"Validation ARI Score: {ari_val_score}")
print(f"Test ARI Score: {ari_test_score}")

## K-Means and DBSCAN using inbuilt libraries

In [None]:
# K-Means using inbuilt libraries
from sklearn.cluster import KMeans

kmeans2 = KMeans(n_clusters=50, random_state=42, n_init=10)
kmeans2.fit(train_df)

train_pred2= kmeans2.predict(train_df) 
val_pred2 = kmeans2.predict(val_df)
test_pred2 = kmeans2.predict(test_df)

ari_train_score2 = adjusted_rand_score(train_label,train_pred2)
ari_val_score2 = adjusted_rand_score(val_label,val_pred2)
ari_test_score2 = adjusted_rand_score(test_label,test_pred2)

print(f"Train ARI Score: {ari_train_score2}")
print(f"Validation ARI Score: {ari_val_score2}")
print(f"Test ARI Score: {ari_test_score2}")

In [None]:
from sklearn.cluster import DBSCAN
from sklearn.metrics import adjusted_rand_score
from tqdm import tqdm

# Hyperparameter tuning for DBSCAN (choosing eps and min_samples)

eps_values = np.arange(1, 100, 1)
min_samples_values = range(3, 250)

best_score = -1
best_params = {}

true_labels = val_label

for eps in tqdm(eps_values, desc="Tuning DBSCAN"):
    for min_samples in min_samples_values:
        dbscan = DBSCAN(eps=eps, min_samples=min_samples)
        labels = dbscan.fit_predict(val_df)

        if len(set(labels)) > 1 and len(set(labels)) < len(val_df):
            score = adjusted_rand_score(true_labels, labels)
            if score > best_score:
                best_score = score
                best_params = {"eps": eps, "min_samples": min_samples}

print(f"Best Params: {best_params}, Best ARI Score: {best_score:.4f}")

In [None]:
# DBSCAN using inbuilt libraries
# Best Params: {'eps': 49, 'min_samples': 3}, Best ARI Score: 0.0291
from sklearn.cluster import DBSCAN
from sklearn.metrics import adjusted_rand_score

dbscan = DBSCAN(eps=49, min_samples=3)
train_pred3 = dbscan.fit_predict(train_df)
val_pred3 = dbscan.fit_predict(val_df)
test_pred3 = dbscan.fit_predict(test_df)

ari_train_score3 = adjusted_rand_score(train_label,train_pred3)
ari_val_score3 = adjusted_rand_score(val_label,val_pred3)
ari_test_score3 = adjusted_rand_score(test_label,test_pred3)

print(f"Train ARI Score: {ari_train_score3}")
print(f"Validation ARI Score: {ari_val_score3}")
print(f"Test ARI Score: {ari_test_score3}")