In [20]:
import numpy as np
import scipy.io
from scipy import sparse
from sklearn.preprocessing import OneHotEncoder
from sklearn.decomposition import PCA
from sklearn.linear_model import Ridge
from sklearn.metrics import accuracy_score, f1_score
import matplotlib.pyplot as plt
import pandas as pd

# **Dataset Preparation**

In [21]:
import os
import pandas as pd

# Define the path to your dataset folder
dataset_path = "/content/drive/MyDrive/Research/esn/Audio dataset"

# Prepare an empty list to store metadata
metadata = []

# Walk through each subdirectory in the dataset folder
for root, dirs, files in os.walk(dataset_path):
    for file in files:
        if file.endswith(".wav"):  # Adjust the extension if necessary
            # Get the full file path
            file_path = os.path.join(root, file)

            # Extract class label (assumes subfolder name is the class label)
            class_label = os.path.basename(root)
            encoded_class_label = -99999
            if class_label == "rope":
                encoded_class_label = 0
            elif class_label == "chair":
                encoded_class_label = 1
            elif class_label == "clothes":
                encoded_class_label = 2
            elif class_label == "batteries":
                encoded_class_label = 3
            elif class_label == "paper":
                encoded_class_label = 4

            # Append metadata information
            metadata.append({
                "filename": file,
                "filepath": file_path,
                "class_label": class_label,
                "encoded_class_label": encoded_class_label
            })

# Convert metadata to a pandas DataFrame
metadata_df = pd.DataFrame(metadata)

# Save the metadata as a CSV file
metadata_csv_path = "audio_metadata_full.csv"
metadata_df.to_csv(metadata_csv_path, index=False)

print(f"Metadata CSV created at: {metadata_csv_path}")


Metadata CSV created at: audio_metadata_full.csv


In [22]:
import os
import librosa
import numpy as np
import pandas as pd
import random
import torch
from torch.utils.data import Dataset

class AudioPreprocessingPipeline(Dataset):
    def __init__(self, metadata_file, audio_dir, sample_rate=16000, fixed_length=2.0, n_mels=64, augmentation=False):
        """
        Initialize the audio preprocessing pipeline.

        Args:
            metadata_file (str): Path to the metadata CSV file.
            audio_dir (str): Path to the directory containing audio files.
            sample_rate (int): Target sample rate for audio.
            fixed_length (float): Target duration (in seconds) for each audio sample.
            n_mels (int): Number of Mel bands for the Mel spectrogram.
            augmentation (bool): Whether to apply audio augmentation.
        """
        self.metadata = pd.read_csv(metadata_file)
        #drop class_label column
        self.metadata = self.metadata.drop(columns=['class_label'])
        self.audio_dir = audio_dir
        # print("Audio directory: ", audio_dir)
        self.sample_rate = sample_rate
        self.fixed_length = fixed_length
        self.n_mels = n_mels
        self.augmentation = augmentation
        self.fixed_length_samples = int(fixed_length * sample_rate)

    def __len__(self):
        return len(self.metadata)

    def __getitem__(self, idx):
        # Get metadata for the current sample
        row = self.metadata.iloc[idx]
        filename = row['filename']
        # print("Filename: ", filename)
        class_label = row['encoded_class_label']
        # print("Class label: ", class_label)

        # Load audio file
        file_path = row['filepath']
        # print("File path: ", file_path)
        audio, sr = librosa.load(file_path, sr=self.sample_rate, mono=True)

        # Trim or pad audio to fixed length
        audio = self._resize_audio(audio)

        # Apply time-shifting augmentation (if enabled)
        if self.augmentation:
            audio = self._time_shift(audio)

        # Convert to Mel spectrogram
        mel_spectrogram = self._compute_mel_spectrogram(audio)

        # Normalize Mel spectrogram
        mel_spectrogram = self._normalize(mel_spectrogram)

        # Return the spectrogram and label
        return torch.tensor(mel_spectrogram, dtype=torch.float32), torch.tensor(class_label, dtype=torch.long)

    def _resize_audio(self, audio):
        """Trim or pad audio to the fixed length."""
        if len(audio) > self.fixed_length_samples:
            audio = audio[:self.fixed_length_samples]
        else:
            padding = self.fixed_length_samples - len(audio)
            audio = np.pad(audio, (0, padding), mode='constant')
        return audio

    def _time_shift(self, audio):
        """Apply time-shifting augmentation."""
        shift = random.randint(-int(0.1 * self.sample_rate), int(0.1 * self.sample_rate))
        audio = np.roll(audio, shift)
        return audio

    def _compute_mel_spectrogram(self, audio):
        """Convert audio to Mel spectrogram."""
        mel_spectrogram = librosa.feature.melspectrogram(
            y=audio, sr=self.sample_rate, n_mels=self.n_mels, fmax=self.sample_rate // 2
        )
        return librosa.power_to_db(mel_spectrogram, ref=np.max)

    def _normalize(self, spectrogram):
        """Normalize spectrogram to range [0, 1]."""
        return (spectrogram - spectrogram.min()) / (spectrogram.max() - spectrogram.min())



In [23]:
# Example usage
if __name__ == "__main__":
    metadata_file = "/content/audio_metadata_full.csv"
    audio_dir = dataset_path

    # Create dataset instance
    dataset = AudioPreprocessingPipeline(
        metadata_file=metadata_file,
        audio_dir=audio_dir,
        sample_rate=16000,
        fixed_length=2.0,
        n_mels=64,
        augmentation=False
    )

    # Example: Access the first sample
    mel_spectrogram, label = dataset[15]
    print("Mel Spectrogram Shape: ", mel_spectrogram.shape)
    print("Mel Spectrogram:\n", mel_spectrogram)
    print("Label:", label)
    # Visualize the Mel spectrogram
    # visualize_mel_spectrogram(mel_spectrogram)

Mel Spectrogram Shape:  torch.Size([64, 63])
Mel Spectrogram:
 tensor([[0.6677, 0.7462, 0.7762,  ..., 0.0000, 0.0000, 0.0000],
        [0.5645, 0.6428, 0.6820,  ..., 0.0000, 0.0000, 0.0000],
        [0.5661, 0.6331, 0.6327,  ..., 0.0000, 0.0000, 0.0000],
        ...,
        [0.2933, 0.3560, 0.3575,  ..., 0.0000, 0.0000, 0.0000],
        [0.3047, 0.3509, 0.3533,  ..., 0.0000, 0.0000, 0.0000],
        [0.2482, 0.3119, 0.3188,  ..., 0.0000, 0.0000, 0.0000]])
Label: tensor(1)


In [None]:
from torch.utils.data import DataLoader

# Create the dataset
metadata_file = "/content/audio_metadata_full.csv"  # Replace with your metadata file
audio_dir = dataset_path      # Replace with your audio directory
dataset = AudioPreprocessingPipeline(metadata_file, audio_dir, augmentation=True)

# Create the DataLoader
batch_size = 5000
data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Example: Iterate over batches
for batch in data_loader:
    mel_spectrograms, labels = batch
    print("Batch Mel Spectrogram Shape:", mel_spectrograms.shape)
    # print("Batch Labels:", labels)
    break

# Create a train_data dictionary with mel_spectograms and labels
# train_data = {'mel_spectrogram': mel_spectrograms, 'label': labels}

In [None]:
mel_spectrograms.shape

# **ESN Implementation**

In [None]:
class Reservoir(object):
    """
    Build a reservoir and evaluate internal states

    Parameters:
        n_internal_units = processing units in the reservoir
        spectral_radius = largest eigenvalue of the reservoir matrix of connection weights
        leak = amount of leakage in the reservoir state update (optional)
        connectivity = percentage of nonzero connection weights (unused in circle reservoir)
        input_scaling = scaling of the input connection weights
        noise_level = deviation of the Gaussian noise injected in the state update
        circle = generate determinisitc reservoir with circle topology
    """

    def __init__(self, n_internal_units=100, spectral_radius=0.99, leak=None,
                 connectivity=0.3, input_scaling=0.2, noise_level=0.01, circle=False):

        # Initialize attributes
        self._n_internal_units = n_internal_units
        self._input_scaling = input_scaling
        self._noise_level = noise_level
        self._leak = leak

        # Input weights depend on input size: they are set when data is provided
        self._input_weights = None

        # Generate internal weights
        if circle:
            self._internal_weights = self._initialize_internal_weights_Circ(
                    n_internal_units,
                    spectral_radius)
        else:
            self._internal_weights = self._initialize_internal_weights(
                n_internal_units,
                connectivity,
                spectral_radius)


    def _initialize_internal_weights_Circ(self, n_internal_units, spectral_radius):

        internal_weights = np.zeros((n_internal_units, n_internal_units))
        internal_weights[0,-1] = spectral_radius
        for i in range(n_internal_units-1):
            internal_weights[i+1,i] = spectral_radius

        return internal_weights


    def _initialize_internal_weights(self, n_internal_units,
                                     connectivity, spectral_radius):

        # Generate sparse, uniformly distributed weights.
        internal_weights = sparse.rand(n_internal_units,
                                       n_internal_units,
                                       density=connectivity).todense()

        # Ensure that the nonzero values are uniformly distributed in [-0.5, 0.5]
        internal_weights[np.where(internal_weights > 0)] -= 0.5

        # Adjust the spectral radius.
        E, _ = np.linalg.eig(internal_weights)
        e_max = np.max(np.abs(E))
        internal_weights /= np.abs(e_max)/spectral_radius
        print(internal_weights.shape)
        return internal_weights


    def _compute_state_matrix(self, X, n_drop=0):
        N, T, _ = X.shape
        previous_state = np.zeros((N, self._n_internal_units), dtype=float)

        # Storage
        state_matrix = np.empty((N, T - n_drop, self._n_internal_units), dtype=float)
        for t in range(T):
            current_input = X[:, t, :]

            # Calculate state
            state_before_tanh = self._internal_weights.dot(previous_state.T) + self._input_weights.dot(current_input.T)

            # Add noise
            state_before_tanh += np.random.rand(self._n_internal_units, N)*self._noise_level

            # Apply nonlinearity and leakage (optional)
            if self._leak is None:
                previous_state = np.tanh(state_before_tanh).T
            else:
                previous_state = (1.0 - self._leak)*previous_state + np.tanh(state_before_tanh).T

            # Store everything after the dropout period
            if (t > n_drop - 1):
                state_matrix[:, t - n_drop, :] = previous_state

        return state_matrix


    def get_states(self, X, n_drop=0, bidir=True):
        N, T, V = X.shape
        if self._input_weights is None:
            self._input_weights = (2.0*np.random.binomial(1, 0.5 , [self._n_internal_units, V]) - 1.0)*self._input_scaling

        # compute sequence of reservoir states
        states = self._compute_state_matrix(X, n_drop)

        # reservoir states on time reversed input
        if bidir is True:
            X_r = X[:, ::-1, :]
            states_r = self._compute_state_matrix(X_r, n_drop)
            states = np.concatenate((states, states_r), axis=2)

        return states

    def getReservoirEmbedding(self, X,pca, ridge_embedding,  n_drop=5, bidir=True, test = False):

        res_states = self.get_states(X, n_drop=5, bidir=True)


        N_samples = res_states.shape[0]
        res_states = res_states.reshape(-1, res_states.shape[2])
        # ..transform..
        if test:
            red_states = pca.transform(res_states)
        else:
            red_states = pca.fit_transform(res_states)
        # ..and put back in tensor form
        red_states = red_states.reshape(N_samples,-1,red_states.shape[1])

        coeff_tr = []
        biases_tr = []

        for i in range(X.shape[0]):
            ridge_embedding.fit(red_states[i, 0:-1, :], red_states[i, 1:, :])
            coeff_tr.append(ridge_embedding.coef_.ravel())
            biases_tr.append(ridge_embedding.intercept_.ravel())
        print(np.array(coeff_tr).shape,np.array(biases_tr).shape)
        input_repr = np.concatenate((np.vstack(coeff_tr), np.vstack(biases_tr)), axis=1)
        return input_repr

In [None]:
from sklearn.model_selection import train_test_split

X = mel_spectrograms
y = labels

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
# data = scipy.io.loadmat('/content/JpVow.mat')
# print(data['X'].shape)
# print(data['Y'].shape)
# print(data['Xte'].shape)
# print(data['Yte'].shape)

# print(" ================================")
# print(X_train.numpy().shape)
# print(y_train.numpy().shape)
# print(X_test.numpy().shape)
# print(y_test.numpy().shape)

In [None]:
# y_train.numpy().reshape(-1,1).shape

In [None]:

# X = data['X']
# Y = data['Y']
# Xte = data['Xte']
# Yte = data['Yte']

# # One-hot encoding for labels
# onehot_encoder = OneHotEncoder(sparse_output=False, categories='auto')
# Y = onehot_encoder.fit_transform(Y)
# Yte = onehot_encoder.transform(Yte)

X = X_train.numpy()
Y = y_train.numpy().reshape(-1,1)
Xte = X_test.numpy()
Yte = y_test.numpy().reshape(-1,1)

# One-hot encoding for labels
onehot_encoder = OneHotEncoder(sparse_output=False, categories='auto')
Y = onehot_encoder.fit_transform(Y)
Yte = onehot_encoder.transform(Yte)

In [None]:
pca = PCA(n_components=30)
ridge_embedding = Ridge(alpha=6, fit_intercept=True)
readout = Ridge(alpha=2)

In [None]:
res = Reservoir(n_internal_units=30, spectral_radius=0.6, leak=0.6,
                 connectivity=0.25, input_scaling=0.1, noise_level=0.01, circle=False)

In [None]:
input_repr = res.getReservoirEmbedding(X,pca, ridge_embedding,  n_drop=5, bidir=True, test = False)
input_repr_te = res.getReservoirEmbedding(Xte,pca, ridge_embedding,  n_drop=5, bidir=True, test = True)

In [None]:
import tensorflow as tf
# from sklearn.metrics import accuracy_score, f1_score
readout.fit(input_repr, Y)
logits = readout.predict(input_repr_te)
# prediction = tf.nn.softmax(logits)
# logits = logits.reshape(-1, 1)
pred_class = np.argmax(logits, axis=1)

In [None]:
logits.shape

In [None]:
# true_class = Yte
true_class = np.argmax(Yte, axis=1)

In [None]:
true_class.shape

In [None]:
pred_class.shape

In [None]:
accuracy_score(true_class, pred_class)

In [None]:
f1_score(true_class, pred_class, average='weighted')

In [None]:
# Result Analysis Section
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay

# Calculate accuracy and F1 score
accuracy = accuracy_score(true_class, pred_class)
f1 = f1_score(true_class, pred_class, average='weighted')

print(f"Accuracy: {accuracy:.2f}")
print(f"Weighted F1 Score: {f1:.2f}")

# Generate a classification report
num_classes = len(set(true_class))  # Dynamically determine the number of classes
report = classification_report(true_class, pred_class, target_names=[f"Class {i}" for i in range(num_classes)])
print("\nClassification Report:\n")
print(report)

# Confusion Matrix
cm = confusion_matrix(true_class, pred_class, labels=range(num_classes))

# Plot confusion matrix
plt.figure(figsize=(10, 7))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=[f"Class {i}" for i in range(num_classes)], yticklabels=[f"Class {i}" for i in range(num_classes)])
plt.title("Confusion Matrix")
plt.xlabel("Predicted")
plt.ylabel("True")
plt.show()

# Visualize Class-wise Metrics
class_wise_metrics = classification_report(true_class, pred_class, output_dict=True)
class_names = list(class_wise_metrics.keys())[:-3]
precision = [class_wise_metrics[cls]['precision'] for cls in class_names]
recall = [class_wise_metrics[cls]['recall'] for cls in class_names]
f1_scores = [class_wise_metrics[cls]['f1-score'] for cls in class_names]

plt.figure(figsize=(12, 6))
bar_width = 0.25
x = range(len(class_names))

plt.bar(x, precision, width=bar_width, label='Precision')
plt.bar([i + bar_width for i in x], recall, width=bar_width, label='Recall')
plt.bar([i + 2 * bar_width for i in x], f1_scores, width=bar_width, label='F1-Score')

plt.xticks([i + bar_width for i in x], class_names)
plt.title("Class-wise Metrics")
plt.xlabel("Classes")
plt.ylabel("Scores")
plt.legend()
plt.show()

# If training metrics are available, plot them
# For example:
# plt.figure(figsize=(10, 5))
# plt.plot(train_accuracy, label='Training Accuracy')
# plt.plot(test_accuracy, label='Testing Accuracy')
# plt.title("Training vs. Testing Accuracy")
# plt.xlabel("Epochs")
# plt.ylabel("Accuracy")
# plt.legend()
# plt.show()
