In [5]:
import os
import cv2
import numpy as np
from pathlib import Path
from PIL import Image
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import pandas as pd

import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models, transforms
from torch.utils.data import Dataset, DataLoader

# Set device (GPU if available)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")


Using device: cpu


In [10]:
def extract_frames_from_video(video_path, output_folder, frames_per_video=20, resize=(224, 224)):
    cap = cv2.VideoCapture(str(video_path))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    if total_frames == 0:
        print(f"[Warning] No frames in {video_path}")
        return

    interval = max(1, total_frames // frames_per_video)
    video_id = Path(video_path).stem
    out_dir = Path(output_folder) / video_id
    out_dir.mkdir(parents=True, exist_ok=True)

    count = 0
    saved = 0

    while cap.isOpened() and saved < frames_per_video:
        ret, frame = cap.read()
        if not ret:
            break
        if count % interval == 0:
            frame = cv2.resize(frame, resize)
            frame_path = out_dir / f"frame_{saved:03d}.jpg"
            cv2.imwrite(str(frame_path), frame)
            saved += 1
        count += 1

    cap.release()
    print(f"[INFO] Saved {saved} frames for {video_path}")


### Cell 4: Process Entire Dataset (Violence & NonViolence)

In [11]:
def process_dataset(video_dir, output_dir, frames_per_video=20):
    video_dir = Path(video_dir)
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    for label in ["Violence", "NonViolence"]:
        label_dir = video_dir / label
        output_label_dir = output_dir / label
        output_label_dir.mkdir(parents=True, exist_ok=True)

        for video_file in label_dir.glob("*.mp4"):
            extract_frames_from_video(
                video_file, 
                output_label_dir, 
                frames_per_video=frames_per_video
            )


### Run Frame Extraction Process

In [14]:
# Set the path to your downloaded dataset
RAW_VIDEO_PATH = r"C:\Users\adita\Downloads\violence-detection-system\archive\Real Life Violence Dataset"
OUTPUT_FRAME_PATH = "data/frames"

# Run the frame extraction process
process_dataset(RAW_VIDEO_PATH, OUTPUT_FRAME_PATH, frames_per_video=20)


[INFO] Saved 20 frames for C:\Users\adita\Downloads\violence-detection-system\archive\Real Life Violence Dataset\Violence\V_1.mp4
[INFO] Saved 20 frames for C:\Users\adita\Downloads\violence-detection-system\archive\Real Life Violence Dataset\Violence\V_10.mp4
[INFO] Saved 20 frames for C:\Users\adita\Downloads\violence-detection-system\archive\Real Life Violence Dataset\Violence\V_100.mp4
[INFO] Saved 20 frames for C:\Users\adita\Downloads\violence-detection-system\archive\Real Life Violence Dataset\Violence\V_1000.mp4
[INFO] Saved 20 frames for C:\Users\adita\Downloads\violence-detection-system\archive\Real Life Violence Dataset\Violence\V_101.mp4
[INFO] Saved 20 frames for C:\Users\adita\Downloads\violence-detection-system\archive\Real Life Violence Dataset\Violence\V_102.mp4
[INFO] Saved 20 frames for C:\Users\adita\Downloads\violence-detection-system\archive\Real Life Violence Dataset\Violence\V_103.mp4
[INFO] Saved 20 frames for C:\Users\adita\Downloads\violence-detection-system\

In [12]:
# Cell 5: Custom PyTorch Dataset for violence frame sequences

class ViolenceDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = Path(root_dir)
        self.transform = transform
        self.samples = []

        for label in ["Violence", "NonViolence"]:
            label_path = self.root_dir / label
            label_idx = 1 if label == "Violence" else 0

            for video_folder in label_path.iterdir():
                if video_folder.is_dir():
                    frame_paths = sorted(video_folder.glob("*.jpg"))
                    if len(frame_paths) > 0:
                        self.samples.append((frame_paths, label_idx))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        frame_paths, label = self.samples[idx]
        frames = []

        for frame_path in frame_paths:
            image = Image.open(frame_path).convert("RGB")
            if self.transform:
                image = self.transform(image)
            frames.append(image)

        frames = torch.stack(frames)  # Shape: [T, C, H, W]
        return frames, torch.tensor(label, dtype=torch.long)


In [15]:
# Cell 6: Define transforms and dataloaders

# Image transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],  # ImageNet standards
                         std=[0.229, 0.224, 0.225])
])

# Load the dataset
dataset = ViolenceDataset(OUTPUT_FRAME_PATH, transform=transform)

# Split into training and validation sets
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

# Create dataloaders
BATCH_SIZE = 4

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

print(f"Train samples: {len(train_dataset)}")
print(f"Validation samples: {len(val_dataset)}")


Train samples: 1560
Validation samples: 391


In [16]:
# Cell 7: Define CNN + LSTM model architecture

class CNNLSTM(nn.Module):
    def __init__(self, hidden_size=256, num_layers=1, num_classes=2):
        super(CNNLSTM, self).__init__()
        
        # Pretrained CNN (ResNet-18 without the final FC layer)
        resnet = models.resnet18(pretrained=True)
        self.cnn = nn.Sequential(*list(resnet.children())[:-1])  # Remove the last FC layer
        self.cnn_output_size = resnet.fc.in_features  # 512 for resnet18

        # Freeze CNN weights (optional, for faster training)
        for param in self.cnn.parameters():
            param.requires_grad = False

        # LSTM to process sequence of CNN features
        self.lstm = nn.LSTM(input_size=self.cnn_output_size,
                            hidden_size=hidden_size,
                            num_layers=num_layers,
                            batch_first=True)

        # Final classifier
        self.classifier = nn.Linear(hidden_size, num_classes)

    def forward(self, x):  # x shape: [B, T, C, H, W]
        B, T, C, H, W = x.size()
        cnn_features = []

        for t in range(T):
            frame = x[:, t, :, :, :]  # Shape: [B, C, H, W]
            with torch.no_grad():  # No gradient for CNN
                feat = self.cnn(frame).squeeze(-1).squeeze(-1)  # Shape: [B, 512]
            cnn_features.append(feat)

        # Stack over time axis
        features = torch.stack(cnn_features, dim=1)  # Shape: [B, T, 512]

        # Pass through LSTM
        lstm_out, _ = self.lstm(features)  # Shape: [B, T, hidden]
        last_out = lstm_out[:, -1, :]  # Take last output
        logits = self.classifier(last_out)  # Shape: [B, num_classes]
        return logits


In [17]:
# Cell 8: Training function

def train_model(model, train_loader, criterion, optimizer, device):
    model.train()
    total_loss = 0

    for batch in tqdm(train_loader, desc="Training"):
        inputs, labels = batch
        inputs = inputs.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    avg_loss = total_loss / len(train_loader)
    return avg_loss


In [18]:
# Cell 9: Validation function

def evaluate_model(model, val_loader, criterion, device):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for batch in tqdm(val_loader, desc="Validation"):
            inputs, labels = batch
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

    avg_loss = total_loss / len(val_loader)
    accuracy = correct / total
    return avg_loss, accuracy


In [19]:
# Cell 11: Setup ResNet18 as feature extractor
resnet = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
resnet = nn.Sequential(*list(resnet.children())[:-1])  # Remove FC layer
resnet.eval()
resnet.to(device)

# Preprocessing pipeline
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])


In [37]:
# Cell 12: Extract features from frames and save as .pt files
from glob import glob
from pathlib import Path
from PIL import Image
from tqdm import tqdm
import cv2
import torch

def extract_and_save_features(frame_root, feature_output_dir):
    frame_root = Path(frame_root)
    feature_output_dir = Path(feature_output_dir)
    feature_output_dir.mkdir(parents=True, exist_ok=True)

    for label in ["Violence", "NonViolence"]:
        label_path = frame_root / label
        out_label_path = feature_output_dir / label
        out_label_path.mkdir(parents=True, exist_ok=True)

        for video_folder in tqdm(sorted(label_path.iterdir()), desc=f"Processing {label}"):
            video_features = []
            for frame_file in sorted(video_folder.glob("*.jpg")):
                # Read frame with OpenCV
                frame = cv2.imread(str(frame_file))
                if frame is None:
                    continue  # Skip unreadable images

                # Convert to RGB and PIL format
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                pil_image = Image.fromarray(frame)  # ✅ Convert to PIL explicitly

                # Apply transform and send to device
                tensor = transform(pil_image).unsqueeze(0).to(device)

                # Extract feature
                with torch.no_grad():
                    feature = resnet(tensor).squeeze().cpu()
                video_features.append(feature)

            if video_features:
                video_features = torch.stack(video_features)  # Shape: [num_frames, 512]
                torch.save(video_features, out_label_path / f"{video_folder.name}.pt")

# Set paths
FRAME_ROOT = "data/frames"
FEATURE_SAVE_DIR = "data/features"

# Run feature extraction
extract_and_save_features(FRAME_ROOT, FEATURE_SAVE_DIR)


Processing Violence: 100%|██████████| 1000/1000 [13:19<00:00,  1.25it/s]
Processing NonViolence: 100%|██████████| 951/951 [12:02<00:00,  1.32it/s]


In [38]:
# Cell 13: Dataset for .pt feature tensors

class FeatureDataset(Dataset):
    def __init__(self, feature_dir):
        self.samples = []
        self.labels = []

        for label_idx, label in enumerate(["NonViolence", "Violence"]):
            folder = Path(feature_dir) / label
            for pt_file in folder.glob("*.pt"):
                self.samples.append(pt_file)
                self.labels.append(label_idx)

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        feature_path = self.samples[idx]
        features = torch.load(feature_path)  # Shape: [num_frames, 512]
        label = self.labels[idx]
        return features, label



In [39]:
# Cell 14: DataLoaders for extracted features

from sklearn.model_selection import train_test_split

# Paths
FEATURE_PATH = "data/features"  # This should match your save location

# Full dataset
full_dataset = FeatureDataset(FEATURE_PATH)

# Train/Val split (80/20)
train_idx, val_idx = train_test_split(
    list(range(len(full_dataset))),
    test_size=0.2,
    stratify=full_dataset.labels,
    random_state=42
)

train_subset = torch.utils.data.Subset(full_dataset, train_idx)
val_subset = torch.utils.data.Subset(full_dataset, val_idx)

# Loaders
train_loader = DataLoader(train_subset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_subset, batch_size=16, shuffle=False)


In [40]:
# Cell 15: LSTM-only model using cached CNN features

class LSTMClassifier(nn.Module):
    def __init__(self, input_size=512, hidden_size=256, num_layers=1, num_classes=2):
        super(LSTMClassifier, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        # x: [batch_size, sequence_len, 512]
        out, _ = self.lstm(x)  # out: [batch_size, seq_len, hidden]
        out = out[:, -1, :]    # Take last time step
        out = self.fc(out)     # Final classification
        return out


In [42]:
# Cell 16: Training loop using precomputed CNN features

model = LSTMClassifier().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

EPOCHS = 10
train_losses = []
val_losses = []
val_accuracies = []

for epoch in range(EPOCHS):
    print(f"\nEpoch {epoch+1}/{EPOCHS}")

    train_loss = train_model(model, train_loader, criterion, optimizer, device)
    val_loss, val_accuracy = evaluate_model(model, val_loader, criterion, device)

    train_losses.append(train_loss)
    val_losses.append(val_loss)
    val_accuracies.append(val_accuracy)

    print(f"Train Loss: {train_loss:.4f}")
    print(f"Val Loss: {val_loss:.4f} | Val Accuracy: {val_accuracy*100:.2f}%")



Epoch 1/10


Training: 100%|██████████| 98/98 [00:19<00:00,  5.15it/s]
Validation: 100%|██████████| 25/25 [00:04<00:00,  5.63it/s]


Train Loss: 0.4129
Val Loss: 0.1984 | Val Accuracy: 93.86%

Epoch 2/10


Training: 100%|██████████| 98/98 [00:02<00:00, 43.75it/s]
Validation: 100%|██████████| 25/25 [00:00<00:00, 92.00it/s]


Train Loss: 0.1761
Val Loss: 0.1471 | Val Accuracy: 94.63%

Epoch 3/10


Training: 100%|██████████| 98/98 [00:02<00:00, 44.43it/s]
Validation: 100%|██████████| 25/25 [00:00<00:00, 94.67it/s]


Train Loss: 0.1123
Val Loss: 0.1215 | Val Accuracy: 96.42%

Epoch 4/10


Training: 100%|██████████| 98/98 [00:02<00:00, 44.52it/s]
Validation: 100%|██████████| 25/25 [00:00<00:00, 90.59it/s]


Train Loss: 0.0895
Val Loss: 0.1021 | Val Accuracy: 96.42%

Epoch 5/10


Training: 100%|██████████| 98/98 [00:02<00:00, 44.93it/s]
Validation: 100%|██████████| 25/25 [00:00<00:00, 88.62it/s]


Train Loss: 0.0604
Val Loss: 0.0897 | Val Accuracy: 97.19%

Epoch 6/10


Training: 100%|██████████| 98/98 [00:02<00:00, 43.68it/s]
Validation: 100%|██████████| 25/25 [00:00<00:00, 89.96it/s]


Train Loss: 0.0381
Val Loss: 0.0859 | Val Accuracy: 96.93%

Epoch 7/10


Training: 100%|██████████| 98/98 [00:02<00:00, 44.42it/s]
Validation: 100%|██████████| 25/25 [00:00<00:00, 94.64it/s]


Train Loss: 0.0257
Val Loss: 0.0786 | Val Accuracy: 97.44%

Epoch 8/10


Training: 100%|██████████| 98/98 [00:02<00:00, 43.64it/s]
Validation: 100%|██████████| 25/25 [00:00<00:00, 92.72it/s]


Train Loss: 0.0134
Val Loss: 0.0809 | Val Accuracy: 97.70%

Epoch 9/10


Training: 100%|██████████| 98/98 [00:02<00:00, 45.04it/s]
Validation: 100%|██████████| 25/25 [00:00<00:00, 83.53it/s]


Train Loss: 0.0071
Val Loss: 0.0884 | Val Accuracy: 97.95%

Epoch 10/10


Training: 100%|██████████| 98/98 [00:02<00:00, 45.19it/s]
Validation: 100%|██████████| 25/25 [00:00<00:00, 91.47it/s]

Train Loss: 0.0100
Val Loss: 0.0841 | Val Accuracy: 97.70%





In [43]:
# Cell 17: Save model
MODEL_SAVE_PATH = "violence_lstm_model.pth"
torch.save(model.state_dict(), MODEL_SAVE_PATH)
print(f"Model saved to {MODEL_SAVE_PATH}")


Model saved to violence_lstm_model.pth


In [44]:
# Define the CNN model (ResNet18 without the final classifier)
cnn_model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
cnn_model = nn.Sequential(*list(cnn_model.children())[:-1])  # Remove classifier
cnn_model.to(device)
cnn_model.eval()


Sequential(
  (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (2): ReLU(inplace=True)
  (3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (4): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Con

In [45]:
def predict_video(video_path, cnn_model, lstm_model, transform, device, frames_per_video=20):
    from PIL import Image

    # Step 1: Extract frames
    temp_dir = "temp_frames"
    Path(temp_dir).mkdir(exist_ok=True)
    extract_frames_from_video(video_path, temp_dir, frames_per_video=frames_per_video)

    # Step 2: Identify the subdirectory (e.g., temp_frames/V_996)
    subdirs = list(Path(temp_dir).glob("*"))
    if not subdirs:
        print("[ERROR] No extracted frame folders found.")
        return

    frame_folder = subdirs[0]
    frame_paths = sorted(frame_folder.glob("*.jpg"))
    if not frame_paths:
        print("[ERROR] No frames found in extracted folder.")
        return

    # Step 3: Preprocess and extract CNN features
    features = []
    cnn_model.eval()
    with torch.no_grad():
        for frame_path in frame_paths:
            image = Image.open(frame_path).convert("RGB")
            image = transform(image).unsqueeze(0).to(device)
            feature = cnn_model(image).squeeze().cpu()
            features.append(feature)

    if not features:
        print("[ERROR] No frames processed.")
        return

    features = torch.stack(features).unsqueeze(0).to(device)  # Shape: (1, T, F)

    # Step 4: LSTM prediction
    lstm_model.eval()
    with torch.no_grad():
        output = lstm_model(features)
        prediction = torch.argmax(output, dim=1).item()

    # Step 5: Clean up
    for f in frame_paths:
        f.unlink()
    frame_folder.rmdir()
    Path(temp_dir).rmdir()

    print("🔍 Prediction:", "Violence" if prediction == 1 else "Non-Violence")


In [49]:
predict_video(
    video_path="C:/Users/adita/Downloads/violence-detection-system/NV_981.mp4",
    cnn_model=cnn_model,
    lstm_model=model,
    transform=transform,
    device=device
)


[INFO] Saved 20 frames for C:/Users/adita/Downloads/violence-detection-system/NV_981.mp4
🔍 Prediction: Non-Violence
