In [2]:
from datasets import load_dataset
from torchvision.transforms import Compose, Resize, ToTensor, Lambda
import torchvision.transforms.functional as TF
import torch
from torch.utils.data import Dataset, DataLoader
import cv2
import numpy as np
from torch.nn.utils.rnn import pad_sequence
from torchvision.transforms.functional import to_pil_image


In [3]:
datasets = load_dataset("jinmang2/ucf_crime")
datasets = datasets['train'].shuffle(seed=42)

You can avoid this message in future by passing the argument `trust_remote_code=True`.
Passing `trust_remote_code=True` will be mandatory to load this dataset from the next major release of `datasets`.


In [4]:
train_test_split = datasets.train_test_split(test_size=0.2, seed=42)
train_dataset = train_test_split['train']
test_dataset = train_test_split['test']

train_val_split = train_dataset.train_test_split(test_size=0.2, seed=42)
train_dataset = train_val_split['train']
val_dataset = train_val_split['test']


In [4]:
import torch
import torchvision.models as models
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
import cv2

class VideoDataset(Dataset):
    def __init__(self, dataset, target_fps=1, transform=None):
        self.dataset = dataset
        self.target_fps = target_fps
        self.transform = transform or transforms.Compose([
            transforms.ToPILImage(),
            transforms.Resize((112, 112)), 
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], \
                std=[0.229, 0.224, 0.225])
        ])
        self.feature_extractor = \
            models.mobilenet_v2(pretrained=True)

        self.feature_extractor.fc = torch.nn.Identity()
        self.feature_extractor.eval()

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        video_path = self.dataset[idx]['video_path']
        frames = self.load_video(video_path, self.target_fps)
        features = []
        with torch.no_grad():
            for frame in frames:
                frame = self.transform(frame)
                frame = frame.unsqueeze(0)  
                feature = self.feature_extractor(frame)
                features.append(feature.squeeze(0)) 
        features = torch.stack(features)
        label = self.dataset[idx]['anomaly']
        return features, label

    def load_video(self, video_path, target_fps):
        cap = cv2.VideoCapture(video_path)
        frames = []
        native_fps = cap.get(cv2.CAP_PROP_FPS)
        frame_ratio = max(1, round(native_fps / target_fps))

        frame_idx = 0
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            if frame_idx % frame_ratio == 0:
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                frames.append(frame)
            frame_idx += 1
        cap.release()
        return frames


In [5]:
from torch.utils.data import DataLoader

train_dataset = VideoDataset(train_dataset)
val_dataset = VideoDataset(val_dataset)

train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=1, shuffle=False)



In [7]:
import torch.nn as nn

class LSTM(nn.Module):
    def __init__(self):
        super(LSTM, self).__init__()
        self.input_dim = 1000
        self.h = 256 
        self.numOfLayers = 1 
        self.numOfClasses = 2 
        self.W = nn.Linear(self.h, self.numOfClasses)
        self.lstm = nn.LSTM(self.input_dim, self.h, self.numOfLayers, batch_first=True)


    def forward(self, inputs):
        # Forward pass through LSTM layer
        # x of shape (batch, seq, feature)
        output, (hidden, cn) = self.lstm(inputs)
        # Assuming using the last hidden state
        out = self.W(hidden[-1])
        return out



In [9]:
def validate(model, val_loader, device, return_misclassified=False):
    model.eval()
    correct_predictions = 0
    total_samples = 0
    misclassified_examples = []

    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            correct_predictions += (predicted == labels).sum().item()
            total_samples += labels.size(0)
    accuracy = correct_predictions / total_samples
    model.train()

    if return_misclassified:
        return accuracy, misclassified_examples
    return accuracy


In [None]:
import matplotlib.pyplot as plt

def train(model, val_loader, computeLoss, optimizer, num_epochs, device, save_path='best_model_RNN.pth'):
    model = model.to(device)
    previous_val_accuracy = 0
    best_val_accuracy = 0

    for epoch in range(num_epochs):
        model.train()
        total_correct = 0
        total_samples = 0
        batch_losses = []
        batch_accuracies = []
        train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True)


        for i, (inputs, labels) in enumerate(train_loader):
            print(inputs.shape, labels)
            inputs = inputs.to(device)
            labels = labels.to(device)

            # Forward pass
            outputs = model(inputs)
            loss = computeLoss(outputs, labels)
            batch_losses.append(loss.item())

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # Calculate batch accuracy
            _, predicted = torch.max(outputs, 1)
            correct = (predicted == labels).sum().item()
            total_correct += correct
            total_samples += labels.size(0)

            if (i + 1) % 100 == 0:
                batch_accuracy = 100.0 * total_correct / total_samples
                batch_accuracies.append(batch_accuracy)
                print(f'Epoch {epoch+1}, Step {i+1}, Loss: {sum(batch_losses) / len(batch_losses):.4f}, '
                      f'Accuracy: {batch_accuracy:.2f}%')
                total_correct = 0
                total_samples = 0
                batch_losses = []

        val_accuracy = validate(model, val_loader, device)
        print(f'Epoch {epoch+1}: Validation Accuracy: {val_accuracy:.4f}')


        # Saving the model if it has the best validation loss
        if val_accuracy < best_val_accuracy:
            best_val_accuracy = val_accuracy
            torch.save(model.state_dict(), save_path)
            print(f'Saved best model to {save_path}')

        if val_accuracy < 0.9 * previous_val_accuracy:
            print("Stopping early due to less than 10% decrease in validation loss.")
            break
        previous_val_accuracy = val_accuracy

        # Plotting
        plt.figure(figsize=(10, 5))
        plt.subplot(1, 2, 1)
        plt.plot(batch_accuracies, label='Accuracy per 100 examples')
        plt.title('Accuracy per 100 examples')
        plt.xlabel('Batch')
        plt.ylabel('Accuracy')
        plt.legend()

        plt.show()

    final_model_path = 'final_model_RNN.pth'
    torch.save(model.state_dict(), final_model_path)
    print(f'Saved final model state to {final_model_path}')

model = LSTM()
computeLoss = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

train(model, val_loader, computeLoss, optimizer, num_epochs=1, device=device)



In [5]:
test_dataset = VideoDataset3DCNN(test_dataset)

test_loader = DataLoader(test_dataset, batch_size=1, shuffle=True)


NameError: name 'VideoDataset3DCNN' is not defined

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
criterion = nn.CrossEntropyLoss()

def test(model, data_loader, device):
    model = model.to(device)
    model.eval() 
    total_loss = 0
    correct_predictions = 0
    total_predictions = 0
    counter = 0
    with torch.no_grad(): 
            for inputs, labels in data_loader:
                inputs = inputs.to(device)
                labels = labels.to(device)
                
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                total_loss += loss.item()

                _, predicted = torch.max(outputs, 1)
                correct_predictions += (predicted == labels).sum().item()
                total_predictions += labels.size(0)
                counter += 1
                print(predicted, labels, correct_predictions, total_predictions)
    
    avg_loss = total_loss / len(data_loader)
    accuracy = correct_predictions / total_predictions
    return avg_loss, accuracy

test_loss, test_accuracy = test(modelFinal, test_loader, device)
print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}')

In [6]:
import torch
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
import cv2
import numpy as np

class VideoDataset3DCNN(Dataset):
    def __init__(self, dataset, clip_length=240, transform=None):
        self.dataset = dataset
        self.clip_length = clip_length 
        self.transform = transform or transforms.Compose([
            transforms.ToPILImage(),
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        video_path = self.dataset[idx]['video_path']
        label = self.dataset[idx]['anomaly']
        frames = self.load_video(video_path, self.clip_length)

        if self.transform:
            frames = [self.transform(frame) for frame in frames]

        frames_tensor = torch.stack(frames, dim=0)
        frames_tensor = frames_tensor.permute(1, 0, 2, 3)

        return frames_tensor, label

    def load_video(self, video_path, clip_length):
        """
        Load a clip containing 'clip_length' frames from a video.
        """
        cap = cv2.VideoCapture(video_path)
        frames = []
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        frame_indices = [min(int(fps * i), int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) - 1) for i in range(clip_length)]

        for idx in frame_indices:
            cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
            ret, frame = cap.read()
            if ret:
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)  # Convert BGR to RGB
                frames.append(frame)
            elif frames:
                frames += [frames[-1]] * (clip_length - len(frames))
                break
            else:
                break
        cap.release()
        return frames


In [7]:
from torch.utils.data import DataLoader

train_dataset = VideoDataset3DCNN(train_dataset)
val_dataset = VideoDataset3DCNN(val_dataset)

train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=1, shuffle=False)

In [15]:
import torch.nn as nn

class Conv3D(nn.Module):
    def __init__(self, num_classes):
        super(Conv3D, self).__init__()
        
        self.conv1 = nn.Conv3d(3, 32, kernel_size=(3, 3, 3), stride=(1, 2, 2), padding=1) 
        self.relu = nn.ReLU()
        self.pool = nn.MaxPool3d(kernel_size=(2, 2, 2), stride=(2, 2, 2)) 
        self.conv2 = nn.Conv3d(32, 64, kernel_size=(3, 3, 3), stride=(1, 2, 2), padding=1)
        
        num_features = 752640 
        
        self.fc1 = nn.Linear(num_features, 256)
        self.fc2 = nn.Linear(256, num_classes)

    def forward(self, inputs):
        # Conv layer operations
        layer1 = self.pool(self.relu(self.conv1(inputs)))
        layer2 = self.pool(self.relu(self.conv2(layer1)))
        # Flatten the tensor for the fully connected layer
        feature_vector = layer2.view(layer2.size(0), -1)
        output1 = self.relu(self.fc1(feature_vector))
        output2 = self.fc2(output1)
        return output2



In [11]:
def CNNvalidate(model, val_loader, device):
    model.eval()
    total_correct = 0
    total_samples = 0

    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)  
            total_correct += (predicted == labels).sum().item()
            total_samples += labels.size(0)

    accuracy = total_correct / total_samples
    print(f'Validation Accuracy: {accuracy * 100:.2f}%')
    return accuracy


In [None]:
import matplotlib.pyplot as plt

def CNNtrain(model, val_loader, computeLoss, optimizer, num_epochs, device, save_path='best_model.pth'):
    model = model.to(device)
    best_val_accuracy = 0
    error_examples = []

    for epoch in range(num_epochs):
        model.train()
        total_correct = 0
        total_samples = 0
        batch_losses = []
        batch_accuracies = []

        for i, (inputs, labels) in enumerate(train_loader):
            print(inputs.shape, labels)
            inputs = inputs.to(device)
            labels = labels.to(device)

            # Forward pass
            outputs = model(inputs)
            loss = computeLoss(outputs, labels)
            batch_losses.append(loss.item())

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # Calculate batch accuracy
            _, predicted = torch.max(outputs.data, 1)
            correct = (predicted == labels).sum().item()
            total_correct += correct
            total_samples += labels.size(0)

            if (i + 1) % 100 == 0:
                batch_accuracy = 100.0 * total_correct / total_samples
                batch_accuracies.append(batch_accuracy)
                print(f'Epoch {epoch+1}, Step {i+1}, Loss: {sum(batch_losses) / len(batch_losses):.4f}, '
                      f'Accuracy: {batch_accuracy:.2f}%')
                total_correct = 0
                total_samples = 0
                batch_losses = []

        # Validation after each epoch
        val_accuracy = CNNvalidate(model, val_loader, device)
        print(f'Epoch {epoch+1}: Validation Accuracy: {val_accuracy:.4f}')

        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            torch.save(model.state_dict(), save_path)
            print(f'Saved best model to {save_path}')

    # Plotting
    plt.figure(figsize=(10, 5))
    plt.plot(batch_accuracies, label='Accuracy per 100 examples')
    plt.title('Accuracy per 100 examples')
    plt.xlabel('Batch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.show()

    final_model_path = 'final_model.pth'
    torch.save(model.state_dict(), final_model_path)
    print(f'Saved final model state to {final_model_path}')


model = Conv3D(2)
computeLoss = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

CNNtrain(model, val_loader, computeLoss, optimizer, num_epochs=5, device=device)

