Build & Train CNN Model from Scratch

**Goals:**
1. Define a simple CNN architecture for 3‑class mask classification  
2. Set up loss (`CrossEntropyLoss`), optimizer (`Adam`), and (optional) LR scheduler  
3. Write train/validation loops  
4. Track & plot loss/accuracy  
5. Save the best model


### Import Libraries

In [1]:
import os
import time
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader,Dataset
from torchvision import datasets, transforms

import torchvision.transforms as transforms

import matplotlib.pyplot as plt
from PIL import Image
import numpy as np

In [2]:
if torch.cuda.is_available():
    device = torch.device("cuda")
    print(f'using GPU training :{torch.cuda.get_device_name(0)}')

using GPU training :NVIDIA GeForce RTX 3060 Laptop GPU


In [3]:
import xml.etree.ElementTree as ET



In [4]:
IMG_SIZE = 224
Batch_Size = 32
NUm_Workers = 0

CLASS_MAP = {"with_mask": 0, "without_mask": 1,"mask_weared_incorrect": 2}



In [5]:
def xml_to_label(xml_path):
    tree=ET.parse(xml_path)
    return CLASS_MAP[tree.find('object/name').text.lower()]


In [6]:
def collect_paths_and_labels(split):
    img_dir=f"dataset/{split}/images"
    xml_dir=f"dataset/{split}/annotations"
    paths,labels= [],[]
    for fn in os.listdir(img_dir):
        if not fn.lower().endswith('.png'):
            continue
        xml=os.path.join(xml_dir,fn.rsplit('.',1)[0]+'.xml')
        if not os.path.exists(xml):
            continue
        paths.append(os.path.join(img_dir,fn))
        labels.append(xml_to_label(xml))
    return paths,labels



In [7]:
train_paths,train_labels = collect_paths_and_labels('train')
val_paths,val_labels = collect_paths_and_labels('val')
test_paths,test_labels = collect_paths_and_labels('test')



In [8]:
train_transform = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])



In [9]:
val_transform = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [10]:
class MaskDetaset(Dataset):
    def __init__(self, paths, labels, transform=None):
        self.paths = paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, idx):
        img_path = self.paths[idx]
        label = self.labels[idx]
        image = Image.open(img_path).convert('RGB')

        if self.transform:
            image = self.transform(image)

        return image, label


In [11]:
train_loader = DataLoader(
    MaskDetaset(train_paths, train_labels, transform=train_transform),num_workers=NUm_Workers,
    batch_size=Batch_Size, shuffle=True,pin_memory=True
)

val_loader = DataLoader(
    MaskDetaset(val_paths, val_labels, transform=val_transform),num_workers=NUm_Workers,
    batch_size=Batch_Size, shuffle=False,pin_memory=True
)

test_loader = DataLoader(
    MaskDetaset(test_paths, test_labels, transform=val_transform),num_workers=NUm_Workers,
    batch_size=Batch_Size, shuffle=False,pin_memory=True
)

print(f"Trian batches:{len(train_loader)}")


Trian batches:19


In [12]:
class SimpleCNN(nn.Module):
    def __init__(self, num_classes=3):
        super().__init__()

        specs=[(3,16),(16,32),(32,64),(64,128),(128,256),(256,512),(512,1024)]
        layers=[]

        for in_channel,out_chanel in specs:
            layers+=[
                nn.Conv2d(in_channel,out_chanel, kernel_size=3, padding=1),
                nn.ReLU(),
                nn.MaxPool2d(kernel_size=2, stride=2)
            ]

        self.features = nn.Sequential(*layers)
        

        self.pool = nn.AdaptiveAvgPool2d((1, 1))

        with torch.no_grad():
            x = torch.randn(1, 3, IMG_SIZE, IMG_SIZE)
            x = self.features(x)
            num_features = x.numel()

        classifier_layers = [nn.Flatten()]
        hidden_units = [512, 256, 128,64, 32]
        previous_features = num_features

        for hidden_unit in hidden_units:
            classifier_layers+=[
                nn.Linear(previous_features, hidden_unit),
                nn.ReLU(),
                nn.Dropout(0.5)

            ]
            previous_features = hidden_unit
        classifier_layers.append(nn.Linear(previous_features, num_classes))
        self.classifier = nn.Sequential(*classifier_layers)

    def forward(self, x):
        x = self.features(x)
        x = self.pool(x)
        x = self.classifier(x)
        return x


model= SimpleCNN(num_classes=len(CLASS_MAP)).to(device)
print(model)



        

SimpleCNN(
  (features): Sequential(
    (0): Conv2d(3, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): ReLU()
    (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): ReLU()
    (8): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (9): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (10): ReLU()
    (11): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (12): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU()
    (14): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (15): Conv2d(256, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
   

In [13]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [14]:
from tqdm import tqdm

def train_one_epoch(model, loader, optimizer, criterion, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    # Wrap the loader with tqdm for a progress bar
    pbar = tqdm(loader, desc="Training", leave=False)
    for images, labels in pbar:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # Update metrics
        batch_size = images.size(0)
        running_loss += loss.item() * batch_size
        preds = outputs.argmax(dim=1)
        total += batch_size
        correct += (preds == labels).sum().item()

        # Update progress bar
        current_loss = running_loss / total
        current_acc  = correct / total
        pbar.set_postfix(loss=f"{current_loss:.4f}", acc=f"{current_acc:.4f}")

    epoch_loss = running_loss / len(loader.dataset)
    epoch_acc = correct / total
    return epoch_loss, epoch_acc


In [15]:
def validate(model, loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for images, labels in loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Update metrics
            batch_size = images.size(0)
            running_loss += loss.item() * batch_size
            preds = outputs.argmax(dim=1)
            total += labels.size(0)
            correct += (preds == labels).sum().item()

    epoch_loss = running_loss / len(loader.dataset)
    epoch_acc = correct / total
    return epoch_loss, epoch_acc

In [None]:
history={
    "train_loass":[],
    "val_loss": [],
    "train_acc": [],
    "val_acc": []
}

best_val_acc = 0.0
num_epochs = 10

best_model_path = "best_model.pth"

for epoch in range(1,num_epochs+1):
    start= time.time()
    train_loss, train_acc = train_one_epoch(model, train_loader, optimizer, criterion, device)
    val_loss, val_acc = validate(model, val_loader, criterion, device)

    history["train_loass"].append(train_loss)
    history["train_acc"].append(train_acc)
    history["val_loss"].append(val_loss)
    history["val_acc"].append(val_acc)

    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save(model.state_dict(), best_model_path)

    print(f"Epoch {epoch}/{num_epochs} - "
            f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, "
            f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}, "
            f"Time: {time.time() - start:.2f}s")
    

    print(f"Best Validation Accuracy: {best_val_acc:.4f}")

In [17]:
# %% 9.1 Imports & Test Transform
import cv2
import torch
from torchvision import transforms as T
from PIL import Image

# Use the same normalization as during training
test_transform = T.Compose([
    T.Resize((IMG_SIZE, IMG_SIZE)),
    T.ToTensor(),
    T.Normalize(mean=[0.485,0.456,0.406],
                std =[0.229,0.224,0.225]),
])


In [18]:
# %% 9.2 Load Model
model = SimpleCNN().to(device)
model.load_state_dict(torch.load("best_model.pth", map_location=device))
model.eval()


SimpleCNN(
  (features): Sequential(
    (0): Conv2d(3, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): ReLU()
    (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): ReLU()
    (8): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (9): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (10): ReLU()
    (11): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (12): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU()
    (14): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (15): Conv2d(256, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
   

In [19]:
def decode_predictions(grid, conf_thresh=0.4):
    """
    Decode the output grid from the model into bounding boxes, scores, and labels.
    """
    S = 7 # Assuming square grid
    B = 1  # Number of bounding boxes per cell
    C = len(CLASS_MAP)  # Number of classes

    boxes = []
    scores = []
    labels = []

    for i in range(S):
        for j in range(S):
            cell = grid[i, j]
            conf = cell[4]  # Confidence score
            if conf < conf_thresh:
                continue

            # Box coordinates
            x_center = (j + cell[0]) / S
            y_center = (i + cell[1]) / S
            width = cell[2]
            height = cell[3]

            x1 = x_center - width / 2
            y1 = y_center - height / 2
            x2 = x_center + width / 2
            y2 = y_center + height / 2

            boxes.append((x1, y1, x2, y2))
            scores.append(conf.item())

            # Class probabilities
            class_probs = cell[5:]
            label = class_probs.argmax().item()
            labels.append(label)

    return torch.tensor(boxes), torch.tensor(scores), labels

In [20]:
import os
os.environ["NCCL_P2P_DISABLE"] = "1"
os.environ["NCCL_SOCKET_IFNAME"] = "lo"


In [21]:
import cv2

def list_active_cameras(max_index=8):
    active = []
    for idx in range(max_index):
        cap = cv2.VideoCapture(idx, cv2.CAP_DSHOW)  # use CAP_DSHOW backend on Windows
        if cap.isOpened():
            ret, _ = cap.read()
            if ret:
                active.append(idx)
        cap.release()
    return active

print("Active camera indices:", list_active_cameras())



Active camera indices: []


In [22]:
# %% One‐Cell Webcam Inference (CPU‐safe)
import os
os.environ["NCCL_P2P_DISABLE"] = "1"
os.environ["NCCL_SOCKET_IFNAME"] = "lo"

import cv2
import torch
import torch.nn.functional as F
from torchvision import transforms as T
from PIL import Image

# Hyperparameters
IMG_SIZE = 224
CLASS_MAP = {"mask": 0, "mask_weared_incorrect": 1, "no_mask": 2}
CLASS_NAMES = {v: k for k, v in CLASS_MAP.items()}
device = torch.device("cpu")  # force CPU

# Load trained model
model = SimpleCNN(num_classes=len(CLASS_MAP)).to(device)
model.load_state_dict(torch.load("best_model.pth", map_location=device))
model.eval()

# Preprocessing transform
transform = T.Compose([
    T.Resize((IMG_SIZE, IMG_SIZE)),
    T.ToTensor(),
    T.Normalize([0.485, 0.456, 0.406],
                [0.229, 0.224, 0.225]),
])

# Start webcam
cap = cv2.VideoCapture(1)
add="http://192.168.1.199:8080/video "# use CAP_DSHOW backend on Windows
cap.open(add) # Use the correct URL or index for your webcam
print("Press 'q' to quit.")
while True:
    ret, frame = cap.read()
    if not ret:
        break

    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    inp = transform(Image.fromarray(rgb)).unsqueeze(0).to(device)

    with torch.no_grad():
        logits = model(inp)
        probs = F.softmax(logits, dim=1)
        conf, pred = probs.max(dim=1)

    label = CLASS_NAMES[pred.item()]
    text = f"{label}: {conf.item():.2f}"

    cv2.putText(frame, text, (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX, 1.0,
                (0, 255, 0), 2, cv2.LINE_AA)
    cv2.imshow("Mask Classification", frame)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()


Press 'q' to quit.
