In [1]:
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import os
import cv2
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt

## Prepare Dataset and Dataloader

#### 1. Define Dataset class

In [7]:
encode = {'background': 0, 'dilmah': 1, 'g7': 2, 'jack-jill': 3, 'karo': 4, 'nestea_atiso': 5, 'nestea_chanh': 6, 'nestea_hoaqua': 7, 'orion': 8, 'tipo': 9, 'y4': 10}

import PIL
from PIL import Image

class MiniPrjDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.image_paths = []
        self.labels = []
        self.class_to_idx = encode  # Use the provided encode dictionary
        for cls_name in self.class_to_idx.keys():
            
            cls_dir = os.path.join(root_dir, cls_name)
            print(f"Checking directory: {cls_dir}")  # Debug line
            if os.path.isdir(cls_dir):
                for img_name in os.listdir(cls_dir):
                    img_path = os.path.join(cls_dir, img_name)
                    self.image_paths.append(img_path)
                    self.labels.append(self.class_to_idx[cls_name])

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert("RGB")
        label = self.labels[idx]

        if self.transform:
            image = self.transform(image)

        return image, label


#### 2. Create Dataset object from data path

In [10]:
# Use all dataset for training
total_datatransform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Replace with data path if needed
all_dataset = MiniPrjDataset(root_dir=r'..\frames_224\total_data_labelled' , transform=total_datatransform)

Checking directory: ..\frames_224\total_data_labelled\background
Checking directory: ..\frames_224\total_data_labelled\dilmah
Checking directory: ..\frames_224\total_data_labelled\g7
Checking directory: ..\frames_224\total_data_labelled\jack-jill
Checking directory: ..\frames_224\total_data_labelled\karo
Checking directory: ..\frames_224\total_data_labelled\nestea_atiso
Checking directory: ..\frames_224\total_data_labelled\nestea_chanh
Checking directory: ..\frames_224\total_data_labelled\nestea_hoaqua
Checking directory: ..\frames_224\total_data_labelled\orion
Checking directory: ..\frames_224\total_data_labelled\tipo
Checking directory: ..\frames_224\total_data_labelled\y4


In [11]:
len(all_dataset)

21717

#### 3. Create class weights
This class weight will help rebalance the contributions to loss from loss of each class (Data is imbalanced)

In [12]:
import torch.nn.functional as F

class_counts = [sum(1 for label in all_dataset.labels if label == i) for i in range(11)]
print(f"Number of data samples in each class: {class_counts}")

total_samples = len(all_dataset)
class_weight = torch.tensor([total_samples / count if count > 0 else 0 for count in class_counts], dtype=torch.float32)
class_weight = F.normalize(class_weight, p=1, dim=0)
class_weight[0] = 0.05
print(f"Class weights: {class_weight}")

Number of data samples in each class: [396, 2725, 2637, 1734, 1577, 2387, 2036, 2233, 2743, 1527, 1722]
Class weights: tensor([0.0500, 0.0493, 0.0510, 0.0775, 0.0852, 0.0563, 0.0660, 0.0602, 0.0490,
        0.0880, 0.0781])


#### 4. Create Dataloader for SGD

Data samples are images, which will be heavy on device memory and cannot be loaded all at once

SGD with small batch size is required

Adjust batch size according to train machine memory

In [13]:
total_trainloader = DataLoader(all_dataset, batch_size=32, shuffle=True)

## Model blueprint

In [15]:
class Resnet50_Lite(nn.Module):

    def __init__(self, num_classes=1000, pretrained=True):
        super(Resnet50_Lite, self).__init__()
        self.resnet50_features = torchvision.models.resnet50(pretrained=pretrained)
        self.resnet50_features.fc = nn.Identity() # [512,1]
        self.classification_head = nn.Sequential(
            nn.Linear(2048, num_classes)
        )

    def forward(self, x):
        features = self.resnet50_features(x)
        classification = self.classification_head(features)

        return classification  # Return in the desired format

## Train model

#### 1. Train function

In [16]:
def last_train(model, train_loader, epochs, optimizer, criterion, device, epoch_debug=False):
    """
    Returns:
        Use for final model train
        total_cost: List of total loss per epoch.
        classify_cost: List of classification loss per epoch.
        objectbackground_cost: List of object/background loss per epoch.
    """
    model.train()
    model.to(device)

    # Track costs across all epochs
    total_cost, classify_cost, objectbackground_cost = [], [], []
    for epoch in range(epochs):
        model.train()
        total_cost_epoch, classify_cost_epoch, objectbackground_cost_epoch = 0, 0, 0

        for (data, label) in train_loader:
            data = data.to(device)
            label = label.to(device)
            data = data.squeeze(1)
            optimizer.zero_grad()
            class_logits = model(data)
            # print(class_logits.shape)
            # print(f"Label shape: {label.shape}, Label type: {type(label)}")  # Inside __getitem__
            loss = criterion(class_logits, label)
            loss.backward()
            optimizer.step()

            total_cost_epoch += loss.item()

        # Log training losses
        total_cost.append(total_cost_epoch / len(train_loader))

        if epoch_debug:
            print(
                f"Epoch {epoch + 1}/{epochs}: "
                f"Train Total Loss: {total_cost[-1]:.4f}, "
            )
    return total_cost

#### 2. Train the model
Class weight is used here as stated, choose crossEntropyLoss()

In [17]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [18]:
lite50_model = Resnet50_Lite(num_classes=11, pretrained=True)
for params in lite50_model.resnet50_features.parameters():
    params.requires_grad = False
for params in lite50_model.resnet50_features.layer4.parameters():
    params.requires_grad = True

class_weight = class_weight.to(device)
criterion_lite50 = nn.CrossEntropyLoss(weight=class_weight)
optimizer = optim.SGD(lite50_model.parameters(), lr=0.001, momentum=0.9, weight_decay=0.00005)

lite50_cost = last_train(
    lite50_model, total_trainloader, epochs=10, optimizer=optimizer, criterion=criterion_lite50, device=device, epoch_debug=True
)



Epoch 1/10: Train Total Loss: 0.1571, 
Epoch 2/10: Train Total Loss: 0.0059, 
Epoch 3/10: Train Total Loss: 0.0033, 
Epoch 4/10: Train Total Loss: 0.0022, 
Epoch 5/10: Train Total Loss: 0.0017, 
Epoch 6/10: Train Total Loss: 0.0015, 
Epoch 7/10: Train Total Loss: 0.0011, 
Epoch 8/10: Train Total Loss: 0.0009, 
Epoch 9/10: Train Total Loss: 0.0011, 
Epoch 10/10: Train Total Loss: 0.0007, 


In [19]:
torch.save(lite50_model, '..\model.pth')

## Create simple pipeline to apply model on a single video

#### 1. Apply model on a single video frame

In [13]:
import getBbox
import myutils

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
### Test on video
def boxAndClass_singleFrame(img, model):
    
    # To size 224x224
    resized_frame, x_pixels_pad = myutils.crop_and_resize(img)

    # Get bbox
    xmin, ymin, w, h = getBbox.getBbox(resized_frame)
    xmin_ratio = xmin / 224
    ymin_ratio = ymin / 224
    w_ratio = w / 224
    h_ratio = h / 224

    # Necessary transforms
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    resized_frame = transform(resized_frame).unsqueeze(0).to(device)
    
    # Get model prediction
    model.eval()
    model.to(device)
    with torch.no_grad():
        output = model.forward(resized_frame)
    
    # Get predicted class
    _, predicted_class = torch.max(output, 1)
    predicted_class = predicted_class.item()

    # Map the predicted class to the class name
    class_name = {v: k for k, v in encode.items()}[predicted_class]

    return class_name, (xmin_ratio, ymin_ratio, w_ratio, h_ratio), x_pixels_pad

#### 2. Apply model on all frames of video, also count the number of object belonging to each class

In [14]:
def boxAndClass_video(video_path, model, output_path, persistence_threshold=16):
    """
    Processes a video to predict the bounding box and class label for each frame and saves the output video,
    while counting the appearance of each class, using persistence tracking.

    Args:
        video_path (str): The path to the input video.
        model (torch.nn.Module): The pre-trained model used for prediction.
        output_path (str): The path to save the output video with bounding boxes and class labels.
        persistence_threshold (int): The number of frames an object must persist to be counted.
    """
    # Initialize counters for each class
    counter = {'dilmah': 0, 'g7': 0, 'jack-jill': 0, 'karo': 0, 'nestea_atiso': 0,
               'nestea_chanh': 0, 'nestea_hoaqua': 0, 'orion': 0, 'tipo': 0, 'y4': 0}

    # Track the persistence of each object over frames
    object_tracker = {'dilmah': 0, 'g7': 0, 'jack-jill': 0, 'karo': 0, 'nestea_atiso': 0,
                      'nestea_chanh': 0, 'nestea_hoaqua': 0, 'orion': 0, 'tipo': 0, 'y4': 0, 'background': 0}

    cap = cv2.VideoCapture(video_path)

    # Get video properties
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    # Define the codec and create VideoWriter object
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))

    # Read until video is completed
    while True:
        ret, frame = cap.read()
        if ret:
            saved_frame = frame.copy()

            # Get the predicted class and bounding box
            class_name, bbox, x_pixels_pad = boxAndClass_singleFrame(frame, model)
            xmin_ratio, ymin_ratio, w_ratio, h_ratio = bbox
            
            # Sửa lỗi vẽ box ở ảnh background
            if w_ratio < 0.2 or h_ratio < 0.2:
                class_name = 'background'
            
            object_tracker[class_name] += 1

            # Display the class name on the frame
            cv2.putText(saved_frame, class_name, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)
            
            if class_name != 'background':
                # Khi gặp object, buffer đếm background đưa về 0
                object_tracker['background'] = 0
                top_left, bottom_right = myutils.topLeftBottomRight(xmin_ratio, ymin_ratio, w_ratio, h_ratio, x_pixels_pad, frame_height)
                saved_frame = cv2.rectangle(saved_frame, top_left, bottom_right, (0, 255, 0), 1)

            elif class_name == 'background' and object_tracker['background'] >= 20:
                """
                Gặp quá nhiều background -> kết thúc object => đếm class xuất hiện nhiều nhất => tăng counter cho class đó
                """
                object_tracker['background'] = 0
                # Nếu class là background, tìm class có giá trị lớn nhất trong object_tracker
                max_class = max(object_tracker, key=object_tracker.get)

                # Nếu object xuất hiện đủ nhiều trong object tracker thì tăng counter cho class đó
                if object_tracker[max_class] >= persistence_threshold:
                    counter[max_class] += 1
                
                # Reset the object tracker
                object_tracker = {key: 0 for key in object_tracker}

            # Display the counter at the bottom right corner
            counter_text = ', '.join([f'{key}: {value}' for key, value in counter.items()])
            text_position = (10, frame_height - 10)  # Adjust text position
            cv2.putText(saved_frame, counter_text, text_position, cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)

            # Write the frame into the output video
            out.write(saved_frame)
        else:
            break

    # Release video capture and writer objects
    cap.release()
    out.release()

    # Output the final class counts
    print("Class counts in video:")
    for class_name, count in counter.items():
        print(f"{class_name}: {count}")


## Done! Test on test video

In [15]:
# Test the function on test video
boxAndClass_video(r'C:\Users\Admin\Documents\[ARS_SP1]14_Mini_Project_1\raw_data\video_test_1.avi', lite50_model, 'output_video_lite50_model.avi')

Class counts in video:
dilmah: 2
g7: 1
jack-jill: 3
karo: 2
nestea_atiso: 1
nestea_chanh: 0
nestea_hoaqua: 1
orion: 1
tipo: 0
y4: 3
