# Settings

In [5]:
import os
import json
import numpy as np
import torch 
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
import cv2
import mss
from sklearn.model_selection import train_test_split
from torchvision import transforms, datasets
from PIL import Image

In [2]:
num_classes = 5
img_width = 224
img_height = 224

random_state = 42
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]

lane_classes = {0: "Lane 1", 1: "Lane 2", 2: "Lane 3", 3: "Lane 4", 4: "Lane 5"}  # Update with your actual classes

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Pretrained Model (match with) -> train.py
model_path = "model.pth"
model = models.efficientnet_b0(weights=None)

# Classification Layer (match with) -> train.py
model.classifier = nn.Sequential(
    nn.Dropout(p=0.2, inplace=True),
    nn.Linear(1280, num_classes)
)

# Define image transformations (match with) -> train.py
transform = transforms.Compose([
    transforms.Resize((img_width, img_height)),
    transforms.ToTensor(),
    transforms.Normalize(mean=mean, std=std)
])

opset_version = 15
ubuntu_user = "brandon"

# 1. Dataset

In [None]:
'''
    PREPARING /test, /train, /val, labels.json
'''
# Step 1: Define paths and parameters
current_dir = os.getcwd()
dataset_path = os.path.join(os.path.join(current_dir, "dataset"), "annotation")  # Root dataset annotation folder
output_json = os.path.join(os.path.join(current_dir, "dataset"), "labels.json")  # JSON file inside dataset folder
output_folder = os.path.join(current_dir, "dataset")  # Use dataset/ as the output folder

train_ratio = 0.7
val_ratio = 0.15
test_ratio = 0.15
target_size = (img_width, img_height)

print("Preparing dataset. . .\n")
# Step 2: Create a JSON file with image-label mappings
labels = {}
# Iterate through each class folder (/0, /1, /2, ...)
for class_folder in sorted(os.listdir(dataset_path)):  # Ensure consistent order
    class_path = os.path.join(dataset_path, class_folder)
    
    if not os.path.isdir(class_path) or not class_folder.isdigit():  # Skip non-numeric folders
        continue

    class_label = int(class_folder)  # Folder name is the label (0, 1, 2, ...)
    for image_name in os.listdir(class_path):
        if image_name.lower().endswith((".jpg", ".png", ".jpeg")):  # Only process image files
            labels[image_name] = class_label  # Store only the image filename
# Save the JSON file inside dataset/
with open(output_json, "w") as f:
    json.dump(labels, f, indent=4)
print(f"JSON file saved to {output_json}")

# Step 3: Split the dataset into train, val, and test sets
image_paths = list(labels.keys())  # Only filenames
image_labels = list(labels.values())  # Corresponding labels
train_paths, test_paths, train_labels, test_labels = train_test_split(
    image_paths, image_labels, test_size=test_ratio, random_state=random_state
)
train_paths, val_paths, train_labels, val_labels = train_test_split(
    train_paths, train_labels, test_size=val_ratio / (train_ratio + val_ratio), random_state=random_state
)

# Step 4: Organize images into train, val, and test folders inside dataset/
def preprocess_image(image_path, target_size):
    """
    Load, resize, and normalize an image.

    Args:
        image_path (str): Path to the image file.
        target_size (tuple): Target size for resizing the image (width, height).
        mean (list): Mean values for normalization (default is ImageNet mean).
        std (list): Standard deviation values for normalization (default is ImageNet std).

    Returns:
        normalized_image (numpy.ndarray): Preprocessed image ready for input to a model.
    """
    # Load the image (OpenCV loads images in BGR format by default)
    image = cv2.imread(image_path)
    if image is None:
        raise ValueError(f"Image not found or unable to load: {image_path}")

    # Resize the image
    resized_image = cv2.resize(image, target_size)

    # Convert BGR to RGB
    resized_image = cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB)

    return resized_image

def organize_images(image_paths, image_labels, split_name):
    split_folder = os.path.join(output_folder, split_name)  # dataset/train, dataset/val, dataset/test
    os.makedirs(split_folder, exist_ok=True) # replace directory

    for image_name, label in zip(image_paths, image_labels):
        class_folder = os.path.join(split_folder, f"class_{label}")
        os.makedirs(class_folder, exist_ok=True) # replace directory

        # Construct source and destination paths
        src_path = os.path.join(dataset_path, str(label), image_name)
        dst_path = os.path.join(class_folder, image_name)

        # Preprocess and save the image
        if os.path.exists(src_path):
            try:
                preprocessed_image = preprocess_image(src_path, target_size)

                cv2.imwrite(dst_path, preprocessed_image)
            except Exception as e:
                print(f"Error processing {src_path}: {e}")
        else:
            print(f"Warning: {src_path} not found!")

# Create train, val, and test folders inside dataset/
organize_images(train_paths, train_labels, "train")
organize_images(val_paths, val_labels, "val")
organize_images(test_paths, test_labels, "test")

print(f"Dataset organized into {output_folder}/train, {output_folder}/val, {output_folder}/test.")
print("Finished preparing dataset.")

In [None]:
'''
    DELETES /test, /train, /val, labels.json
'''
# def delete_directories(dir_paths):
#     """Deletes the specified directories and their contents if they exist."""
#     for dir_path in dir_paths:
#         if os.path.exists(dir_path) and os.path.isdir(dir_path):
#             shutil.rmtree(dir_path)
#             print(f"Deleted directory or file: {dir_path}")
#         else:
#             print(f"Directory or file does not exist: {dir_path}")

# directories = ["dataset/train", "dataset/test", "dataset/val"]
# delete_directories(directories)

# if os.path.exists("dataset/labels.json"):
#     os.remove("dataset/labels.json")
#     print("File deleted successfully: dataset/labels.json")
# else:
#     print("File not found: dataset/labels.json")

# 2. Training

- train.py

# 3. Inference

In [3]:
checkpoint = torch.load(model_path)
model.load_state_dict(checkpoint['model_state_dict'])
model.to(device)
model.eval()

EfficientNet(
  (features): Sequential(
    (0): Conv2dNormActivation(
      (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): SiLU(inplace=True)
    )
    (1): Sequential(
      (0): MBConv(
        (block): Sequential(
          (0): Conv2dNormActivation(
            (0): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=32, bias=False)
            (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
            (2): SiLU(inplace=True)
          )
          (1): SqueezeExcitation(
            (avgpool): AdaptiveAvgPool2d(output_size=1)
            (fc1): Conv2d(32, 8, kernel_size=(1, 1), stride=(1, 1))
            (fc2): Conv2d(8, 32, kernel_size=(1, 1), stride=(1, 1))
            (activation): SiLU(inplace=True)
            (scale_activation): Sigmoid()
          )
          (2): Conv2dNormActivat

In [None]:
'''
    INFERENCE
'''
# Start full-screen capture
with mss.mss() as sct:
    monitor = sct.monitors[1]  # Capture the primary screen

    while True:
        # Capture the full screen
        screenshot = sct.grab(monitor)
        img_original = np.array(screenshot)  # Keep original image for display

        # Convert from BGRA to RGB
        img_rgb = cv2.cvtColor(img_original, cv2.COLOR_BGRA2RGB)
        
        # Convert to PIL Image for transformation
        pil_img = Image.fromarray(img_rgb)
        
        # Apply transformations
        input_tensor = transform(pil_img)
        input_batch = input_tensor.unsqueeze(0)  # Add batch dimension

        # Perform inference
        with torch.no_grad():
            logits = model(input_batch.to(device))
            probabilities = F.softmax(logits, dim=1)
            pred_class = torch.argmax(probabilities, dim=1).item()
            confidence = torch.max(probabilities).item()

        # Get prediction info
        label = f"{lane_classes[pred_class]}: {confidence:.2f}"

        # Display prediction on screen
        if confidence >= 0.5:
            cv2.putText(img_original, label, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 
                   1.5, (0, 255, 0), 3, cv2.LINE_AA)

        # Display output in a window
        cv2.imshow("Lane Classification - Press Q to quit", img_original)

        # Press 'q' to exit
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

# Cleanup
cv2.destroyAllWindows()

# 4. Evaluation

In [6]:
import torch
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
import os
from PIL import Image
import numpy as np

class TestDataset(Dataset):
    def __init__(self, test_dir, transform=None):
        self.test_dir = test_dir
        self.transform = transform
        self.image_paths = []
        self.labels = []
        
        # Collect all images and their labels from class folders
        for class_folder in os.listdir(test_dir):
            if class_folder.startswith('class_'):
                label = int(class_folder.split('_')[1])
                class_path = os.path.join(test_dir, class_folder)
                for img_name in os.listdir(class_path):
                    if img_name.lower().endswith(('.jpg', '.png', '.jpeg')):
                        self.image_paths.append(os.path.join(class_path, img_name))
                        self.labels.append(label)
    
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert('RGB')
        label = self.labels[idx]
        
        if self.transform:
            image = self.transform(image)
            
        return image, label

test_transform = transforms.Compose([
        transforms.Resize((img_height, img_width)),
        transforms.ToTensor(),
        transforms.Normalize(mean=mean, std=std)
    ])
test_dataset = datasets.ImageFolder("dataset/test", transform=test_transform)
test_loader = DataLoader(
        test_dataset,
        batch_size=64,
        shuffle=False, 
        num_workers=8,
        pin_memory=True
    )


def evaluate_model(model, test_loader, device):
    model.eval()
    correct = 0
    total = 0
    
    with torch.no_grad():
        for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)
            
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    accuracy = 100 * correct / total
    return accuracy

# Evaluate the model
test_accuracy = evaluate_model(model, test_loader, device)
print(f"Test Accuracy: {test_accuracy:.2f}%")

Test Accuracy: 99.21%


# 5. Exporting

In [None]:
input = torch.randn(1, 3, img_height, img_width).to(device)

# Export to ONNX
torch.onnx.export(model, input, "model.onnx", opset_version=opset_version)

# Export to WSL (\\wsl.localhost\Ubuntu\home\brandon)
torch.onnx.export(model, input, fr"\\wsl.localhost\Ubuntu\home\{ubuntu_user}\model.onnx", opset_version=opset_version)

# WSL Command Prompt - Note: Change Paths Accordingly
# source export/bin/activate
# onnx2tf -i model.onnx
# cp -r /home/brandon/saved_model /mnt/c/Users/Brandon/Desktop/FYP2/Code/cleadr/tflite/current_lane_number
# deactivate
# or
# ./export.sh
