### Task

In [1]:
import os
import torch
import torchvision.transforms as transforms
from xy_dataset_copy import XYDataset
from torch.utils.data import DataLoader, random_split
import torchvision

# Path to your dataset folder
DATA_DIRECTORY = '/home/af/sd/final_record_2'

# Define transforms
TRANSFORMS = transforms.Compose([
    transforms.ColorJitter(0.2, 0.2, 0.2, 0.2),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

In [2]:
# Initialize dataset
dataset = XYDataset(DATA_DIRECTORY, transform=TRANSFORMS, random_hflip=True)

# Split into train, validation, and test sets
train_size = int(0.8 * len(dataset))
val_size = int(0.1 * len(dataset))
test_size = len(dataset) - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

# Initialize DataLoaders
BATCH_SIZE = 8
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

### Model

In [4]:
# Model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
output_dim = 3  # Three classes: -28, 0, 28 degrees

# model = torchvision.models.resnet18(pretrained=True)
# model.fc = torch.nn.Linear(512, output_dim)
# model = model.to(device)

# model = torchvision.models.resnet34(pretrained=True)
# model.fc = torch.nn.Linear(512, output_dim)
# model = model.to(device)

# model = torchvision.models.resnet50(pretrained=True)
# model.fc = torch.nn.Linear(2048, output_dim)  # ResNet50 has a final fully connected layer with 2048 input features
# model = model.to(device)

model = torchvision.models.densenet121(pretrained=True)
model.classifier = torch.nn.Linear(1024, output_dim)  # DenseNet121 has a final classifier layer with 1024 input features
model = model.to(device)

# Optimizer and loss function
optimizer = torch.optim.Adam(model.parameters())
criterion = torch.nn.CrossEntropyLoss()

### Training

In [5]:
# Function for training or evaluation
def train_eval(model, optimizer, data_loader, device, is_training):
    if is_training:
        model.train()
    else:
        model.eval()

    total_loss = 0.0
    total_samples = 0
    correct_predictions = 0

    for images, _, angles in data_loader:
        images = images.to(device)
        angles = angles.to(device, dtype=torch.long)

        # Ensure angles has the correct shape
        if len(angles.shape) == 1:
            angles = angles.unsqueeze(1)

        if is_training:
            optimizer.zero_grad()

        with torch.set_grad_enabled(is_training):
            outputs = model(images)
            loss = criterion(outputs, angles.squeeze())
            
            if is_training:
                loss.backward()
                optimizer.step()

        total_loss += loss.item() * images.size(0)
        total_samples += images.size(0)
        _, preds = torch.max(outputs, 1)
        correct_predictions += torch.sum(preds == angles.squeeze())

    accuracy = correct_predictions.double() / total_samples
    return total_loss / total_samples, accuracy.item()

In [6]:
# Training loop
def train(model, optimizer, train_loader, val_loader, device, num_epochs=10):
    best_val_loss = float('inf')

    for epoch in range(num_epochs):
        train_loss, train_acc = train_eval(model, optimizer, train_loader, device, True)
        val_loss, val_acc = train_eval(model, optimizer, val_loader, device, False)

        print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

        # Save best model
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'road_following_model_densenet121_100_2.pth')

    print("Training complete.")

In [6]:
# Start training
train(model, optimizer, train_loader, val_loader, device, num_epochs=100)

Epoch [1/100], Train Loss: 0.7150, Train Acc: 0.7164, Val Loss: 0.7244, Val Acc: 0.7442
Epoch [2/100], Train Loss: 0.5391, Train Acc: 0.7987, Val Loss: 0.4878, Val Acc: 0.8047
Epoch [3/100], Train Loss: 0.4792, Train Acc: 0.8225, Val Loss: 0.5042, Val Acc: 0.8093
Epoch [4/100], Train Loss: 0.4648, Train Acc: 0.8202, Val Loss: 0.4494, Val Acc: 0.8372
Epoch [5/100], Train Loss: 0.4240, Train Acc: 0.8428, Val Loss: 0.4779, Val Acc: 0.8140
Epoch [6/100], Train Loss: 0.3969, Train Acc: 0.8509, Val Loss: 0.3940, Val Acc: 0.8605
Epoch [7/100], Train Loss: 0.4013, Train Acc: 0.8521, Val Loss: 0.4819, Val Acc: 0.7907
Epoch [8/100], Train Loss: 0.3926, Train Acc: 0.8579, Val Loss: 0.3997, Val Acc: 0.8233
Epoch [9/100], Train Loss: 0.3735, Train Acc: 0.8573, Val Loss: 0.3939, Val Acc: 0.8372
Epoch [10/100], Train Loss: 0.3778, Train Acc: 0.8631, Val Loss: 0.3677, Val Acc: 0.8791
Epoch [11/100], Train Loss: 0.3351, Train Acc: 0.8747, Val Loss: 0.3959, Val Acc: 0.8558
Epoch [12/100], Train Loss: 0.

In [7]:
# Test loop
def test(model, test_loader, device):
    model.eval()
    test_loss, test_acc = train_eval(model, optimizer, test_loader, device, False)
    print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}")

In [8]:
# Evaluate on test set
model.load_state_dict(torch.load('road_following_model_densenet121_100_2.pth'))

test(model, test_loader, device)

Test Loss: 0.1841, Test Accuracy: 0.9120
