In [None]:
import os
import librosa
import numpy as np
import pickle
from keras.preprocessing.sequence import pad_sequences
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from keras.models import Model, load_model
from keras.layers import Input, Conv1D, MaxPooling1D, Flatten, Dense, Concatenate, Dropout, Masking, Attention, Add, Reshape

# **Feature Extraction**

In [None]:
def augment_audio(y, sr):
    # Time stretching
    if np.random.random() < 0.5:
        rate = np.random.uniform(0.9, 1.1)  # ±10% speed change
        y = librosa.effects.time_stretch(y, rate=rate)

    # Pitch shifting
    if np.random.random() < 0.5:
        n_steps = np.random.randint(-2, 3)  # ±2 semitones
        y = librosa.effects.pitch_shift(y, sr=sr, n_steps=n_steps)

    # Adding noise
    if np.random.random() < 0.5:
        noise = np.random.normal(0, 0.005, y.shape)
        y = y + noise

    return y

def extract_features(file_path, segment_length=2, hop_length=1, sr=22050):
    y, sr = librosa.load(file_path, sr=sr)
    y = augment_audio(y, sr)
    segment_samples = int(segment_length * sr)
    hop_samples = int(hop_length * sr)

    segments = librosa.util.frame(y, frame_length=segment_samples, hop_length=hop_samples)

    mfccs_list = []
    chroma_list = []
    spectral_contrast_list = []
    tonnetz_list = []

    for segment in segments.T:
        mfccs = librosa.feature.mfcc(y=segment, sr=sr, n_mfcc=13).T
        chroma = librosa.feature.chroma_stft(y=segment, sr=sr).T
        spectral_contrast = librosa.feature.spectral_contrast(y=segment, sr=sr).T
        tonnetz = librosa.feature.tonnetz(y=segment, sr=sr).T

        mfccs_list.append(mfccs)
        chroma_list.append(chroma)
        spectral_contrast_list.append(spectral_contrast)
        tonnetz_list.append(tonnetz)

    return np.array(mfccs_list), np.array(chroma_list), np.array(spectral_contrast_list), np.array(tonnetz_list)

def save_features(file_path, feature_save_dir, mfccs, chroma, spectral_contrast, tonnetz):
    base_filename = os.path.splitext(os.path.basename(file_path))[0]
    if not os.path.exists(feature_save_dir):
        os.makedirs(feature_save_dir)
    with open(os.path.join(feature_save_dir, base_filename + '_mfccs.pkl'), 'wb') as f:
        pickle.dump(mfccs, f)
    with open(os.path.join(feature_save_dir, base_filename + '_chroma.pkl'), 'wb') as f:
        pickle.dump(chroma, f)
    with open(os.path.join(feature_save_dir, base_filename + '_spectral_contrast.pkl'), 'wb') as f:
        pickle.dump(spectral_contrast, f)
    with open(os.path.join(feature_save_dir, base_filename + '_tonnetz.pkl'), 'wb') as f:
        pickle.dump(tonnetz, f)

def load_data(data_dir, feature_dir):
    labels = []
    for label in os.listdir(data_dir):
        label_dir = os.path.join(data_dir, label)
        if not os.path.isdir(label_dir):
            continue
        label_feature_dir = os.path.join(feature_dir, label)
        if not os.path.exists(label_feature_dir):
            os.makedirs(label_feature_dir)
        for file_name in os.listdir(label_dir):
            file_path = os.path.join(label_dir, file_name)
            if file_path.endswith('.wav'):  # Ensure only audio files are processed
                mfccs, chroma, spectral_contrast, tonnetz = extract_features(file_path)
                save_features(file_path, label_feature_dir, mfccs, chroma, spectral_contrast, tonnetz)
                labels.append(label)
    return labels

def load_features_from_files(feature_dir):
    mfccs_list = []
    chroma_list = []
    spectral_contrast_list = []
    tonnetz_list = []
    labels = []

    for label in os.listdir(feature_dir):
        label_dir = os.path.join(feature_dir, label)
        if not os.path.isdir(label_dir):
            continue
        for file_name in os.listdir(label_dir):
            if not file_name.endswith('_mfccs.pkl'):
                continue
            base_filename = file_name.replace('_mfccs.pkl', '')
            with open(os.path.join(label_dir, base_filename + '_mfccs.pkl'), 'rb') as f:
                mfccs = pickle.load(f)
            with open(os.path.join(label_dir, base_filename + '_chroma.pkl'), 'rb') as f:
                chroma = pickle.load(f)
            with open(os.path.join(label_dir, base_filename + '_spectral_contrast.pkl'), 'rb') as f:
                spectral_contrast = pickle.load(f)
            with open(os.path.join(label_dir, base_filename + '_tonnetz.pkl'), 'rb') as f:
                tonnetz = pickle.load(f)

            mfccs_list.append(mfccs)
            chroma_list.append(chroma)
            spectral_contrast_list.append(spectral_contrast)
            tonnetz_list.append(tonnetz)
            labels.append(label)

    return mfccs_list, chroma_list, spectral_contrast_list, tonnetz_list, labels

def prepare_data(data_dir, feature_dir):
    # Extract and save features
    labels = load_data(data_dir, feature_dir)

    # Load features from saved files
    mfccs_list, chroma_list, spectral_contrast_list, tonnetz_list, labels = load_features_from_files(feature_dir)

    # Pad sequences to ensure uniform input length for each feature type
    X_mfccs = pad_sequences(mfccs_list, padding='post', dtype='float32', value=-1)
    X_chroma = pad_sequences(chroma_list, padding='post', dtype='float32', value=-1)
    X_spectral_contrast = pad_sequences(spectral_contrast_list, padding='post', dtype='float32', value=-1)
    X_tonnetz = pad_sequences(tonnetz_list, padding='post', dtype='float32', value=-1)
    y = np.array(labels)

    # Encode labels
    le = LabelEncoder()
    y_encoded = le.fit_transform(y)

    # Split the data
    X_mfccs_train, X_mfccs_test, y_train, y_test = train_test_split(X_mfccs, y_encoded, test_size=0.2, random_state=42)
    X_chroma_train, X_chroma_test = train_test_split(X_chroma, test_size=0.2, random_state=42)
    X_spectral_contrast_train, X_spectral_contrast_test = train_test_split(X_spectral_contrast, test_size=0.2, random_state=42)
    X_tonnetz_train, X_tonnetz_test = train_test_split(X_tonnetz, test_size=0.2, random_state=42)

    return (X_mfccs_train, X_chroma_train, X_spectral_contrast_train, X_tonnetz_train, y_train), \
           (X_mfccs_test, X_chroma_test, X_spectral_contrast_test, X_tonnetz_test, y_test), le

# Use this function to prepare the data
data_dir = '/content/drive/MyDrive/SMLRAAG'  # Update this path to your data directory
feature_dir = '/content/drive/MyDrive/SMLRAAG_features'  # Update this path to your feature directory



## *Model*

In [None]:
import os
import numpy as np
import pickle
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from keras.models import Model
from keras.layers import Input, Conv1D, MaxPooling1D, Flatten, Dense, Concatenate, Dropout, BatchNormalization, Reshape, LSTM, Bidirectional
from keras.regularizers import l2
from keras.callbacks import EarlyStopping, ReduceLROnPlateau, LearningRateScheduler
import tensorflow as tf

# Function to load features from files
def load_features_from_files(feature_dir):
    mfccs_list = []
    chroma_list = []
    spectral_contrast_list = []
    tonnetz_list = []
    labels = []

    for label in os.listdir(feature_dir):
        label_dir = os.path.join(feature_dir, label)
        if not os.path.isdir(label_dir):
            continue
        for file_name in os.listdir(label_dir):
            if not file_name.endswith('_mfccs.pkl'):
                continue
            base_filename = file_name.replace('_mfccs.pkl', '')
            with open(os.path.join(label_dir, base_filename + '_mfccs.pkl'), 'rb') as f:
                mfccs = pickle.load(f)
            with open(os.path.join(label_dir, base_filename + '_chroma.pkl'), 'rb') as f:
                chroma = pickle.load(f)
            with open(os.path.join(label_dir, base_filename + '_spectral_contrast.pkl'), 'rb') as f:
                spectral_contrast = pickle.load(f)
            with open(os.path.join(label_dir, base_filename + '_tonnetz.pkl'), 'rb') as f:
                tonnetz = pickle.load(f)

            mfccs_list.append(mfccs)
            chroma_list.append(chroma)
            spectral_contrast_list.append(spectral_contrast)
            tonnetz_list.append(tonnetz)
            labels.append(label)

    return mfccs_list, chroma_list, spectral_contrast_list, tonnetz_list, labels

# Function to prepare data
def prepare_data(feature_dir):
    # Load features from saved files
    mfccs_list, chroma_list, spectral_contrast_list, tonnetz_list, labels = load_features_from_files(feature_dir)

    # Encode labels
    le = LabelEncoder()
    y_encoded = le.fit_transform(labels)

    # Save LabelEncoder
    with open('label_encoder.pkl', 'wb') as f:
        pickle.dump(le, f)

    # Pad sequences to ensure uniform input length for each feature type
    max_length = max(len(x) for x in mfccs_list)
    mfccs_list = np.array([np.pad(x, ((0, max_length - len(x)), (0, 0), (0, 0)), mode='constant') for x in mfccs_list])
    chroma_list = np.array([np.pad(x, ((0, max_length - len(x)), (0, 0), (0, 0)), mode='constant') for x in chroma_list])
    spectral_contrast_list = np.array([np.pad(x, ((0, max_length - len(x)), (0, 0), (0, 0)), mode='constant') for x in spectral_contrast_list])
    tonnetz_list = np.array([np.pad(x, ((0, max_length - len(x)), (0, 0), (0, 0)), mode='constant') for x in tonnetz_list])

    # Split the data
    mfccs_train, mfccs_test, y_train, y_test = train_test_split(mfccs_list, y_encoded, test_size=0.2, random_state=42)
    chroma_train, chroma_test = train_test_split(chroma_list, test_size=0.2, random_state=42)
    spectral_contrast_train, spectral_contrast_test = train_test_split(spectral_contrast_list, test_size=0.2, random_state=42)
    tonnetz_train, tonnetz_test = train_test_split(tonnetz_list, test_size=0.2, random_state=42)

    return (mfccs_train, chroma_train, spectral_contrast_train, tonnetz_train, y_train), \
           (mfccs_test, chroma_test, spectral_contrast_test, tonnetz_test, y_test), le

# Example usage
feature_dir = '/content/drive/MyDrive/SMLRAAG_features'  # Update this path to your feature directory
(mfccs_train, chroma_train, spectral_contrast_train, tonnetz_train, y_train), \
(mfccs_test, chroma_test, spectral_contrast_test, tonnetz_test, y_test), le = prepare_data(feature_dir)

# Print shapes of the data for debugging
print("Number of training samples:", len(mfccs_train))
print("Number of testing samples:", len(mfccs_test))

# Define input shapes based on the actual data shapes
input_shape_mfccs = mfccs_train.shape[1:]
input_shape_chroma = chroma_train.shape[1:]
input_shape_spectral_contrast = spectral_contrast_train.shape[1:]
input_shape_tonnetz = tonnetz_train.shape[1:]

input_mfccs = Input(shape=input_shape_mfccs)
input_chroma = Input(shape=input_shape_chroma)
input_spectral_contrast = Input(shape=input_shape_spectral_contrast)
input_tonnetz = Input(shape=input_shape_tonnetz)

# TCN branches for each input using Conv1D
def tcn_branch(input_layer):
    reshaped_input = Reshape((input_layer.shape[1] * input_layer.shape[2], input_layer.shape[3]))(input_layer)
    conv = Conv1D(32, kernel_size=3, activation='relu', kernel_regularizer=l2(0.001))(reshaped_input)
    conv = BatchNormalization()(conv)
    conv = MaxPooling1D(pool_size=2)(conv)
    conv = Dropout(0.6)(conv)
    lstm = Bidirectional(LSTM(64, return_sequences=False, kernel_regularizer=l2(0.001)))(conv)
    lstm = BatchNormalization()(lstm)
    return lstm

tcn_mfccs = tcn_branch(input_mfccs)
tcn_chroma = tcn_branch(input_chroma)
tcn_spectral_contrast = tcn_branch(input_spectral_contrast)
tcn_tonnetz = tcn_branch(input_tonnetz)

# Concatenate the flattened layers
combined = Concatenate()([tcn_mfccs, tcn_chroma, tcn_spectral_contrast, tcn_tonnetz])

# Apply Dense and Dropout layers
dense = Dense(128, activation='relu', kernel_regularizer=l2(0.001))(combined)
dense = Dropout(0.6)(dense)
# Output layer
output = Dense(3, activation='softmax')(dense)  # 3 classes - Bhairav, Malkans, and Yaman

# Build and compile the model
model = Model(inputs=[input_mfccs, input_chroma, input_spectral_contrast, input_tonnetz], outputs=output)
model.compile(loss='sparse_categorical_crossentropy', optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), metrics=['accuracy'])

# Print model summary for debugging
print(model.summary())

# Prepare the data for training
train_data = [mfccs_train, chroma_train, spectral_contrast_train, tonnetz_train]
test_data = [mfccs_test, chroma_test, spectral_contrast_test, tonnetz_test]

# Early stopping and reduce learning rate on plateau
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6)

# Learning rate scheduler
def scheduler(epoch, lr):
    if epoch > 10:
        return lr * 0.5
    return lr

lr_scheduler = LearningRateScheduler(scheduler)

# Train the model
history = model.fit(train_data, y_train,
                    epochs=50, batch_size=32, validation_data=(test_data, y_test),
                    callbacks=[early_stopping, reduce_lr, lr_scheduler])

# Save the model
model.save('raag_recognition_model.keras')

# Save the label encoder
with open('label_encoder.pkl', 'wb') as f:
    pickle.dump(le, f)


## **Prediction**

In [None]:
import numpy as np
import librosa
import pickle
import tensorflow as tf
from keras.models import load_model

# Load the trained model and the label encoder
model = load_model('/content/raag_recognition_model.keras')
with open('label_encoder.pkl', 'rb') as f:
    le = pickle.load(f)

def augment_audio(y, sr):
    # Time stretching
    if np.random.random() < 0.5:
        rate = np.random.uniform(0.9, 1.1)  # ±10% speed change
        y = librosa.effects.time_stretch(y, rate=rate)

    # Pitch shifting
    if np.random.random() < 0.5:
        n_steps = np.random.randint(-2, 3)  # ±2 semitones
        y = librosa.effects.pitch_shift(y, sr=sr, n_steps=n_steps)

    # Adding noise
    if np.random.random() < 0.5:
        noise = np.random.normal(0, 0.005, y.shape)
        y = y + noise

    return y

def extract_features(file_path, segment_length=2, hop_length=1, sr=22050):
    y, sr = librosa.load(file_path, sr=sr)
    y = augment_audio(y, sr)
    segment_samples = int(segment_length * sr)
    hop_samples = int(hop_length * sr)

    mfccs_list = []
    chroma_list = []
    spectral_contrast_list = []
    tonnetz_list = []

    for start in range(0, len(y), hop_samples):
        end = start + segment_samples
        segment = y[start:end]

        # Pad the segment if it's shorter than the required length
        if len(segment) < segment_samples:
            segment = np.pad(segment, (0, segment_samples - len(segment)), mode='constant')

        mfccs = librosa.feature.mfcc(y=segment, sr=sr, n_mfcc=13).T
        chroma = librosa.feature.chroma_stft(y=segment, sr=sr).T
        spectral_contrast = librosa.feature.spectral_contrast(y=segment, sr=sr).T
        tonnetz = librosa.feature.tonnetz(y=segment, sr=sr).T

        mfccs_list.append(mfccs)
        chroma_list.append(chroma)
        spectral_contrast_list.append(spectral_contrast)
        tonnetz_list.append(tonnetz)

    return np.array(mfccs_list), np.array(chroma_list), np.array(spectral_contrast_list), np.array(tonnetz_list)

def pad_or_truncate(features, max_length):
    if features.shape[0] < max_length:
        return np.pad(features, ((0, max_length - features.shape[0]), (0, 0), (0, 0)), mode='constant')
    else:
        return features[:max_length]

def pad_features(mfccs, chroma, spectral_contrast, tonnetz, max_length):
    mfccs = pad_or_truncate(mfccs, max_length)
    chroma = pad_or_truncate(chroma, max_length)
    spectral_contrast = pad_or_truncate(spectral_contrast, max_length)
    tonnetz = pad_or_truncate(tonnetz, max_length)
    return mfccs, chroma, spectral_contrast, tonnetz

def predict_raag(audio_path):
    # Extract features from the input audio file
    mfccs, chroma, spectral_contrast, tonnetz = extract_features(audio_path)

    # Pad or truncate sequences to the expected maximum length
    max_length = 32  # Update this to match your expected maximum length
    mfccs, chroma, spectral_contrast, tonnetz = pad_features(mfccs, chroma, spectral_contrast, tonnetz, max_length)

    # Prepare the data for prediction
    mfccs = np.expand_dims(mfccs, axis=0)
    chroma = np.expand_dims(chroma, axis=0)
    spectral_contrast = np.expand_dims(spectral_contrast, axis=0)
    tonnetz = np.expand_dims(tonnetz, axis=0)

    # Predict the raag
    predictions = model.predict([mfccs, chroma, spectral_contrast, tonnetz])
    predicted_label = np.argmax(predictions, axis=1)

    # Decode the label to get the raag name
    raag_name = le.inverse_transform(predicted_label)

    return raag_name[0]

# Example usage
audio_file_path = '/content/drive/MyDrive/SEGSMLRAGA/test data raga/Malkans 12 vocal aroh.wav'  # Update this path to your audio file
predicted_raag = predict_raag(audio_file_path)
print(f'The predicted raag is: {predicted_raag}')
