In [None]:
# Import necessary libraries
import torch
from torch.utils.data import Dataset
from PIL import Image
import os
import xml.etree.ElementTree as ET
import torchvision
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
import torchvision.transforms as T
import numpy as np
from tqdm.notebook import tqdm


label_mapping={"with_mask":1, "without_mask": 2, "mask_weared_incorrect": 3}

class MaskedFaceDataset(Dataset):
    def __init__(self, image_dir, transform=None):
        self.image_dir = image_dir
        self.transform = transform
        self.images = [os.path.join(image_dir, x) for x in os.listdir(image_dir) if x.endswith('.png') or x.endswith('.jpg')]

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_path = self.images[idx]
        image = Image.open(img_path).convert('RGB')

        # Load XML annotation
        annotation_path = img_path.replace('.png', '.xml').replace('.jpg', '.xml')
        tree = ET.parse(annotation_path)
        root = tree.getroot()

        # Extract bounding boxes and labels
        boxes = []
        labels = []
        for member in root.findall('object'):
            bbox = member.find('bndbox')
            boxes.append([float(bbox.find('xmin').text), float(bbox.find('ymin').text),
                          float(bbox.find('xmax').text), float(bbox.find('ymax').text)])
            label = member.find('name').text  # Assuming labels are strings like 'category1'
            labels.append(label)  # This will need to be converted to integers based on your label mapping

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.tensor([label_mapping[label] for label in labels], dtype=torch.int64)  # Convert string labels to integers

        target = {}
        target["boxes"] = boxes
        target["labels"] = labels

        if self.transform:
            image = self.transform(image)

        return image, target


def get_model(num_classes):
    # Load a pre-trained model for object detection
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights='FasterRCNN_ResNet50_FPN_Weights.DEFAULT')

    # Replace the classifier with a new one for num_classes (3 classes + background)
    num_classes = 4  # 3 classes + background
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    return model

def count_bounding_boxes(predictions, num_categories=3):
    counts = []
    for prediction in predictions:
        image_counts = [0] * num_categories
        labels = prediction['labels']
        for i in range(1, num_categories + 1):  # Categories are assumed to be labeled as 1, 2, ..., num_categories
            image_counts[i-1] = (labels == i).sum().item()
        counts.append(image_counts)
    return counts


def parse_xml_for_counts(image_dir, num_categories=3, category_names=None):
    if category_names is None:
        category_names = [f'category{i}' for i in range(1, num_categories + 1)]

    actual_counts = []
    for xml_file in os.listdir(image_dir):
        if xml_file.endswith('.xml'):
            tree = ET.parse(os.path.join(image_dir, xml_file))
            root = tree.getroot()

            image_counts = [0] * num_categories
            for member in root.findall('object'):
                label = member.find('name').text
                if label in category_names:
                    index = category_names.index(label)
                    image_counts[index] += 1

            actual_counts.append(image_counts)

    return actual_counts


# Example usage in a validation loop
def validate_model(model, val_loader, image_dir):
    model.eval()
    all_counts = []
    with torch.no_grad():
        for images, _ in val_loader:
            images = list(image.to(device) for image in images)
            outputs = model(images)

            outputs = [{k: v.to('cpu') for k, v in t.items()} for t in outputs]
            image_counts = count_bounding_boxes(outputs)
            all_counts.extend(image_counts)

    actual_counts = parse_xml_for_counts(image_dir)
    return all_counts, actual_counts

# Define transformations
transform = T.Compose([
    T.Resize((224, 224)),
    T.ToTensor()
])

path_to_train_dataset="/content/drive/MyDrive/MaskedFace/train"
path_to_val_dataset="/content/drive/MyDrive/MaskedFace/val"
# Load datasets and create data loaders
train_dataset = MaskedFaceDataset(path_to_train_dataset, transform=transform)
val_dataset = MaskedFaceDataset(path_to_val_dataset, transform=transform)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=lambda x: tuple(zip(*x)))
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=4, shuffle=False, collate_fn=lambda x: tuple(zip(*x)))

# Create the model and move it to the appropriate device
model = get_model(num_classes=4)
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

# Training Logic
num_epochs = 28
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

for epoch in range(num_epochs):
    model.train()
    for images, targets in tqdm(train_loader,desc='progress',leave=False):
        images = list(img.to(device) for img in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {losses.item()}")


# Validation Logic
predicted_counts, actual_counts = validate_model(model, val_loader, "/content/drive/MyDrive/MaskedFace/val")

# Comparing Predicted Counts with Actual Counts
def compare_counts(predicted, actual):
    for i in range(len(predicted)):
        print(f"Image {i+1}:")
        print(f"Predicted: {predicted[i]}, Actual: {actual[i]}")

compare_counts(predicted_counts, actual_counts)

In [None]:
def calculate_mape(actual_counts, predicted_counts):
    total_mape = 0
    for ac, pc in zip(actual_counts, predicted_counts):
        image_mape = 0
        for a, p in zip(ac, pc):
            image_mape += abs(a - p) / max(a, 1) * 100
        total_mape += image_mape / len(ac)
    return total_mape / len(actual_counts)

def count_masks(model, val_loader, image_dir):
    predicted_counts, actual_counts = validate_model(model, val_loader, image_dir)
    mape_score = calculate_mape(actual_counts, predicted_counts)
    predicted_counts_np = np.array(predicted_counts, dtype=np.int64)
    return predicted_counts_np, mape_score

# Example usage
predicted_counts_np, mape_score = count_masks(model, val_loader, "/content/drive/MyDrive/MaskedFace/val")
print(f"MAPE Score: {mape_score}%")
