In [1]:
import os
import cv2
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as transforms
from torch.optim.lr_scheduler import StepLR
import optuna

seed = 42
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = True
# Constants
IMG_SIZE = 64  # Resize all images to 64x64 pixels


# Load images and extract Y channel
def load_images_and_labels(image_dir, label):
    images = []
    labels = []
    for filename in os.listdir(image_dir):
        if filename.endswith(".jpg"):
            img_path = os.path.join(image_dir, filename)
            image = cv2.imread(img_path)
            if image is not None:
                # Resize image to IMG_SIZE x IMG_SIZE
                image = cv2.resize(image, (IMG_SIZE, IMG_SIZE))
                ycbcr_image = cv2.cvtColor(image, cv2.COLOR_BGR2YCrCb)
                Y, _, _ = cv2.split(ycbcr_image)
                images.append(Y)
                labels.append(label)
    return images, labels


# Directories
heavy_traffic_dir = 'Heavy_Traffic'
light_traffic_dir = 'Light_Traffic'
moderate_traffic_dir = 'Moderate_Traffic'
no_traffic_dir = 'No_Traffic'


# Load images and labels
heavy_images, heavy_labels = load_images_and_labels(heavy_traffic_dir, 3)
light_images, light_labels = load_images_and_labels(light_traffic_dir, 1)
moderate_images, moderate_labels = load_images_and_labels(moderate_traffic_dir, 2)
no_images, no_labels = load_images_and_labels(no_traffic_dir, 0)


# Combine and preprocess data
images = np.array(heavy_images + light_images + moderate_images + no_images, dtype=np.float32)
labels = np.array(heavy_labels + light_labels + moderate_labels + no_labels)


# Normalize the images
images = images / 255.0
images = images[:, np.newaxis, :, :]  # Add channel dimension


# Convert labels to integers
label_encoder = LabelEncoder()
labels = label_encoder.fit_transform(labels)


# Split data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.2, random_state=42)


# Data augmentation and normalization
train_transforms = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])


test_transforms = transforms.Compose([
    transforms.ToPILImage(),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])


class TrafficDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform
    
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image.squeeze())  # Remove channel dimension for transformation
        return image, label


# Create data loaders
train_dataset = TrafficDataset(X_train, y_train, transform=train_transforms)
test_dataset = TrafficDataset(X_test, y_test, transform=test_transforms)



def create_model(trial):
    class CNNModel(nn.Module):
        def __init__(self):
            super(CNNModel, self).__init__()
            self.conv1 = nn.Conv2d(1, trial.suggest_int('conv1_out_channels', 16, 64), kernel_size=3, padding=1)
            self.conv2 = nn.Conv2d(trial.suggest_int('conv1_out_channels', 16, 64), trial.suggest_int('conv2_out_channels', 32, 128), kernel_size=3, padding=1)
            self.conv3 = nn.Conv2d(trial.suggest_int('conv2_out_channels', 32, 128), trial.suggest_int('conv3_out_channels', 64, 256), kernel_size=3, padding=1)
            self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
            self.dropout = nn.Dropout(trial.suggest_float('dropout', 0.2, 0.7))
            self.fc1 = nn.Linear(trial.suggest_int('conv3_out_channels', 64, 256) * (IMG_SIZE // 8) * (IMG_SIZE // 8), trial.suggest_int('fc1_units', 128, 512))
            self.fc2 = nn.Linear(trial.suggest_int('fc1_units', 128, 512), 4)
        
        def forward(self, x):
            x = self.pool(torch.relu(self.conv1(x)))
            x = self.pool(torch.relu(self.conv2(x)))
            x = self.pool(torch.relu(self.conv3(x)))
            x = x.view(-1, trial.suggest_int('conv3_out_channels', 64, 256) * (IMG_SIZE // 8) * (IMG_SIZE // 8))
            x = torch.relu(self.fc1(x))
            x = self.dropout(x)
            x = self.fc2(x)
            return x


    return CNNModel()



def objective(trial):
    # Create the model with trial suggestions
    model = create_model(trial).float()
    model.to('cuda')
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=trial.suggest_loguniform('lr', 1e-5, 1e-3), weight_decay=trial.suggest_loguniform('weight_decay', 1e-6, 1e-4))
    scheduler = StepLR(optimizer, step_size=trial.suggest_int('step_size', 5, 15), gamma=trial.suggest_float('gamma', 0.1, 0.9))
    
    # Training the model with early stopping
    num_epochs = 50  # Use a lower number of epochs for faster optimization
    patience = 10
    best_val_loss = float('inf')
    patience_counter = 0
    
    train_loader = DataLoader(train_dataset, batch_size=trial.suggest_int('batch_size', 32, 128), shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=trial.suggest_int('batch_size', 32, 128), shuffle=False)
    
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for images, labels in train_loader:
            images = images.cuda() 
            labels = labels.cuda()
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        avg_train_loss = running_loss / len(train_loader)
        
        # Evaluate on validation set
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        with torch.no_grad():
            for images, labels in test_loader:
                images = images.cuda() 
                labels = labels.cuda()
                outputs = model(images)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        
        avg_val_loss = val_loss / len(test_loader)
        accuracy = correct / total * 100
        
        # Check for early stopping
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            patience_counter = 0
        else:
            patience_counter += 1
            if patience_counter >= patience:
                break
        
        scheduler.step()
    
    return accuracy


study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=4)


print('Best trial:')
trial = study.best_trial
print(f'  Value: {trial.value}')
print('  Params: ')
for key, value in trial.params.items():
    print(f'    {key}: {value}')


# Train the best model on the full dataset
best_model = create_model(trial).float()
best_model.to('cuda')
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(best_model.parameters(), lr=trial.params['lr'], weight_decay=trial.params['weight_decay'])
scheduler = StepLR(optimizer, step_size=trial.params['step_size'], gamma=trial.params['gamma'])


train_loader = DataLoader(train_dataset, batch_size=trial.params['batch_size'], shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=trial.params['batch_size'], shuffle=False)


# Train the model with the best hyperparameters
num_epochs = 200
patience = 20
best_val_loss = float('inf')
patience_counter = 0


for epoch in range(num_epochs):
    best_model.train()
    running_loss = 0.0
    for images, labels in train_loader:
        images = images.cuda() 
        labels = labels.cuda()
        optimizer.zero_grad()
        outputs = best_model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    
    avg_train_loss = running_loss / len(train_loader)
    
    # Evaluate on validation set
    best_model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in test_loader:
            images = images.cuda() 
            labels = labels.cuda()
            outputs = best_model(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    avg_val_loss = val_loss / len(test_loader)
    accuracy = correct / total * 100
    
    print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}, Accuracy: {accuracy:.2f}%')
    
    # Check for early stopping
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        patience_counter = 0
        torch.save(best_model.state_dict(), 'model.pth')
    else:
        patience_counter += 1
        if patience_counter >= patience:
            print(f"Early stopping at epoch {epoch+1}")
            break
    
    scheduler.step()

[I 2024-08-14 15:25:32,789] A new study created in memory with name: no-name-3edc30d0-2827-4cf6-8f79-3dc919dc4a13
  optimizer = optim.Adam(model.parameters(), lr=trial.suggest_loguniform('lr', 1e-5, 1e-3), weight_decay=trial.suggest_loguniform('weight_decay', 1e-6, 1e-4))
[I 2024-08-14 15:25:54,334] Trial 0 finished with value: 57.0945945945946 and parameters: {'conv1_out_channels': 16, 'conv2_out_channels': 63, 'conv3_out_channels': 104, 'dropout': 0.39639874736170944, 'fc1_units': 302, 'lr': 0.0003804158783009216, 'weight_decay': 3.644444827141247e-06, 'step_size': 15, 'gamma': 0.8961212415687746, 'batch_size': 115}. Best is trial 0 with value: 57.0945945945946.
[I 2024-08-14 15:26:36,082] Trial 1 finished with value: 59.797297297297305 and parameters: {'conv1_out_channels': 16, 'conv2_out_channels': 101, 'conv3_out_channels': 203, 'dropout': 0.5328428975143074, 'fc1_units': 455, 'lr': 0.00021121999261506532, 'weight_decay': 2.8547676815599713e-05, 'step_size': 6, 'gamma': 0.85941879

Best trial:
  Value: 60.13513513513513
  Params: 
    conv1_out_channels: 21
    conv2_out_channels: 61
    conv3_out_channels: 81
    dropout: 0.4633444097414257
    fc1_units: 165
    lr: 0.0007126942257014508
    weight_decay: 1.631944558398173e-06
    step_size: 7
    gamma: 0.7553860490108643
    batch_size: 117
Epoch 1/200, Train Loss: 1.3647, Val Loss: 1.3300, Accuracy: 28.04%
Epoch 2/200, Train Loss: 1.3296, Val Loss: 1.2970, Accuracy: 41.22%
Epoch 3/200, Train Loss: 1.2755, Val Loss: 1.2620, Accuracy: 40.20%
Epoch 4/200, Train Loss: 1.2768, Val Loss: 1.2354, Accuracy: 44.26%
Epoch 5/200, Train Loss: 1.2536, Val Loss: 1.2041, Accuracy: 42.91%
Epoch 6/200, Train Loss: 1.2241, Val Loss: 1.1873, Accuracy: 44.59%
Epoch 7/200, Train Loss: 1.1805, Val Loss: 1.1554, Accuracy: 45.27%
Epoch 8/200, Train Loss: 1.1464, Val Loss: 1.1216, Accuracy: 46.96%
Epoch 9/200, Train Loss: 1.1561, Val Loss: 1.1086, Accuracy: 52.36%
Epoch 10/200, Train Loss: 1.1302, Val Loss: 1.1006, Accuracy: 53.04%
