In [None]:

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader, random_split

import torch.nn.functional as F
import torch.optim as optim
#from torch2trt import torch2trt, TRTModule
#import tensorrt as trt
import PIL.Image
import glob
import os
import copy
import time
import matplotlib.pyplot as plt
from sklearn.metrics import mean_absolute_error
import numpy as np
from torch.optim.lr_scheduler import ReduceLROnPlateau
from tqdm import tqdm
import pandas as pd



In [None]:


class RoadDataset(Dataset):
    def __init__(self, directory, mode='train'):
        self.directory = directory
        self.mode = mode
        self.image_size = 224
        self.samples = []
        
        if not os.path.isdir(directory):
            raise ValueError(f"Catalogue does not exist: {directory}")
        
        for filename in os.listdir(directory):
            if filename.endswith(('.jpg', '.png')):
                parts = filename.split('_')
                if len(parts) < 3:
                    print(f"Skip file {filename}: ")
                    continue
                try:
                    x = int(parts[1])
                    y = int(parts[2].split('_')[0])  # 
                    
                    x_normalized = (x - (self.image_size / 2)) / (self.image_size / 2)
                    y_normalized = (y - (self.image_size / 2)) / (self.image_size / 2)
                    
                    self.samples.append((
                        os.path.join(directory, filename),
                        (x_normalized, y_normalized)
                    ))
                except ValueError as e:
                    print(f"error  {filename} : {str(e)}")
                    continue
        
        if len(self.samples) == 0:
            raise ValueError(f"{directory}No valid images found")
        
        print(f"Valid images founded {len(self.samples)} ")
        self._check_distribution()
        
        self.transform = self._get_transform()
                
    def _get_transform(self):
        base_transforms = [
            transforms.Resize((self.image_size, self.image_size)),
            transforms.ToTensor(),
            transforms.Normalize(
                [0.485, 0.456, 0.406],
                [0.229, 0.224, 0.225]
            )
        ]
        if self.mode == 'train':
            augmentations = [
                transforms.ColorJitter(
                    brightness=0.2, 
                    contrast=0.2, 
                    saturation=0.2, 
                    hue=0.1
                ),
                transforms.GaussianBlur(3, sigma=(0.1, 0.2))
            ]
            return transforms.Compose(augmentations + base_transforms)
        else:
            return transforms.Compose(base_transforms)
                
    def _check_distribution(self):
        x_coords = [coords[0] for _, coords in self.samples]
        y_coords = [coords[1] for _, coords in self.samples]
        
        print("\nData distribution statistics:")
        print(f"X range: [{min(x_coords):.2f}, {max(x_coords):.2f}]")
        print(f"Y range: [{min(y_coords):.2f}, {max(y_coords):.2f}]")
        print(f"X Mean: {sum(x_coords)/len(x_coords):.2f}")
        print(f"Y Mean: {sum(y_coords)/len(y_coords):.2f}")
        
        num_left = sum(1 for x in x_coords if x < 0)
        num_right = sum(1 for x in x_coords if x > 0)
        print(f"\ndistribution:")
        print(f"Number of samples on the left side: {num_left}")
        print(f"Number of samples on the right side: {num_right}")
        if abs(num_left - num_right) > len(x_coords) * 0.2: 
            print("There may be a left-right imbalance in the data")
                
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        img_path, (x, y) = self.samples[idx]

        if not os.path.exists(img_path):
            raise FileNotFoundError(f"File does not exist: {img_path}")
        
        image = PIL.Image.open(img_path).convert('RGB')
        
        if image.size[0] == 0 or image.size[1] == 0:
            raise ValueError(f"Invalid image size: {img_path}")
        
        if self.transform:
            image = self.transform(image)
        
        label = torch.tensor([x, y], dtype=torch.float32)
        
        return image, label


### Split dataset into train and test sets
Once we read dataset, we will split data set in train and test sets. In this example we split train and test a 90%-10%. The test set will be used to verify the accuracy of the model we train.

In [None]:
dataset = RoadDataset('road_following/dataset_xy')

test_percent = 0.1
num_test = int(test_percent * len(dataset))
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [len(dataset) - num_test, num_test])

In [None]:


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Use of cuda: {device}')

best_batch_size = 8
best_lr = 0.0011029876079911261
best_weight_decay = 0.0003940097627656374
NUM_EPOCHS = 30
BEST_MODEL_PATH = 'best_road_model.pth'


data_directory = 'road_following/dataset_xy'  

train_dataset = RoadDataset(directory=data_directory, mode='train')
test_dataset = RoadDataset(directory='road_following/val', mode='test')

train_loader = DataLoader(
    train_dataset,
    batch_size=best_batch_size,
    shuffle=True,
    num_workers=0
)

test_loader = DataLoader(
    test_dataset,
    batch_size=best_batch_size,
    shuffle=False,
    num_workers=0
)

model = models.resnet34(weights=models.ResNet34_Weights.DEFAULT)
model.fc = torch.nn.Linear(512, 2)
model = model.to(device)

optimizer = optim.Adam(model.parameters(), lr=best_lr, weight_decay=best_weight_decay)

criterion = torch.nn.MSELoss()

best_loss = float('inf')
train_losses = []
test_losses = []

for epoch in range(NUM_EPOCHS):
    print(f'Epoch {epoch + 1}/{NUM_EPOCHS}')
    print('-' * 10)

    model.train()
    running_loss = 0.0
    for inputs, targets in tqdm(train_loader, desc='训练'):
        inputs = inputs.to(device)
        targets = targets.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * inputs.size(0)
    epoch_loss = running_loss / len(train_dataset)
    train_losses.append(epoch_loss)
    print(f'loss: {epoch_loss:.4f}')

    model.eval()
    running_loss = 0.0
    with torch.no_grad():
        for inputs, targets in tqdm(test_loader, desc='validate'):
            inputs = inputs.to(device)
            targets = targets.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            running_loss += loss.item() * inputs.size(0)
    epoch_loss = running_loss / len(test_dataset)
    test_losses.append(epoch_loss)
    print(f'validation loss: {epoch_loss:.4f}')


    if epoch_loss < best_loss:
        best_loss = epoch_loss
        torch.save(model.state_dict(), BEST_MODEL_PATH)
        print(f'The new best model is saved in {BEST_MODEL_PATH} validation loss: {best_loss:.4f}')

print('Done')


In [None]:

test_percent = 0.1
num_test = int(test_percent * len(dataset))
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [len(dataset) - num_test, num_test])

def objective(trial):

   
    batch_size = trial.suggest_categorical('batch_size', [8, 16, 32, 64])
    lr = trial.suggest_loguniform('lr', 1e-5, 1e-1)
    weight_decay = trial.suggest_loguniform('weight_decay', 1e-5, 1e-1)

   
    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=0
    )

    test_loader = torch.utils.data.DataLoader(
        test_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=0
    )

    model = models.resnet34(pretrained=True)
    model.fc = torch.nn.Linear(512, 2)
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)

    optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)

    NUM_EPOCHS = 10  
    BEST_MODEL_PATH = f'best_model_trial_{trial.number}.pth'
    best_loss = float('inf')


    train_losses = []
    test_losses = []

    for epoch in range(NUM_EPOCHS):
        model.train()
        train_loss = 0.0
        for images, labels in iter(train_loader):
            images = images.to(device)
            labels = labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = F.mse_loss(outputs, labels)
            train_loss += float(loss)
            loss.backward()
            optimizer.step()
        train_loss /= len(train_loader)
        train_losses.append(train_loss)

        model.eval()
        test_loss = 0.0
        with torch.no_grad():
            for images, labels in iter(test_loader):
                images = images.to(device)
                labels = labels.to(device)
                outputs = model(images)
                loss = F.mse_loss(outputs, labels)
                test_loss += float(loss)
        test_loss /= len(test_loader)
        test_losses.append(test_loss)

        print(f'Trial {trial.number}, Epoch {epoch}, Train Loss: {train_loss}, Test Loss: {test_loss}')

        if test_loss < best_loss:
            best_loss = test_loss
            torch.save(model.state_dict(), BEST_MODEL_PATH)

     
        trial.report(test_loss, epoch)
        if trial.should_prune():
            raise optuna.exceptions.TrialPruned()

    return best_loss


study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=50)

print('best trial:')
trial = study.best_trial

print('  Minimum loss value: {}'.format(trial.value))
print('  Optimal hyperparameters: ')
for key, value in trial.params.items():
    print('    {}: {}'.format(key, value))


In [None]:
class RoadFollowingModel(nn.Module):
    def __init__(self, num_outputs=2):
        super(RoadFollowingModel, self).__init__()
        
        self.model = models.resnet34(pretrained=True)
        
        # 凍結前面層，只訓練最後一層
        for param in self.model.layer1.parameters():
            param.requires_grad = False
        for param in self.model.layer2.parameters():
            param.requires_grad = False

        num_ftrs = self.model.fc.in_features
        self.model.fc = nn.Sequential(
            nn.Linear(num_ftrs, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, 256),
            nn.BatchNorm1d(256), 
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, num_outputs)
        )
        
    def forward(self, x):
        return self.model(x)
model = RoadFollowingModel()

ResNet model has fully connect (fc) final layer with 512 as ``in_features`` and we will be training for regression thus ``out_features`` as 1

Finally, we transfer our model for execution on the GPU

In [None]:
model.fc = torch.nn.Linear(512, 2)
device = torch.device('cuda')
model = model.to(device)

In [None]:
def train_model(model, train_loader, val_loader, num_epochs=50, device='cuda'):
    print("開始訓練模型...")
    
    scaler = GradScaler()
    criterion = nn.MSELoss()
    
    optimizer = optim.AdamW([
        {'params': model.model.layer3.parameters(), 'lr': 1e-4},
        {'params': model.model.layer4.parameters(), 'lr': 2e-4},
        {'params': model.model.fc.parameters(), 'lr': 5e-4}
    ], weight_decay=0.01)
    
    scheduler = ReduceLROnPlateau(
        optimizer, 
        mode='min', 
        factor=0.5,
        patience=3,
        min_lr=1e-6, 
        verbose=True
    )
    
    train_losses = []
    val_losses = []
    train_mae = []
    val_mae = []
    
    best_model_wts = copy.deepcopy(model.state_dict())
    best_loss = float('inf')
    
    os.makedirs('checkpoints', exist_ok=True)
    
    early_stopping_patience = 10
    early_stopping_counter = 0
    
    if torch.cuda.is_available():
        print(f"使用 GPU: {torch.cuda.get_device_name(0)}")
    
    for epoch in range(num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}')
        print('-' * 10)
        
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
        
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
                current_loader = train_loader  
            else:
                model.eval()
                current_loader = val_loader 
            
            running_loss = 0.0
            all_targets = []
            all_outputs = []

            with tqdm(current_loader, desc=phase) as pbar: 
                for inputs, targets in pbar:
                    inputs = inputs.to(device)
                    targets = targets.to(device)
                    
                    optimizer.zero_grad()
                    
                    with torch.cuda.amp.autocast():
                        with torch.set_grad_enabled(phase == 'train'):
                            outputs = model(inputs)
                            loss = criterion(outputs, targets)
                            
                            if phase == 'train':
                                scaler.scale(loss).backward()
                                torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
                                scaler.step(optimizer)
                                scaler.update()
                    
                    all_outputs.append(outputs.detach().cpu())
                    all_targets.append(targets.detach().cpu())
                    running_loss += loss.item() * inputs.size(0)
                    
                    pbar.set_postfix({'loss': f'{loss.item():.4f}'})
            
            epoch_loss = running_loss / len(current_loader.dataset)  
            all_outputs = torch.cat(all_outputs).numpy()
            all_targets = torch.cat(all_targets).numpy()
            epoch_mae = mean_absolute_error(all_targets, all_outputs)
            
            print(f'{phase} Loss: {epoch_loss:.4f} MAE: {epoch_mae:.4f}')
            
            if phase == 'train':
                train_losses.append(epoch_loss)
                train_mae.append(epoch_mae)
            else:
                val_losses.append(epoch_loss)
                val_mae.append(epoch_mae)
                
                scheduler.step(epoch_loss)
                
                if epoch_loss < best_loss:
                    best_loss = epoch_loss
                    best_model_wts = copy.deepcopy(model.state_dict())
                    early_stopping_counter = 0
                    
                    torch.save({
                        'epoch': epoch,
                        'model_state_dict': best_model_wts,
                        'optimizer_state_dict': optimizer.state_dict(),
                        'scheduler_state_dict': scheduler.state_dict(),
                        'loss': best_loss,
                        'train_losses': train_losses,
                        'val_losses': val_losses,
                        'train_mae': train_mae,
                        'val_mae': val_mae,
                        'scaler_state_dict': scaler.state_dict()
                    }, 'best_steering_model_ResNet34_xy.pth')
                else:
                    early_stopping_counter += 1
                    
                if early_stopping_counter >= early_stopping_patience:
                    break
                    
        if (epoch + 1) % 5 == 0:
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'scheduler_state_dict': scheduler.state_dict(),
                'loss': epoch_loss,
                'train_losses': train_losses,
                'val_losses': val_losses,
                'train_mae': train_mae,
                'val_mae': val_mae,
                'scaler_state_dict': scaler.state_dict()
            }, f'checkpoints/checkpoint_epoch_{epoch+1}.pth')
            
            plot_training_results(
                train_losses, 
                val_losses, 
                train_mae, 
                val_mae,
                save_path=f'checkpoints/training_curves_epoch_{epoch+1}.png'
            )
        
        current_lr = optimizer.param_groups[0]['lr']
        print(f'Current learning rate: {current_lr:.2e}')
        print()
        
        if early_stopping_counter >= early_stopping_patience:
            break
    
    model.load_state_dict(best_model_wts)
    
    np.save('checkpoints/training_history.npy', {
        'train_losses': train_losses,
        'val_losses': val_losses,
        'train_mae': train_mae,
        'val_mae': val_mae,
        'final_learning_rate': current_lr,
        'best_loss': best_loss
    })
    
    return model, train_losses, val_losses, train_mae, val_mae

In [None]:
model, train_losses, val_losses, train_mae, val_mae = train_model(
    model=model,
    train_loader=train_loader,
    val_loader=test_loader,
    num_epochs=50,
    device=device
)

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 5))
plt.plot(range(NUM_EPOCHS), train_losses, label='Training Loss', color='blue')
plt.plot(range(NUM_EPOCHS), test_losses, label='Testing Loss', color='orange')
plt.title('Training and Testing Loss Over Epochs')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.grid()
plt.show()