In [1]:
import torch
from transformers import Wav2Vec2Processor, Wav2Vec2Model
from pydub import AudioSegment

# Initialize Wav2Vec2 processor and model
processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base-960h")
wav2vec2_model = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-base-960h")

# Move the model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
wav2vec2_model.to(device)

# Function to load and preprocess the audio file
def load_audio(audio_path):
    # Load the audio file (using pydub to handle .wav format)
    audio = AudioSegment.from_wav(audio_path)
    # Convert to mono and set the sample rate (Wav2Vec2 expects 16kHz)
    audio = audio.set_channels(1).set_frame_rate(16000)
    # Export to raw audio data
    raw_audio = audio.get_array_of_samples()
    return torch.tensor(raw_audio).float()

# Function to extract features using Wav2Vec2
def extract_audio_features(audio_path):
    audio_data = load_audio(audio_path)
    # Tokenize the audio data
    inputs = processor(audio_data, return_tensors="pt", sampling_rate=16000)
    # Move inputs to the same device as the model (GPU or CPU)
    inputs = {key: value.to(device) for key, value in inputs.items()}
    
    with torch.no_grad():
        # Extract features using Wav2Vec2 model
        outputs = wav2vec2_model(**inputs)
        # Extracting the embeddings (mean pooling across time steps)
        audio_features = outputs.last_hidden_state.mean(dim=1)  # Output size: (1, 768)
    
    # Debugging: Checking the output size
    print(f"Audio feature vector size: {audio_features.size()}")
    
    return audio_features

# Test the audio feature extraction
audio_file_path = "T:/0 datasets/dum4/audios/sarcastic/sea1_ep1_sc4_utt3.wav"
audio_features = extract_audio_features(audio_file_path)


Some weights of Wav2Vec2Model were not initialized from the model checkpoint at facebook/wav2vec2-base-960h and are newly initialized: ['wav2vec2.masked_spec_embed']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Audio feature vector size: torch.Size([1, 768])


In [2]:
from transformers import AutoTokenizer, AutoModel
import torch

# Initialize TinyBERT tokenizer and model
tokenizer = AutoTokenizer.from_pretrained("huawei-noah/TinyBERT_General_4L_312D")
tinybert_model = AutoModel.from_pretrained("huawei-noah/TinyBERT_General_4L_312D")

# Move the model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
tinybert_model.to(device)

# Function to load and preprocess the text file
def load_text(text_path):
    with open(text_path, 'r') as file:
        text = file.read()
    return text

# Function to extract features using TinyBERT
def extract_text_features(text_path):
    text = load_text(text_path)
    # Tokenize the text (with padding and truncation to a fixed length)
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=128)
    # Move inputs to the same device as the model (GPU or CPU)
    inputs = {key: value.to(device) for key, value in inputs.items()}
    
    with torch.no_grad():
        # Extract features using TinyBERT model
        outputs = tinybert_model(**inputs)
        # Extract the [CLS] token (first token) representation
        text_features = outputs.last_hidden_state[:, 0, :]  # Output size: (1, 312)
        # Project to 768 dimensions
        text_features = torch.nn.Linear(312, 768).to(device)(text_features)  # Output size: (1, 768)
    
    # Debugging: Checking the output size
    print(f"Text feature vector size: {text_features.size()}")
    
    return text_features

# Test the text feature extraction
text_file_path = "T:/0 datasets/dum4/text/sarcastic/sea1_ep1_sc4_utt3.txt"
text_features = extract_text_features(text_file_path)


Text feature vector size: torch.Size([1, 768])


In [3]:
import torch
import cv2
from torchvision import models, transforms

# Initialize ResNet-18 and move to GPU if available
resnet18 = models.resnet18(pretrained=True)
resnet18 = torch.nn.Sequential(*list(resnet18.children())[:-1])  # Remove the final classification layer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
resnet18.to(device)

# Preprocessing transformations for video frames (resize and normalize)
preprocess = transforms.Compose([
    transforms.ToPILImage(),  # Convert the frame to PIL image
    transforms.Resize((224, 224)),  # Resize to 224x224 pixels
    transforms.ToTensor(),  # Convert to Tensor format
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # Normalize using ImageNet stats
])

# Function to extract 1 frame per second from the video
def extract_video_frames(video_path):
    video = cv2.VideoCapture(video_path)
    fps = video.get(cv2.CAP_PROP_FPS)  # Get frames per second (fps) of the video
    total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))  # Get total number of frames in the video
    
    print(f"FPS: {fps}, Total frames: {total_frames}")

    frame_count = 0
    frames = []
    
    while True:
        ret, frame = video.read()
        if not ret:
            break  # Stop if no more frames are available
        
        # Extract 1 frame per second based on FPS
        if frame_count % int(fps) == 0:  # Extract 1 frame every second
            frames.append(frame)
        
        frame_count += 1
    
    video.release()
    return frames

# Function to extract features using ResNet-18
def extract_video_features(video_path):
    frames = extract_video_frames(video_path)  # Extract 1 frame per second
    frame_features = []
    
    for frame in frames:
        frame = preprocess(frame)  # Resize and normalize the frame
        frame = frame.unsqueeze(0).to(device)  # Add batch dimension and move to GPU
        
        with torch.no_grad():
            feature = resnet18(frame)  # Extract features using ResNet-18
            feature = feature.view(feature.size(0), -1)  # Flatten the output to 1D
            frame_features.append(feature)

    # Take the mean of the frame features (to get a fixed-size feature vector)
    video_features = torch.mean(torch.stack(frame_features), dim=0)
    # Project to 768 dimensions and move the projection to the GPU
    video_features = torch.nn.Linear(512, 768).to(device)(video_features)  # Project from 512 to 768 dimensions
    
    # Debugging: Checking the output size
    print(f"Video feature vector size: {video_features.size()}")
    
    return video_features

# Test the video feature extraction
video_file_path = "T:/0 datasets/dum4/vids/sarcastic/sea1_ep1_sc4_utt3.mp4"
video_features = extract_video_features(video_file_path)




FPS: 23.976042590949422, Total frames: 67
Video feature vector size: torch.Size([1, 768])


In [4]:
import torch

# Assuming audio_features, text_features, and video_features are already obtained
# Move tensors to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Concatenate the features along the feature dimension (dim=1)
def concatenate_features(audio_features, text_features, video_features):
    # Ensure that all tensors are on the same device
    audio_features = audio_features.to(device)
    text_features = text_features.to(device)
    video_features = video_features.to(device)

    # Concatenate the features
    features = torch.cat([audio_features, text_features, video_features], dim=1)
    
    # Debugging: Check the shape after concatenation
    print(f"Concatenated Features shape: {features.size()}")  # Should print [batch_size, 2304]
    
    return features

# Example for concatenation (batch_size=32 in this case)
audio_features = torch.randn(32, 768).to(device)  # Example audio features
text_features = torch.randn(32, 768).to(device)   # Example text features
video_features = torch.randn(32, 768).to(device)  # Example video features

# Concatenate the features
features = concatenate_features(audio_features, text_features, video_features)


Concatenated Features shape: torch.Size([32, 2304])


In [5]:
import torch
import torch.nn as nn

class MultimodalSarcasmDetectionModel(nn.Module):
    def __init__(self):
        super(MultimodalSarcasmDetectionModel, self).__init__()
        
        # Define the first fully connected layer (from 2304 input features to 1024 output features)
        self.fc1 = nn.Linear(2304, 1024)  # First fully connected layer
        self.relu1 = nn.ReLU()  # ReLU activation
        
        # Additional hidden layers
        self.fc2 = nn.Linear(1024, 512)  # Second fully connected layer
        self.relu2 = nn.ReLU()  # ReLU activation
        
        self.fc3 = nn.Linear(512, 256)  # Third fully connected layer
        self.relu3 = nn.ReLU()  # ReLU activation
        
        # Output layer (binary classification)
        self.fc4 = nn.Linear(256, 1)  # Output layer
        self.sigmoid = nn.Sigmoid()  # Sigmoid activation for binary classification
        
        # Layer normalization (normalize the concatenated feature vector)
        self.layer_norm = nn.LayerNorm(2304)  # Normalize the concatenated feature vector

    def forward(self, features):
        """
        Forward pass of the model.
        - Accept a single concatenated feature vector.
        - Normalize the concatenated features.
        - Pass through the fully connected layers.
        """
        # Normalize the concatenated feature vector
        features = self.layer_norm(features)
        
        # Pass through the first fully connected layer (fc1)
        x = self.fc1(features)
        x = self.relu1(x)  # Apply ReLU activation
        
        # Pass through the second fully connected layer (fc2)
        x = self.fc2(x)
        x = self.relu2(x)  # Apply ReLU activation
        
        # Pass through the third fully connected layer (fc3)
        x = self.fc3(x)
        x = self.relu3(x)  # Apply ReLU activation
        
        # Output layer: This is the final layer that gives the classification score (one value per sample)
        x = self.fc4(x)
        x = self.sigmoid(x)  # Sigmoid for binary classification
        
        return x

# Device setup: Use GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Instantiate the model
model = MultimodalSarcasmDetectionModel()

# Move the model to GPU if available
model.to(device)

# Print the model architecture (optional)
print(model)


MultimodalSarcasmDetectionModel(
  (fc1): Linear(in_features=2304, out_features=1024, bias=True)
  (relu1): ReLU()
  (fc2): Linear(in_features=1024, out_features=512, bias=True)
  (relu2): ReLU()
  (fc3): Linear(in_features=512, out_features=256, bias=True)
  (relu3): ReLU()
  (fc4): Linear(in_features=256, out_features=1, bias=True)
  (sigmoid): Sigmoid()
  (layer_norm): LayerNorm((2304,), eps=1e-05, elementwise_affine=True)
)


In [6]:
import torch
import os
from torch.utils.data import Dataset
from transformers import Wav2Vec2Processor, AutoTokenizer
from pydub import AudioSegment
import cv2
from torchvision import transforms
from transformers import Wav2Vec2Model, AutoModel
import torch.nn as nn

class MultimodalSarcasmDataset(Dataset):
    def __init__(self, audio_paths, text_paths, video_paths, labels, audio_processor, text_tokenizer, video_transform, device):
        self.audio_paths = audio_paths
        self.text_paths = text_paths
        self.video_paths = video_paths
        self.labels = labels
        self.audio_processor = audio_processor
        self.text_tokenizer = text_tokenizer
        self.video_transform = video_transform
        self.device = device

    def __len__(self):
        return len(self.audio_paths)

    def __getitem__(self, idx):
        audio_path = self.audio_paths[idx]
        text_path = self.text_paths[idx]
        video_path = self.video_paths[idx]
        label = self.labels[idx]

        # Extract audio features
        audio_features = self.extract_audio_features(audio_path)
        
        # Extract text features
        text_features = self.extract_text_features(text_path)

        # Extract video features
        video_features = self.extract_video_features(video_path)
        
        # Move features to the same device as the model
        audio_features = audio_features.to(self.device)
        text_features = text_features.to(self.device)
        video_features = video_features.to(self.device)

        # Debugging: Checking dimensions of extracted features
        #print(f"Audio features size: {audio_features.size()}")
        #print(f"Text features size: {text_features.size()}")
        #print(f"Video features size: {video_features.size()}")
        
        # Return the features and label
        return audio_features, text_features, video_features, torch.tensor(label).to(self.device)

    def extract_audio_features(self, audio_path):
        audio = AudioSegment.from_wav(audio_path)
        audio = audio.set_channels(1).set_frame_rate(16000)
        audio_data = torch.tensor(audio.get_array_of_samples()).float()

        # Move the audio tensor to the same device as the model
        audio_data = audio_data.to(self.device)

        # Tokenize the audio data and ensure it is on the same device as the model
        inputs = self.audio_processor(audio_data, return_tensors="pt", sampling_rate=16000)
        inputs = {key: value.to(self.device) for key, value in inputs.items()}  # Move all inputs to the device

        with torch.no_grad():
            # Extract features using wav2vec2_model
            outputs = wav2vec2_model(**inputs)
            audio_features = outputs.last_hidden_state.mean(dim=1)  # Mean pooling
        #print(f"Extracted audio features size: {audio_features.size()}")  # Debugging
        return audio_features

    def extract_text_features(self, text_path):
        with open(text_path, 'r') as file:
            text = file.read()
        inputs = self.text_tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=128)
        inputs = {key: value.to(self.device) for key, value in inputs.items()}  # Ensure inputs are on the same device

        with torch.no_grad():
            outputs = tinybert_model(**inputs)
            text_features = outputs.last_hidden_state[:, 0, :]  # [CLS] token
            text_features = torch.nn.Linear(312, 768).to(self.device)(text_features)  # Project to 768
        #print(f"Extracted text features size: {text_features.size()}")  # Debugging
        return text_features

    def extract_video_features(self, video_path):
        video = cv2.VideoCapture(video_path)
        frames = []
        while True:
            ret, frame = video.read()
            if not ret:
                break
            frames.append(frame)
        video.release()

        frame_features = []
        for frame in frames:
            frame = self.video_transform(frame)  # Resize and normalize the frame
            frame = frame.unsqueeze(0).to(self.device)  # Add batch dimension and move to GPU
            
            with torch.no_grad():
                feature = resnet18(frame)  # Extract features
                feature = feature.view(feature.size(0), -1)  # Flatten
                frame_features.append(feature)

        video_features = torch.mean(torch.stack(frame_features), dim=0)
        video_features = torch.nn.Linear(512, 768).to(self.device)(video_features)  # Project to 768
        #print(f"Extracted video features size: {video_features.size()}")  # Debugging
        return video_features

# Example paths (replace with actual paths from your dataset)
audio_sarcastic_folder = "T:/0 datasets/dum4/audios/sarcastic"
audio_nonsarcastic_folder = "T:/0 datasets/dum4/audios/nonsarcastic"
text_sarcastic_folder = "T:/0 datasets/dum4/text/sarcastic"
text_nonsarcastic_folder = "T:/0 datasets/dum4/text/nonsarcastic"
video_sarcastic_folder = "T:/0 datasets/dum4/vids/sarcastic"
video_nonsarcastic_folder = "T:/0 datasets/dum4/vids/nonsarcastic"

audio_paths = []
text_paths = []
video_paths = []
labels = []

# Populate the paths for sarcastic data (1) and nonsarcastic data (0)
for audio_file in os.listdir(audio_sarcastic_folder):
    audio_paths.append(os.path.join(audio_sarcastic_folder, audio_file))
    text_paths.append(os.path.join(text_sarcastic_folder, audio_file.replace(".wav", ".txt")))
    video_paths.append(os.path.join(video_sarcastic_folder, audio_file.replace(".wav", ".mp4")))
    labels.append(1)

for audio_file in os.listdir(audio_nonsarcastic_folder):
    audio_paths.append(os.path.join(audio_nonsarcastic_folder, audio_file))
    text_paths.append(os.path.join(text_nonsarcastic_folder, audio_file.replace(".wav", ".txt")))
    video_paths.append(os.path.join(video_nonsarcastic_folder, audio_file.replace(".wav", ".mp4")))
    labels.append(0)

# Device setup: Use GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize models and processors
processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base-960h")
tokenizer = AutoTokenizer.from_pretrained("huawei-noah/TinyBERT_General_4L_312D")
resnet18 = models.resnet18(pretrained=True).to(device)
resnet18 = torch.nn.Sequential(*list(resnet18.children())[:-1])  # Remove final classification layer

# Instantiate models for audio and text
wav2vec2_model = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-base-960h").to(device)
tinybert_model = AutoModel.from_pretrained("huawei-noah/TinyBERT_General_4L_312D").to(device)

# Preprocessing transformations for video frames (resize and normalize)
preprocess = transforms.Compose([
    transforms.ToPILImage(),  # Convert the frame to PIL image
    transforms.Resize((224, 224)),  # Resize to 224x224 pixels
    transforms.ToTensor(),  # Convert to Tensor format
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # Normalize using ImageNet stats
])

# Create the dataset
dataset = MultimodalSarcasmDataset(
    audio_paths=audio_paths,
    text_paths=text_paths,
    video_paths=video_paths,
    labels=labels,
    audio_processor=processor,
    text_tokenizer=tokenizer,
    video_transform=preprocess,
    device=device
)

# Create the DataLoader
from torch.utils.data import DataLoader

dataloader = DataLoader(dataset, batch_size=64, shuffle=True)

# Testing the data loading (just a few iterations)
for batch in dataloader:
    audio_features, text_features, video_features, labels = batch
    print(audio_features.size(), text_features.size(), video_features.size(), labels.size())
    break


Some weights of Wav2Vec2Model were not initialized from the model checkpoint at facebook/wav2vec2-base-960h and are newly initialized: ['wav2vec2.masked_spec_embed']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


torch.Size([64, 1, 768]) torch.Size([64, 1, 768]) torch.Size([64, 1, 768]) torch.Size([64])


In [7]:
import torch
import torch.optim as optim
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score

# Instantiate the model and move it to the appropriate device (GPU/CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = MultimodalSarcasmDetectionModel().to(device)

# Define the loss function (Binary Cross-Entropy Loss)
criterion = torch.nn.BCEWithLogitsLoss().to(device)  # Move loss function to GPU

# Define the optimizer (Adam optimizer)
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training function with GPU support
def train_model(model, dataloader, criterion, optimizer, num_epochs=5):
    model.train()  # Set the model to training mode
    for epoch in range(num_epochs):
        running_loss = 0.0
        y_true = []
        y_pred = []
        
        for batch_idx, (audio_features, text_features, video_features, labels) in enumerate(dataloader):
            # Move data to the GPU
            audio_features = audio_features.to(device)
            text_features = text_features.to(device)
            video_features = video_features.to(device)
            labels = labels.to(device)

            # Squeeze the features to remove the singleton dimension
            audio_features = audio_features.squeeze(1)  # Shape: [batch_size, 768]
            text_features = text_features.squeeze(1)    # Shape: [batch_size, 768]
            video_features = video_features.squeeze(1)  # Shape: [batch_size, 768]

            # Concatenate the features along the feature dimension (dim=1)
            features = torch.cat([audio_features, text_features, video_features], dim=1)

            # Forward pass
            outputs = model(features)  # Forward pass through the model

            # Ensure labels have the same shape as outputs
            labels = labels.unsqueeze(1)  # Change labels to shape [batch_size, 1]
            labels = labels.float()  # Convert labels to Float type (required by BCEWithLogitsLoss)

            # Compute the loss
            loss = criterion(outputs, labels)

            # Backward pass
            optimizer.zero_grad()  # Reset gradients
            loss.backward()  # Compute gradients
            optimizer.step()  # Update model parameters
            
            # Track running loss
            running_loss += loss.item()

            # Convert output to predicted class (0 or 1)
            y_true.extend(labels.cpu().numpy())
            y_pred.extend((outputs > 0.5).cpu().numpy())  # Convert probability to binary class
        
        # Calculate average loss for the epoch
        avg_loss = running_loss / len(dataloader)
        
        # Calculate metrics
        accuracy = accuracy_score(y_true, y_pred)
        f1 = f1_score(y_true, y_pred)
        roc_auc = roc_auc_score(y_true, y_pred)
        
        print(f"Epoch {epoch+1}/{num_epochs}")
        print(f"Loss: {avg_loss:.4f}, Accuracy: {accuracy:.4f}, F1-Score: {f1:.4f}, ROC-AUC: {roc_auc:.4f}")

# Train the model
train_model(model, dataloader, criterion, optimizer, num_epochs=5)


Epoch 1/5
Loss: 0.6949, Accuracy: 0.5008, F1-Score: 0.0033, ROC-AUC: 0.5008
Epoch 2/5
Loss: 0.6931, Accuracy: 0.5000, F1-Score: 0.0000, ROC-AUC: 0.5000
Epoch 3/5
Loss: 0.6931, Accuracy: 0.5000, F1-Score: 0.0000, ROC-AUC: 0.5000
Epoch 4/5
Loss: 0.6931, Accuracy: 0.5000, F1-Score: 0.0000, ROC-AUC: 0.5000
Epoch 5/5
Loss: 0.6931, Accuracy: 0.5000, F1-Score: 0.0000, ROC-AUC: 0.5000


In [None]:
# Save the entire model (architecture + weights) using .pt extension
torch.save(model, "T:/000 Models/dum4full_model.pt")


In [9]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters())

# Example: Print the number of parameters in your model
num_params = count_parameters(model)
print(f"Total number of parameters: {num_params}")


Total number of parameters: 3021313


In [10]:
def count_parameters_by_layer(model):
    for name, param in model.named_parameters():
        print(f"Layer: {name}, Parameters: {param.numel()}")
        
# Example: Print the number of parameters in each layer
count_parameters_by_layer(model)


Layer: fc1.weight, Parameters: 2359296
Layer: fc1.bias, Parameters: 1024
Layer: fc2.weight, Parameters: 524288
Layer: fc2.bias, Parameters: 512
Layer: fc3.weight, Parameters: 131072
Layer: fc3.bias, Parameters: 256
Layer: fc4.weight, Parameters: 256
Layer: fc4.bias, Parameters: 1
Layer: layer_norm.weight, Parameters: 2304
Layer: layer_norm.bias, Parameters: 2304
