In [11]:
import torch

# Check if a GPU is available for linux machines
if torch.cuda.is_available():
    device = torch.device('cuda')
    print("cuda")
# Check if a GPU is available for macOS machines
elif torch.backends.mps.is_available():
    device = torch.device("mps")
    print("apple silicon")
else:
    device = torch.device("cpu")
    print("no silicon, no cuda")


apple silicon


In [26]:
import os

  # Root directory
train_dir = './data/train'
val_dir = './data/val'
test_dir = './data/test'

In [27]:

from torchvision import datasets, transforms
from torch.utils.data import DataLoader

# Define data transformations (data augmentation for train, normalization for all)
train_transforms = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize all images to 224x224
    transforms.ToTensor(),  # Convert to PyTorch tensors
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])  # Normalize RGB channels
])

val_test_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# Load datasets
train_dataset = datasets.ImageFolder(root=train_dir, transform=train_transforms)
val_dataset = datasets.ImageFolder(root=val_dir, transform=val_test_transforms)
test_dataset = datasets.ImageFolder(root=test_dir, transform=val_test_transforms)

# Create DataLoaders
batch_size = 32

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Check dataset class mappings
print("Class Mappings:", train_dataset.class_to_idx)

# Example: Iterate over a DataLoader to check data shape
for images, labels in train_loader:
    print("Image batch shape:", images.shape)
    print("Label batch shape:", labels.shape)
    break

Class Mappings: {'awake': 0, 'sleepy': 1}
Image batch shape: torch.Size([32, 3, 224, 224])
Label batch shape: torch.Size([32])


In [28]:
import torchvision.models as models
import torch.nn as nn

# pre-trained ResNet model
resnet = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)

# Freeze all layers
for param in resnet.parameters():
    param.requires_grad = True

# Modify the final fully connected layer to output 56 classes
num_features = resnet.fc.in_features  # Get the input features for the fully connected layer
resnet.fc = nn.Linear(num_features, 2)  # Replace with a new fully connected layer with 56 outputs


In [29]:
import torch.optim as optim

def one_hot_encoding(targets, num_classes=2, device=device):
    return torch.eye(num_classes=2, device=device)[targets]

# Instantiate the model once and move it to the device
# model = plate_recognize_model().to(device)
model = resnet.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)

# Train function
def train(model, device, train_loader, optimizer):
    model.train()
    train_loss = 0
    correct = 0
    total = 0
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        
        optimizer.zero_grad()

        # Forward pass
        output = model(data)
        # print("Output shape:", output.shape)
        # print("Target shape:", target.shape)
        # Calculate loss with CrossEntropyLoss
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

        # Track loss
        train_loss += loss.item()

        # Track accuracy
        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()
        total += target.size(0)

        

    avg_train_loss = train_loss / len(train_loader)
    train_accuracy = correct / total
    
    return avg_train_loss, train_accuracy

# Test function
def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            
            # Calculate loss with CrossEntropyLoss
            loss = criterion(output, target)
            test_loss += loss.item()
            
            # Track accuracy
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
            total += target.size(0)
    
    avg_test_loss = test_loss / len(test_loader)
    test_accuracy = correct / total
    return avg_test_loss, test_accuracy

# Training loop
num_epochs = 50
train_losses = []
test_losses = []
train_accuracies = []
test_accuracies = []

for epoch in range(num_epochs):
    avg_train_loss, train_accuracy = train(
        model=model, device=device, train_loader=train_loader, optimizer=optimizer
    )
    avg_test_loss, test_accuracy = test(
        model=model, device=device, test_loader=test_loader
    )
    
    train_losses.append(avg_train_loss)
    test_losses.append(avg_test_loss)
    train_accuracies.append(train_accuracy)
    test_accuracies.append(test_accuracy)
    
    print(f'Epoch {epoch}: \tTrain Loss: {avg_train_loss:.4f} \tTest Loss: {avg_test_loss:.4f}'
          + f'\tTrain Accuracy: {train_accuracy:.4f}, \tTest Accuracy: {test_accuracy:.4f}')

Epoch 0: 	Train Loss: 0.0702 	Test Loss: 0.0418	Train Accuracy: 0.9765, 	Test Accuracy: 0.9865
Epoch 1: 	Train Loss: 0.0530 	Test Loss: 0.0545	Train Accuracy: 0.9818, 	Test Accuracy: 0.9803
Epoch 2: 	Train Loss: 0.0505 	Test Loss: 0.0650	Train Accuracy: 0.9827, 	Test Accuracy: 0.9788
Epoch 3: 	Train Loss: 0.0453 	Test Loss: 0.0857	Train Accuracy: 0.9843, 	Test Accuracy: 0.9734
Epoch 4: 	Train Loss: 0.0423 	Test Loss: 0.0413	Train Accuracy: 0.9851, 	Test Accuracy: 0.9870
Epoch 5: 	Train Loss: 0.0392 	Test Loss: 0.0284	Train Accuracy: 0.9861, 	Test Accuracy: 0.9905
Epoch 6: 	Train Loss: 0.0363 	Test Loss: 0.0325	Train Accuracy: 0.9871, 	Test Accuracy: 0.9890
Epoch 7: 	Train Loss: 0.0337 	Test Loss: 0.0325	Train Accuracy: 0.9882, 	Test Accuracy: 0.9899
Epoch 8: 	Train Loss: 0.0329 	Test Loss: 0.0271	Train Accuracy: 0.9885, 	Test Accuracy: 0.9909
Epoch 9: 	Train Loss: 0.0312 	Test Loss: 0.0323	Train Accuracy: 0.9888, 	Test Accuracy: 0.9893
Epoch 10: 	Train Loss: 0.0301 	Test Loss: 0.0276	T

KeyboardInterrupt: 

In [44]:
import torch
import torch.nn as nn

architecture_config = [
    # kernel_size, channel, stride, padding
    (7, 64, 2, 3),
    "Maxpooling",
    (3, 192, 1, 1),
    "Maxpooling",
    (1, 128, 1, 1),
    (3, 256, 1, 1),
    (1, 256, 1, 0),
    (3, 512, 1, 1),
    "Maxpooling",
    [(1, 256, 1, 0), (3, 512, 1, 1), 4],
    (1, 512, 1, 0),
    (3, 1024, 1, 1),
    "Maxpooling",
    [(1, 512, 1, 0), (3, 1024, 1, 1), 2],
    (3, 1024, 1, 1),
    (3, 1024, 2, 1),
    (3, 1024, 1, 1),
    (3, 1024, 1, 1),
]

class CNNBlock(nn.Module):
    def __init__(self, in_channels,out_channels, **kwargs) -> None:
        super(CNNBlock,self).__init__()
        self.conv = nn.Conv2d(in_channels,out_channels, bias=False, **kwargs)
        self.batchnorm = nn.BatchNorm2d(out_channels)
        self.leakyrelu = nn.LeakyReLU(0.1)

    def forward(self, x):
        return self.leakyrelu(self.batchnorm(self.conv(x)))
    
class Yolov1(nn.Module):
    def __init__(self, in_channels = 3, **kwargs) -> None:
        super(Yolov1, self).__init__()
        self.architecture = architecture_config
        self.in_channels = in_channels
        self.darknet = self._create_conv_layers(self.architecture)
        self.fcs = self._create_fcs(**kwargs)

    def forward(self, x):
        x = self.darknet(x)
        return self.fcs(torch.flatten(x, start_dim=1))
    
    def _create_conv_layers(self, architecture):
        layers = []
        in_channels = self.in_channels

        for x in architecture:
            if type(x) == tuple:
                layers += [
                    CNNBlock(
                        in_channels,
                        x[1],
                        kernel_size = x[0],
                        stride = x[2],
                        padding = x[3],
                    )
                ]

                in_channels = x[1]
            elif type(x) == str:
                layers += [nn.MaxPool2d(kernel_size=2, stride = 2)]
            elif type(x) == list:
                conv1 = x[0] # Tuple
                conv2 = x[1] # Tuple
                num_repeats = x[2] # Int
                
                for _ in range(num_repeats):
                    layers += [
                        CNNBlock(
                        in_channels,
                        conv1[1],
                        kernel_size = conv1[0],
                        stride = conv1[2],
                        padding = conv1[3]
                        )
                    ]

                    layers += [
                        CNNBlock(
                        conv1[1], # in_channels
                        conv2[1],
                        kernel_size = conv2[0],
                        stride = conv2[2],
                        padding = conv2[3]
                        )
                    ]

                    in_channels = conv2[1]

        return nn.Sequential(*layers)

    def _create_fcs(self, split_size, num_boxes, num_classes):
        S, B, C = split_size, num_boxes, num_classes

        return nn.Sequential( 
            nn.Flatten(),
            nn.Linear(1024 * S * S, 496) ,
            nn.Dropout(0.0),
            nn.LeakyReLU(0.1),
            nn.Linear(496, S*S*(C + B*5)) # last Linear Layer
        )
    
def test(S = 7, B = 2, C = 20):
        model = Yolov1(split_size = S, num_boxes = B,num_classes = C)
        x = torch.randn((2,3,448,448))
        print(model(x).shape)

test()

torch.Size([2, 1470])


### Make Loss fuction

In [74]:
import torch
import torch.nn as nn

def intersection_over_union(box1, box2):
    x1 = torch.max(box1[0], box2[0])
    y1 = torch.max(box1[1], box2[1])
    x2 = torch.min(box1[2], box2[2])
    y2 = torch.min(box1[3], box2[3])

    intersection = (x2 - x1).clamp(0) * (y2 - y1).clamp(0)
    area1 = (box1[2] - box1[0]) * (box1[3] - box1[1])
    area2 = (box2[2] - box2[0]) * (box2[3] - box2[1])

    union = area1 + area2 - intersection
    return intersection / union if union > 0 else 0


class YoloLoss(nn.Module):
    def __init__(self, S = 7, B = 2, C = 1):
        super(YoloLoss, self).__init__()
        self.mse = nn.MSELoss(reduction="sum")
        self.S = 7
        self.B = 2
        self.C = 1
        self.lambda_noobj = 0.5
        self.lambda_coord = 5

    def forward(self, predictions, target):
        predictions = predictions.reshape(-1, self.S, self.S, self.C + self.B*5)

        iou_b1 = intersection_over_union(predictions[..., self.C+1:self.C+5], target[..., self.C+1:self.C+5])
        iou_b2 = intersection_over_union(predictions[..., self.C+6:self.C+10], target[..., self.C+1:self.C+5])
        ious = torch.cat([iou_b1.unsqueeze(0), iou_b2.unsqueeze(0)], dim = 0)
        iou_maxes, bestbox = torch.max(ious, dim=0)
        exists_box = target[... , self.C].unsqueeze(3) # Iobj_i

        # for box loss
        box_predictions = exists_box * (
            (
                bestbox * predictions[..., self.C + 6 : self.C + 10]
                + (1 - bestbox) * predictions[..., self.C + 1 : self.C + 5]
            )
        )

        box_targets = exists_box * target[..., self.C + 1 : self.C + 5]

        # Take sqrt of width, height of boxes 
        box_predictions[..., 2:4] = torch.sign(box_predictions[..., 2:4]) * torch.sqrt(
            torch.abs(box_predictions[..., 2:4] + 1e-6)
        )
        box_targets[..., 2:4] = torch.sqrt(box_targets[..., 2:4])

        box_loss = self.mse(
            torch.flatten(box_predictions, end_dim=-2),
            torch.flatten(box_targets, end_dim=-2),
        )

        # for object loss
        pred_box = (
            bestbox * predictions[..., self.C + 5 : self.C + 6] + (1 - bestbox) * predictions[..., self.C : self.C + 1]
        )

        object_loss = self.mse(
            torch.flatten(exists_box * pred_box),
            torch.flatten(exists_box * target[..., self.C : self.C + 1]),
        )

        # for non-object loss

        no_object_loss = self.mse(
            torch.flatten((1 - exists_box) * predictions[..., self.C : self.C + 1], start_dim=1),
            torch.flatten((1 - exists_box) * target[..., self.C : self.C + 1], start_dim=1),
        )

        no_object_loss += self.mse(
            torch.flatten((1 - exists_box) * predictions[..., self.C + 5 : self.C + 6], start_dim=1),
            torch.flatten((1 - exists_box) * target[..., self.C : self.C + 1], start_dim=1)
        )

        # class loss
        class_loss = self.mse(
            torch.flatten(exists_box * predictions[..., :self.C], end_dim=-2,),
            torch.flatten(exists_box * target[..., :self.C], end_dim=-2,),
        )

        loss = (
            self.lambda_coord * box_loss  # first two rows in paper
            + object_loss  # third row in paper
            + self.lambda_noobj * no_object_loss  # forth row
            + class_loss  # fifth row
        )

        return loss

### Read File

In [75]:
import os
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image

# Function to read YOLO label files
def read_yolo_label(label_path):
    boxes = []
    try:
        with open(label_path, 'r') as file:
            for line in file.readlines():
                values = line.strip().split()
                class_id = int(values[0])  # Class ID
                x_center, y_center, width, height = map(float, values[1:])  # YOLO format
                boxes.append([class_id, x_center, y_center, width, height])
    except FileNotFoundError:
        # Handle missing label files gracefully
        print(f"Label file not found: {label_path}. Returning empty boxes.")
    return torch.tensor(boxes, dtype=torch.float32)

# Define image transformations
drive_train_transforms = transforms.Compose([
    # transforms.Resize((224, 224)),  # Resize all images to the same size
    transforms.ToTensor(),  # Convert to PyTorch tensors
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])  # Normalize RGB channels
])

class DriveYoloDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.image_paths = []
        self.label_paths = []
        self.labels = []

        # Populate image and label paths
        for class_name in os.listdir(root_dir):  # Iterate through "open", "closed"
            class_dir = os.path.join(root_dir, class_name)
            if os.path.isdir(class_dir):
                label = 0 if class_name.lower() == "open" else 1  # Assign numeric labels
                for file_name in os.listdir(class_dir):
                    if file_name.endswith(".jpg"):  # Look for image files
                        label_path = os.path.splitext(os.path.join(class_dir, file_name))[0] + ".txt"
                        if os.path.exists(label_path):  # Check if label file exists
                            self.image_paths.append(os.path.join(class_dir, file_name))
                            self.label_paths.append(label_path)
                            self.labels.append(label)

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        # Load image
        img_path = self.image_paths[idx]
        img = Image.open(img_path).convert("RGB")

        # Apply transforms to the image
        if self.transform:
            img = self.transform(img)

        # Load bounding box labels
        label_path = self.label_paths[idx]
        boxes = read_yolo_label(label_path)

        # Fetch the open/closed label
        eye_state_label = self.labels[idx]

        return img, boxes, eye_state_label

# Define dataset
drive_train_dir = './drive_image_data/train'
train_dataset = DriveYoloDataset(root_dir=drive_train_dir, transform=drive_train_transforms)

# Create DataLoader
batch_size = 32
def collate_fn(batch):
    """
    Custom collate function to handle batches with varying numbers of bounding boxes.
    """
    images, boxes, labels = zip(*batch)
    images = torch.stack(images)  # Stack images into a single tensor
    return images, boxes, torch.tensor(labels)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)

# Check one batch of data
for images, boxes, labels in train_loader:
    print("Batch of images shape:", images.shape)  # (batch_size, 3, 224, 224)
    print("Batch of bounding boxes:", boxes)  # List of tensors with bounding boxes
    print("Batch of labels (0=open, 1=closed):", labels)  # Tensor of eye state labels
    break

Batch of images shape: torch.Size([32, 3, 448, 448])
Batch of bounding boxes: (tensor([[0.0000, 0.6083, 0.4821, 0.1540, 0.1060],
        [0.0000, 0.7757, 0.5123, 0.0993, 0.1027]]), tensor([[0.0000, 0.1897, 0.6016, 0.1250, 0.0826],
        [0.0000, 0.3951, 0.5938, 0.1429, 0.0837]]), tensor([[0.0000, 0.3895, 0.6339, 0.1406, 0.0915],
        [0.0000, 0.5971, 0.6417, 0.1440, 0.0949]]), tensor([[0.0000, 0.5469, 0.5212, 0.1540, 0.1004],
        [0.0000, 0.7377, 0.5368, 0.1205, 0.0882]]), tensor([[0.0000, 0.4487, 0.4364, 0.1585, 0.1038],
        [0.0000, 0.6261, 0.4464, 0.1417, 0.0971]]), tensor([[0.0000, 0.1629, 0.6283, 0.1406, 0.0915],
        [0.0000, 0.3661, 0.6105, 0.1540, 0.0915]]), tensor([[0.0000, 0.3214, 0.7679, 0.1987, 0.1038],
        [0.0000, 0.5469, 0.7679, 0.1875, 0.1071]]), tensor([[0.0000, 0.6786, 0.2935, 0.1395, 0.0971],
        [0.0000, 0.8371, 0.3114, 0.1239, 0.0949]]), tensor([[0.0000, 0.2210, 0.6462, 0.1261, 0.0815],
        [0.0000, 0.4241, 0.6384, 0.1350, 0.0882]]), ten

## Train

In [76]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader

# YOLOv1 model initialization
model = Yolov1(split_size=7, num_boxes=2, num_classes=1).to(device)

# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "mps")



# Custom YOLO loss (you need to define this)
criterion = YoloLoss()

# Optimizer
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)

# Training function
def train_model(model, dataloader, criterion, optimizer, num_epochs=10):
    model.train()  # Set the model to training mode
    for epoch in range(num_epochs):
        running_loss = 0.0

        for images, boxes, targets in dataloader:  # Targets include bounding boxes and class info
            images, targets = images.to(device), targets.to(device)

            # Zero the gradient
            optimizer.zero_grad()

            # Forward pass
            outputs = model(images)

            # Compute loss
            loss = criterion(outputs, targets)

            # Backward pass and optimization
            loss.backward()
            optimizer.step()

            # Track running loss
            running_loss += loss.item()

        epoch_loss = running_loss / len(dataloader)
        print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {epoch_loss:.4f}")

    print("Training complete.")

# Train the model
num_epochs = 10
train_model(model, train_loader, criterion, optimizer, num_epochs=num_epochs)


RuntimeError: MPS backend out of memory (MPS allocated: 45.44 GB, other allocations: 397.33 MB, max allowed: 45.90 GB). Tried to allocate 105.12 MB on private pool. Use PYTORCH_MPS_HIGH_WATERMARK_RATIO=0.0 to disable upper limit for memory allocations (may cause system failure).