In [None]:
##import Libraries
import pickle
import librosa
import os
import numpy as np
import pickle
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, BatchNormalization, Flatten, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from sklearn.metrics import accuracy_score

###Feature extraction for entire folder

In [None]:
def generate_audio_features(audio_filename, output_dir):
    # Load audio file
    y, sr = librosa.load(audio_filename, sr=None)
    duration = librosa.get_duration(y=y, sr=sr)
    segment_length = 30  # Segment length in seconds

    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # Dictionary to store results
    audio_features = {}

    # Divide audio into 30-second segments
    num_segments = int(np.ceil(duration / segment_length))

    for i in range(num_segments):
        start_sample = int(i * segment_length * sr)
        end_sample = int(min((i + 1) * segment_length * sr, len(y)))
        segment = y[start_sample:end_sample]

        # Generate spectrogram
        S = librosa.stft(segment, hop_length = sr*0.01)
        S_db = librosa.amplitude_to_db(np.abs(S), ref=np.max)

        # Extract MFCCs
        mfccs = librosa.feature.mfcc(y=segment, sr=sr, n_mfcc=20, hop_length = sr*0.01)

        # F_0 estimation
        time, frequency, confidence, activation = estimate_pitch(segment, sr, voicing_threshold=0.3, use_viterbi=True)

        # Split the file name and extension
        file_name, file_extension = os.path.splitext(audio_filename)

        # Create the segment name
        segment_name = f"{file_name}_segment_{i+1}{file_extension}"

        # Store the features in the dictionary
        audio_features[segment_name] = {
            "spectrogram": S_db,
            "mfcc": mfccs[1:], # remove first row of mfcc
            "f_0 estimation": frequency
        }

    output_file = os.path.join(output_dir, f"{file_name}_features.pkl")
    with open(output_file, 'wb') as f:
        pickle.dump(audio_features, f)


    return audio_features

In [None]:
def process_audio_folder(folder_path, output_dir):
    """
    Processes all .wav files in the given folder and generates audio features.

    Parameters:
    -----------
    folder_path : str
        Path to the folder containing audio files.
    output_dir : str
        Directory where processed features will be saved.
    """
    # List all .wav files in the folder
    audio_files = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith('.wav')]

    # Ensure output directory exists
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # Process each file
    for audio_file in audio_files:
        print(f"Processing: {audio_file}")
        generate_audio_features(audio_file, output_dir)
        print(f"Finished processing: {audio_file}")



In [None]:
def create_combined_feature_input(spectrogram, mfcc, f0):
    # Add channel dimension and stack features

    spectrogram = spectrogram[np.newaxis, :, :]  # Shape: (1, frequency_bins, time_frames)
    mfcc = mfcc[np.newaxis, :, :]  # Shape: (1, n_mfcc, time_frames)
    f0 = f0[np.newaxis, np.newaxis, :]  # Shape: (1, 1, time_frames)

    # Combine spectrogram, MFCC, and f0 into a single input
    combined_feature = np.concatenate([spectrogram, mfcc, f0], axis=1)
    return combined_feature

In [None]:
with open('/content/drive/MyDrive/Songsay_output_features/1047_v360P_V1_source_3_features.pkl', 'rb') as f:
    data = pickle.load(f)

test_item = data['1047_v360P_V1_source_3_segment_1.wav']
test_spec = test_item['spectrogram']
test_mfcc = test_item['mfcc']
test_f0 = test_item['f_0 estimation']
combined_input = create_combined_feature_input(test_spec, test_mfcc, test_f0)

In [None]:
test_spec.shape #(1025, 3001)
test_mfcc.shape #(19, 3001)

(19, 3001)

In [None]:
len(list(test_f0))

3001

In [None]:
combined_input.shape

(1, 1045, 3001)

In [None]:
def generate_combined_features_from_pkl(pkl_file_path, output_dir):
    # Load the .pkl file containing audio features
    with open(pkl_file_path, 'rb') as f:
        audio_features = pickle.load(f)

    # Create output directory if it doesn't exist
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # Dictionary to store combined features
    combined_features = {}

    # Iterate through each segment in the pkl file and create combined feature input
    for segment_name, features in audio_features.items():
        spectrogram = features['spectrogram']
        mfcc = features['mfcc']
        f0 = features['f_0 estimation']

        # Create combined feature
        combined_feature = create_combined_feature_input(spectrogram, mfcc, f0)

        # Store the combined feature in the dictionary
        combined_features[segment_name] = combined_feature

    # Save the combined features to a new .pkl file
    output_file = os.path.join(output_dir, os.path.basename(pkl_file_path).replace('.pkl', '_combined.pkl'))
    with open(output_file, 'wb') as f:
        pickle.dump(combined_features, f)

    return combined_features

def process_all_pkl_files(input_folder, output_folder):
    # Iterate through all .pkl files in the input folder
    for file_name in os.listdir(input_folder):
        if file_name.endswith('.pkl'):
            pkl_file_path = os.path.join(input_folder, file_name)
            # Generate combined features for each .pkl file
            generate_combined_features_from_pkl(pkl_file_path, output_folder)

process_all_pkl_files("/content/drive/MyDrive/Songsay_output_features", "/content/drive/MyDrive/concat_features_nobeats" )

In [None]:
##concatenate features along time axis
def concatenate_segments_for_audio(input_folder, output_folder):
    # Iterate through all _combined.pkl files in the input folder
    for file_name in os.listdir(input_folder):
        if file_name.endswith('_combined.pkl'):
            pkl_file_path = os.path.join(input_folder, file_name)

            # Load combined features from pkl file
            with open(pkl_file_path, 'rb') as f:
                combined_features = pickle.load(f)

            # Concatenate all segments along the time axis
            concatenated_feature = np.concatenate([segment for segment in combined_features.values()], axis=2)  # Concatenate along the time axis

            # Save the concatenated feature to a new .pkl file
            output_file = os.path.join(output_folder, file_name.replace('_combined.pkl', '_concatenated.pkl'))
            with open(output_file, 'wb') as f:
                pickle.dump(concatenated_feature, f)

concatenate_segments_for_audio("/content/drive/MyDrive/concat_features_nobeats", "/content/drive/MyDrive/concat_features_nobeats_unsegmented")

In [None]:
with open('/content/drive/MyDrive/concat_features_nobeats_unsegmented/10932_v360P_V1_source_3_features_concatenated.pkl', 'rb') as f:
    data = pickle.load(f)

In [None]:
data

array([[[-8.00000000e+01, -8.00000000e+01, -8.00000000e+01, ...,
         -7.63258743e+01, -7.12796326e+01, -6.90777817e+01],
        [-7.92051697e+01, -8.00000000e+01, -8.00000000e+01, ...,
         -6.91974716e+01, -7.26264038e+01, -7.13789749e+01],
        [-7.70871811e+01, -7.70083771e+01, -8.00000000e+01, ...,
         -8.00000000e+01, -7.90651627e+01, -7.42696228e+01],
        ...,
        [-2.78383160e+00, -2.98514605e+00, -5.51406801e-01, ...,
         -1.24940510e+01, -1.12014341e+01, -7.21944618e+00],
        [-2.61022711e+00, -2.84083462e+00, -5.35616040e-01, ...,
          1.83486927e+00,  4.38358974e+00,  4.94402170e+00],
        [ 0.00000000e+00,  0.00000000e+00,  0.00000000e+00, ...,
          7.51149134e+02,  7.52148783e+02,  7.52513604e+02]]])

### Data Loader

In [9]:
pkl_folder = "/content/drive/MyDrive/concat_features_nobeats_unsegmented"
ground_truth_folder = "/content/drive/MyDrive/ground_truth/"

In [10]:
import tensorflow as tf
import numpy as np

class PKLDataLoader(tf.keras.utils.Sequence):
    def __init__(self, pkl_folder, ground_truth_folder, batch_size, max_feature_length, num_classes=8):
        self.pkl_folder = pkl_folder
        self.ground_truth_folder = ground_truth_folder
        self.batch_size = batch_size
        self.pkl_files = [f for f in os.listdir(pkl_folder) if f.endswith(".pkl")]
        self.max_feature_length = max_feature_length
        self.num_classes = num_classes  # Fixed number of output classes

    def __len__(self):
        # Number of batches per epoch
        return int(np.ceil(len(self.pkl_files) / self.batch_size))

    def __getitem__(self, idx):
        # Get batch file names
        batch_files = self.pkl_files[idx * self.batch_size:(idx + 1) * self.batch_size]
        X_batch = []
        Y_batch = []

        for pkl_file in batch_files:
            audio_id = os.path.splitext(pkl_file)[0]

            # Load feature file
            pkl_path = os.path.join(self.pkl_folder, pkl_file)
            with open(pkl_path, "rb") as f:
                features = pickle.load(f)
                features = np.array(features)
                # Pad features to global max length
                padding_length = self.max_feature_length - features.shape[2]
                if padding_length > 0:
                    features = np.pad(features, ((0, 0), (0, 0), (0, padding_length)), mode="constant")
                X_batch.append(np.squeeze(features))  # Remove unnecessary dimensions

            # Load and adjust target file
            txt_path = os.path.join(self.ground_truth_folder, f"{audio_id}.txt")
            data = pd.read_csv(
                txt_path,
                header=None,
                delimiter=",",
                usecols=[0],
                on_bad_lines="skip",
                engine="python"
            )
            targets = data.iloc[:, 0].values

            # Truncate or pad to num_classes
            targets = targets[:self.num_classes]  # Truncate
            if len(targets) < self.num_classes:
                targets = np.pad(targets, (0, self.num_classes - len(targets)), mode="constant")

            Y_batch.append(targets)

        # Stack and return the batch
        X_batch = np.stack(X_batch, axis=0)
        Y_batch = np.stack(Y_batch, axis=0)
        return X_batch, Y_batch


In [11]:
max_feature_length = 44107
max_target_length = 8


# Get the list of all .pkl files
pkl_files = [f for f in os.listdir(pkl_folder) if f.endswith(".pkl")]

# Get the list of all .txt files
txt_files = [f for f in os.listdir(ground_truth_folder) if f.endswith(".txt")]

# Extract audio IDs (filenames without extensions)
pkl_ids = set(os.path.splitext(f)[0] for f in pkl_files)
txt_ids = set(os.path.splitext(f)[0] for f in txt_files)

# Find the joint set of IDs
joint_ids = pkl_ids.intersection(txt_ids)

# Filter the .pkl files to retain only those with a matching .txt file
filtered_pkl_files = [f"{audio_id}.pkl" for audio_id in joint_ids]

# Filter the .txt files to retain only those with a matching .pkl file
filtered_txt_files = [f"{audio_id}.txt" for audio_id in joint_ids]

print(f"Number of matching .pkl and .txt files: {len(joint_ids)}")


from sklearn.model_selection import train_test_split

# # Split the filtered .pkl files into train and test sets
train_files, test_files = train_test_split(filtered_pkl_files, test_size=0.2, random_state=42)

# Create DataLoaders for training and testing
train_loader = PKLDataLoader(
    pkl_folder=pkl_folder,
    ground_truth_folder=ground_truth_folder,
    batch_size=2,
    max_feature_length=max_feature_length,
    num_classes = max_target_length,

)
train_loader.pkl_files = train_files  # Assign train files to the loader

test_loader = PKLDataLoader(
    pkl_folder=pkl_folder,
    ground_truth_folder=ground_truth_folder,
    batch_size=2,
    max_feature_length=max_feature_length,
    num_classes = max_target_length,

)
test_loader.pkl_files = test_files  # Assign test files to the loader



Number of matching .pkl and .txt files: 88


###Model Training

With num_classes = 162 classes, 5 epochs

In [None]:
# Get input shape dynamically from DataLoader
first_batch_X, _ = train_loader[0]  # Fetch the first batch
input_shape = (first_batch_X.shape[1], first_batch_X.shape[2])  # (sequence_length, features_per_time_step)

# Define the number of classes for classification
num_classes = 162

# Build the classification CNN model
model = Sequential([
    Conv1D(64, kernel_size=3, activation='relu', input_shape=input_shape),
    MaxPooling1D(pool_size=2),
    BatchNormalization(),

    Conv1D(128, kernel_size=3, activation='relu'),
    MaxPooling1D(pool_size=2),
    BatchNormalization(),

    Conv1D(256, kernel_size=3, activation='relu'),
    MaxPooling1D(pool_size=2),
    BatchNormalization(),

    Conv1D(512, kernel_size=3, activation='relu'),
    MaxPooling1D(pool_size=2),
    BatchNormalization(),

    Flatten(),
    Dense(256, activation='relu'),
    Dropout(0.5),  # Dropout to reduce overfitting
    Dense(128, activation='relu'),
    Dropout(0.5),
    Dense(num_classes, activation='softmax')  # Classification output
])

# Compile the model
model.compile(optimizer=Adam(), loss='categorical_crossentropy', metrics=['accuracy'])

# Train the model using the DataLoader
model.fit(train_loader, epochs=5, validation_data=test_loader)

# Evaluate the model
test_loss, test_accuracy = model.evaluate(test_loader)
print(f"Test Loss: {test_loss}, Test Accuracy: {test_accuracy}")

# Make predictions on the test set
Y_true = []
Y_pred = []

for X_batch, Y_batch in test_loader:
    # print("!!!", X_batch)
    # print("!!!!!", Y_batch)
    Y_hat = model.predict(X_batch)
    Y_true.extend(np.argmax(Y_batch, axis=1))  # Convert one-hot to class indices
    Y_pred.extend(np.argmax(Y_hat, axis=1))

# Calculate classification accuracy
accuracy = accuracy_score(Y_true, Y_pred)
print(f"Classification Accuracy: {accuracy:.4f}")

# Save predictions and model
np.savetxt("/content/drive/MyDrive/predictions_Y_hat.txt", Y_pred, fmt="%d", header="Predicted Classes")
model.save("/content/drive/MyDrive/cnn_classification_model.h5")

print("Model and predictions saved successfully.")


!!! (2, 1045, 44107)
!!!!! (2, 162)


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


!!! (2, 1045, 44107)
!!!!! (2, 162)
!!! (2, 1045, 44107)
!!!!! (2, 162)
Epoch 1/5


  self._warn_if_super_not_called()


!!! (2, 1045, 44107)
!!!!! (2, 162)
!!! (2, 1045, 44107)
!!!!! (2, 162)
!!! (2, 1045, 44107)
!!!!! (2, 162)
!!! (2, 1045, 44107)
!!!!! (2, 162)
!!! (2, 1045, 44107)
!!!!! (2, 162)
!!! (2, 1045, 44107)
!!!!! (2, 162)
!!! (2, 1045, 44107)
!!!!! (2, 162)
!!! (2, 1045, 44107)
!!!!! (2, 162)
!!! (2, 1045, 44107)
!!!!! (2, 162)
[1m 2/35[0m [32m━[0m[37m━━━━━━━━━━━━━━━━━━━[0m [1m11s[0m 341ms/step - accuracy: 0.0000e+00 - loss: 43808.7969   !!! (2, 1045, 44107)
!!!!! (2, 162)
[1m 3/35[0m [32m━[0m[37m━━━━━━━━━━━━━━━━━━━[0m [1m4:56[0m 9s/step - accuracy: 0.0000e+00 - loss: 57768.3906  !!! (2, 1045, 44107)
!!!!! (2, 162)
[1m 4/35[0m [32m━━[0m[37m━━━━━━━━━━━━━━━━━━[0m [1m3:46[0m 7s/step - accuracy: 0.0000e+00 - loss: 63440.9922!!! (2, 1045, 44107)
!!!!! (2, 162)
[1m 5/35[0m [32m━━[0m[37m━━━━━━━━━━━━━━━━━━[0m [1m5:19[0m 11s/step - accuracy: 0.0000e+00 - loss: 68061.1719!!! (2, 1045, 44107)
!!!!! (2, 162)
[1m 6/35[0m [32m━━━[0m[37m━━━━━━━━━━━━━━━━━[0m [1m6:09[0

  self._warn_if_super_not_called()


!!! (2, 1045, 44107)
!!!!! (2, 162)
!!! (2, 1045, 44107)
!!!!! (2, 162)
!!! (2, 1045, 44107)
!!!!! (2, 162)
!!! (2, 1045, 44107)
!!!!! (2, 162)
!!! (2, 1045, 44107)
!!!!! (2, 162)
!!! (2, 1045, 44107)
!!!!! (2, 162)
!!! (2, 1045, 44107)
!!!!! (2, 162)
!!! (2, 1045, 44107)
!!!!! (2, 162)
!!! (2, 1045, 44107)
!!!!! (2, 162)
[1m35/35[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m905s[0m 22s/step - accuracy: 0.0034 - loss: 1237972.8750 - val_accuracy: 0.0556 - val_loss: 3972120.5000
Epoch 2/5
!!! (2, 1045, 44107)
!!!!! (2, 162)
!!! (2, 1045, 44107)
!!!!! (2, 162)
!!! (2, 1045, 44107)
!!!!! (2, 162)
!!! (2, 1045, 44107)
!!!!! (2, 162)
!!! (2, 1045, 44107)
!!!!! (2, 162)
!!! (2, 1045, 44107)
!!!!! (2, 162)
!!! (2, 1045, 44107)
!!!!! (2, 162)
!!! (2, 1045, 44107)
!!!!! (2, 162)
[1m 1/35[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m32:52[0m 58s/step - accuracy: 0.0000e+00 - loss: 22358892.0000!!! (2, 1045, 44107)
!!!!! (2, 162)
[1m 2/35[0m [32m━[0m[37m━━━━━━━━━━━━━━━━━━━[0m [1m2:13[0m 4s

ValueError: need at least one array to stack

With num_classes = 8 classes, 20 epochs

In [None]:
# Get input shape dynamically from DataLoader
first_batch_X, _ = train_loader[0]  # Fetch the first batch
input_shape = (first_batch_X.shape[1], first_batch_X.shape[2])  # (sequence_length, features_per_time_step)

# Define the number of classes for classification
num_classes = 8

# Build the classification CNN model
model = Sequential([
    Conv1D(32, kernel_size=3, activation='relu', input_shape=input_shape),
    MaxPooling1D(pool_size=2),
    BatchNormalization(),

    Conv1D(64, kernel_size=3, activation='relu', input_shape=input_shape),
    MaxPooling1D(pool_size=2),
    BatchNormalization(),

    Conv1D(128, kernel_size=3, activation='relu'),
    MaxPooling1D(pool_size=2),
    BatchNormalization(),

    Flatten(),
    Dense(64, activation='relu'),
    Dropout(0.5),  # Dropout to reduce overfitting
    Dense(32, activation='relu'),
    Dropout(0.5),
    Dense(num_classes, activation='softmax')  # Classification output
])

# Compile the model
model.compile(optimizer=Adam(), loss='categorical_crossentropy', metrics=['accuracy'])

# Train the model using the DataLoader
model.fit(train_loader, epochs=20, validation_data=test_loader)

# Evaluate the model
test_loss, test_accuracy = model.evaluate(test_loader)
print(f"Test Loss: {test_loss}, Test Accuracy: {test_accuracy}")


model.save("/content/drive/MyDrive/cnn_classification_model_20ep.h5")

print("Model and predictions saved successfully.")

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Epoch 1/20


  self._warn_if_super_not_called()


[1m35/35[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17s/step - accuracy: 0.1510 - loss: 11458.3662 

  self._warn_if_super_not_called()


[1m35/35[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1053s[0m 25s/step - accuracy: 0.1540 - loss: 11956.4570 - val_accuracy: 0.8889 - val_loss: 36377.4180
Epoch 2/20
[1m35/35[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m235s[0m 4s/step - accuracy: 0.3287 - loss: 145335.0625 - val_accuracy: 0.2222 - val_loss: 62955.8828
Epoch 3/20
[1m35/35[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m183s[0m 4s/step - accuracy: 0.1945 - loss: 622866.8125 - val_accuracy: 0.0000e+00 - val_loss: 270353.4375
Epoch 4/20
[1m35/35[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m193s[0m 4s/step - accuracy: 0.1973 - loss: 1968755.2500 - val_accuracy: 0.1111 - val_loss: 678731.9375
Epoch 5/20
[1m35/35[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m171s[0m 4s/step - accuracy: 0.3320 - loss: 4437678.0000 - val_accuracy: 0.0556 - val_loss: 2555081.2500
Epoch 6/20
[1m35/35[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m203s



Test Loss: 485483232.0, Test Accuracy: 1.0
Model and predictions saved successfully.


In [None]:
import keras
keras.saving.save_model(model, '/content/drive/MyDrive/cnn_classification_model_20ep.keras')