## Import libraries

In [None]:
import os
import cv2
import numpy as np
import pickle
import tempfile
from moviepy import VideoFileClip

from pydub import AudioSegment
import librosa

from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as T
from torch.cuda.amp import autocast, GradScaler
import torch.nn.functional as F

### Function: `block_difference`
- Compares two frames by dividing them into blocks
- Calculates the mean absolute difference between corresponding blocks 
- Returns the average block difference across the whole frame

In [2]:
def block_difference(frame1, frame2, block_size=16):
    height, width = frame1.shape
    total_diff = 0
    num_blocks = 0

    for y in range(0, height, block_size):
        for x in range(0, width, block_size):
            block1 = frame1[y:y+block_size, x:x+block_size]
            block2 = frame2[y:y+block_size, x:x+block_size]
            if block1.shape == block2.shape:
                diff = np.abs(block1.astype(int) - block2.astype(int)).mean()
                total_diff += diff
                num_blocks += 1

    return total_diff / num_blocks if num_blocks > 0 else 0

### Function: `extract_keyframes_from_shot`
- Extracts keyframes from a list of frames using block difference
- Uses the first frame as reference, then adds new frames when difference exceeds a threshold
- Resizes keyframes to 224x224
- Ensures exactly 15 keyframes by padding with the last frame if needed

In [3]:
def extract_keyframes_from_shot(frames, fps, block_threshold=20):
    duration = len(frames) / fps
    target_num_keyframes = 15
    keyframes = []

    if len(frames) == 0:
        return []

    ref_gray = cv2.cvtColor(frames[0], cv2.COLOR_BGR2GRAY)
    resized_frame = cv2.resize(frames[0], (224, 224))
    keyframes.append(resized_frame)

    for i, frame in enumerate(frames[1:], start=1):
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        diff = block_difference(ref_gray, gray)
        if diff > block_threshold:
            resized = cv2.resize(frame, (224, 224))
            keyframes.append(resized)

        if len(keyframes) >= target_num_keyframes:
            break

    while len(keyframes) < target_num_keyframes:
        keyframes.append(keyframes[-1].copy())

    return keyframes


### Function: `extract_audio_shot`
- Extracts a segment of audio from a given audio file
- Takes `start_time` and `end_time` in seconds
- Saves the extracted audio segment as a `.wav` file

In [4]:
def extract_audio_shot(audio_path, start_time, end_time, save_path):
    audio = AudioSegment.from_file(audio_path)
    start_ms = int(start_time * 1000)
    end_ms = int(end_time * 1000)
    segment = audio[start_ms:end_ms]
    segment.export(save_path, format="wav")

### Function: `extract_audio_temp`
- Extracts the entire audio track from a video
- Saves it temporarily as a `.wav` file
- Returns the path of the temporary audio file

In [5]:
def extract_audio_temp(video_path):
    video = VideoFileClip(video_path)
    with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_audio_file:
        temp_audio_path = temp_audio_file.name
    video.audio.write_audiofile(temp_audio_path)
    return temp_audio_path

### Function: `extract_shots_and_keyframes_with_audio`
- Splits a video into shots based on histogram difference
- Extracts **keyframes** from each shot using block difference
- Saves the **corresponding audio segment** of each shot as `.wav`
- Returns a list of samples, where each sample contains:  
  - keyframes  
  - audio file path  
  - start time and end time of the shot  

In [6]:
def extract_shots_and_keyframes_with_audio(video_path, hist_threshold=30, block_threshold=20, fps_cap=30, save_audio_dir="audio_shots"):
    os.makedirs(save_audio_dir, exist_ok=True)

    audio_path = extract_audio_temp(video_path)

    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    fps = min(fps, fps_cap)

    prev_hist = None
    frames = []
    all_samples = []
    shot_start_frame = 0

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        hist = cv2.calcHist([gray], [0], None, [256], [0, 256])
        hist = cv2.normalize(hist, hist).flatten()

        if prev_hist is not None:
            diff = cv2.compareHist(prev_hist, hist, cv2.HISTCMP_BHATTACHARYYA)
            if diff > hist_threshold / 100:
                shot_end_frame = shot_start_frame + len(frames)
                start_time = shot_start_frame / fps
                end_time = shot_end_frame / fps

                keyframes = extract_keyframes_from_shot(frames, fps, block_threshold)

                if keyframes:
                    audio_filename = f"{os.path.basename(video_path)}_{int(start_time*1000)}_{int(end_time*1000)}.wav"
                    audio_save_path = os.path.join(save_audio_dir, audio_filename)
                    extract_audio_shot(audio_path, start_time, end_time, audio_save_path)

                    all_samples.append({
                        "keyframes": keyframes,
                        "audio_path": audio_save_path,
                        "start_time": start_time,
                        "end_time": end_time
                    })

                shot_start_frame = shot_end_frame
                frames = []

        frames.append(frame.copy())
        prev_hist = hist

    cap.release()

    os.remove(audio_path)

    return all_samples

### Function: `process_category_folder`
Processes all video files in a category folder, extracts shots with keyframes and audio, and assigns the given label

In [7]:
def process_category_folder(category_path, audio_folder_path, label):
    samples = []

    for filename in os.listdir(category_path):
        if filename.lower().endswith(('.mp4', '.avi', '.mov', '.mkv')):
            video_path = os.path.join(category_path, filename)
            audio_path = os.path.join(audio_folder_path, os.path.splitext(filename)[0] + ".wav")
            print(f"Processing: {video_path}")
            shot_samples = extract_shots_and_keyframes_with_audio(video_path)
            for sample in shot_samples:
                sample["label"] = label
                samples.append(sample)

    return samples

### Main Script
Iterates over all categories, processes their videos, and collects all samples into a single list

In [None]:
base_path = r"C:\Users\LEGION\Desktop\Violence Detection-PyTorch"
categories = ["bloody", "explosions", "fight", "non-violence"]

all_samples = []

for category in categories:
    video_folder = os.path.join(base_path, category)
    audio_folder = os.path.join(base_path, category)
    category_samples = process_category_folder(video_folder, audio_folder, category)
    all_samples.extend(category_samples)

print("Total samples:", len(all_samples))

### Data Preparation (Images + Audio + Labels)

- Initializes empty lists for **image sequences**, **audio spectrograms**, and **labels**
- Ensures each sample has exactly **15 frames** (by truncating or padding with the last frame)
- Loads the corresponding **audio clip**, converts it into a **mel-spectrogram (128 mel bins)**, and pads/truncates it to a fixed length of **200 time steps**
- Appends processed image frames, spectrogram, and label to their lists
- Encodes labels into numeric form using `LabelEncoder`
- Converts all lists into NumPy arrays and prints their shapes for verification

In [None]:
X_images = []
X_audio = []
y_labels = []

time_steps = 15
mel_bins = 128
max_audio_len = 200

for sample in all_samples:
    frames = sample["keyframes"]
    if len(frames) < time_steps:
        frames += [frames[-1]] * (time_steps - len(frames))
    else:
        frames = frames[:time_steps]

    y_audio, sr = librosa.load(sample["audio_path"], sr=22050)
    mel = librosa.feature.melspectrogram(y=y_audio, sr=sr, n_mels=mel_bins)
    mel_db = librosa.power_to_db(mel, ref=np.max)

    if mel_db.shape[1] < max_audio_len:
        pad_width = max_audio_len - mel_db.shape[1]
        mel_db = np.pad(mel_db, ((0, 0), (0, pad_width)), mode='constant')
    else:
        mel_db = mel_db[:, :max_audio_len]

    X_images.append(np.array(frames))
    X_audio.append(mel_db)
    y_labels.append(sample["label"])

encoder = LabelEncoder()
y_encoded = encoder.fit_transform(y_labels)

X_images = np.array(X_images)
X_audio = np.array(X_audio)
y_encoded = np.array(y_encoded)

In [10]:
# import joblib
# joblib.dump(encoder, "label_encoder.pkl")

In [None]:
X_img_train, X_img_test, X_audio_train, X_audio_test, y_train, y_test = train_test_split(
    X_images, X_audio, y_encoded,
    test_size=0.15,
    stratify=y_encoded,
    random_state=1
)

In [12]:
# with open("train_data_with_audio_short.pkl", "wb") as f:
#     pickle.dump((X_img_train, X_audio_train, y_train), f)

# with open("test_data_with_audio_short.pkl", "wb") as f:
#     pickle.dump((X_img_test, X_audio_test, y_test), f)

In [None]:
with open("train_data_with_audio_short.pkl", "rb") as f:
    X_img_train, X_audio_train, y_train = pickle.load(f)

with open("test_data_with_audio_short.pkl", "rb") as f:
    X_img_test, X_audio_test, y_test = pickle.load(f)

### `CartoonViolenceMultiModalDataset` Class

- Custom PyTorch `Dataset` for multimodal data (video frames + audio spectrograms + labels)
- Supports **data augmentation** for image frames (random flips & rotations) when `augment=True`
- `__len__`: returns the total number of samples
- `__getitem__`:  
  - Retrieves one sequence of frames, its spectrogram, and the label
  - Transforms each frame into a tensor (with or without augmentation)
  - Stacks all frames into a single tensor with shape `[T, C, H, W]` (time, channels, height, width)
  - Converts spectrogram into a tensor of shape `[1, n_mels, time]`
  - Returns `(frames_tensor, audio_tensor, label)`

In [None]:
class CartoonViolenceMultiModalDataset(Dataset):
    def __init__(self, img_sequences, audio_spectrograms, labels, augment=False):
        self.img_sequences = img_sequences
        self.audio_spectrograms = audio_spectrograms
        self.labels = labels
        self.augment = augment

        self.transform = T.Compose([
            T.ToPILImage(),
            T.RandomHorizontalFlip(),
            T.RandomRotation(15),
            T.ToTensor()
        ])
        self.basic_transform = T.Compose([
            T.ToPILImage(),
            T.ToTensor()
        ])

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        img_seq = self.img_sequences[idx]
        audio_spec = self.audio_spectrograms[idx]
        label = self.labels[idx]

        frames = []
        for frame in img_seq:
            frame_tensor = self.transform(frame) if self.augment else self.basic_transform(frame)
            frames.append(frame_tensor)

        frames_tensor = torch.stack(frames)
        audio_tensor = torch.tensor(audio_spec, dtype=torch.float32)

        return frames_tensor, audio_tensor.unsqueeze(0), torch.tensor(label, dtype=torch.long)

### Class: `CNNLSTM_Audio`

- **Purpose**: A multimodal neural network that combines **image sequences** and **audio spectrograms** for classification.  

---

#### 🔹 Image Pathway
- Uses a series of **CNN layers** to extract spatial features from each frame.  
- Flattens the output and feeds it into an **LSTM** to capture temporal dependencies across frames.  
- Produces a final image representation vector (size 128).  

---

#### 🔹 Audio Pathway
- Applies **1D CNN layers** to process the mel-spectrogram input (treating it like a sequence).  
- Passes the output through an **LSTM** to capture temporal audio features.  
- Produces a final audio representation vector (size 64).  

---

#### 🔹 Fusion and Classification
- Concatenates image and audio representations into a single feature vector `[128 + 64]`.  
- Applies a **dropout layer** and a **fully connected layer** to classify into `num_classes`.  

---

#### 🔹 Forward Pass
1. Processes image sequence through CNN → Flatten → LSTM → final frame output.  
2. Processes audio spectrogram through CNN → reshape → LSTM → final timestep output.  
3. Concatenates image and audio outputs.  
4. Passes fused vector through the classifier to get predictions.  


In [None]:
class CNNLSTM_Audio(nn.Module):
    def __init__(self, num_classes):
        super(CNNLSTM_Audio, self).__init__()

        # Image pathway
        self.cnn = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1), nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 64, kernel_size=3, padding=1), nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 128, kernel_size=3, padding=1), nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.flatten = nn.Flatten()
        self.lstm = nn.LSTM(128 * 28 * 28, 128, batch_first=True)

        # Audio pathway
        self.audio_conv = nn.Sequential(
            nn.Conv1d(in_channels=128, out_channels=64, kernel_size=3, padding=1), nn.ReLU(),
            nn.MaxPool1d(kernel_size=2),
            nn.Conv1d(64, 32, kernel_size=3, padding=1), nn.ReLU()
        )

        self.audio_lstm = nn.LSTM(input_size=32, hidden_size=64, batch_first=True)
 
        # Fusion and classifier
        self.fc = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(128 + 64, num_classes)
        )

    def forward(self, img_seq, audio_spec):
        B, T, C, H, W = img_seq.shape

        # Image branch
        x = img_seq.view(-1, C, H, W)
        x = self.cnn(x)
        x = self.flatten(x)
        x = x.view(B, T, -1)
        x, _ = self.lstm(x)
        x = x[:, -1, :]

        # Audio branch
        a = audio_spec.squeeze(1)  # [B, 128, time]
        a = self.audio_conv(a)     # [B, 32, time']
        a = a.permute(0, 2, 1)     # [B, time', 32]
        a, _ = self.audio_lstm(a)
        a = a[:, -1, :]

        # Fusion
        fused = torch.cat((x, a), dim=1)  # [B, 128 + 64]

        # Classification
        out = self.fc(fused)
        return out

In [None]:
y_train_tensor = torch.tensor(y_train, dtype=torch.long)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)

In [None]:
train_dataset = CartoonViolenceMultiModalDataset(X_img_train, X_audio_train, y_train_tensor, augment=True)
test_dataset = CartoonViolenceMultiModalDataset(X_img_test, X_audio_test, y_test_tensor, augment=False)

In [None]:
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False, pin_memory=True)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CNNLSTM_Audio(num_classes=len(np.unique(y_train))).to(device)