All the libraries are imported here.Also warning bypass is done here.

In [2]:
import os
import warnings
import numpy as np
import librosa
import cv2
import tensorflow as tf
from tensorflow.keras.utils import Sequence
from tensorflow.keras.applications import VGG16
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, Concatenate, Dropout, BatchNormalization, Conv3D, MaxPooling3D, Flatten, Activation, Conv2D, MaxPooling2D, Reshape
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.regularizers import l2
from sklearn.model_selection import train_test_split

# Suppress TensorFlow logging
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

# Suppress specific Python warnings
warnings.filterwarnings('ignore', category=FutureWarning)
warnings.filterwarnings('ignore', category=DeprecationWarning)

# Suppress TensorFlow retracing warnings
tf.get_logger().setLevel('ERROR')


As the label of the dataset is in file name and the format for it is given "03-02-01-01-02-01-02.wav" as this so a function to get the audio label is created here.

In [3]:

# Emotion mapping
emotion_dict = {
    '01': 'neutral',
    '02': 'calm',
    '03': 'happy',
    '04': 'sad',
    '05': 'angry',
    '06': 'fearful',
    '07': 'disgust',
    '08': 'surprised'
}

emotion_labels = list(emotion_dict.values())

def parse_label_from_filename(file_name):
    parts = file_name.split('-')
    actor_id = parts[-1].split('.')[0]  
    emotion_id = parts[2]
    return actor_id, emotion_dict[emotion_id]


#For audio:


In [4]:

# Feature extraction for audio with augmentation
def extract_audio_features(file_path, target_time_steps=44):
    try:
        y, sr = librosa.load(file_path, sr=22050)
        
        # Audio augmentations
        if np.random.rand() < 0.4:
            y = librosa.effects.pitch_shift(y=y, sr=sr, n_steps=np.random.uniform(-2, 2))  # Pitch shift
        if np.random.rand() < 0.4:
            y = librosa.effects.time_stretch(y=y, rate=np.random.uniform(0.7, 1.3))  # Time stretch
        if np.random.rand() < 0.4:
            y += 0.01 * np.random.randn(len(y))  # Add noise

        mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=40)  # Shape: (40, time_steps)

        # Adjust the time steps to the target length (44)
        if mfccs.shape[1] > target_time_steps:
            mfccs = mfccs[:, :target_time_steps]  # Truncate
        elif mfccs.shape[1] < target_time_steps:
            pad_width = target_time_steps - mfccs.shape[1]
            mfccs = np.pad(mfccs, ((0, 0), (0, pad_width)), mode='constant')  # Pad with zeros

        return mfccs
    except Exception as e:
        print(f"Error processing {file_path}: {e}")
        return np.zeros((40, target_time_steps))  # Default size for consistency


# Feature extraction for video with sequence length adjustment
def extract_video_features(video_frames, sequence_length):
    frames = np.array([cv2.resize(frame, (112, 112)) for frame in video_frames])
    
    if len(frames) > sequence_length:
        frames = frames[:sequence_length]
    elif len(frames) < sequence_length:
        padding = np.zeros((sequence_length - len(frames), 112, 112, 3))
        frames = np.concatenate((frames, padding))
    
    frames = frames.reshape(-1, sequence_length, 112, 112, 3)  # Shape: (batch_size, sequence_length, height, width, channels)
    return frames


In [5]:

# Data generator
class DataGenerator(Sequence):
    def __init__(self, audio_files, video_files, batch_size=32, sequence_length=20, dim=(224,224), shuffle=True, **kwargs):
        self.audio_files = audio_files
        self.video_files = video_files
        self.batch_size = batch_size
        self.sequence_length = sequence_length
        self.dim = dim
        self.shuffle = shuffle
        self.indexes = np.arange(len(self.audio_files))
        super().__init__(**kwargs)  # Call the base class constructor with kwargs
        self.on_epoch_end()

    def __len__(self):
        return int(np.floor(len(self.audio_files) / self.batch_size))

    def __getitem__(self, index):
        # Generate a batch of audio and video files
        batch_audio_files = self.audio_files[index * self.batch_size:(index + 1) * self.batch_size]
        batch_video_files = self.__match_video_files(batch_audio_files)
        
        X_audio, y_audio = self.__data_generation_audio(batch_audio_files)
        X_video, y_video = self.__data_generation_video(batch_video_files)
        
        # Ensure the labels are consistent (e.g., for now, using y_audio)
        return (X_audio, X_video), y_audio

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.audio_files)
            np.random.shuffle(self.video_files)

    def __match_video_files(self, batch_audio_files):
        matched_video_files = []
        for audio_file in batch_audio_files:
            actor_id, emotion_label = parse_label_from_filename(os.path.basename(audio_file))
            for video_file in self.video_files:
                video_actor_id, video_emotion_label = parse_label_from_filename(os.path.basename(video_file))
                if actor_id == video_actor_id and emotion_label == video_emotion_label:
                    matched_video_files.append(video_file)
                    break
        return matched_video_files

    # Data generation for audio
    def __data_generation_audio(self, batch_audio_files):
        X_audio = np.empty((self.batch_size, 40, 44, 1))  # Adjust shape to (40, 44, 1)
        y = np.empty((self.batch_size), dtype=int)

        for i, audio_path in enumerate(batch_audio_files):
            mel_spec = extract_audio_features(audio_path)
            mel_spec = np.expand_dims(mel_spec, axis=-1)  # Add channel dimension
            X_audio[i,] = mel_spec  # Ensure MFCC shape matches (40, 44, 1)
            label = parse_label_from_filename(os.path.basename(audio_path))[1]
            y[i] = emotion_labels.index(label)

        return X_audio, y


    def __data_generation_video(self, batch_video_files):
        X_video = np.empty((self.batch_size, self.sequence_length, 112, 112, 3))
        y = np.empty((self.batch_size), dtype=int)

        for i, video_path in enumerate(batch_video_files):
            cap = cv2.VideoCapture(video_path)
            frames = []
            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break
                frames.append(frame)
            cap.release()
            
            if len(frames) == 0:
                raise ValueError(f"No frames extracted from video file {video_path}")
            
            X_video[i,] = extract_video_features(np.array(frames), self.sequence_length)
            label = parse_label_from_filename(os.path.basename(video_path))[1]
            y[i] = emotion_labels.index(label)

        return X_video, y


In [6]:

# File paths
audio_dir = 'dataset/audios'
video_dir = 'dataset/videos'
audio_files = []
video_files = []

# Populate the lists by reading from directories
for actor_dir in os.listdir(audio_dir):
    actor_path = os.path.join(audio_dir, actor_dir)
    if os.path.isdir(actor_path):
        for file in os.listdir(actor_path):
            if file.endswith('.wav'):
                audio_files.append(os.path.join(actor_path, file))

for actor_dir in os.listdir(video_dir):
    actor_path = os.path.join(video_dir, actor_dir)
    if os.path.isdir(actor_path):
        for file in os.listdir(actor_path):
            if file.endswith('.mp4'):
                video_files.append(os.path.join(actor_path, file))

# Debugging: Check the number of audio and video files
print(f"Total audio files: {len(audio_files)}")
print(f"Total video files: {len(video_files)}")


Total audio files: 264
Total video files: 528


In [7]:

# Split the data into training and validation sets
audio_train, audio_val = train_test_split(audio_files, test_size=0.2, random_state=42)
video_train, video_val = train_test_split(video_files, test_size=0.2, random_state=42)

# Create training and validation generators
SEQUENCE_LENGTH = 20  # Fixed sequence length
train_generator = DataGenerator(audio_train, video_train, batch_size=16, sequence_length=SEQUENCE_LENGTH)
val_generator = DataGenerator(audio_val, video_val, batch_size=16, sequence_length=SEQUENCE_LENGTH)


In [8]:

# Model definition

IMAGE_HEIGHT, IMAGE_WIDTH = 112, 112

# Audio input branch with Conv2D layers
input_audio = Input(shape=(40, 44, 1))  # Adjust shape for 2D convolution
x_audio = Conv2D(32, (3, 3), activation='relu')(input_audio)
x_audio = BatchNormalization()(x_audio)
x_audio = MaxPooling2D((2, 2))(x_audio)

x_audio = Conv2D(64, (3, 3), activation='relu')(x_audio)
x_audio = BatchNormalization()(x_audio)
x_audio = MaxPooling2D((2, 2))(x_audio)

x_audio = Flatten()(x_audio)
x_audio = Dense(128, activation='relu', kernel_regularizer=l2(0.001))(x_audio)
x_audio = BatchNormalization()(x_audio)
x_audio = Dropout(0.5)(x_audio)

# Video input branch using 3D Conv layers
input_video = Input(shape=(SEQUENCE_LENGTH, IMAGE_HEIGHT, IMAGE_WIDTH, 3))  # Adjusted input shape for sequence of frames
x_video = Conv3D(16, (3, 3, 3), activation='relu')(input_video)
x_video = BatchNormalization()(x_video)
x_video = MaxPooling3D((1, 2, 2))(x_video)

x_video = Conv3D(32, (3, 3, 3), activation='relu')(x_video)
x_video = BatchNormalization()(x_video)
x_video = MaxPooling3D((1, 2, 2))(x_video)

x_video = Conv3D(64, (3, 3, 3), activation='relu')(x_video)
x_video = BatchNormalization()(x_video)
x_video = MaxPooling3D((1, 2, 2))(x_video)

x_video = Flatten()(x_video)
x_video = Dense(2048, activation='relu')(x_video)
x_video = Dropout(0.5)(x_video)
x_video = Dense(1024, activation='relu')(x_video)
x_video = Dropout(0.5)(x_video)

# Combine audio and video branches
combined = Concatenate()([x_audio, x_video])
x = Dense(512, activation='relu', kernel_regularizer=l2(0.001))(combined)
x = BatchNormalization()(x)
x = Dropout(0.5)(x)
output = Dense(len(emotion_dict), activation='softmax')(x)


In [9]:

# Compile the model
optimizer = Adam(learning_rate=0.0005, clipnorm=1.0)  # Gradient clipping
model = Model(inputs=[input_audio, input_video], outputs=output)
model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])

model.summary()




In [10]:
# Callbacks for early stopping, model checkpointing, and learning rate reduction
checkpoint = ModelCheckpoint('best_model.keras', monitor='val_accuracy', verbose=1, save_best_only=True, mode='max')
early_stopping = EarlyStopping(monitor='val_loss', patience=15, restore_best_weights=True)  # Increased patience
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6, verbose=1)
# Training with validation data
history = model.fit(train_generator, validation_data=val_generator, epochs=50, callbacks=[checkpoint, early_stopping, reduce_lr])



Epoch 1/50
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4s/step - accuracy: 0.1535 - loss: 4.5062
Epoch 1: val_accuracy improved from -inf to 0.00000, saving model to best_model.keras
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m116s[0m 6s/step - accuracy: 0.1518 - loss: 4.4954 - val_accuracy: 0.0000e+00 - val_loss: 5.5336 - learning_rate: 5.0000e-04
Epoch 2/50
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4s/step - accuracy: 0.1772 - loss: 3.9874
Epoch 2: val_accuracy improved from 0.00000 to 0.20833, saving model to best_model.keras
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m99s[0m 5s/step - accuracy: 0.1779 - loss: 3.9878 - val_accuracy: 0.2083 - val_loss: 3.0360 - learning_rate: 5.0000e-04
Epoch 3/50
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4s/step - accuracy: 0.1398 - loss: 4.4014
Epoch 3: val_accuracy did not improve from 0.20833
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m93s[0m