In [4]:
import os 
os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE'
import pandas as pd
import os
import cv2
import numpy as np
import torch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

# Load CSV dataset
csv_path = "training_data/training_data.csv"
image_dir = "training_data/images/"

df = pd.read_csv(csv_path)
print(f"✅ Loaded {len(df)} samples.")

# ✅ Check for missing values
df = df.dropna()  # Remove any rows with missing values

# Convert labels to tensors
labels = df["steering_output"].values

# ✅ Function to load and preprocess images
def load_and_preprocess_images(df, image_dir, img_size=(128, 128)):
    images = []
    labels = []
    
    for _, row in df.iterrows():
        img_path = os.path.join(image_dir, row["image_filename"])
        img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)

        if img is not None:
            img = cv2.resize(img, img_size)  # Resize for CNN input
            img = img / 255.0  # Normalize
            images.append(img)
            labels.append(int(row["steering_output"]))

    return np.array(images), np.array(labels)

# Load dataset
X, y = load_and_preprocess_images(df, image_dir)
print(f"✅ Loaded {len(X)} images with labels.")

# Convert to PyTorch tensors
X_tensor = torch.tensor(X).unsqueeze(1).float()  # Add channel dimension
y_tensor = torch.tensor(y).long()

# Verify shapes
print(f"Images Tensor Shape: {X_tensor.shape}")  # Expected: (num_samples, 1, 128, 128)
print(f"Labels Tensor Shape: {y_tensor.shape}")  # Expected: (num_samples,)


✅ Loaded 2666 samples.
✅ Loaded 2666 images with labels.
Images Tensor Shape: torch.Size([2666, 1, 128, 128])
Labels Tensor Shape: torch.Size([2666])


In [3]:
class LineFollowerDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# ✅ Split dataset into training & test sets (80%-20%)
train_size = int(0.8 * len(X_tensor))
test_size = len(X_tensor) - train_size

train_dataset, test_dataset = torch.utils.data.random_split(
    LineFollowerDataset(X_tensor, y_tensor), [train_size, test_size]
)

# Create PyTorch DataLoaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

print("✅ PyTorch DataLoaders Ready!")


✅ PyTorch DataLoaders Ready!


In [5]:


# ✅ Define CNN Model for Line Following
class LineFollowerCNN(nn.Module):
    def __init__(self):
        super(LineFollowerCNN, self).__init__()

        self.conv1 = nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        self.fc1 = nn.Linear(32 * 32 * 32, 128)  # 32x32 after pooling twice
        self.fc2 = nn.Linear(128, 4)  # 4 output classes (steering commands)

    def forward(self, x):
        x = self.pool(torch.relu(self.conv1(x)))
        x = self.pool(torch.relu(self.conv2(x)))
        x = x.view(x.size(0), -1)  # Flatten
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)  # No activation function here (CrossEntropyLoss applies softmax internally)
        return x

# ✅ Initialize model, loss function, and optimizer
model = LineFollowerCNN()
criterion = nn.CrossEntropyLoss()  # Classification loss function
optimizer = optim.Adam(model.parameters(), lr=0.001)

print("✅ CNN Model Initialized!")


✅ CNN Model Initialized!


In [6]:
# ✅ Training Parameters
num_epochs = 10  # Adjust as needed
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# ✅ Training Loop
for epoch in range(num_epochs):
    total_loss = 0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss/len(train_loader):.4f}")

# ✅ Save the trained model
torch.save(model.state_dict(), "line_follower_cnn.pth")
print("✅ Model training complete and saved as 'line_follower_cnn.pth'.")


Epoch [1/10], Loss: 0.0551
Epoch [2/10], Loss: 0.0171
Epoch [3/10], Loss: 0.0070
Epoch [4/10], Loss: 0.0031
Epoch [5/10], Loss: 0.0072
Epoch [6/10], Loss: 0.0035
Epoch [7/10], Loss: 0.0006
Epoch [8/10], Loss: 0.0002
Epoch [9/10], Loss: 0.0001
Epoch [10/10], Loss: 0.0000
✅ Model training complete and saved as 'line_follower_cnn.pth'.


In [7]:
# ✅ Evaluation Mode
model.eval()

correct = 0
total = 0

with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)

        outputs = model(images)
        _, predicted = torch.max(outputs, 1)

        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = 100 * correct / total
print(f"✅ Model Accuracy on Test Data: {accuracy:.2f}%")


✅ Model Accuracy on Test Data: 100.00%
