## Step 1: Setup and GPU Check

In [None]:
# Check GPU availability
import torch
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"\nUsing device: {device}")

## Step 2: Upload Your Dataset

Upload your `dataset_v2.zip` file containing the 5 session folders.

Each session folder should have:
- `frames/` - camera images (PNG)
- `velodyne/` - LiDAR point clouds (BIN)

In [None]:
# Upload dataset zip file
from google.colab import files
import zipfile
import os

print("Please upload your dataset_v2.zip file...")
uploaded = files.upload()

# Extract the zip file
zip_filename = list(uploaded.keys())[0]
print(f"\nExtracting {zip_filename}...")

with zipfile.ZipFile(zip_filename, 'r') as zip_ref:
    zip_ref.extractall('data')

print("Extraction complete!")
print("\nContents:")
!ls -la data/

In [None]:
# Find the dataset directory (handle nested folders)
import os
from pathlib import Path

# Look for session folders
data_root = Path('data')

# Find folders that contain 'frames' and 'velodyne' subdirectories
def find_sessions(root):
    sessions = []
    for path in root.rglob('*'):
        if path.is_dir():
            if (path / 'frames').exists() and (path / 'velodyne').exists():
                sessions.append(path)
    return sessions

session_dirs = find_sessions(data_root)
print(f"Found {len(session_dirs)} sessions:")
for s in session_dirs:
    num_frames = len(list((s / 'frames').glob('*.png')))
    num_lidar = len(list((s / 'velodyne').glob('*.bin')))
    print(f"  {s.name}: {num_frames} images, {num_lidar} LiDAR files")

# Set DATA_DIR to parent of sessions
if session_dirs:
    DATA_DIR = session_dirs[0].parent
    print(f"\nDATA_DIR set to: {DATA_DIR}")

## Step 3: Define Dataset and Model

In [None]:
import numpy as np
from pathlib import Path
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from PIL import Image
import matplotlib.pyplot as plt
from tqdm import tqdm
import time

# Session configuration
TRAIN_SESSIONS = [
    "4th_floor_hallway_20251206_132136",
    "4th_floor_lounge_20251206_154822",
    "5th_floor_hallway_20251206_161536",
    "3rd_floor_hallway_20251206_162223",
]

TEST_SESSIONS = [
    "Mlab_20251207_112819",
]

# Hyperparameters
BATCH_SIZE = 32
NUM_EPOCHS = 15
LEARNING_RATE = 0.001
IMAGE_SIZE = 224

print("Configuration loaded!")
print(f"  Batch size: {BATCH_SIZE}")
print(f"  Epochs: {NUM_EPOCHS}")
print(f"  Learning rate: {LEARNING_RATE}")
print(f"  Image size: {IMAGE_SIZE}x{IMAGE_SIZE}")

In [None]:
class CameraLiDARDataset(Dataset):
    """Dataset that pairs camera images with LiDAR-derived targets."""
    
    def __init__(self, session_names, data_dir, transform=None):
        self.data_dir = Path(data_dir)
        self.transform = transform
        self.samples = []  # List of (image_path, target)
        
        for session_name in session_names:
            # Find session directory (may be nested)
            session_dir = None
            for path in self.data_dir.rglob(session_name):
                if path.is_dir() and (path / 'frames').exists():
                    session_dir = path
                    break
            
            if session_dir is None:
                print(f"  Warning: Session {session_name} not found")
                continue
            
            image_dir = session_dir / "frames"
            velodyne_dir = session_dir / "velodyne"
            
            if not image_dir.exists() or not velodyne_dir.exists():
                print(f"  Warning: Missing data in {session_name}")
                continue
            
            # Get all image files
            image_files = sorted(image_dir.glob("*.png"))
            
            for img_path in image_files:
                frame_id = img_path.stem
                lidar_path = velodyne_dir / f"{frame_id}.bin"
                
                if lidar_path.exists():
                    # Load LiDAR and compute target (mean distance)
                    points = np.fromfile(str(lidar_path), dtype=np.float32).reshape(-1, 5)
                    x, y, z = points[:, 0], points[:, 1], points[:, 2]
                    mean_distance = np.sqrt(x**2 + y**2 + z**2).mean()
                    
                    self.samples.append((img_path, mean_distance))
        
        print(f"  Loaded {len(self.samples)} samples")
    
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        img_path, target = self.samples[idx]
        image = Image.open(img_path).convert('RGB')
        
        if self.transform:
            image = self.transform(image)
        
        return image, torch.tensor(target, dtype=torch.float32)


class ResNetRegressor(nn.Module):
    """ResNet18 modified for regression."""
    
    def __init__(self, pretrained=True):
        super(ResNetRegressor, self).__init__()
        
        # Load pretrained ResNet18
        self.resnet = models.resnet18(weights='IMAGENET1K_V1' if pretrained else None)
        
        # Replace final FC layer for regression
        num_features = self.resnet.fc.in_features
        self.resnet.fc = nn.Sequential(
            nn.Linear(num_features, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, 1)
        )
    
    def forward(self, x):
        return self.resnet(x).squeeze()

print("Dataset and Model classes defined!")

## Step 4: Load Data

In [None]:
# Data transforms
train_transform = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

test_transform = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

print("Loading training data...")
train_dataset = CameraLiDARDataset(TRAIN_SESSIONS, DATA_DIR, transform=train_transform)

print("\nLoading test data...")
test_dataset = CameraLiDARDataset(TEST_SESSIONS, DATA_DIR, transform=test_transform)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2, pin_memory=True)

print(f"\n" + "="*50)
print("TRAIN/TEST SPLIT")
print("="*50)
total = len(train_dataset) + len(test_dataset)
print(f"Train: {len(train_dataset)} samples ({100*len(train_dataset)/total:.1f}%)")
print(f"Test:  {len(test_dataset)} samples ({100*len(test_dataset)/total:.1f}%)")

# Target statistics
train_targets = [t for _, t in train_dataset.samples]
test_targets = [t for _, t in test_dataset.samples]
print(f"\nTrain distance range: [{min(train_targets):.2f}, {max(train_targets):.2f}] meters")
print(f"Test distance range:  [{min(test_targets):.2f}, {max(test_targets):.2f}] meters")

## Step 5: Visualize Sample Data

In [None]:
# Show sample images with their targets
fig, axes = plt.subplots(2, 4, figsize=(16, 8))

for i, ax in enumerate(axes.flat):
    if i < len(train_dataset):
        img_path, target = train_dataset.samples[i * 100]  # Sample every 100th
        img = Image.open(img_path)
        ax.imshow(img)
        ax.set_title(f"Distance: {target:.2f}m", fontsize=12)
        ax.axis('off')

plt.suptitle("Sample Camera Images with Mean LiDAR Distance", fontsize=14, fontweight='bold')
plt.tight_layout()
plt.savefig('sample_images.png', dpi=150, bbox_inches='tight')
plt.show()
print("Saved: sample_images.png")

## Step 6: Create Model

In [None]:
# Create model
model = ResNetRegressor(pretrained=True)
model = model.to(device)

print("="*50)
print("MODEL ARCHITECTURE")
print("="*50)
print("ResNet18 (pretrained on ImageNet)")
print("Modified final layer: FC(512) → FC(256) → ReLU → Dropout(0.3) → FC(1)")
print(f"\nTotal parameters: {sum(p.numel() for p in model.parameters()):,}")
print(f"Trainable parameters: {sum(p.numel() for p in model.parameters() if p.requires_grad):,}")

# Loss and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

## Step 7: Training

In [None]:
def train_epoch(model, dataloader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    num_batches = 0
    
    for images, targets in dataloader:
        images = images.to(device)
        targets = targets.to(device)
        
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        num_batches += 1
    
    return total_loss / num_batches


def evaluate(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0
    all_preds = []
    all_targets = []
    
    with torch.no_grad():
        for images, targets in dataloader:
            images = images.to(device)
            targets = targets.to(device)
            
            outputs = model(images)
            loss = criterion(outputs, targets)
            
            total_loss += loss.item()
            all_preds.extend(outputs.cpu().numpy())
            all_targets.extend(targets.cpu().numpy())
    
    all_preds = np.array(all_preds)
    all_targets = np.array(all_targets)
    
    mae = np.abs(all_preds - all_targets).mean()
    rmse = np.sqrt(((all_preds - all_targets) ** 2).mean())
    
    ss_res = ((all_targets - all_preds) ** 2).sum()
    ss_tot = ((all_targets - all_targets.mean()) ** 2).sum()
    r2 = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0
    
    return {
        'loss': total_loss / len(dataloader),
        'mae': mae,
        'rmse': rmse,
        'r2': r2,
        'predictions': all_preds,
        'targets': all_targets
    }

print("Training functions defined!")

In [None]:
# Training loop
print("="*60)
print(f"TRAINING ResNet18 ({NUM_EPOCHS} epochs)")
print("="*60)

train_losses = []
val_losses = []
val_maes = []
best_val_loss = float('inf')
best_epoch = 0

start_time = time.time()

for epoch in range(NUM_EPOCHS):
    epoch_start = time.time()
    
    # Train
    train_loss = train_epoch(model, train_loader, criterion, optimizer, device)
    train_losses.append(train_loss)
    
    # Evaluate
    val_results = evaluate(model, test_loader, criterion, device)
    val_losses.append(val_results['loss'])
    val_maes.append(val_results['mae'])
    
    # Track best
    if val_results['loss'] < best_val_loss:
        best_val_loss = val_results['loss']
        best_epoch = epoch + 1
        # Save best model
        torch.save(model.state_dict(), 'best_model.pth')
    
    epoch_time = time.time() - epoch_start
    print(f"Epoch {epoch+1:2d}/{NUM_EPOCHS} | "
          f"Train Loss: {train_loss:.4f} | "
          f"Val Loss: {val_results['loss']:.4f} | "
          f"Val MAE: {val_results['mae']:.4f}m | "
          f"Time: {epoch_time:.1f}s")

total_time = time.time() - start_time
print(f"\nTraining completed in {total_time/60:.1f} minutes")
print(f"Best epoch: {best_epoch} (Val Loss: {best_val_loss:.4f})")

## Step 8: Results Visualization

In [None]:
# Load best model and get final predictions
model.load_state_dict(torch.load('best_model.pth'))
final_results = evaluate(model, test_loader, criterion, device)

print("="*50)
print("FINAL TEST RESULTS")
print("="*50)
print(f"MAE:  {final_results['mae']:.4f} meters")
print(f"RMSE: {final_results['rmse']:.4f} meters")
print(f"R²:   {final_results['r2']:.4f}")

In [None]:
# Plot 1: Training History
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Loss curves
ax1 = axes[0]
epochs = range(1, NUM_EPOCHS + 1)
ax1.plot(epochs, train_losses, 'b-', label='Training Loss', linewidth=2, marker='o')
ax1.plot(epochs, val_losses, 'r-', label='Validation Loss', linewidth=2, marker='s')
ax1.axvline(x=best_epoch, color='green', linestyle='--', label=f'Best Epoch ({best_epoch})')
ax1.set_xlabel('Epoch', fontsize=12)
ax1.set_ylabel('Loss (MSE)', fontsize=12)
ax1.set_title('Training and Validation Loss', fontsize=14)
ax1.legend()
ax1.grid(True, alpha=0.3)

# MAE curve
ax2 = axes[1]
ax2.plot(epochs, val_maes, 'g-', linewidth=2, marker='o')
ax2.axvline(x=best_epoch, color='green', linestyle='--', label=f'Best Epoch ({best_epoch})')
ax2.set_xlabel('Epoch', fontsize=12)
ax2.set_ylabel('MAE (meters)', fontsize=12)
ax2.set_title('Validation MAE (Mean Absolute Error)', fontsize=14)
ax2.legend()
ax2.grid(True, alpha=0.3)

plt.suptitle('ResNet18 Training Progress\n(Predicting Mean LiDAR Distance from Camera Images)', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.savefig('training_history.png', dpi=150, bbox_inches='tight')
plt.show()
print("Saved: training_history.png")

In [None]:
# Plot 2: Predictions Analysis
fig, axes = plt.subplots(2, 2, figsize=(14, 12))

targets = final_results['targets']
predictions = final_results['predictions']
errors = predictions - targets

# Predicted vs Actual
ax1 = axes[0, 0]
ax1.scatter(targets, predictions, alpha=0.5, s=30, c='blue', edgecolors='navy')
min_val = min(targets.min(), predictions.min())
max_val = max(targets.max(), predictions.max())
ax1.plot([min_val, max_val], [min_val, max_val], 'r--', lw=2, label='Perfect prediction')
ax1.set_xlabel('Actual Mean Distance (m)', fontsize=12)
ax1.set_ylabel('Predicted Mean Distance (m)', fontsize=12)
ax1.set_title('Predicted vs Actual (Test Set)', fontsize=14)
ax1.legend()
ax1.grid(True, alpha=0.3)

# Error distribution
ax2 = axes[0, 1]
ax2.hist(errors, bins=30, color='green', alpha=0.7, edgecolor='darkgreen')
ax2.axvline(x=0, color='r', linestyle='--', lw=2, label='Zero error')
ax2.axvline(x=errors.mean(), color='orange', linestyle='-', lw=2, label=f'Mean: {errors.mean():.3f}m')
ax2.set_xlabel('Prediction Error (m)', fontsize=12)
ax2.set_ylabel('Frequency', fontsize=12)
ax2.set_title('Error Distribution', fontsize=14)
ax2.legend()
ax2.grid(True, alpha=0.3)

# Residual plot
ax3 = axes[1, 0]
ax3.scatter(predictions, errors, alpha=0.5, s=30, c='purple', edgecolors='indigo')
ax3.axhline(y=0, color='r', linestyle='--', lw=2)
ax3.set_xlabel('Predicted Distance (m)', fontsize=12)
ax3.set_ylabel('Residual (Pred - Actual)', fontsize=12)
ax3.set_title('Residual Plot', fontsize=14)
ax3.grid(True, alpha=0.3)

# Timeline
ax4 = axes[1, 1]
frames = np.arange(len(targets))
ax4.plot(frames, targets, 'b-', alpha=0.7, label='Actual', linewidth=1)
ax4.plot(frames, predictions, 'r-', alpha=0.7, label='Predicted', linewidth=1)
ax4.set_xlabel('Frame Index', fontsize=12)
ax4.set_ylabel('Mean Distance (m)', fontsize=12)
ax4.set_title('Prediction Timeline (Test Session - Mlab)', fontsize=14)
ax4.legend()
ax4.grid(True, alpha=0.3)

plt.suptitle(f'ResNet18 Test Results\nMAE: {final_results["mae"]:.4f}m | RMSE: {final_results["rmse"]:.4f}m | R²: {final_results["r2"]:.4f}', 
             fontsize=14, fontweight='bold')
plt.tight_layout()
plt.savefig('prediction_results.png', dpi=150, bbox_inches='tight')
plt.show()
print("Saved: prediction_results.png")

In [None]:
# Plot 3: Sample Predictions with Images
fig, axes = plt.subplots(2, 5, figsize=(20, 8))

# Get some test samples
sample_indices = np.linspace(0, len(test_dataset)-1, 10, dtype=int)

for idx, ax in zip(sample_indices, axes.flat):
    img_path, actual = test_dataset.samples[idx]
    predicted = predictions[idx]
    error = abs(predicted - actual)
    
    img = Image.open(img_path)
    ax.imshow(img)
    
    # Color based on error
    color = 'green' if error < 0.1 else ('orange' if error < 0.3 else 'red')
    ax.set_title(f"Actual: {actual:.2f}m\nPred: {predicted:.2f}m\nError: {error:.2f}m", 
                 fontsize=10, color=color)
    ax.axis('off')

plt.suptitle('Sample Predictions on Test Set (Green=Good, Orange=OK, Red=Poor)', 
             fontsize=14, fontweight='bold')
plt.tight_layout()
plt.savefig('sample_predictions.png', dpi=150, bbox_inches='tight')
plt.show()
print("Saved: sample_predictions.png")

## Step 9: Summary

In [None]:
print("="*70)
print("FINAL SUMMARY")
print("="*70)
print(f"""
DATASET INFORMATION:
  - Source: Unitree Go1 Robot with RoboSense Helios-16 LiDAR
  - Data: 100% REAL sensor measurements (NO synthetic data)
  - Train: {len(train_dataset)} frames from 4 indoor sessions
  - Test:  {len(test_dataset)} frames from 1 held-out session (Mlab)

TASK:
  - Cross-modal learning: Predict LiDAR depth from camera RGB images
  - Input:  Camera image (resized to {IMAGE_SIZE}x{IMAGE_SIZE})
  - Output: Mean distance of LiDAR points (meters)

MODEL:
  - Architecture: ResNet18 (pretrained on ImageNet)
  - Fine-tuned head: FC(512) → FC(256) → ReLU → Dropout(0.3) → FC(1)
  - Training: {NUM_EPOCHS} epochs, batch size {BATCH_SIZE}, Adam optimizer (lr={LEARNING_RATE})
  - Training time: {total_time/60:.1f} minutes

RESULTS:
  - Best Epoch: {best_epoch}
  - Test MAE:   {final_results['mae']:.4f} meters
  - Test RMSE:  {final_results['rmse']:.4f} meters  
  - Test R²:    {final_results['r2']:.4f}

GENERATED PLOTS:
  - sample_images.png (dataset visualization)
  - training_history.png (loss curves)
  - prediction_results.png (4-panel analysis)
  - sample_predictions.png (visual predictions)

INTERPRETATION:
  - The model learns to predict scene depth from visual appearance
  - This demonstrates cross-modal learning (Camera → LiDAR)
  - Transfer learning from ImageNet provides useful visual features
  - Train/test split by session tests generalization to new environments
""")
print("="*70)

In [None]:
# Download all results
from google.colab import files
import shutil

# Create results zip
result_files = ['sample_images.png', 'training_history.png', 'prediction_results.png', 
                'sample_predictions.png', 'best_model.pth']

shutil.make_archive('resnet_results', 'zip', '.', '.')

print("Downloading results...")
for f in result_files:
    if os.path.exists(f):
        files.download(f)
        print(f"  Downloaded: {f}")