In [2]:
import os 
from PIL import Image 
import xml.etree.ElementTree as ET
import torch 
import torch.utils.data 
from torchvision import transforms as T 
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torch.utils.data import DataLoader, random_split
from tqdm import tqdm 

In [3]:
#custom dataset class
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, img_dir, annotation_dir, transforms = None):
        self.img_dir = img_dir 
        self.annotation_dir = annotation_dir
        self.transforms = transforms

        self.images = sorted([f for f in os.listdir(img_dir) if f.endswith(('.jpg', '.png', '.jpeg'))])
        print(f"Found {len(self.images)} images")

    def __getitem__(self, idx):
        #load images
        img_path = os.path.join(self.img_dir, self.images[idx])
        img = Image.open(img_path).convert("RGB")

        #load annotation
        annotation_file = os.path.join(
            self.annotation_dir,
            self.images[idx].replace('.jpg' , '.xml').replace('.png', '.xml')
        )


        #Parse XML annotation

        tree = ET.parse(annotation_file)
        root = tree.getroot()
        boxes = []
        labels = []

        #extract bounding boxes and labels
        for obj in root.findall('object'):
            label = obj.find('name').text 

            if label == 'with_mask':
                label_id = 1 
            elif label == 'without_mask':
                label_id = 2
            else:
                label_id = 3

            
            #get bounding box
            bbox = obj.find('bndbox')
            xmin = float(bbox.find('xmin').text)
            ymin = float(bbox.find('ymin').text)
            xmax = float(bbox.find('xmax').text)
            ymax = float(bbox.find('ymax').text)

            boxes.append([xmin,ymin,xmax,ymax])
            labels.append(label_id)

        #convert to tensors
        boxes = torch.as_tensor(boxes, dtype = torch.float32)
        labels = torch.as_tensor(labels, dtype = torch.int64)

        target = {}
        target["boxes"] = boxes 
        target["labels"] = labels 

        #transforms
        if self.transforms is not None:
            img = self.transforms(img)
        return img, target 
    
    def __len__(self):
        return len(self.images)
    
transform = T.Compose([
    T.ToTensor(),
])

print("self done")


self done


In [4]:
dataset = CustomDataset(
    img_dir = r"C:\Users\wming\OneDrive\Desktop\kaggle\yolov8_detection\archive\images",
    annotation_dir= r"C:\Users\wming\OneDrive\Desktop\kaggle\yolov8_detection\archive\annotations",
    transforms=transform
)

def collate_fn(batch):
    return tuple(zip(*batch))


train_size = int(0.8*len(dataset))
valid_size = len(dataset) - train_size

train_dataset, valid_dataset = random_split(dataset, [train_size,valid_size])

if __name__ == '__main__':
    train_loader = DataLoader(
        train_dataset,
        batch_size=1,
        shuffle = True,
        num_workers=0,
        collate_fn=collate_fn
    )

    valid_loader = DataLoader(
        valid_dataset,
        batch_size = 1,
        shuffle = False,
        num_workers=0,
        collate_fn=collate_fn
    )

Found 853 images


In [5]:
model = fasterrcnn_resnet50_fpn(pretrained = True)
num_classes = 4 #background + 3 mask calsses
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)



In [6]:
# device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
# print(f"using device: {device}")
# model = model.to(device)

# if torch.cuda.is_available():
#     print(f"GPU NAME: {torch.cuda.get_device_name(0)}")
#     print(f"Memory Allocated: {torch.cuda.memory_allocated(0)/(1024**2):.2f} MB")


device = torch.device("cpu")

In [7]:
#train settings
optimizer = torch.optim.SGD(
    [p for p in model.parameters() if p.requires_grad],
    lr = 0.001,
    momentum = 0.9,
    weight_decay= 0.0005
)

lr_scheduler = torch.optim.lr_scheduler.StepLR(
    optimizer,
    step_size = 5,
    gamma = 0.5
)

epochs = 5
best_loss = float('inf')

def train_one_epoch(model, optimizer, data_loader, device):
    model.train()
    total_loss = 0
    num_batches = 0

    with tqdm(data_loader, desc = "Training") as pbar:
        for images, targets in pbar:
            try:
                images = list(image.to(device) for image in images)
                targets = [{k: v.to(device) for k,v in t.items()} for t in targets]

                if num_batches == 0:
                    print(f"first batch - number of images: {len(images)}")
                    print(f"first image shape: {images[0].shape}")
                    print(f"first target boxes shape: {targets[0]['boxes'].shape}")

                optimizer.zero_grad()

                loss_dict = model(images, targets)
                losses = sum(loss for loss in loss_dict.values())

                if not torch.isfinite(losses):
                    print(f"loss is {losses}, skip batch")
                    continue 

                losses.backward()
                torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm =1.0)

                optimizer.step()

                total_loss += losses.item()
                num_batches += 1

                pbar.set_postfix({
                    'batch_loss': losses.item(),
                    'avg_loss' : total_loss / num_batches
                })

            except Exception as e:
                print(f"Error in batch: {e}")
                continue 

    return total_loss / num_batches if num_batches > 0 else float('inf')

In [7]:
for epoch in range(epochs):
    try:
        print(f"\nEpoch {epoch+1} / {epochs}")

        epoch_loss = train_one_epoch(model, optimizer, train_loader, device)

        lr_scheduler.step()

        print(f"average loss: {epoch_loss:.4f}")
        print(f"learning rate: {lr_scheduler.get_last_lr()[0]:.6f}")



        #save best model
        if epoch_loss < best_loss:
            best_loss = epoch_loss
            torch.save({
                'epoch':epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'loss': epoch_loss,

            }, 'best_model.pth')
            print("save best model!")
    except KeyboardInterrupt:
        print("\nTraining interrupted by user")
        break 
    except Exception as e :
        print(f"Error in epoch {epoch+1}: {e}")
        continue 

print("\nTraining complete!")
print(f"best loss achieved: {best_loss:.4f}")


Epoch 1 / 5


Training:   0%|          | 0/682 [00:00<?, ?it/s]

first batch - number of images: 1
first image shape: torch.Size([3, 210, 400])
first target boxes shape: torch.Size([7, 4])


Training: 100%|██████████| 682/682 [24:04<00:00,  2.12s/it, batch_loss=0.129, avg_loss=0.524] 


average loss: 0.5238
learning rate: 0.001000
save best model!

Epoch 2 / 5


Training:   0%|          | 0/682 [00:00<?, ?it/s]

first batch - number of images: 1
first image shape: torch.Size([3, 400, 301])
first target boxes shape: torch.Size([1, 4])


Training: 100%|██████████| 682/682 [23:42<00:00,  2.09s/it, batch_loss=0.0647, avg_loss=0.349]


average loss: 0.3493
learning rate: 0.001000
save best model!

Epoch 3 / 5


Training:   0%|          | 0/682 [00:00<?, ?it/s]

first batch - number of images: 1
first image shape: torch.Size([3, 400, 267])
first target boxes shape: torch.Size([1, 4])


Training: 100%|██████████| 682/682 [23:41<00:00,  2.08s/it, batch_loss=0.232, avg_loss=0.309] 


average loss: 0.3095
learning rate: 0.001000
save best model!

Epoch 4 / 5


Training:   0%|          | 0/682 [00:00<?, ?it/s]

first batch - number of images: 1
first image shape: torch.Size([3, 374, 400])
first target boxes shape: torch.Size([4, 4])


Training: 100%|██████████| 682/682 [23:43<00:00,  2.09s/it, batch_loss=0.167, avg_loss=0.284] 


average loss: 0.2838
learning rate: 0.001000
save best model!

Epoch 5 / 5


Training:   0%|          | 0/682 [00:00<?, ?it/s]

first batch - number of images: 1
first image shape: torch.Size([3, 400, 267])
first target boxes shape: torch.Size([2, 4])


Training: 100%|██████████| 682/682 [23:52<00:00,  2.10s/it, batch_loss=0.0776, avg_loss=0.267]


average loss: 0.2671
learning rate: 0.000500
save best model!

Training complete!
best loss achieved: 0.2671


In [8]:
torch.save({
    'epoch': epochs,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'loss': best_loss,
}, 'final_model.pth')

print('save final model')

save final model


In [11]:
import cv2
import numpy as np 
from PIL import Image, ImageDraw 
def evaluate_model(model, valid_loader, device):
    model.eval()

    label_map = {
        1: "with_mask",
        2: "without_mask",
        3: "mask_incorrect"
    }

#Test on validation dataset
with torch.no_grad(): #disable gradient calculations
    for images, targets in valid_loader:
        images = list(img.to(device) for img in images)

        predictions = model(images)

        for i , prediction in enumerate(predictions):
            image = image[i].cpu().permute(1,2,0).numpy()
            image = (image*255).astype(np.uint8)

            pil_image = Image.fromarray(image)
            draw = ImageDraw.Draw(pil_image)

            boxes = prediction['boxes'].cpu().numpy()
            labels = prediction['labels'].cpu().numpy()
            scores = prediction['scores'].cpu().numpy()

            for box, label, score in zip(boxes, labels, scores):
                if score > 0.5: #threshold
                    box = box.astype(np.int32)
                    draw.rectangle(
                        [(box[0], box[1], box[2], box[3])],
                        outline = 'red',
                        width = 2
                    )

                    label_text = f"{label_map[label]}: {score:.2f}"
                    draw.text((box[0], box[1] - 10,), label_text, fill = 'blue')
            pil_image.show() #for display
            pil_image.save(f"prediction_{i}.png")



            #print predictions

            print(f"\nImage {i} Predictions:")
            print("Boxes", boxes[scores>0.5])
            print("Labels:", [label_map[l] for l in labels[score > 0.5]])
            print("Scores:", scores[score > 0.5])
        break 


#use the function
print("staring evaluation...")
evaluate_model(model, valid_loader, device)


AssertionError: targets should not be none when in training mode

In [None]:
def visualize_predictions(mode, valid_loader, device, num_images = 120):

    model.eval()

    colors = {
        'with_mask': (0,255,0),
        'without_mask': (255,0,0),
        'mask_incorrect': (0,0,255)
    }

    label_map = {
        1: 'with_mask',
        2: 'without_mask',
        3: 'mask_incorrect'
    }

    with torch.no_grad():
        for batch_idx, (images,targets) in enumerate(valid_loader):
            # move images to device

            images = list(img.to(device) for img in images)

            predictions = model(images)

            for i , (image,prediction) in enumerate(zip(images, predictions)):

                if batch_idx * len(images) + i >= num_images:
                    return 
                
                img_np = image.cpu().permute(1,2,0).numpy()
                img_np = (img_np * 255).astype(np.uint8)
                img_np = cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR)

                boxes = prediction['boxes'].cpu().numpy()
                labels = prediction['labels'].cpu().numpy()
                scores = prediction['scores'].cpu().numpy()

                for box, label, score in zip(boxes, labels, scores):
                    if score >0.5:
                        box = box.astype(int)
                        label_name = label_map[label]
                        color = colors[label_name]

                        cv2.rectangle(imp_np, (box[0], box[1]), (box[2], box[3]). color, 3)
                        label_text = f"{label_name}: {score:.2f}"
                        cv2.putText(img_np, label_text, (box[0], box[1] - 10),
                                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
                        
                cv2.imwrite(f"prediction_image_{batch_idx}_{i}.png" , img_np)
                print(f"saved prediction_image_{batch_idx}_{i}.png")

print("starting visualization...")
visualize_predictions(model, valid_loader, device, num_images= 120)

In [None]:
from IPython.display import Image, display 
import glob 

for image_path in sorted(glob.glob('prediction_image_*.png')):
    display(Image(filename = image_path))
    print(f"displaying {iamge_path}")
