In [None]:
import cv2
import os
import moviepy.editor as mp

# Directory containing the videos
video_directory = "F:/thesis/Data/F6/F71"
output_directory = "F:/thesis/Data/F6/F71_1FPS"
  # Directory to save processed videos
fps = 1  # Desired frame rate (frames per second)

# Create the output directory if it doesn't exist
os.makedirs(output_directory, exist_ok=True)

# Iterate through each video file
for video_file in os.listdir(video_directory):
    video_path = os.path.join(video_directory, video_file)
    output_path = os.path.join(output_directory, f"{video_file}")  # Output video path

    # Open the video file
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error opening video file: {video_file}")
        continue

    # Get video properties
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps_original = cap.get(cv2.CAP_PROP_FPS)

    # Create video writer object
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Codec for output video
    out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))

    # Read and save every nth frame
    frame_number = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        # Save frame every nth frame
        if frame_number % int(fps_original / fps) == 0:
            out.write(frame)

        frame_number += 1

    # Release video capture and writer objects
    cap.release()
    out.release()

    # Combine video with original audio
    audio_clip = mp.AudioFileClip(video_path).subclip().set_duration(frame_number / fps_original)
    video_clip = mp.VideoFileClip(output_path)
    video_clip = video_clip.set_audio(audio_clip)
    video_clip.write_videofile(output_path, codec='libx264', fps=fps, verbose=False)


# Features from videos

In [None]:
# import cv2
# import numpy as np
# import pandas as pd
# import os
# from mtcnn import MTCNN
# import torch
# from facenet_pytorch import InceptionResnetV1

# # Load the pre-trained FaceNet model
# facenet_model = InceptionResnetV1(pretrained='vggface2').eval().cuda()

# # Define a function to preprocess and extract features from a video frame
# def extract_features(frame):
#     # Create an MTCNN detector
#     detector = MTCNN()

#     # Detect faces in the frame
#     results = detector.detect_faces(frame)

#     features = []

#     for result in results:
#         x, y, w, h = result['box']

#         # Extract the face region
#         face_region = frame[y:y+h, x:x+w]

#         # Check if the face region is not empty
#         if face_region.shape[0] > 0 and face_region.shape[1] > 0:
#             # Define the desired size for FaceNet input (160x160)
#             desired_size = (160, 160)

#             # Resize the face region using OpenCV
#             face_region = cv2.resize(face_region, dsize=desired_size, interpolation=cv2.INTER_AREA)

#             # Convert the resized face region to a NumPy array
#             face_region = np.array(face_region)

#             # Normalize pixel values to the range [0, 1] (optional)
#             face_region = face_region / 255.0  # You can skip this step if your data is already in the correct range

#             # Convert to PyTorch tensor and move to GPU
#             face_region = torch.tensor(face_region).permute(2, 0, 1).float().cuda()

#             # Expand dimensions to make it compatible with the FaceNet model
#             face_region = face_region.unsqueeze(0)

#             # Extract features using FaceNet
#             with torch.no_grad():
#                 face_embedding = facenet_model(face_region).cpu()

#             # Append the feature vector to the list
#             features.append(face_embedding[0])
#         else:
#             # Handle the case where the face region is empty or invalid
#             print("Empty or invalid face region detected")

#     return features


# # Process each video and extract features (one frame per second)
# video_directory = "F:/thesis/temp/F0/F64"
# output_file = "F:/thesis/temp/F0/videos_features_F64.csv"

# video_files = [file for file in os.listdir(video_directory) if file.endswith(".mp4")]

# all_features = []
# count = 0
# for video_file in video_files:
#     count = count + 1
#     print(count)
#     cap = cv2.VideoCapture(os.path.join(video_directory, video_file))
#     feature_count = 0  # Initialize feature count for each video
#     frame_number = 0  # Initialize frame number

#     while True:
#         ret, frame = cap.read()
#         if not ret:
#             break

#         frame_number += 1

#         # Process one frame per second
#         if frame_number % cap.get(cv2.CAP_PROP_FPS) == 0:
#             # Check if the frame is not empty
#             if frame is not None:
#                 # Extract features from the frame
#                 frame_features = extract_features(frame)
#                 v_file = os.path.join(video_directory, video_file)
#                 # Append the video file link to each feature row with feature number
#                 video_label = f"{video_file[0]}"
#                 feature_count += 1  # Increment feature count

#                 for frame_feature in frame_features:
#                     feature_row = [v_file, feature_count, video_label] + frame_feature.tolist()
#                     # Add file path and file name
#                     all_features.append(feature_row)
#             else:
#                 # Handle the case where the frame is empty or invalid
#                 print("Empty or invalid frame detected")

#     cap.release()

# # Convert the features to a DataFrame and save to a CSV file
# feature_columns = ["Link", "Frame", "Label"] + [f"Feature_{i}" for i in range(len(all_features[0]) - 3)]
# features_df = pd.DataFrame(all_features, columns=feature_columns)
# features_df.to_csv(output_file, index=False)


### Updated

In [None]:
import cv2
import os
import pandas as pd
import torch
from facenet_pytorch import InceptionResnetV1
import numpy as np
from mtcnn import MTCNN

# Load the pre-trained FaceNet model
facenet_model = InceptionResnetV1(pretrained='vggface2').eval().cuda()

# Function to extract face embeddings from a frame
def extract_face_embeddings(frame, device):
    try:
        # Create an MTCNN detector
        detector = MTCNN()

        # Detect faces in the frame
        results = detector.detect_faces(frame)

        embeddings = []

        for result in results:
            x, y, w, h = result['box']

            # Extract the face region
            face_region = frame[y:y+h, x:x+w]

            # Check if the face region is not empty
            if face_region.shape[0] > 0 and face_region.shape[1] > 0:
                # Define the desired size for FaceNet input (160x160)
                desired_size = (160, 160)

                # Resize the face region using OpenCV
                face_region = cv2.resize(face_region, dsize=desired_size, interpolation=cv2.INTER_AREA)

                # Convert the resized face region to a NumPy array
                face_region = np.array(face_region)

                # Normalize pixel values to the range [0, 1]
                face_region = face_region / 255.0

                # Convert to PyTorch tensor and move to GPU
                face_region = torch.tensor(face_region).permute(2, 0, 1).float().to(device)

                # Expand dimensions to make it compatible with the FaceNet model
                face_region = face_region.unsqueeze(0)

                # Extract face embeddings using FaceNet
                with torch.no_grad():
                    face_embedding = facenet_model(face_region).cpu()

                # Append the face embeddings to the list
                embeddings.append(face_embedding[0].numpy())
            else:
                # Handle the case where the face region is empty or invalid
                print("Empty or invalid face region detected")

        return embeddings
    except Exception as e:
        print(f"Error extracting face embeddings: {str(e)}")
        return None


# Process each video and extract face embeddings
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Process each video and extract face embeddings
video_directory = "F:/thesis/Data/F6/F71_1FPS"
output_file = "F:/thesis/Data/F6/videos_face_embeddings_71fps.csv"

video_files = [file for file in os.listdir(video_directory) if file.endswith(".mp4")]

all_features = []
count = 0
for video_file in video_files:
    frame_number = 0
    cap = cv2.VideoCapture(os.path.join(video_directory, video_file))
    count = count + 1
    print(count)
    frame_rate = cap.get(cv2.CAP_PROP_FPS)
    
    while True:
        
        ret, frame = cap.read()
        if not ret:
            break
        # Check if the frame is not empty and it's time to process a frame
        if frame is not None and frame_number % frame_rate == 0:
            # Extract face embeddings from the frame
            face_embeddings = extract_face_embeddings(frame, device)
            v_file = os.path.join(video_directory, video_file)
            video_label = f"{video_file[0]}"
            frame_number += 1
            if face_embeddings is not None:
                # Append the video file link and face embeddings
                for face_embedding in face_embeddings:
                    feature_row = [v_file, frame_number, video_label] + face_embedding.tolist()
                    all_features.append(feature_row)
        
        else:
            # Handle the case where the frame is empty or invalid
            print("Empty or invalid frame detected")

    cap.release()

# Convert the features to a DataFrame and save to a CSV file
embedding_columns = ["Link", "Frame", "Label"] + [f"Embedding_{i}" for i in range(len(all_features[0]) - 3)]
embedding_df = pd.DataFrame(all_features, columns=embedding_columns)
embedding_df.to_csv(output_file, index=False)

In [None]:
import cv2
import os

# Specify the path to the folder containing your video files
video_folder = "F:/thesis/temp/F5/v"


# Iterate over all video files in the folder
for video_file in os.listdir(video_folder):
    if video_file.endswith(".mp4"):  # Assuming your videos have the .mp4 extension
        video_path = os.path.join(video_folder, video_file)

        # Open the video file
        cap = cv2.VideoCapture(video_path)

        # Get the frames per second (fps) of the video
        fps = cap.get(cv2.CAP_PROP_FPS)

        # Print or store the fps for each video
        #print(f"Video: {video_file}, Frames per Second (fps): {fps}")
        print(f"Frames per Second (fps): {fps}")

        # Release the video capture object
        cap.release()



# Testing Videos Features

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader

# Define the NeuralNet class
class NeuralNet(nn.Module):
    def __init__(self, input_size, num_classes):
        super(NeuralNet, self).__init__()
        self.fc1 = nn.Linear(input_size, 128)
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, num_classes)
    
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        # x = self.dropout(x)
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# Load the features CSV file with the provided header
# header = ["Link", "Label"] + [f"Feature_{i}" for i in range(512)]
features_df = pd.read_csv("F:/thesis/Features/Final/Updated/Video_features_final.csv")
audio_features = pd.read_csv("F:/thesis/Features/Final/Updated/Audio_Features_final_wave2vec.csv")

Y_VF = audio_features["Label"].values  # Emotion label

# Group by 'Counter' and calculate mean
X_VF = features_df.drop(columns=["Link", "Label"])
X_VF = X_VF.groupby('Counter').mean().reset_index()

# Convert DataFrame to numpy array
X_VF_np = X_VF.iloc[:, 1:].values


encoder = LabelEncoder()

# Fit and transform the string labels to integer labels
Y_VF = encoder.fit_transform(audio_features["Label"].values)

# Convert numpy array to PyTorch tensor
X_VF = torch.tensor(X_VF_np, dtype=torch.float32)
Y_VF = torch.tensor(Y_VF, dtype=torch.long)

# No need for train_test_split, split directly with PyTorch
test_size = 0.2
num_samples = len(X_VF)
num_test_samples = int(test_size * num_samples)
num_train_samples = num_samples - num_test_samples

# Split the data
X_train, X_test = X_VF[:num_train_samples], X_VF[num_train_samples:]
y_train, y_test = Y_VF[:num_train_samples], Y_VF[num_train_samples:]

# Move data and model to CUDA if available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
X_train, X_test = X_train.to(device), X_test.to(device)
y_train, y_test = y_train.to(device), y_test.to(device)

# Build a simple neural network model using PyTorch
input_size = X_train.shape[1]
num_classes = len(np.unique(Y_VF))
model = NeuralNet(input_size, num_classes)
model.to(device)

# Define loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training the model
num_epochs = 200
for epoch in range(num_epochs):
    model.train()
    optimizer.zero_grad()
    outputs = model(X_train)
    loss = criterion(outputs, y_train)
    loss.backward()
    optimizer.step()

# Evaluate the model
model.eval()
with torch.no_grad():
    outputs = model(X_test)
    _, predicted = torch.max(outputs, 1)
    correct = (predicted == y_test).sum().item()
    total = y_test.size(0)
    accuracy = correct / total * 100
    print(f"Test accuracy: {accuracy:.2f}%")

# Use the same encoder instance for inverse transformation
predicted_labels = encoder.inverse_transform(predicted.cpu().numpy())

In [None]:
X_VF.shape

#### MFCC

In [None]:
import librosa
import numpy as np
import pandas as pd
import csv
import os
import matplotlib.pyplot as plt

# Function to create MFCCs from audio
def create_mfcc(audio_file):
    # Load the audio file
    x, sr = librosa.load(audio_file, sr=None)

    # Compute MFCC features
    mfcc = librosa.feature.mfcc(y=x, sr=sr)

    return mfcc

# Function to process all audio files in a folder and its subfolders
def process_audio_folder(folder_path):
    all_features = []

    for root, _, files in os.walk(folder_path):
        for file in files:
            if file.endswith(".mp3"):  # You can adjust the file extension as needed
                audio_file = os.path.join(root, file)
                mfcc = create_mfcc(audio_file)
                # Flatten the MFCC matrix
                mfcc_flat = mfcc.flatten()
                # Storing link, label, and flattened MFCCs in a list
                all_features.append([audio_file, file[0]] + mfcc_flat.tolist())

    return all_features

# Function to save features to a CSV file
def save_to_csv(data, output_file, column_names):
    with open(output_file, 'w', newline='') as csvfile:
        csv_writer = csv.writer(csvfile)
        # Writing header to the CSV file
        csv_writer.writerow(column_names)
        # Writing data to the CSV file
        csv_writer.writerows(data)

# Example usage:
audio_folder = "F:/thesis/temp/F0/F"
output_file = "F:/thesis/temp/F5/MFCC/features_5.csv"
all_features = process_audio_folder(audio_folder)
# Generate column names including MFCC indices
column_names = ["Link", "Label"] + [f"MFCC_{i}" for i in range(1, len(all_features[0]) - 1)]
save_to_csv(all_features, output_file, column_names)


# Speech Features

In [None]:
import librosa
import numpy as np
import pandas as pd
import csv
import os

# Define a function to extract audio features from an audio file
def extract_audio_features(audio_file):
    # Load the audio file
    y, sr = librosa.load(audio_file)

    # Extract audio features
    features = []

    # Tempo and beat features
    tempo, _ = librosa.beat.beat_track(y=y, sr=sr)
    features.append(tempo)

    # Spectral features
    spectral_centroid = np.mean(librosa.feature.spectral_centroid(y=y, sr=sr))
    spectral_bandwidth = np.mean(librosa.feature.spectral_bandwidth(y=y, sr=sr))
    spectral_contrast = np.mean(librosa.feature.spectral_contrast(y=y, sr=sr))
    spectral_rolloff = np.mean(librosa.feature.spectral_rolloff(y=y, sr=sr))
    features.extend([spectral_centroid, spectral_bandwidth, spectral_contrast, spectral_rolloff])

    # MFCC (Mel-frequency cepstral coefficients) features
    mfccs = np.mean(librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13), axis=1)
    features.extend(mfccs)

    # Zero-crossing rate
    zero_crossing_rate = np.mean(librosa.feature.zero_crossing_rate(y))
    features.append(zero_crossing_rate)

    # Chroma feature
    chroma = np.mean(librosa.feature.chroma_stft(y=y, sr=sr))
    features.append(chroma)

    return features

# Function to process all audio files in a folder and its subfolders
def process_audio_folder(folder_path):
    all_features = []

    for root, _, files in os.walk(folder_path):
        for file in files:
            if file.endswith(".mp3"):  # You can adjust the file extension as needed
                audio_file = os.path.join(root, file)
                features = extract_audio_features(audio_file)
                # Include the file path and name in the CSV
                all_features.append([audio_file, file[0]] + features)

    return all_features

# Function to save features to a CSV file
def save_to_csv(data, output_file):
    with open(output_file, 'w', newline='') as csvfile:
        csv_writer = csv.writer(csvfile)
        csv_writer.writerow(["Link", "Label", "Tempo", "SpectralCentroid", "SpectralBandwidth", "SpectralContrast", "SpectralRolloff", "MFCC1", "MFCC2", "MFCC3", "MFCC4", "MFCC5", "MFCC6", "MFCC7", "MFCC8", "MFCC9", "MFCC10", "MFCC11", "MFCC12", "MFCC13", "ZeroCrossingRate", "Chroma"])
        csv_writer.writerows(data)

audio_directory = "F:/thesis/temp/F0/F64/Audio"
output_file = "F:/thesis/temp/F0/audio_features_F64.csv"
all_features = process_audio_folder(audio_directory)

# Save the audio features to a CSV file
save_to_csv(all_features, output_file)

### Speech Features from Wav2vec

In [1]:
import os
import csv
import numpy as np
import pandas as pd
import torch
from transformers import Wav2Vec2Processor, Wav2Vec2Model
import librosa

# Load the Wav2Vec2 processor and model
model_name = 'kingabzpro/wav2vec2-large-xls-r-300m-Urdu'

 
processor = Wav2Vec2Processor.from_pretrained(model_name)
model = Wav2Vec2Model.from_pretrained(model_name)


# Define a function to extract audio features from an audio file
def extract_audio_features(audio_file):
    # Load the audio file with librosa
    y, sr = librosa.load(audio_file,sr=16000)

    # Process the audio file with Wav2Vec2 processor
    inputs = processor(y, return_tensors="pt", padding="longest", sampling_rate=sr)

    # Get the model embeddings
    with torch.no_grad():
        model_output = model(**inputs)

    # Use the output embeddings as features
    features = model_output.last_hidden_state.mean(dim=1).squeeze().numpy()

    return features.tolist()

# Function to process all audio files in a folder and its subfolders
def process_audio_folder(folder_path):
    all_features = []
    count=0

    for root, _, files in os.walk(folder_path):
        for file in files:
            if file.endswith(".mp3"):
                audio_file = os.path.join(root, file)
                features = extract_audio_features(audio_file)
                # Include the file path and name in the CSV
                count = count + 1
               # print(count)
                all_features.append([audio_file, file[0]] + features)

    return all_features

# Function to save features to a CSV file
def save_to_csv(data, output_file):
    with open(output_file, 'w', newline='') as csvfile:
        csv_writer = csv.writer(csvfile)
        csv_writer.writerow(["Link", "Label"] + [f"Feature_{i}" for i in range(len(data[0]) - 2)])
        csv_writer.writerows(data)

# Specify the directory containing audio files and the output CSV file
audio_directory = "F:/thesis/Data/F6/F71/Audio"
output_file = "F:/thesis/Data/F6/Audio Features/audio_features_F71.csv"

# Process audio files and save features to CSV
all_features = process_audio_folder(audio_directory)
save_to_csv(all_features, output_file)



OSError: Can't load tokenizer for 'facebook/wav2vec2-xls-r-2b'. If you were trying to load it from 'https://huggingface.co/models', make sure you don't have a local directory with the same name. Otherwise, make sure 'facebook/wav2vec2-xls-r-2b' is the correct path to a directory containing all relevant files for a Wav2Vec2CTCTokenizer tokenizer.

# Testing of Speech Features

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import pandas as pd
from sklearn.preprocessing import LabelEncoder

# Check if CUDA (GPU) is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load the data from your CSV file
data = pd.read_csv("F:/thesis/Features/Final/Updated/Audio_Features_final_wave2vec.csv")
Y_AF = data["Label"]
# Split data into features (X) and labels (y)
X_AF = data.drop(columns=["Link", "Label"])


# Use LabelEncoder to convert string labels to numerical labels
label_encoder = LabelEncoder()
Y_AF = label_encoder.fit_transform(Y_AF)

# Specify the sequence length (adjust as needed)
sequence_length = 20

# Create sequences of the desired length
sequences = []
labels = []
for i in range(len(X_AF) - sequence_length + 1):
    sequence = X_AF.iloc[i:i+sequence_length].values  # Extract a sequence of features
    label = Y_AF[i + sequence_length - 1]  # Use the label of the last item in the sequence
    sequences.append(sequence)
    labels.append(label)

# Convert data to PyTorch tensors and move them to the GPU
sequences = torch.Tensor(sequences).to(device)
labels = torch.LongTensor(labels).to(device)

# Split the data into training and testing sets
X_train_A, X_test_A, y_train_A, y_test_A = train_test_split(sequences, labels, test_size=0.2, random_state=42)

# Create a DataLoader for the training set (optional but useful for mini-batch training)
batch_size = 32  # Adjust as needed
train_dataset = TensorDataset(X_train_A, y_train_A)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Define an LSTM-based model
class EmotionLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(EmotionLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        out, _ = self.lstm(x)
        out = self.fc(out[:, -1, :])  # Take the output from the last time step
        return out

# Define the LSTM model hyperparameters
input_size = X_train_A.shape[2]  # Input size based on the number of features in each time step
hidden_size = 64
num_layers = 2  # You can adjust this as needed
num_classes = len(label_encoder.classes_)

# Initialize the model and move it to the GPU
model = EmotionLSTM(input_size, hidden_size, num_layers, num_classes).to(device)

# Define a loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)

# Training loop
num_epochs = 50
for epoch in range(num_epochs):
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

# Set the model to evaluation mode
model.eval()

# Make predictions on the test set
with torch.no_grad():
    outputs = model(X_test_A)
    _, predicted = torch.max(outputs, 1)

# Move the predictions to the CPU and convert them to a NumPy array
predicted = predicted.cpu().numpy()

# Calculate accuracy
accuracy = accuracy_score(y_test_A.cpu().numpy(), predicted)
print("Accuracy:", accuracy)


  sequences = torch.Tensor(sequences).to(device)


Accuracy: 0.5502421307506054


# Features From Text

In [None]:
import torch
from transformers import BertTokenizer, BertModel
import pandas as pd
import numpy as np

# Set the device to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load the pre-trained BERT model and tokenizer for Urdu
model_name = "bert-base-uncased"
tokenizer = BertTokenizer.from_pretrained(model_name)
model = BertModel.from_pretrained(model_name)
model.to(device)

# Define a function to extract features from Urdu text
def extract_features(text):
    if pd.notna(text):  # Check if the text is not missing or NaN
        # Tokenize the text and add special tokens [CLS] and [SEP]
        input_ids = tokenizer.encode(text, add_special_tokens=True, max_length=512, truncation=True)

        # Convert the input to a PyTorch tensor and move to GPU
        input_ids = torch.tensor(input_ids).unsqueeze(0).to(device)  # Batch size of 1

        # Pass the input through the BERT model
        with torch.no_grad():
            outputs = model(input_ids)

        # Extract the features (output embeddings of the [CLS] token)
        features = outputs.last_hidden_state[:, 0, :].cpu().numpy()

        return features
    else:
        return None

# Load your Excel file with text and labels
input_excel_file = 'F:/thesis/Data/F6/transcriptions.xlsx'
df = pd.read_excel(input_excel_file)  # Use pd.read_excel() to read from an Excel file

# Extract features from the Urdu texts and preserve labels
all_features = []

for index, row in df.iterrows():
    text = row["Transcription"]  # Assuming the column name for text is "Transcription"
    label = row["Label"]  # Assuming the column name for labels is "Label"
    
    print(f"Processing text: {text}")
    
    text_features = extract_features(text)
    
    if text_features is not None:
        # Append both the text features and the label
        all_features.append((text_features, label))
    else:
        print(f"Text excluded: {text}")

# Flatten the nested arrays
all_features = [(features.flatten(), label) for features, label in all_features]

# Convert the features to a DataFrame
feature_columns = [f"Feature_{i}" for i in range(all_features[0][0].shape[0])]
features_df = pd.DataFrame([features for features, label in all_features], columns=feature_columns)

# Add a column for the labels
features_df["Label"] = [label for _, label in all_features]

# Save the features to a CSV file (or you can save it to an Excel file using to_excel)
output_file = "F:/thesis/Data/F6/Text/urdu_text_features.csv"
features_df.to_csv(output_file, index=False)

print(f"Features saved to {output_file}")


### Fine Tune XLMR

In [None]:
import pandas as pd
import datasets
from datasets import Dataset
from sklearn.model_selection import train_test_split
from transformers import XLMRobertaTokenizer, AutoModelForSequenceClassification, XLMRobertaForSequenceClassification, Trainer, TrainingArguments
import evaluate

# Load your CSV file
input_excel_file = "F:/thesis/Features/Final/Updated/transcriptions_final_label.xlsx"
df = pd.read_excel(input_excel_file)
df = df.drop(columns=["Link"])

# Rename your columns if necessary (assuming 'transcription' and 'label' are the headers)
df = df[['Transcription', 'Label']]

# Convert labels to integers if they are categorical
#df['Label'] = pd.Categorical(df['Label']).codes
labels = list(set(df['Label']))
label_encoding = {}
for i in range(len(labels)):
  label_encoding[labels[i]] = i
label_encoding


In [None]:
rev_label_encoding = {v: k for k, v in label_encoding.items()}
rev_label_encoding

In [None]:
train_df, val_df = train_test_split(df, test_size=0.2, random_state=42)

In [None]:
data_train = train_df.replace({'Label': label_encoding})
data_test = val_df.replace({'Label': label_encoding})

In [None]:
data_test

In [None]:
# Convert the Pandas dataframes into Hugging Face Datasets
train_dataset = datasets.Dataset.from_pandas(data_train)
val_dataset = datasets.Dataset.from_pandas(data_test)

In [None]:
train_dataset[0]

In [None]:
data = datasets.DatasetDict()
data['train']= train_dataset
data['test']= val_dataset

In [None]:
from transformers import AutoTokenizer

tokaneizer_name = "xlm-roberta-large"
model_name = "xlm-roberta-large"
tokenizer = AutoTokenizer.from_pretrained(tokaneizer_name)

# Tokenizer function
def tokenize_function(examples):
    return tokenizer(examples["Transcription"], padding="max_length", truncation=True)
tokenized_datasets = data.map(tokenize_function, batched=True)

In [None]:
# # Tokenize the datasets
# train_dataset = train_da.map(tokenize_function, batched=True)
# val_dataset = val_dataset.map(tokenize_function, batched=True)

# # Set the format for PyTorch tensors
# train_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "Label"])
# val_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "Label"])

In [None]:
# # Load the tokenizer
# tokenizer = XLMRobertaTokenizer.from_pretrained('xlm-roberta-base')

# # Load the model (adjust num_labels to match your task)
# model = XLMRobertaForSequenceClassification.from_pretrained('xlm-roberta-base', num_labels=df['Label'].nunique())

In [None]:
tokenized_datasets

In [None]:
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=len(rev_label_encoding), id2label=rev_label_encoding, label2id=label_encoding, ignore_mismatched_sizes=True)

In [None]:
metric1 = evaluate.load("f1")
metric2 = evaluate.load("accuracy")

In [None]:
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    return {'f1': metric1.compute(predictions=predictions, references=labels, average='macro')['f1'], 'accuracy': metric2.compute(predictions=predictions, references=labels)['accuracy']}

In [None]:
# Set up training arguments
training_args = TrainingArguments(output_dir="./results",
                                  evaluation_strategy='epoch',
                                  save_strategy='epoch',
                                  num_train_epochs=5,
                                  per_device_train_batch_size=2,
                                  per_gpu_train_batch_size=2,
                                  per_device_eval_batch_size=2,
                                  per_gpu_eval_batch_size=2,
                                  save_total_limit = 5,
                                  metric_for_best_model = 'f1',
                                  load_best_model_at_end=True,
                                 learning_rate = 5e-5
                                 )

In [None]:
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_datasets['train'],
    eval_dataset=tokenized_datasets['test'],
    compute_metrics=compute_metrics,
)

In [None]:
# Train the model
trainer.train()

In [None]:
trainer.evaluate()

In [None]:
# Save the fine-tuned model and tokenizer
model.save_pretrained('./fine_tuned_xlm_roberta')
tokenizer.save_pretrained('./fine_tuned_xlm_roberta')


### XLMR Embeddings

In [3]:
import pandas as pd
import torch
from transformers import AutoTokenizer, AutoModel

# Load the pre-trained XLM-RoBERTa model and tokenizer
tokenizer = AutoTokenizer.from_pretrained("xlm-roberta-large")
model = AutoModel.from_pretrained("xlm-roberta-large")



In [None]:
# Load the Excel file
file_path = 'F:/thesis/Features/Final/Updated/transcriptions_final_label.xlsx'  # Replace with your file path
df = pd.read_excel(file_path)

# Assuming the columns are named 'Link', 'Label', and 'Text'
links = df['Link'].tolist()
labels = df['Label'].tolist()
texts = df['Transcription'].tolist()

# Tokenize sentences and extract embeddings
def get_embeddings(sentence):
    inputs = tokenizer(sentence, return_tensors="pt", padding=True, truncation=True)
    with torch.no_grad():
        outputs = model(**inputs)
    sentence_embedding = outputs.last_hidden_state.mean(dim=1).squeeze(0)
    return sentence_embedding

# Process each text to get embeddings
embeddings = []
for text in texts:
    embedding = get_embeddings(text)
    embeddings.append(embedding.cpu().numpy())  # Convert to numpy array

# Create a DataFrame to store embeddings along with links and labels
embeddings_df = pd.DataFrame(embeddings)
embeddings_df['Link'] = links
embeddings_df['Label'] = labels

# Save the DataFrame to a CSV file
output_file = 'F:/thesis/Data/F6/Text/urdu_embeddings.csv'  # Specify your output file name
embeddings_df.to_csv(output_file, index=False)

print(f"Embeddings saved to {output_file}")


# Testing of Text Features

In [3]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score


# Load the features from the Excel file
input_file = ("F:/thesis/Features/Final/Updated/Urdu_embeddings_xlmr_large.csv")
df = pd.read_csv(input_file)
df = df.drop(columns=["Link"])

In [4]:
from sklearn.preprocessing import LabelEncoder
# Split the data into features (X) and labels (y)
#df = df.drop(['Link'],axis = 1)
data = df.iloc[::]  # Features
Y = df['Label']      # Target variable
X = data.drop(['Label'],axis = 1)

label_encoder = LabelEncoder()
Y = label_encoder.fit_transform(Y)

In [5]:
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=42)

In [6]:
import xgboost as xgb
from sklearn.metrics import accuracy_score

# Define and train the XGBoost classifier
xgb_model = xgb.XGBClassifier(
    n_estimators=100,      # Number of boosting rounds (trees)
    tree_method="hist",
    learning_rate=0.5,     # Learning rate for boosting
    max_depth=6,           # Maximum depth of trees
    eval_metric='mlogloss', # Log loss for multi-class classification
    use_label_encoder=False # Disable automatic label encoding
)

# Train the XGBoost classifier on the extracted features and corresponding labels
xgb_model.fit(X_train, Y_train)

y_pred = xgb_model.predict(X_test)

# Calculate test accuracy
accuracy = accuracy_score(Y_test, y_pred)
print(f"XGBoost Test Accuracy: {accuracy * 100:.2f}%")

Parameters: { "use_label_encoder" } are not used.



XGBoost Test Accuracy: 44.81%


In [None]:
# Initialize and train an SVM classifier
classifier = SVC(kernel='linear', C=1.0, probability=True)
classifier.fit(X_train, Y_train)

# Make predictions on the test set
Y_pred = classifier.predict(X_test)

# Calculate accuracy
accuracy = accuracy_score(Y_test, Y_pred)
print(f"Accuracy: {accuracy}")

# If you want probability estimates, you can use predict_proba
# probabilities = classifier.predict_proba(X_test)

# Late Fusion with LSTM

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import pandas as pd
import numpy as np

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Define LSTM models for each modality (Video, Audio, Text)

class VideoLSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers):
        super(VideoLSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        batch_size = x.size(0)  # Get the batch size from input 'x'
        # Initialize hidden and cell states
        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out


class AudioLSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers):
        super(AudioLSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        batch_size = x.size(0)  # Get the batch size from input 'x'
        # Initialize hidden and cell states
        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out


class TextLSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers):
        super(TextLSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        batch_size = x.size(0)  # Get the batch size from input 'x'
        # Initialize hidden and cell states
        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out

# Define Late Fusion model with LSTM
class LateFusionLSTM(nn.Module):
    def __init__(self, video_model, audio_model, text_model, output_size):
        super(LateFusionLSTM, self).__init__()
        self.video_model = video_model
        self.audio_model = audio_model
        self.text_model = text_model
        self.fc = nn.Linear(3 * output_size, output_size)

    def forward(self, video_input, audio_input, text_input):
        video_output = self.video_model(video_input)
        audio_output = self.audio_model(audio_input)
        text_output = self.text_model(text_input)

        combined = torch.cat((video_output[:, -1, :], audio_output[:, -1, :], text_output[:, -1, :]), dim=1)
        out = self.fc(combined)
        return out

# Initialize LSTM models for each modality
video_input_size = 100  # Modify this according to your feature size
audio_input_size = 20   # Modify this according to your feature size
text_input_size = 768   # Modify this according to your feature size
hidden_size = 64        # Modify this according to your choice
output_size = 5         # Modify this according to the number of classes
num_layers = 2          # Modify this according to your choice

video_lstm_model = VideoLSTMModel(video_input_size, hidden_size, output_size, num_layers).to(device)
audio_lstm_model = AudioLSTMModel(audio_input_size, hidden_size, output_size, num_layers).to(device)
text_lstm_model = TextLSTMModel(text_input_size, hidden_size, output_size, num_layers).to(device)

late_fusion_lstm_model = LateFusionLSTM(video_lstm_model, audio_lstm_model, text_lstm_model, output_size).to(device)

In [None]:
# Load video, audio, and text features separately
video_features = pd.read_csv("F:/thesis/Data/Final/Video_Features_final.csv")
audio_features = pd.read_csv("F:/thesis/Data/Final/audio_features_final.csv")
text_features = pd.read_excel("F:/thesis/Data/Final/urdu_text_features.xlsx")

# Extract labels for each modality
video_labels = video_features["Label"]
audio_labels = audio_features["Label"]
text_labels = text_features["Label"]

# Drop unnecessary columns
video_features = video_features.drop(columns=["Link", "Label"])
audio_features = audio_features.drop(columns=["Link", "Label"])
text_features = text_features.drop(columns=["Label"])

# Convert text labels to numerical form using LabelEncoder
label_encoder = LabelEncoder()
text_labels_numeric = label_encoder.fit_transform(text_labels)

# Convert Pandas DataFrames to PyTorch tensors for each modality
video_tensor = torch.tensor(video_features.values, dtype=torch.float32)
audio_tensor = torch.tensor(audio_features.values, dtype=torch.float32)
text_tensor = torch.tensor(text_features.values, dtype=torch.float32)
text_labels_tensor = torch.tensor(text_labels_numeric, dtype=torch.long)


#video_labels_numeric = label_encoder.fit_transform(video_labels)
#audio_labels_numeric = label_encoder.fit_transform(audio_labels)

#video_labels_tensor = torch.tensor(video_labels_numeric, dtype=torch.long)
#audio_labels_tensor = torch.tensor(audio_labels_numeric, dtype=torch.long)

# Combine features and labels into a single dataset
#video_dataset = TensorDataset(video_tensor, text_labels_tensor)
#audio_dataset = TensorDataset(audio_tensor, text_labels_tensor)
#text_dataset = TensorDataset(text_tensor, text_labels_tensor)

# Create data loaders for each modality
#batch_size = 64
#video_loader = DataLoader(video_dataset, batch_size=batch_size, shuffle=True)
#audio_loader = DataLoader(audio_dataset, batch_size=batch_size, shuffle=True)
#text_loader = DataLoader(text_dataset, batch_size=batch_size, shuffle=True)

In [None]:
def create_data_loader(features, labels, batch_size, shuffle=True):
    dataset = TensorDataset(features, labels)
    return DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)


late_fusion_optimizer = optim.Adam(late_fusion_lstm_model.parameters(), lr=0.001)
criterion_combined = nn.CrossEntropyLoss()

num_epochs_late_fusion = 10  # Set the number of epochs as needed

# Splitting data into train and validation sets
video_train, video_val, audio_train, audio_val, text_train, text_val, label_train, label_val = train_test_split(
    video_tensor, audio_tensor, text_tensor, text_labels_tensor, test_size=0.2, random_state=42)

# Creating separate datasets and data loaders for train and validation
train_dataset = TensorDataset(video_train, audio_train, text_train, label_train)
val_dataset = TensorDataset(video_val, audio_val, text_val, label_val)

batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# Training loop
for epoch in range(num_epochs_late_fusion):
    late_fusion_lstm_model.train()
    
    for batch_idx, (video_batch, audio_batch, text_batch, label_batch) in enumerate(train_loader):
        # Move each batch to device
        video_inputs = video_batch.to(device)
        audio_inputs = audio_batch.to(device)
        text_inputs = text_batch.to(device)
        labels = label_batch.to(device).view(-1)  # Assuming labels are already in the correct shape
        
        late_fusion_optimizer.zero_grad()
        
        video_outputs = video_lstm_model(video_inputs)
        audio_outputs = audio_lstm_model(audio_inputs)
        text_outputs = text_lstm_model(text_inputs)
        
        # Combine outputs of individual models
        combined_outputs = late_fusion_lstm_model(video_outputs, audio_outputs, text_outputs)
        
        # Calculate loss and optimize
        loss_late_fusion = criterion_combined(combined_outputs, labels)
        loss_late_fusion.backward()
        late_fusion_optimizer.step()

    # Validation loop
    late_fusion_lstm_model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    
    for batch_idx, (video_batch, audio_batch, text_batch, label_batch) in enumerate(val_loader):
        with torch.no_grad():
            # Move each batch to device
            video_inputs = video_batch.to(device)
            audio_inputs = audio_batch.to(device)
            text_inputs = text_batch.to(device)
            labels = label_batch.to(device).view(-1)  # Assuming labels are already in the correct shape
            
            video_outputs = video_lstm_model(video_inputs)
            audio_outputs = audio_lstm_model(audio_inputs)
            text_outputs = text_lstm_model(text_inputs)
            
            # Combine outputs of individual models
            combined_outputs = late_fusion_lstm_model(video_outputs, audio_outputs, text_outputs)
            
            # Calculate loss
            val_loss += criterion_combined(combined_outputs, labels).item()
            
            # Calculate accuracy
            _, predicted = combined_outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

    val_loss /= len(val_loader)
    accuracy = 100 * correct / total

    print(f"Epoch [{epoch+1}/{num_epochs_late_fusion}] | Validation Loss: {val_loss:.4f} | Accuracy: {accuracy:.2f}%")



# Testing Late Fusion

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import pandas as pd
import numpy as np

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Define LSTM models for each modality (Video, Audio, Text)
class VideoLSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers):
        super(VideoLSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        batch_size = x.size(0)  # Get the batch size from input 'x'
        # Initialize hidden and cell states
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)

        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out

    def forward(self, x):
        batch_size = x.size(0)  # Get the batch size from input 'x'
        # Initialize hidden and cell states
        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(x.device)

        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out


class AudioLSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers):
        super(AudioLSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        batch_size = x.size(0)  # Get the batch size from input 'x'
        # Initialize hidden and cell states
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)

        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out


class TextLSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers):
        super(TextLSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        batch_size = x.size(0)  # Get the batch size from input 'x'
        # Initialize hidden and cell states
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)

        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out

# Define Late Fusion model with LSTM
class LateFusionLSTM(nn.Module):
    def __init__(self, video_model, audio_model, text_model, output_size):
        super(LateFusionLSTM, self).__init__()
        self.video_model = video_model
        self.audio_model = audio_model
        self.text_model = text_model
        self.fc = nn.Linear(3 * output_size, output_size)

    def forward(self, video_input, audio_input, text_input):
        video_output = self.video_model(video_input)
        audio_output = self.audio_model(audio_input)
        text_output = self.text_model(text_input)

        combined = torch.cat((video_output[:, -1, :], audio_output[:, -1, :], text_output[:, -1, :]), dim=1)
        out = self.fc(combined)
        return out

# Initialize LSTM models for each modality
video_input_size = 100  # Modify this according to your feature size
audio_input_size = 20   # Modify this according to your feature size
text_input_size = 768   # Modify this according to your feature size
hidden_size = 64        # Modify this according to your choice
output_size = 5         # Modify this according to the number of classes
num_layers = 2          # Modify this according to your choice

video_lstm_model = VideoLSTMModel(video_input_size, hidden_size, output_size, num_layers).to(device)
audio_lstm_model = AudioLSTMModel(audio_input_size, hidden_size, output_size, num_layers).to(device)
text_lstm_model = TextLSTMModel(text_input_size, hidden_size, output_size, num_layers).to(device)

late_fusion_lstm_model = LateFusionLSTM(video_lstm_model, audio_lstm_model, text_lstm_model, output_size).to(device)

# Load video, audio, and text features separately
video_features = pd.read_csv("F:/thesis/Data/Final/Video_Features_final.csv")
audio_features = pd.read_csv("F:/thesis/Data/Final/audio_features_final.csv")
text_features = pd.read_excel("F:/thesis/Data/Final/urdu_text_features.xlsx")

# Extract labels for each modality
video_labels = video_features["Label"]
audio_labels = audio_features["Label"]
text_labels = text_features["Label"]

# Drop unnecessary columns
video_features = video_features.drop(columns=["Link", "Label"])
audio_features = audio_features.drop(columns=["Link", "Label"])
text_features = text_features.drop(columns=["Label"])

# Convert text labels to numerical form using LabelEncoder
label_encoder = LabelEncoder()
text_labels_numeric = label_encoder.fit_transform(text_labels)

# Convert Pandas DataFrames to PyTorch tensors for each modality
video_tensor = torch.tensor(video_features.values, dtype=torch.float32)
audio_tensor = torch.tensor(audio_features.values, dtype=torch.float32)
text_tensor = torch.tensor(text_features.values, dtype=torch.float32)
text_labels_tensor = torch.tensor(text_labels_numeric, dtype=torch.long)


def create_data_loader(features, labels, batch_size, shuffle=True):
    dataset = TensorDataset(features, labels)
    return DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)


late_fusion_optimizer = optim.Adam(late_fusion_lstm_model.parameters(), lr=0.001)
criterion_combined = nn.CrossEntropyLoss()

num_epochs_late_fusion = 10  # Set the number of epochs as needed

# Splitting data into train and validation sets
video_train, video_val, audio_train, audio_val, text_train, text_val, label_train, label_val = train_test_split(
    video_tensor, audio_tensor, text_tensor, text_labels_tensor, test_size=0.2, random_state=42)

# Creating separate datasets and data loaders for train and validation
train_dataset = TensorDataset(video_train, audio_train, text_train, label_train)
val_dataset = TensorDataset(video_val, audio_val, text_val, label_val)

batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# Training loop
for epoch in range(num_epochs_late_fusion):
    late_fusion_lstm_model.train()
    
    for batch_idx, (video_batch, audio_batch, text_batch, label_batch) in enumerate(train_loader):
        # Move each batch to device
        video_inputs = video_batch.to(device)
        audio_inputs = audio_batch.to(device)
        text_inputs = text_batch.to(device)
        labels = label_batch.to(device).view(-1)  # Assuming labels are already in the correct shape
        
        late_fusion_optimizer.zero_grad()
        
        video_outputs = video_lstm_model(video_inputs)
        audio_outputs = audio_lstm_model(audio_inputs)
        text_outputs = text_lstm_model(text_inputs)
        
        # Combine outputs of individual models
        combined_outputs = late_fusion_lstm_model(video_outputs, audio_outputs, text_outputs)
        
        # Calculate loss and optimize
        loss_late_fusion = criterion_combined(combined_outputs, labels)
        loss_late_fusion.backward()
        late_fusion_optimizer.step()

    # Validation loop
    late_fusion_lstm_model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    
    for batch_idx, (video_batch, audio_batch, text_batch, label_batch) in enumerate(val_loader):
        with torch.no_grad():
            # Move each batch to device
            video_inputs = video_batch.to(device)
            audio_inputs = audio_batch.to(device)
            text_inputs = text_batch.to(device)
            labels = label_batch.to(device).view(-1)  # Assuming labels are already in the correct shape
            
            video_outputs = video_lstm_model(video_inputs)
            audio_outputs = audio_lstm_model(audio_inputs)
            text_outputs = text_lstm_model(text_inputs)
            
            # Combine outputs of individual models
            combined_outputs = late_fusion_lstm_model(video_outputs, audio_outputs, text_outputs)
            
            # Calculate loss
            val_loss += criterion_combined(combined_outputs, labels).item()
            
            # Calculate accuracy
            _, predicted = combined_outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

    val_loss /= len(val_loader)
    accuracy = 100 * correct / total

    print(f"Epoch [{epoch+1}/{num_epochs_late_fusion}] | Validation Loss: {val_loss:.4f} | Accuracy: {accuracy:.2f}%")


In [None]:
print(text_inputs[0][0])

In [None]:
from sklearn.metrics import confusion_matrix, f1_score, classification_report
import matplotlib.pyplot as plt
import seaborn as sns

# Define the plot_metrics function
def plot_metrics(train_losses, test_losses, accuracies):
    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Train Loss')
    plt.plot(test_losses, label='Test Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.title('Training and Test Loss')

    plt.subplot(1, 2, 2)
    plt.plot(accuracies, label='Test Accuracy', color='orange')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.title('Test Accuracy')

    plt.tight_layout()
    plt.show()

# Your previous code here..

# Plotting Loss and Accuracy
plot_metrics(train_losses, test_losses, accuracies)

# Calculate F1 Score
late_fusion_lstm_model.eval()
all_predicted = []
all_targets = []

for video_inputs, audio_inputs, text_inputs, targets in test_loader:
    video_inputs, audio_inputs, text_inputs, targets = (
        video_inputs.to(device),
        audio_inputs.to(device),
        text_inputs.to(device),
        targets.to(device),
    )
    
    outputs = late_fusion_lstm_model(video_inputs, audio_inputs, text_inputs)
    _, predicted = torch.max(outputs, 1)
    
    all_predicted.extend(predicted.cpu().numpy())
    all_targets.extend(targets.cpu().numpy())

# Calculate accuracy
accuracy = sum([1 for i, j in zip(all_predicted, all_targets) if i == j]) / len(all_targets)

f1 = f1_score(all_targets, all_predicted, average='weighted')
print(f"F1 Score: {f1:.4f}")

# Classification Report
report = classification_report(all_targets, all_predicted)
print(f"Accuracy: {accuracy:.4f}")
print("Report :\n", report)

# Confusion Matrix
labels_list = label_encoder.classes_
conf_matrix = confusion_matrix(all_targets, all_predicted)
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=labels_list, yticklabels=labels_list)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()

In [None]:
# Define Loss and Optimizer for Individual Models
criterion_video = nn.CrossEntropyLoss()
optimizer_video = optim.Adam(video_lstm_model.parameters(), lr=0.001)

criterion_audio = nn.CrossEntropyLoss()
optimizer_audio = optim.Adam(audio_lstm_model.parameters(), lr=0.001)

criterion_text = nn.CrossEntropyLoss()
optimizer_text = optim.Adam(text_lstm_model.parameters(), lr=0.001)

# Training Loops for Individual Models
num_epochs = 10  # Set the number of epochs as needed

for epoch in range(num_epochs):
    # Training Loop for Video Model
    video_lstm_model.train()
    for video_inputs, video_labels in video_loader:
        optimizer_video.zero_grad()
        video_outputs = video_lstm_model(video_inputs)
        loss_video = criterion_video(video_outputs.squeeze(), video_labels)
        loss_video.backward()
        optimizer_video.step()

    # Training Loop for Audio Model
    audio_lstm_model.train()
    for audio_inputs, audio_labels in audio_loader:
        optimizer_audio.zero_grad()
        audio_outputs = audio_lstm_model(audio_inputs)
        loss_audio = criterion_audio(audio_outputs.squeeze(), audio_labels)
        loss_audio.backward()
        optimizer_audio.step()

    # Training Loop for Text Model
    text_lstm_model.train()
    for text_inputs, text_labels in text_loader:
        optimizer_text.zero_grad()
        text_outputs = text_lstm_model(text_inputs)
        loss_text = criterion_text(text_outputs.squeeze(), text_labels)
        loss_text.backward()
        optimizer_text.step()

# Training the Late Fusion Model
late_fusion_optimizer = optim.Adam(late_fusion_lstm_model.parameters(), lr=0.001)

num_epochs_late_fusion = 10  # Set the number of epochs as needed

for epoch in range(num_epochs_late_fusion):
    late_fusion_lstm_model.train()
    for video_inputs, audio_inputs, text_inputs, labels in zip(video_loader, audio_loader, text_loader, label_loader):
        optimizer_late_fusion.zero_grad()
        video_outputs = video_lstm_model(video_inputs)
        audio_outputs = audio_lstm_model(audio_inputs)
        text_outputs = text_lstm_model(text_inputs)
        
        # Combine outputs of individual models
        combined_outputs = late_fusion_lstm_model(video_outputs, audio_outputs, text_outputs)
        
        # Calculate loss and optimize
        loss_late_fusion = criterion_combined(combined_outputs.squeeze(), labels)
        loss_late_fusion.backward()
        optimizer_late_fusion.step()
