<a href="https://colab.research.google.com/github/AfifaMasood/AfifaMasood/blob/main/Copy_of_Resnet%2Bcropped_images%2Byolo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os

from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [None]:
import os
from PIL import Image, UnidentifiedImageError
from torch.utils.data import Dataset, DataLoader
import torch
import torchvision.transforms as transforms
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models

# Custom Dataset class to extract labels from filenames
class CustomDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.image_paths = []
        self.labels = []
        self.corrupt_images = []  # Store corrupt image names

        print(f"🔄 Loading dataset from: {root_dir}")

        # List all the images in the directory
        for subdir, _, files in os.walk(root_dir):
            for file in files:
                if file.endswith(".jpg"):
                    file_path = os.path.join(subdir, file)

                    # Check if the image is corrupted or empty
                    if os.path.getsize(file_path) == 0:  # Zero-byte file check
                        print(f"⚠️ Skipping zero-byte image: {file}")
                        self.corrupt_images.append(file)
                        continue

                    try:
                        # Try to open the image to check for corruption
                        with Image.open(file_path) as img:
                            img.verify()  # Verify image integrity
                    except (UnidentifiedImageError, IOError):
                        print(f"⚠️ Corrupt image detected & skipped: {file}")
                        self.corrupt_images.append(file)
                        continue  # Skip corrupted images

                    # Extract class label from filename (e.g., "Shoplifting-1_frame0.jpg")
                    label = file.split('-')[0]
                    self.image_paths.append(file_path)
                    self.labels.append(label)

        # Convert labels to indices
        self.label_map = {'Shoplifting': 1, 'Normal': 0}
        self.labels = [self.label_map.get(label, 0) for label in self.labels]

        print(f"✅ Dataset loaded: {len(self.image_paths)} images found (Skipped {len(self.corrupt_images)} corrupt images)")

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_name = self.image_paths[idx]

        try:
            image = Image.open(img_name).convert("RGB")  # Convert to RGB
        except (UnidentifiedImageError, IOError):
            print(f"⚠️ Skipping unreadable image: {img_name}")
            return None, None  # Return None to avoid crashes

        if self.transform:
            image = self.transform(image)

        label = self.labels[idx]
        return image, label

# Define transformations for the dataset
transform = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Load the dataset (already cropped person images)
dataset_path = "/content/drive/MyDrive/split_dataset_3"

train_dataset = CustomDataset(root_dir=dataset_path + "/train", transform=transform)
test_dataset = CustomDataset(root_dir=dataset_path + "/test", transform=transform)

# Show the number of images per class in training data
train_labels = [label for _, label in train_dataset if label is not None]
print(f"📊 Shoplifting count in train: {train_labels.count(1)}")
print(f"📊 Normal count in train: {train_labels.count(0)}")

# Filter out None values from the dataset before DataLoader
train_dataset.image_paths, train_dataset.labels = zip(
    *[(img, lbl) for img, lbl in zip(train_dataset.image_paths, train_dataset.labels) if lbl is not None]
)

test_dataset.image_paths, test_dataset.labels = zip(
    *[(img, lbl) for img, lbl in zip(test_dataset.image_paths, test_dataset.labels) if lbl is not None]
)

# Create DataLoader
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# Load the pre-trained ResNet50 model and modify it for 2 classes (Shoplifting & Normal)
print("⏳ Loading ResNet34 model...")
model = models.resnet34(weights=models.ResNet34_Weights.IMAGENET1K_V1)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 2)  # 2 classes: Shoplifting, Normal
print("✅ ResNet34 model loaded and modified.")

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# Training loop
print("\n🚀 Starting Training...\n")

# Training loop
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for batch_idx, (data, targets) in enumerate(train_loader):
        data = data.to(device)
        targets = targets.to(device)

        outputs = model(data)
        loss = criterion(outputs, targets)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss / len(train_loader):.4f}')

# Save the trained model
torch.save(model.state_dict(), "/content/drive/MyDrive/resnet34_shoplifting.pth")
print("\n🎉 Training complete. Model saved as 'resnet34_shoplifting.pth' 🎉")


🔄 Loading dataset from: /content/drive/MyDrive/split_dataset_3/train
✅ Dataset loaded: 21438 images found (Skipped 0 corrupt images)
🔄 Loading dataset from: /content/drive/MyDrive/split_dataset_3/test
✅ Dataset loaded: 5000 images found (Skipped 0 corrupt images)
📊 Shoplifting count in train: 10719
📊 Normal count in train: 10719
⏳ Loading ResNet34 model...


Downloading: "https://download.pytorch.org/models/resnet34-b627a593.pth" to /root/.cache/torch/hub/checkpoints/resnet34-b627a593.pth
100%|██████████| 83.3M/83.3M [00:00<00:00, 192MB/s]


✅ ResNet34 model loaded and modified.

🚀 Starting Training...

Epoch [1/10], Loss: 0.2284
Epoch [2/10], Loss: 0.0815
Epoch [3/10], Loss: 0.0598
Epoch [4/10], Loss: 0.0515
Epoch [5/10], Loss: 0.0417
Epoch [6/10], Loss: 0.0752
Epoch [7/10], Loss: 0.0394
Epoch [8/10], Loss: 0.0273
Epoch [9/10], Loss: 0.0291
Epoch [10/10], Loss: 0.0251

🎉 Training complete. Model saved as 'resnet34_shoplifting.pth' 🎉


In [None]:
# Accuracy checking function
def check_accuracy(loader, model):
    num_correct = 0
    num_samples = 0
    model.eval()  # Set the model to evaluation mode

    with torch.no_grad():  # Disable gradient calculation during inference
        for x, y in loader:
            if x is None or y is None:  # Skip None batches
                continue

            x = x.to(device)
            y = y.to(device)
            scores = model(x)
            _, predictions = scores.max(1)
            num_correct += (predictions == y).sum().item()
            num_samples += y.size(0)

    accuracy = float(num_correct) / num_samples * 100
    return accuracy

# Load and check train accuracy
model.load_state_dict(torch.load('/content/drive/MyDrive/resnet34_shoplifting.pth', map_location=device))
model.to(device)

# Train Accuracy
print('Train Accuracy:')
train_accuracy = check_accuracy(train_loader, model)
print(f'Train Accuracy: {train_accuracy:.2f}%')

# Test Accuracy
print('Test Accuracy:')
test_accuracy = check_accuracy(test_loader, model)
print(f'Test Accuracy: {test_accuracy:.2f}%')


  model.load_state_dict(torch.load('/content/drive/MyDrive/resnet34_shoplifting.pth', map_location=device))


Train Accuracy:
