In [1]:
import os
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from torch.cuda.amp import GradScaler, autocast
from tqdm import tqdm


In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F  # Import this for functional operations
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms
from torch.cuda.amp import GradScaler, autocast


In [3]:
# **Dataset Class with Preloading**
class PreloadedDataset(Dataset):
    def __init__(self, root_dir, categories, sequence_length=8, transform=None):
        """
        Args:
            root_dir (str): Root directory containing category folders.
            categories (list): List of category names (subfolder names).
            sequence_length (int): Number of consecutive frames in each sequence.
            transform (callable, optional): Transform to apply to each frame.
        """
        self.data = []
        self.labels = []
        self.sequence_length = sequence_length
        self.transform = transform

        for label, category in enumerate(categories):
            category_path = os.path.join(root_dir, category)
            if not os.path.exists(category_path):
                print(f"Category folder does not exist: {category_path}")
                continue

            # Load all PNGs into memory
            print(f"Preloading category: {category}")
            frame_files = sorted([f for f in os.listdir(category_path) if f.endswith(".png")])
            for file in frame_files:
                img = Image.open(os.path.join(category_path, file)).convert("RGB")
                if self.transform:
                    img = self.transform(img)
                self.data.append(img)  # Add preprocessed image
                self.labels.append(label)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # Create sequences on-the-fly from preloaded data
        start_idx = idx
        end_idx = start_idx + self.sequence_length

        # Handle edge cases by padding with zeros
        if end_idx > len(self.data):
            sequence = self.data[start_idx:] + [torch.zeros_like(self.data[0])] * (end_idx - len(self.data))
        else:
            sequence = self.data[start_idx:end_idx]

        # Stack into tensor of shape (C, T, H, W)
        sequence = torch.stack(sequence, dim=1)
        label = self.labels[idx]
        return sequence, label

In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class DualAttention3DCNNWithDropout(nn.Module):
    def __init__(self, num_classes):
        super(DualAttention3DCNNWithDropout, self).__init__()
        
        # 3D Convolutional Layers
        self.conv1 = nn.Conv3d(3, 32, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm3d(32)
        self.pool1 = nn.MaxPool3d(kernel_size=2, stride=2)
        self.dropout1 = nn.Dropout3d(p=0.3)  # Dropout after first conv layer

        self.conv2 = nn.Conv3d(32, 64, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm3d(64)
        self.pool2 = nn.MaxPool3d(kernel_size=2, stride=2)
        self.dropout2 = nn.Dropout3d(p=0.3)  # Dropout after second conv layer

        self.conv3 = nn.Conv3d(64, 128, kernel_size=3, stride=1, padding=1)
        self.bn3 = nn.BatchNorm3d(128)
        self.pool3 = nn.MaxPool3d(kernel_size=2, stride=2)
        self.dropout3 = nn.Dropout3d(p=0.3)  # Dropout after third conv layer

        # Spatial Attention
        self.spatial_fc = nn.Conv3d(128, 1, kernel_size=1)  # Reduce feature maps to attention weights
        
        # Temporal Attention
        self.temporal_fc = nn.Linear(128, 1)  # Learn temporal attention weights

        # Fully Connected Layers
        self.fc1 = nn.Linear(128, 128)  # Intermediate FC layer
        self.dropout_fc = nn.Dropout(p=0.5)  # Dropout after FC layer
        self.fc2 = nn.Linear(128, num_classes)  # Final output layer

    def forward(self, x):
        # 3D CNN Layers
        x = self.pool1(F.relu(self.bn1(self.conv1(x))))
        x = self.dropout1(x)  # Apply dropout after first conv layer
        
        x = self.pool2(F.relu(self.bn2(self.conv2(x))))
        x = self.dropout2(x)  # Apply dropout after second conv layer
        
        x = self.pool3(F.relu(self.bn3(self.conv3(x))))
        x = self.dropout3(x)  # Apply dropout after third conv layer

        # Spatial Attention
        spatial_weights = torch.sigmoid(self.spatial_fc(x))  # (B, 1, T, H, W)
        x = x * spatial_weights  # Apply spatial attention

        # Global Spatial Pooling
        x = x.mean(dim=[3, 4])  # Reduce spatial dimensions (B, 128, T)

        # Temporal Attention
        x = x.permute(0, 2, 1)  # (B, T, 128)
        temporal_weights = F.softmax(self.temporal_fc(x), dim=1)  # (B, T, 1)
        x = (x * temporal_weights).sum(dim=1)  # Weighted sum over time (B, 128)

        # Fully Connected Layers with Dropout
        x = F.relu(self.fc1(x))
        x = self.dropout_fc(x)  # Apply dropout after FC layer
        x = self.fc2(x)
        
        return x


In [5]:
# **Prepare Datasets and DataLoaders**
categories = ["Abuse", "Arson", "Assault", "Burglary", "Explosion", "Fighting",
               "RoadAccidents", "Robbery", "Shooting", "Shoplifting",
              "Stealing", "Vandalism"]

train_root = "Train"
test_root = "Test"

transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),  # Slight rotations to avoid distortion
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
    transforms.Resize((112, 112)),  # Ensure the size is consistent
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])


In [6]:
from torch.utils.data import random_split, DataLoader

# Define dataset parameters
sequence_length = 16
val_split = 0.2  # Percentage of training data for validation
batch_size = 32



# Load the full train dataset
full_train_dataset = PreloadedDataset(root_dir=train_root, categories=categories, sequence_length=sequence_length, transform=transform)

# Compute sizes for train-validation split
val_size = int(len(full_train_dataset) * val_split)
train_size = len(full_train_dataset) - val_size

# Split dataset into train and validation
train_dataset, val_dataset = random_split(full_train_dataset, [train_size, val_size])

# Load the test dataset
test_dataset = PreloadedDataset(root_dir=test_root, categories=categories, sequence_length=sequence_length, transform=transform)

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


Preloading category: Abuse
Preloading category: Arson
Preloading category: Assault
Preloading category: Burglary
Preloading category: Explosion
Preloading category: Fighting
Preloading category: RoadAccidents
Preloading category: Robbery
Preloading category: Shooting
Preloading category: Shoplifting
Preloading category: Stealing
Preloading category: Vandalism
Preloading category: Abuse
Preloading category: Arson
Preloading category: Assault
Preloading category: Burglary
Preloading category: Explosion
Preloading category: Fighting
Preloading category: RoadAccidents
Preloading category: Robbery
Preloading category: Shooting
Preloading category: Shoplifting
Preloading category: Stealing
Preloading category: Vandalism


In [12]:
# **Model Setup**
num_classes = len(categories)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.backends.cudnn.benchmark = True
model = DualAttention3DCNNWithDropout(num_classes=num_classes).to(device)

criterion = nn.CrossEntropyLoss(label_smoothing=0.1)

optimizer = optim.AdamW(model.parameters(), lr=0.0005, weight_decay=1e-4)

scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

scaler = GradScaler()  # Mixed precision scaler


  scaler = GradScaler()  # Mixed precision scaler


In [9]:
# **Training Loop**
num_epochs = 5
save_dir = "models"  # Folder to save models
os.makedirs(save_dir, exist_ok=True)

In [10]:
for epoch in range(num_epochs):
    print(f"Starting Epoch {epoch + 1}/{num_epochs}")
    model.train()
    running_loss = 0.0
    train_correct = 0
    train_total = 0

    progress_bar = tqdm(train_loader, desc=f"Epoch {epoch + 1}/{num_epochs}")

    for batch_idx, (inputs, labels) in enumerate(progress_bar):
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()

        # Mixed precision training
        with autocast():
            outputs = model(inputs)
            loss = criterion(outputs, labels)

        # Backward pass
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        running_loss += loss.item()
        _, preds = torch.max(outputs, 1)
        train_correct += (preds == labels).sum().item()
        train_total += labels.size(0)

        # Update progress bar
        progress_bar.set_postfix(loss=loss.item())

    train_accuracy = train_correct / train_total
    epoch_loss = running_loss / len(train_loader)
    print(f"Epoch {epoch + 1}/{num_epochs} completed with Train Loss: {epoch_loss:.4f}, Train Accuracy: {train_accuracy:.4f}")

    # Validation Phase
    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0

    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            with autocast():
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                _, preds = torch.max(outputs, 1)
                val_correct += (preds == labels).sum().item()
                val_total += labels.size(0)

    val_accuracy = val_correct / val_total
    val_loss /= len(val_loader)
    print(f"Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}")

    # Save model checkpoint
    torch.save(model.state_dict(), os.path.join(save_dir, f"simple_3dcnn_epoch_{epoch + 1}.pth"))
    print(f"Model saved for Epoch {epoch + 1}")

    # Step scheduler
    scheduler.step()

print("Training complete.")


Starting Epoch 1/5


  with autocast():
Epoch 1/5: 100%|██████████| 7305/7305 [28:22<00:00,  4.29it/s, loss=2.11]  
  with autocast():


Epoch 1/5 completed with Train Loss: 2.0874, Train Accuracy: 0.3204
Validation Loss: 1.8198, Validation Accuracy: 0.4435
Model saved for Epoch 1
Starting Epoch 2/5


Epoch 2/5: 100%|██████████| 7305/7305 [30:28<00:00,  3.99it/s, loss=1.99]  


Epoch 2/5 completed with Train Loss: 1.8954, Train Accuracy: 0.4125
Validation Loss: 1.6535, Validation Accuracy: 0.5221
Model saved for Epoch 2
Starting Epoch 3/5


Epoch 3/5: 100%|██████████| 7305/7305 [6:00:38<00:00,  2.96s/it, loss=1.35]      


Epoch 3/5 completed with Train Loss: 1.7751, Train Accuracy: 0.4719
Validation Loss: 1.5320, Validation Accuracy: 0.5846
Model saved for Epoch 3
Starting Epoch 4/5


Epoch 4/5: 100%|██████████| 7305/7305 [29:37<00:00,  4.11it/s, loss=1.67]  


Epoch 4/5 completed with Train Loss: 1.6850, Train Accuracy: 0.5155
Validation Loss: 1.4185, Validation Accuracy: 0.6335
Model saved for Epoch 4
Starting Epoch 5/5


Epoch 5/5: 100%|██████████| 7305/7305 [30:28<00:00,  3.99it/s, loss=1.68] 


Epoch 5/5 completed with Train Loss: 1.6151, Train Accuracy: 0.5511
Validation Loss: 1.3351, Validation Accuracy: 0.6740
Model saved for Epoch 5
Training complete.


In [11]:
model.eval()
test_loss = 0.0
test_correct = 0
test_total = 0

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        with autocast():
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            test_loss += loss.item()

            _, preds = torch.max(outputs, 1)
            test_correct += (preds == labels).sum().item()
            test_total += labels.size(0)

test_accuracy = test_correct / test_total
test_loss /= len(test_loader)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")


  with autocast():


Test Loss: 2.7687, Test Accuracy: 0.1663


In [13]:
torch.save({
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'scheduler_state_dict': scheduler.state_dict(),
    'epoch': epoch
}, os.path.join(save_dir, f"checkpoint_epoch_{epoch + 1}.pth"))


In [13]:
checkpoint = torch.load(os.path.join("models\checkpoint_epoch_5.pth"))
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
start_epoch = checkpoint['epoch'] + 1  # Resume from the next epoch


  checkpoint = torch.load(os.path.join("models\checkpoint_epoch_5.pth"))
  checkpoint = torch.load(os.path.join("models\checkpoint_epoch_5.pth"))


In [17]:
num_additional_epochs = 8
save_dir = "models" 
for epoch in range(start_epoch, start_epoch + num_additional_epochs):
    print(f"Resuming Training: Epoch {epoch + 1}/{start_epoch + num_additional_epochs}")
    model.train()
    running_loss = 0.0

    for batch_idx, (inputs, labels) in enumerate(train_loader):
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()

        # Forward pass
        with autocast():
            outputs = model(inputs)
            loss = criterion(outputs, labels)

        # Backward pass
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        running_loss += loss.item()

    train_loss = running_loss / len(train_loader)
    print(f"Epoch {epoch + 1} completed with Train Loss: {train_loss:.4f}")

    # Validation phase
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            with autocast():
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

    val_loss /= len(val_loader)
    print(f"Validation Loss: {val_loss:.4f}")

    # Save checkpoint
    torch.save({
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'scheduler_state_dict': scheduler.state_dict(),
        'epoch': epoch
    }, os.path.join(save_dir, f"checkpoint_epoch_{epoch + 1}.pth"))

    scheduler.step()


Resuming Training: Epoch 6/13


  with autocast():


Epoch 6 completed with Train Loss: 1.5609


  with autocast():


Validation Loss: 1.2988
Resuming Training: Epoch 7/13
Epoch 7 completed with Train Loss: 1.5450
Validation Loss: 1.3034
Resuming Training: Epoch 8/13
Epoch 8 completed with Train Loss: 1.5394
Validation Loss: 1.2942
Resuming Training: Epoch 9/13
Epoch 9 completed with Train Loss: 1.5309
Validation Loss: 1.2801
Resuming Training: Epoch 10/13
Epoch 10 completed with Train Loss: 1.5231
Validation Loss: 1.2807
Resuming Training: Epoch 11/13
Epoch 11 completed with Train Loss: 1.5173
Validation Loss: 1.2630
Resuming Training: Epoch 12/13
Epoch 12 completed with Train Loss: 1.5164
Validation Loss: 1.2662
Resuming Training: Epoch 13/13
Epoch 13 completed with Train Loss: 1.5148
Validation Loss: 1.2652


MODEL EVALUATION

In [18]:
# Test Phase
model.eval()
test_loss = 0.0
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        test_loss += loss.item()

        # Calculate accuracy
        _, preds = torch.max(outputs, 1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)

# Calculate average test loss and accuracy
test_loss /= len(test_loader)
test_accuracy = correct / total
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")


Test Loss: 2.7938, Test Accuracy: 0.1508


In [25]:
model.eval()  # Set the model to evaluation mode
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = 100 * correct / total
print(f"Test Accuracy: {accuracy:.2f}%")


Test Accuracy: 32.71%


LOADING MODEL


In [20]:
import torch

# Reinitialize the model
num_classes = 12  # Number of classes in your dataset
model = DualAttention3DCNNWithDropout(num_classes=num_classes)

# Load the saved model checkpoint
checkpoint_path = "models\checkpoint_epoch_13.pth"  # Path to your checkpoint file
checkpoint = torch.load(checkpoint_path)

# Load the model's state dictionary
model.load_state_dict(checkpoint['model_state_dict'])

# Move the model to the appropriate device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# Set the model to evaluation mode
model.eval()
print("Model loaded and ready for inference.")


Model loaded and ready for inference.


  checkpoint_path = "models\checkpoint_epoch_13.pth"  # Path to your checkpoint file
  checkpoint = torch.load(checkpoint_path)


VIDEO PROCESSING PIPELINE

In [None]:
from torchvision import transforms
from PIL import Image
import cv2

def preprocess_video(video_path, sequence_length=16):
    """
    Preprocess a video file into a tensor suitable for the model.

    Args:
        video_path (str): Path to the video file.
        sequence_length (int): Number of frames to extract.

    Returns:
        torch.Tensor: Preprocessed video tensor of shape (1, C, T, H, W).
    """
    # Define transformations (resize, normalize)
    transform = transforms.Compose([
        transforms.Resize((112,112)),  # Resize frames to 224x224
        transforms.ToTensor(),         # Convert to tensor
        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])  # Normalize
    ])
    
    # Open video file using OpenCV
    cap = cv2.VideoCapture(video_path)
    frames = []
    frame_count = 0

    while cap.isOpened() and frame_count < sequence_length:
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)  # Convert BGR to RGB
        frame = Image.fromarray(frame)  # Convert to PIL image
        frame = transform(frame)  # Apply transformations
        frames.append(frame)
        frame_count += 1

    cap.release()

    # If the video has fewer frames than sequence_length, pad with black frames
    while len(frames) < sequence_length:
        frames.append(torch.zeros_like(frames[0]))

    # Stack frames into a single tensor: (C, T, H, W)
    video_tensor = torch.stack(frames, dim=1).unsqueeze(0)  # Add batch dimension
    return video_tensor


In [22]:
def predict_video(model, video_tensor, device):
    """
    Predict the class of a video using the trained model.

    Args:
        model (nn.Module): The trained model.
        video_tensor (torch.Tensor): Preprocessed video tensor of shape (1, C, T, H, W).
        device (torch.device): Device to run the model on.

    Returns:
        int: Predicted class index.
    """
    video_tensor = video_tensor.to(device)
    with torch.no_grad():
        outputs = model(video_tensor)  # Forward pass
        _, predicted_class = torch.max(outputs, 1)  # Get the predicted class index
    return predicted_class.item()

# Example usage
video_path = "output_folder_video\RoadAccidents001_x264.avi"  # Path to the input video
video_tensor = preprocess_video(video_path, sequence_length=16)
predicted_class = predict_video(model, video_tensor, device)

print(f"Predicted Class Index: {predicted_class}")


  video_path = "output_folder_video\RoadAccidents001_x264.avi"  # Path to the input video


Predicted Class Index: 10


In [23]:
# Define your class label mapping
class_labels = ["Abuse", "Arson", "Assault", "Burglary", "Explosion", 
                "Fighting", "RoadAccidents", "Robbery", "Shooting", 
                "Shoplifting", "Stealing", "Vandalism"]

# Print the predicted class label
print(f"Predicted Class: {class_labels[predicted_class]}")


Predicted Class: Stealing
