In [1]:
import os
import numpy as np
import pandas as pd
import librosa
import tensorflow as tf
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras import layers, Model, models
import joblib

def preprocess_and_load_data(csv_path, audio_folder):
    # Load CSV file
    df = pd.read_csv(csv_path)
    
    # Initialize LabelEncoders
    cate_encoder = LabelEncoder()
    dist_encoder = LabelEncoder()
    dire_encoder = LabelEncoder()
    
   
    
    # Fit the LabelEncoders
    cate_encoder.fit(df['cate'])
    dist_encoder.fit(df['dist'])
    dire_encoder.fit(df['dire'])
    
    joblib.dump(cate_encoder, 'cate_encoder.pk2')
    joblib.dump(dist_encoder, 'dist_encoder.pk2')
    joblib.dump(dire_encoder, 'dire_encoder.pk2')
    
    # Create a list to hold the data
    mel_spectrograms = []
    labels = []
    
    # Process each row in the dataframe
    for _, row in df.iterrows():
        audio_file = os.path.join(audio_folder, row['name'])
        y, sr = librosa.load(audio_file, sr=3000, mono=False)  # Load stereo audio at 3 kHz

        # Ensure waveform has two channels and transpose to shape (num_samples, 2)
        if y.ndim == 1:
            y = np.stack([y, y], axis=-1)  # Duplicate if mono
        else:
            y = y.T  # Transpose to get shape (num_samples, 2)
        
        # Compute Mel spectrograms for each channel
        mel_spectrograms_channel_0 = librosa.feature.melspectrogram(y=y[:, 0], sr=sr, n_fft=2048, hop_length=512, n_mels=128)
        mel_spectrograms_channel_1 = librosa.feature.melspectrogram(y=y[:, 1], sr=sr, n_fft=2048, hop_length=512, n_mels=128)
        
        # Stack the spectrograms to get shape (128, num_frames, 2)
        mel_spectrogram = np.stack([mel_spectrograms_channel_0, mel_spectrograms_channel_1], axis=-1)
        
        # Collect the features and labels
        mel_spectrograms.append(mel_spectrogram)
        labels.append((
            cate_encoder.transform([row['cate']])[0],
            dist_encoder.transform([row['dist']])[0],
            dire_encoder.transform([row['dire']])[0]
        ))
    
    # Convert lists to numpy arrays
    mel_spectrograms = np.array(mel_spectrograms)
    labels = np.array(labels)
    
    # Create TensorFlow dataset
    dataset = tf.data.Dataset.from_tensor_slices((mel_spectrograms, {
        'cate_fc': labels[:, 0],
        'dist_fc': labels[:, 1],
        'dire_fc': labels[:, 2]
    }))
    
    # Set batch size and shuffle
    dataset = dataset.shuffle(buffer_size=len(mel_spectrograms))
    dataset = dataset.batch(32)
    
    return dataset, cate_encoder, dist_encoder, dire_encoder

# Transformer block
def transformer_block(inputs, num_heads, key_dim, ff_dim, dropout_rate=0.1):
    # Multi-head attention
    attn_output = layers.MultiHeadAttention(num_heads=num_heads, key_dim=key_dim)(inputs, inputs)
    attn_output = layers.Dropout(dropout_rate)(attn_output)
    out1 = layers.LayerNormalization(epsilon=1e-6)(attn_output + inputs)  # Skip connection

    # Feed-forward network
    ffn = layers.Dense(ff_dim, activation='relu')(out1)
    ffn_output = layers.Dense(inputs.shape[-1])(ffn)
    ffn_output = layers.Dropout(dropout_rate)(ffn_output)
    return layers.LayerNormalization(epsilon=1e-6)(ffn_output + out1)  # Skip connection

# CNN + Transformer model for Mel spectrogram classification
def cnn_transformer_model(input_shape=(128, None, 2), num_classes_cate=38, num_classes_dist=7, num_classes_dire=6):
    inputs = layers.Input(shape=input_shape)
    
    # CNN block for 2D input
    x = layers.Conv2D(filters=64, kernel_size=(3, 3), activation='relu', padding='same')(inputs)
    x = layers.MaxPooling2D(pool_size=(2, 2))(x)
    x = layers.Conv2D(filters=128, kernel_size=(3, 3), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D(pool_size=(2, 2))(x)
    x = layers.Conv2D(filters=256, kernel_size=(3, 3), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D(pool_size=(2, 2))(x)
    
    # Reshape to fit Transformer block
    x = layers.Reshape((-1, x.shape[-1]))(x)

    # Transformer block
    x = transformer_block(x, num_heads=4, key_dim=64, ff_dim=128)
    x = layers.GlobalAveragePooling1D()(x)

    # Fully connected layers for each output task
    cate_output = layers.Dense(num_classes_cate, activation='softmax', name='cate_fc')(x)
    dist_output = layers.Dense(num_classes_dist, activation='softmax', name='dist_fc')(x)
    dire_output = layers.Dense(num_classes_dire, activation='softmax', name='dire_fc')(x)

    # Create the model
    model = models.Model(inputs=inputs, outputs=[cate_output, dist_output, dire_output])
    
    return model

# Paths
csv_path = 'E:\\SIHV2\\cleaned\\train_split.csv'
audio_folder = 'E:\\SIHV2\\processed_data'

# Load dataset
dataset, cate_encoder, dist_encoder, dire_encoder = preprocess_and_load_data(csv_path, audio_folder)

# Create the CNN+Transformer model
model = cnn_transformer_model(input_shape=(128, None, 2), num_classes_cate=4, num_classes_dist=7, num_classes_dire=6)

# Compile the model with loss functions and optimizer
model.compile(optimizer='adam',
              loss={
                  'cate_fc': 'sparse_categorical_crossentropy',
                  'dist_fc': 'sparse_categorical_crossentropy',
                  'dire_fc': 'sparse_categorical_crossentropy'
              },
              metrics={
                  'cate_fc': 'accuracy',
                  'dist_fc': 'accuracy',
                  'dire_fc': 'accuracy'
              })

# Print model summary
model.summary()

# Train the model
model.fit(dataset, epochs=50)

# Save the model and weights



Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 128, None,   0           []                               
                                2)]                                                               
                                                                                                  
 conv2d (Conv2D)                (None, 128, None, 6  1216        ['input_1[0][0]']                
                                4)                                                                
                                                                                                  
 max_pooling2d (MaxPooling2D)   (None, 64, None, 64  0           ['conv2d[0][0]']                 
                                )                                                             

<keras.callbacks.History at 0x2778a43ac40>

In [2]:
model.save('cnn_transformer_modelv5.h5')
model.save_weights('cnn_transformer_weightsv5.h5')