In [19]:
import os
import cv2
import json
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from PIL import Image
from torchvision import transforms
# from torchvision.models import resnet18
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.utils.data import Dataset, DataLoader, random_split

In [20]:
class ImitationDataset(Dataset):
    def __init__(self, rgb_dir, seg_dir, log_path, transforms=None):
        self.rgb_dir = rgb_dir
        self.seg_dir = seg_dir
        self.log_path = log_path
        self.transform = transforms
        
        with open(log_path, 'r') as f:
            self.log_data = json.load(f)
            
    def __len__(self):
        return len(self.log_data)
    
    def __getitem__(self, index):
        rgb_image_path = os.path.join(self.rgb_dir, f"{index:05d}.png")
        seg_image_path = os.path.join(self.seg_dir, f"{index:05d}.png")
        
        rgb_image = Image.open(rgb_image_path)
        
        if self.transform:
            rgb_tensor = self.transform(rgb_image)
        else:
            rgb_tensor = transforms.ToTensor()(rgb_image)
            
        seg_image = np.array(Image.open(seg_image_path))
        
        lane_mask = np.all(seg_image == [0, 255, 0], axis=2).astype(np.uint8)
        obs_mask = np.all(seg_image == [255, 0, 0], axis=2).astype(np.uint8)
        
        seg_tensor = torch.tensor(np.stack([lane_mask, obs_mask], axis=0), dtype=torch.float32)
        
        if seg_tensor.shape[1:] != rgb_tensor.shape[1:]:
            seg_tensor = F.interpolate(
                seg_tensor.unsqueeze(0),
                size=rgb_tensor.shape[1:],
                mode='nearest'
            ).squeeze(0)
        
        input_tensor = torch.cat([rgb_tensor, seg_tensor], dim=0)
                        
        control = self.log_data[index]
        control_tensor = torch.tensor([
            control['steer'],
            control['throttle'],
            control['brake']
        ], dtype=torch.float32)
        
        return input_tensor, control_tensor
        


In [21]:
class ImitationCNN(nn.Module):
    def __init__(self):
        super(ImitationCNN, self).__init__()
        
        self.conv1 = nn.Conv2d(5, 32, kernel_size=5, stride=2, padding=2)  
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1)  
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1) 
        self.conv4 = nn.Conv2d(128, 256, kernel_size=3, stride=2, padding=1)

        self.fc = nn.Linear(256 * 23 * 40, 512) 
        
        # Output: steer, throttle, brake
        self.steer_head = nn.Linear(512, 1)
        self.throttle_head = nn.Linear(512, 1)  
        self.brake_head = nn.Linear(512, 1)    

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = F.relu(self.conv4(x))
        
        x = x.view(x.size(0), -1)  
        x = F.relu(self.fc(x))
        
        steer = self.steer_head(x)
        throttle = self.throttle_head(x)
        brake = self.brake_head(x)
        
        return torch.cat([steer, throttle, brake], dim=1)


In [None]:
rgb_dir = 'rgb_image'
seg_dir = 'seg_image'
log_path = 'logs/logs.json'

os.makedirs('checkpoints', exist_ok=True)

dataset = ImitationDataset(rgb_dir, seg_dir, log_path, None)
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_set, val_set = torch.utils.data.random_split(dataset, [train_size, val_size])
train_loader = DataLoader(train_set, 32, shuffle=True)
val_loader = DataLoader(val_set, 32, shuffle=False)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ImitationCNN().to(device)

optimizer = optim.Adam(model.parameters(), lr=1e-4)

num_epochs = 100

best_rmse = float('inf')
train_losses = []
val_steer, val_throttle, val_brake = [], [], []

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    
    for i, (inputs, targets) in enumerate(train_loader):
        inputs = inputs.to(device)
        targets = targets.to(device)
        
        optimizer.zero_grad()
        
        outputs= model(inputs)
        loss_steer = F.mse_loss(outputs[:, 0], targets[:, 0])
        loss_throttle = F.smooth_l1_loss(outputs[:, 1], targets[:, 1])
        loss_brake = F.smooth_l1_loss(outputs[:, 2], targets[:, 2])
        
        loss = loss_steer + 2.0 * (loss_throttle + loss_brake) 
        
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
    
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}")
    model.eval()
    total_se, total_th, total_br = 0, 0, 0
    n=0
    with torch.no_grad():
        for inputs, targets in val_loader:
            inputs = inputs.to(device)
            targets = targets.to(device)
            outputs = model(inputs)
            
            total_se += F.mse_loss(outputs[:, 0], targets[:, 0], reduction='sum').item()
            total_th += F.mse_loss(outputs[:, 1], targets[:, 1], reduction='sum').item()
            total_br += F.mse_loss(outputs[:, 2], targets[:, 2], reduction='sum').item()
            n+= inputs.size(0)
            
    val_rmse = ((total_se +total_th + total_br)/(3*n)) **0.5
    
    if val_rmse < best_rmse:
        best_rmse = val_rmse
        torch.save(model.state_dict(), 'checkpoints/best_model.pth')
    
    torch.save(model.state_dict(), 'checkpoints/last_epoch.pth')
    
    val_st = (total_se/n)**0.5     
    val_th = (total_th/n)**0.5     
    val_br = (total_br/n)**0.5
    
    train_losses.append(running_loss/len(train_loader))
    val_steer.append(val_st)
    val_throttle.append(val_th)
    val_brake.append(val_br)
    
    
    print(f"[VAL] Steer MSE: {val_st:.4f}, Throttle RMSE: {val_th:.4f}, Brake RMSE: {val_br:.4f}")
    
    with open('logs/val_metrics.csv', 'a') as f:
        f.write(f"{epoch+1},{val_st:.4f},{val_th:.4f},{val_br:.4f}\n")


Epoch [1/100], Loss: 0.0408
[VAL] Steer MSE: 0.0501, Throttle RMSE: 0.0886, Brake RMSE: 0.0717
Epoch [2/100], Loss: 0.0166
[VAL] Steer MSE: 0.0433, Throttle RMSE: 0.0853, Brake RMSE: 0.0713
Epoch [3/100], Loss: 0.0132
[VAL] Steer MSE: 0.0426, Throttle RMSE: 0.0809, Brake RMSE: 0.0625
Epoch [4/100], Loss: 0.0106
[VAL] Steer MSE: 0.0402, Throttle RMSE: 0.0851, Brake RMSE: 0.0706
Epoch [5/100], Loss: 0.0081
[VAL] Steer MSE: 0.0397, Throttle RMSE: 0.0808, Brake RMSE: 0.0638
Epoch [6/100], Loss: 0.0056
[VAL] Steer MSE: 0.0403, Throttle RMSE: 0.0919, Brake RMSE: 0.0699
Epoch [7/100], Loss: 0.0040
[VAL] Steer MSE: 0.0380, Throttle RMSE: 0.0862, Brake RMSE: 0.0634
Epoch [8/100], Loss: 0.0030
[VAL] Steer MSE: 0.0380, Throttle RMSE: 0.0861, Brake RMSE: 0.0645
Epoch [9/100], Loss: 0.0021
[VAL] Steer MSE: 0.0374, Throttle RMSE: 0.0833, Brake RMSE: 0.0617
Epoch [10/100], Loss: 0.0015
[VAL] Steer MSE: 0.0378, Throttle RMSE: 0.0823, Brake RMSE: 0.0623
Epoch [11/100], Loss: 0.0012
[VAL] Steer MSE: 0.0

In [None]:
import matplotlib.pyplot as plt
plt.plot(train_losses, label='Train Loss')
plt.plot(val_steer, label='Val Steer RMSE')
plt.plot(val_throttle, label='Val Throttle RMSE')
plt.plot(val_brake, label='Val Brake RMSE')
plt.legend()
plt.savefig('training_curves.png')


In [5]:
torch.save(model.state_dict(), 'model.pth')
model.eval()  # Set the model to evaluation mode
steer_error = 0
throttle_error = 0
brake_error = 0

with torch.no_grad():  # No gradient calculation during evaluation
    for inputs, targets in train_loader:  # Using train_loader for demonstration
        inputs = inputs.to(device)
        targets = targets.to(device)
        outputs = model(inputs)

        steer_error += F.mse_loss(outputs[:, 0], targets[:, 0], reduction='sum').item()
        throttle_error += F.mse_loss(outputs[:, 1], targets[:, 1], reduction='sum').item()
        brake_error += F.mse_loss(outputs[:, 2], targets[:, 2], reduction='sum').item()

n = len(train_loader.dataset)  # Total number of samples in the dataset

print(f"Steer MSE: {steer_error/n:.4f}, Throttle MSE: {throttle_error/n:.4f}, Brake MSE: {brake_error/n:.4f}")




NameError: name 'model' is not defined

In [8]:
import json
from sklearn.model_selection import train_test_split

with open('logs/logs.json') as f:
    log = json.load(f)

train_log, val_log = train_test_split(log, test_size=0.2, random_state=42)

with open('train_log.json', 'w') as f:
    json.dump(train_log, f)
with open('val_log.json', 'w') as f:
    json.dump(val_log, f, indent=2)


In [10]:
from torch.utils.data import DataLoader
rgb_dir = 'rgb_image'
seg_dir = 'seg_image'
transform = None
# transform = transforms.Compose([
#     transforms.Resize((128, 128)),
#     transforms.ToTensor(),
# ])
train_dataset = ImitationDataset(rgb_dir, seg_dir, 'train_log.json', transform)
val_dataset = ImitationDataset(rgb_dir, seg_dir, 'val_log.json', transform)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)


In [15]:
model.eval()
steer_error = 0
throttle_error = 0
brake_error = 0

with torch.no_grad():
    for inputs, targets in val_loader:
        inputs = inputs.to(device)
        targets = targets.to(device)
        outputs = model(inputs)

        steer_error += F.mse_loss(outputs[:, 0], targets[:, 0], reduction='sum').item()
        throttle_error += F.mse_loss(outputs[:, 1], targets[:, 1], reduction='sum').item()
        brake_error += F.mse_loss(outputs[:, 2], targets[:, 2], reduction='sum').item()

n = len(val_loader.dataset)
print(f"[VAL] Steer MSE: {steer_error/n:.6f}, Throttle MSE: {throttle_error/n:.6f}, Brake MSE: {brake_error/n:.6f}")


[VAL] Steer MSE: 0.007455, Throttle MSE: 0.121155, Brake MSE: 0.211377


In [16]:
steer_rmse = (steer_error / n) ** 0.5
throttle_rmse = (throttle_error / n) ** 0.5
brake_rmse = (brake_error / n) ** 0.5

print(f"[VAL] Steer RMSE: {steer_rmse:.6f}, Throttle RMSE: {throttle_rmse:.6f}, Brake RMSE: {brake_rmse:.6f}")


[VAL] Steer RMSE: 0.086340, Throttle RMSE: 0.348074, Brake RMSE: 0.459758


In [None]:
# seg_image_path = Image.open("seg_image/00000.png")
# rgb_image_path = Image.open("rgb_image/00000.png")

# # rgb_image = cv2.cvtColor(cv2.imread(rgb_image_path), cv2.COLOR_BGR2RGB)
# # rgb_image = Image.fromarray(rgb_image)


# # rgb_tensor = transforms.ToTensor()(rgb_image)
# rgb_image = np.array(rgb_image_path)

# # Convert RGB image to RGB format (if needed)
# rgb_image = cv2.cvtColor(rgb_image, cv2.COLOR_BGR2RGB)

# # Convert the numpy array to a PIL Image (if needed)
# rgb_image = Image.fromarray(rgb_image)

# # Convert RGB image to tensor
# rgb_tensor = transforms.ToTensor()(rgb_image)

# seg_image = np.array(seg_image_path)
# lane_mask = np.all(seg_image == [0, 255, 0], axis=2).astype(np.uint8)
# obs_mask = np.all(seg_image == [255, 0, 0], axis=2).astype(np.uint8)
# seg_tensor = torch.tensor(np.stack([lane_mask, obs_mask], axis=0), dtype=torch.float32)

# if seg_tensor.shape[1:] != rgb_tensor.shape[1:]:
#     seg_tensor = F.interpolate(
#         seg_tensor.unsqueeze(0),
#         size=rgb_tensor.shape[1:],
#         mode='nearest'
#     ).squeeze(0)

# input_tensor = torch.cat([rgb_tensor, seg_tensor], dim=0)