In [180]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
import torchvision.models as models 
import albumentations as A
from albumentations.pytorch import ToTensorV2
import cv2
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import sys
from tqdm.notebook import tqdm
from PIL import Image

# print('System Version:', sys.version)
# print('PyTorch version', torch.__version__)
# print('Torchvision version', torchvision.__version__)
# print('Numpy version', np.__version__)
# print('Pandas version', pd.__version__)

In [181]:
config = {
    "lr": 1e-3,
    "epochs": 20,
    "image_size": (128, 128),
    "patience": 8, 
    "factor": 0.4

}

In [182]:
def get_train_transform():
    return A.Compose([
        A.Resize(height=128, width=128),
        A.HorizontalFlip(p=0.5),
        A.ShiftScaleRotate(shift_limit=0.0625, scale_limit=0.5, rotate_limit=10, p=1.0),
        A.Perspective(scale=0.0001, p=0.5),
        A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=0.5),
        A.HueSaturationValue(hue_shift_limit=20, sat_shift_limit=30, val_shift_limit=20, p=0.5),
        ToTensorV2()
    ])

In [183]:
def get_valid_transform():
    return A.Compose([
        A.Resize(height=128, width=128),
        ToTensorV2()
    ])

In [184]:
class HeadlightDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data = ImageFolder(data_dir)
        self.transform = transform
    def __len__(self):
        return len(self.data)
    def __getitem__(self, idx):
        img_path, label = self.data.imgs[idx]
        image = Image.open(img_path).convert('RGB')

        if self.transform:
            image = self.transform(image)
        
        return image, label
    
    @property
    def classes(self):
        return self.data.classes

In [185]:
dataset = HeadlightDataset(
    data_dir='./dataset/train'
)

In [186]:
#len(dataset)

In [187]:
# image, label = dataset[600]
# print(label)
# image

In [188]:
data_dir = './dataset/train'
target_to_class = {v: k for k, v in ImageFolder(data_dir).class_to_idx.items()}
print(target_to_class)

{0: 'off', 1: 'on'}


In [189]:
transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

data_dir = './dataset/train'
dataset = HeadlightDataset(data_dir, transform)

In [190]:
dataloader = DataLoader(dataset, batch_size=16, shuffle=True)

In [191]:
# #mobilenet
class HeadlightClassifier(nn.Module):
    def __init__(self, num_classes=2):
        super(HeadlightClassifier, self).__init__()
        self.base_model = models.mobilenet_v3_large(pretrained=True)

        for param in self.base_model.parameters():
            param.requires_grad = False
        
        for param in self.base_model.features[-1].parameters():
            param.requires_grad = True

        in_features = self.base_model.classifier[0].in_features
        self.base_model.classifier = nn.Identity()

        self.additional_layers = nn.Sequential(
            nn.Conv2d(in_features, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.Conv2d(512, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d((1, 1))
        )

        self.fc = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(256, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes),
        )

    def forward(self, x):
        x = self.base_model.features(x)
        x = self.additional_layers(x)
        x = x.mean([2, 3])
        x = self.fc(x)
        return x

In [192]:
# #efficientnet
# import timm

# class HeadlightClassifier(nn.Module):
#     def __init__(self, num_classes=2):
#         super(HeadlightClassifier, self).__init__()
#         self.base_model = timm.create_model('efficientnetv2_rw_s', pretrained=True, num_classes=0)

#         for param in self.base_model.parameters():
#             param.requires_grad = False
        
#         for param in self.base_model.blocks[-1].parameters():
#             param.requires_grad = True

#         in_features = self.base_model.num_features
#         self.additional_layers = nn.Sequential(
#             nn.Conv2d(in_features, 512, kernel_size=3, padding=1),
#             nn.BatchNorm2d(512),
#             nn.ReLU(),
#             nn.Conv2d(512, 256, kernel_size=3, padding=1),
#             nn.BatchNorm2d(256),
#             nn.ReLU(),
#             nn.AdaptiveAvgPool2d((1, 1))
#         )

#         self.fc = nn.Sequential(
#             nn.Dropout(0.5),
#             nn.Linear(256, 512),
#             nn.ReLU(),
#             nn.Dropout(0.5),
#             nn.Linear(512, num_classes),
#         )

#     def forward(self, x):
#         x = self.base_model.forward_features(x)
#         x = self.additional_layers(x)
#         x = torch.flatten(x, 1)
#         x = self.fc(x)
#         return x

In [193]:
# #resnet
# class HeadlightClassifier(nn.Module):
#     def __init__(self, num_classes=2):
#         super(HeadlightClassifier, self).__init__()
#         self.base_model = models.resnet18(pretrained=True)

#         for param in self.base_model.parameters():
#             param.requires_grad = False
        
#         for param in self.base_model.layer4.parameters():
#             param.requires_grad = True

#         in_features = self.base_model.fc.in_features
#         self.base_model.fc = nn.Identity()

#         self.fc = nn.Sequential(
#             nn.Linear(in_features, 512),
#             nn.ReLU(),
#             nn.Dropout(0.5),
#             nn.Linear(512, num_classes),
#         )

#     def forward(self, x):
#         x = self.base_model(x)
#         x = torch.flatten(x, 1)
#         x = self.fc(x)
#         return x

In [194]:
transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

train_folder = './dataset/train'
valid_folder = './dataset/val'
#test_folder = '../input/cards-image-datasetclassification/test/'

train_transform = get_train_transform()
valid_transform = get_valid_transform()

train_dataset = HeadlightDataset(train_folder, transform=transform)
val_dataset = HeadlightDataset(valid_folder, transform=transform)
#test_dataset = HeadlightDataset(test_folder, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)
#test_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

In [None]:
#training loop
num_epochs = 20
train_losses, val_losses = [], []

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

model = HeadlightClassifier(num_classes=2)
model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=8, factor=0.4)

for epoch in range(num_epochs):
    # Training phase
    model.train()
    running_loss = 0.0
    for images, labels in tqdm(train_loader, desc='Training loop'):
        images, labels = images.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * labels.size(0)
    train_loss = running_loss / len(train_loader.dataset)
    train_losses.append(train_loss)
    
    # Validation phase
    model.eval()
    running_loss = 0.0
    with torch.no_grad():
        for images, labels in tqdm(val_loader, desc='Validation loop'):
            images, labels = images.to(device), labels.to(device)
         
            outputs = model(images)
            loss = criterion(outputs, labels)
            running_loss += loss.item() * labels.size(0)
    val_loss = running_loss / len(val_loader.dataset)
    val_losses.append(val_loss)
    print(f"Epoch {epoch+1}/{num_epochs} - Train loss: {train_loss}, Validation loss: {val_loss}")

In [None]:
plt.plot(train_losses, label='Training loss')
plt.plot(val_losses, label='Validation loss')
plt.legend()
plt.title("Loss over epochs")
plt.show()

In [None]:
import torch
import torchvision.transforms as transforms
from PIL import Image
import matplotlib.pyplot as plt
import numpy as np

def preprocess_image(image_path, transform):
    image = Image.open(image_path).convert("RGB")
    return image, transform(image).unsqueeze(0)

def predict(model, image_tensor, device):
    model.eval()
    with torch.no_grad():
        image_tensor = image_tensor.to(device)
        # print(torch.max(image_tensor))
        # print(torch.min(image_tensor))
        # print(image_tensor.shape)
        outputs = model(image_tensor)
        probabilities = torch.nn.functional.softmax(outputs, dim=1)
    return probabilities.cpu().numpy().flatten()

def visualize_predictions(original_image, probabilities, class_names):
    fig, axarr = plt.subplots(1, 2, figsize=(14, 7))

    axarr[0].imshow(original_image)
    axarr[0].axis("off")

    axarr[1].barh(class_names, probabilities)
    axarr[1].set_xlabel("Probability")
    axarr[1].set_title("Class Predictions")
    axarr[1].set_xlim(0, 1)

    plt.tight_layout()
    plt.show()

test_image = "./dataset/test/on/frame_890_car_357.jpg"
transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

original_image, image_tensor = preprocess_image(test_image, transform)
probabilities = predict(model, image_tensor, device)

class_names = dataset.classes 

print(class_names)
print(probabilities)

visualize_predictions(original_image, probabilities, class_names)

In [None]:
from glob import glob
test_images = glob('./dataset/test/*/*')
test_examples = np.random.choice(test_images, 20)

for example in test_examples:
    original_image, image_tensor = preprocess_image(example, transform)
    probabilities = predict(model, image_tensor, device)
    class_names = dataset.classes
    #print(probabilities)
    visualize_predictions(original_image, probabilities, class_names)

In [199]:
# #Saving img, graphs
# import os
# import numpy as np
# from glob import glob
# import matplotlib.pyplot as plt
# from PIL import Image
# import torch
# from torchvision import transforms

# output_dir = './predictions/resnet_graph'  # change name of output directory
# os.makedirs(output_dir, exist_ok=True)

# test_images = glob('./dataset/test/*/*')

# for example in test_images:
#     original_image, image_tensor = preprocess_image(example, transform)
#     probabilities = predict(model, image_tensor, device)
#     print(probabilities)
#     image_filename = os.path.basename(example)
#     save_path = os.path.join(output_dir, f"{image_filename.split('.')[0]}_prediction.png")

#     fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
    
#     # Display the image
#     ax1.imshow(original_image)
#     ax1.axis('off')
    
#     # Display the probabilities bar chart
#     ax2.barh(class_names, probabilities)
#     ax2.set_xlim(0, 1)
#     ax2.set_xlabel('Probability')
#     ax2.set_title('Class Predictions')

#     plt.savefig(save_path)
#     plt.close()


In [200]:
# #checkpoints
# import os

# checkpoint_dir = 'checkpoints'
# checkpoint_file = os.path.join(checkpoint_dir, 'mobilenet_checkpoint.pth')  #change name of checkpoint
# os.makedirs(checkpoint_dir, exist_ok=True)

# torch.save({
#     'epoch': epoch + 1,
#     'model_state_dict': model.state_dict(),
#     'optimizer_state_dict': optimizer.state_dict(),
#     'train_loss': train_losses,
#     'val_loss': val_losses
# }, checkpoint_file)

# print(f'Model checkpoint saved to {checkpoint_file}')

In [201]:
#Checking Accuracy
import os
import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from datetime import datetime

validation_dataset = datasets.ImageFolder(root='./dataset/val', transform=transform)
test_dataset = datasets.ImageFolder(root='./dataset/test', transform=transform)

validation_loader = DataLoader(validation_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

def evaluate_model(model, dataloader, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in dataloader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    accuracy = correct / total
    return accuracy

model_name = model.__class__.__name__ #not working, remove

validation_accuracy = evaluate_model(model, validation_loader, device)
test_accuracy = evaluate_model(model, test_loader, device)
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')


results = f"## Model: {model_name}\n"
results += f"**Date:** {current_time}\n\n"
results += f"**Validation Accuracy:** {validation_accuracy * 100:.2f}%\n\n"
results += f"**Test Accuracy:** {test_accuracy * 100:.2f}%\n\n\n\n"
output_path = './accuracy_results_check.md'


with open(output_path, 'a') as f:
    f.write(results)

print(f"Validation Accuracy: {validation_accuracy * 100:.2f}%")
print(f"Test Accuracy: {test_accuracy * 100:.2f}%")
print(f"Results saved to {output_path}")


Validation Accuracy: 81.93%
Test Accuracy: 76.74%
Results saved to ./accuracy_results_check.md
