In [1]:
import os
import sys

# Clone or pull part
repo_url = "https://github.com/fraco03/6D_pose.git"
repo_dir = "/content/6D_pose"
branch = "pose_rgb"

if not os.path.exists(repo_dir):
    !git clone -b {branch} {repo_url}
    print(f"Cloned {repo_url}")
else:
    %cd {repo_dir}
    !git fetch origin
    !git checkout {branch}
    !git reset --hard origin/{branch}
    %cd ..
    print(f"Updated {repo_url}")

if repo_dir not in sys.path:
    sys.path.insert(0, repo_dir)

%cd 6D_pose

/content/6D_pose
Already on 'pose_rgb'
Your branch is up to date with 'origin/pose_rgb'.
HEAD is now at 53b7b93 Update: Pointnet preload
/content
Updated https://github.com/fraco03/6D_pose.git
/content/6D_pose


In [None]:
%cd ..
!gdown --fuzzy https://drive.google.com/file/d/1qQ8ZjUI6QauzFsiF8EpaaI2nKFWna_kQ/view?usp=sharing -O Linemod_preprocessed.zip
!unzip Linemod_preprocessed.zip
%cd 6D_pose

In [6]:
from google.colab import drive
from utils.load_data import mount_drive

mount_drive()

# dataset_root = "/content/drive/MyDrive/Linemod_preprocessed"
dataset_root = "/content/Linemod_preprocessed"
print(f"\n‚úÖ Setup complete!")
print(f"üìÅ Dataset path: {dataset_root}")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
‚úÖ Drive mounted at /content/drive

‚úÖ Setup complete!
üìÅ Dataset path: /content/Linemod_preprocessed


In [7]:
!pip install plyfile



In [8]:
from src.pose_rgb.pointcloud_dataset import LineModPointCloudDataset
from src.pose_rgb.pointnet_model import PointNetPose
from src.pose_rgb.loss import AutomaticWeightedLoss
from torch.utils.data import DataLoader
import torch
from tqdm import tqdm

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"üî• Using device: {DEVICE}")

üî• Using device: cuda


## Notes on the Loss Function
Difference between PointNet and RGB approach:

- RGB (ResNet + TranslationNet):

  * Predicts [dx, dy, log(z)] ‚Üí then applies pinhole projection ‚Üí [X, Y, Z]

  * Uses DisentangledTranslationLoss: separates XY from Z

  This makes sense because XY depend on the pinhole geometry, whereas Z is independent

- PointNet:

  * Predicts [X, Y, Z] directly from the point cloud

  * Uses a unified loss for translation: treats X, Y, and Z symmetrically

  * Since there is no pinhole projection, separating XY from Z is unnecessary

## ‚ö° Performance Optimizations

**Optimizations applied to accelerate training:**

1. **Point reduction**: 1024 ‚Üí 512 points per point cloud (~2x speed-up).
2. **Cached YAML files**: `linemod_config` caches `info.yml` and `gt.yml` instead of opening them for every iteration (~10-20x speed-up!).
    - **Critical on Google Drive**: I/O latency is extremely high, making caching essential.
3. **Torch sampling**: Used `torch.randperm` instead of `np.random.choice` (~1.5x speed-up).
4. **Mixed precision**: Automatic FP16/FP32 training with `torch.cuda.amp` (~1.5x speed-up).

**Estimated total speed-up: ~20-30x** compared to the initial version! üöÄ

**Note**: The first batch may take longer to load and cache all YAML files; subsequent iterations will be significantly faster.

In [None]:
# Crea dataset con point clouds
# Nota: Prediciamo SOLO rotazione, la traslazione viene ricavata da depth + bbox usando pinhole geometry

train_dataset = LineModPointCloudDataset(
    root_dir=dataset_root,
    split='train',
    num_points=512,
    use_rgb=True
)

test_dataset = LineModPointCloudDataset(
    root_dir=dataset_root,
    split='test',
    num_points=512,
    use_rgb=True
)

print(f"Train samples: {len(train_dataset)}")
print(f"Test samples: {len(test_dataset)}")

‚úÖ LineModConfig initialized: /content/Linemod_preprocessed
üîÑ Preloading YAML files...
‚úÖ Preloaded YAML data for 13 objects
üìä Loaded 2373 samples for train split (preload_images=False)
üîÑ Preloading YAML files...
‚úÖ Preloaded YAML data for 13 objects
üìä Loaded 13407 samples for test split (preload_images=False)
Train samples: 2373
Test samples: 13407


In [None]:
# Visualizza un sample
sample = train_dataset[0]

print("Sample keys:", sample.keys())
print(f"Point cloud shape: {sample['point_cloud'].shape}")  # (512, 6)
print(f"Rotation shape: {sample['rotation'].shape}")        # (4,)
print(f"Depth Z: {sample['depth_z']:.4f} m")
print(f"\nRotation (quat): {sample['rotation']}")

Sample keys: dict_keys(['point_cloud', 'bbox_info', 'rotation', 'translation', 'object_id', 'img_id', 'cam_K', 'bbox'])
Point cloud shape: torch.Size([512, 6])
Rotation shape: (4,)
Translation shape: torch.Size([3])

Rotation (quat): [ 0.33261785  0.64730227  0.6364495  -0.2555329 ]
Translation (m): tensor([-0.1036, -0.0498,  1.0251])


In [11]:
# DataLoaders - Ottimizzati per performance
train_loader = DataLoader(
    train_dataset,
    batch_size=64,  # Aumentato da 32 per migliore GPU utilization
    shuffle=True,
    num_workers=2,  # Aumentato da 2
    pin_memory=True  # Velocizza transfer CPU->GPU
)

test_loader = DataLoader(
    test_dataset,
    batch_size=64,
    shuffle=False,
    num_workers=2,
    pin_memory=True
)

In [21]:
# Import visualization utilities
from utils.projection_utils import visualize_pose_comparison
from utils.linemod_config import get_linemod_config
from utils.projection_utils import setup_projection_utils
import matplotlib.pyplot as plt
import cv2
import numpy as np
from pathlib import Path

# Config per caricare modelli 3D
config = get_linemod_config(dataset_root)
setup_projection_utils(dataset_root)

def visualize_training_sample(model, dataset, dataset_root, checkpoint_dir, epoch, device='cuda'):
    """
    Visualizza una predizione casuale e la salva velocemente in /tmp, poi copia su Drive.
    """
    import shutil

    model.eval()

    # Sample casuale
    idx = np.random.randint(0, len(dataset))
    sample = dataset[idx]

    obj_id = sample['object_id']
    img_id = sample['img_id']

    # Carica immagine originale
    img_path = Path(dataset_root) / "data" / f"{obj_id:02d}" / "rgb" / f"{img_id:04d}.png"
    image = cv2.imread(str(img_path))
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    # Prepare input
    point_cloud = sample['point_cloud'].unsqueeze(0).to(device)
    bbox_info = sample['bbox_info'].unsqueeze(0).to(device)

    # Inference
    with torch.no_grad():
        pred_rot, pred_trans = model(point_cloud, bbox_info)

    # Convert to numpy

    pred_rot = pred_rot.squeeze(0).cpu().numpy()
    pred_trans = pred_trans.squeeze(0).cpu().numpy()
    if hasattr(sample['rotation'], 'numpy'):
      gt_rot = sample['rotation'].numpy()
    else:
      gt_rot = sample['rotation']

    if hasattr(sample['translation'], 'numpy'):
      gt_trans = sample['translation'].numpy()
    else:
      gt_trans = sample['translation']
    cam_K = sample['cam_K'].numpy()

    # Visualizza
    img_vis = visualize_pose_comparison(
        image, obj_id, cam_K,
        gt_rot, gt_trans,
        pred_rot, pred_trans
    )

    # Calcola errori
    trans_error = np.linalg.norm(gt_trans - pred_trans)
    dot_product = np.abs(np.dot(pred_rot, gt_rot))
    dot_product = np.clip(dot_product, -1.0, 1.0)
    angle_error = 2 * np.arccos(dot_product) * 180 / np.pi

    # SALVA PRIMA IN /tmp (VELOCE - RAM disk)
    tmp_path = f"/tmp/vis_epoch_{epoch:03d}_obj_{obj_id:02d}.png"
    fig = plt.figure(figsize=(12, 10))
    plt.imshow(img_vis)
    plt.axis('off')
    plt.title(f"Object {obj_id} | Sample {idx} | Trans Err: {trans_error:.4f}m | Angle Err: {angle_error:.2f}¬∞")
    plt.savefig(tmp_path, bbox_inches='tight', dpi=100)
    plt.close()

    # POI COPIA SU DRIVE (pi√π lento, ma asincrono dopo)
    vis_dir = Path(checkpoint_dir) / "visualizations"
    vis_dir.mkdir(exist_ok=True)
    drive_path = vis_dir / f"epoch_{epoch:03d}_obj_{obj_id:02d}.png"
    shutil.copy(tmp_path, str(drive_path))

    print(f"‚úÖ Saved: epoch {epoch} | Trans Err: {trans_error:.4f}m | Angle Err: {angle_error:.2f}¬∞")

    model.train()



In [13]:
import os
import json
from datetime import datetime
from itertools import islice
import matplotlib.pyplot as plt
from torch.amp import autocast, GradScaler

# ==========================================
# HYPERPARAMETERS
# ==========================================
LEARNING_RATE = 1e-4
NUM_EPOCHS = 50
USE_MIXED_PRECISION = True  # Mixed precision for speed-up

timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
CHECKPOINT_DIR = f'/content/drive/MyDrive/runs/pointnet_{timestamp}'
os.makedirs(CHECKPOINT_DIR, exist_ok=True)

# Initialize PointNet Model
# input_channels=6 because we use [x,y,z,r,g,b]
model = PointNetPose(input_channels=6, use_batch_norm=True).to(DEVICE)

# Loss for PointNet
criterion = AutomaticWeightedLoss(use_disentangled=False).to(DEVICE)

# Optimizer
optimizer = torch.optim.Adam(
    list(model.parameters()) + list(criterion.parameters()),
    lr=LEARNING_RATE
)

# Mixed Precision Scaler
scaler = GradScaler('cuda') if USE_MIXED_PRECISION else None

train_losses = []
val_losses = []
best_val_loss = float('inf')
best_epoch = 0

print(f"\nüî• STARTING POINTNET TRAINING on {DEVICE}...")
print(f"üìÅ Checkpoints: {CHECKPOINT_DIR}")
print(f"‚öôÔ∏è  Loss mode: Unified Translation (no XY/Z separation)")
print(f"‚ö° Mixed Precision: {USE_MIXED_PRECISION}")


üî• STARTING POINTNET TRAINING on cuda...
üìÅ Checkpoints: /content/drive/MyDrive/runs/pointnet_20251217_222800
‚öôÔ∏è  Loss mode: Unified Translation (no XY/Z separation)
‚ö° Mixed Precision: True


## üéØ Rotation-Only Model Architecture

**Key Insight**: With depth available, we only need to predict **rotation**. Translation is computed directly using pinhole geometry:

$$X = \frac{(c_x - c_x^{intr}) \cdot Z}{f_x}, \quad Y = \frac{(c_y - c_y^{intr}) \cdot Z}{f_y}, \quad Z = \text{depth}[c_x, c_y]$$

**Advantages:**
- Simpler model (no translation head)
- Faster training (only rotation loss)
- More stable (depth provides ground truth Z)
- Cleaner optimization (one prediction target instead of two)

**Dataset returns:**
- `point_cloud`: Local point cloud (N√ó3 or N√ó6 with RGB)
- `rotation`: Target quaternion
- `depth_z`: Z coordinate from depth at bbox center (for inference)

In [16]:
sample

{'point_cloud': tensor([[-7.6477e-02, -7.7854e-02,  1.1150e+00,  7.4510e-02,  9.4118e-02,
           1.6863e-01],
         [-9.1665e-02, -8.2356e-02,  1.0040e+00,  5.4118e-01,  2.7059e-01,
           1.5686e-01],
         [-9.4889e-02, -7.6875e-02,  1.0010e+00,  5.2941e-01,  2.7059e-01,
           1.6863e-01],
         ...,
         [-1.2922e-01,  1.7211e-03,  1.0380e+00,  3.6863e-01,  4.1961e-01,
           5.4510e-01],
         [-7.3431e-02, -8.9170e-05,  1.0440e+00,  7.2549e-01,  6.7059e-01,
           6.3529e-01],
         [-1.3987e-01, -8.5092e-02,  1.1080e+00,  4.7059e-01,  4.3529e-01,
           3.2157e-01]]),
 'bbox_info': tensor([0.4180, 0.4563, 0.0672, 0.1167]),
 'rotation': array([ 0.33261785,  0.64730227,  0.6364495 , -0.2555329 ], dtype=float32),
 'translation': tensor([-0.1036, -0.0498,  1.0251]),
 'object_id': 1,
 'img_id': 4,
 'cam_K': tensor([[572.4114,   0.0000, 325.2611],
         [  0.0000, 573.5704, 242.0490],
         [  0.0000,   0.0000,   1.0000]]),
 'bbox': ten

In [24]:
# ==========================================
# TRAINING LOOP (con Mixed Precision)
# ==========================================
for epoch in range(NUM_EPOCHS):

    # --- TRAIN PHASE ---
    model.train()
    running_train_loss = 0.0

    pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{NUM_EPOCHS} [Train]")

    for batch in pbar:
        # Move to device
        point_clouds = batch['point_cloud'].to(DEVICE, non_blocking=True)  # (B, N, 6)
        bbox_info = batch['bbox_info'].to(DEVICE, non_blocking=True)       # (B, 4)
        gt_rot = batch['rotation'].to(DEVICE, non_blocking=True)           # (B, 4)
        gt_trans = batch['translation'].to(DEVICE, non_blocking=True)      # (B, 3) in meters

        optimizer.zero_grad()

        # Mixed Precision Forward + Backward
        if USE_MIXED_PRECISION:
            with autocast('cuda'):
                pred_rot, pred_trans = model(point_clouds, bbox_info)
                loss, l_r, l_t, _ = criterion(pred_rot, gt_rot, pred_trans, gt_trans)

            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
        else:
            pred_rot, pred_trans = model(point_clouds, bbox_info)
            loss, l_r, l_t, _ = criterion(pred_rot, gt_rot, pred_trans, gt_trans)
            loss.backward()
            optimizer.step()

        running_train_loss += loss.item()

        pbar.set_postfix({
            'L_Tot': f"{loss.item():.2f}",
            'L_Rot': f"{l_r.item():.2f}",
            'L_Trans': f"{l_t.item():.3f}"
        })

    avg_train_loss = running_train_loss / len(train_loader)
    train_losses.append(avg_train_loss)

    # --- VALIDATION PHASE ---
    model.eval()
    running_val_loss = 0.0
    val_batches_limit = 50
    count_batches = 0

    with torch.no_grad():
        val_iterator = islice(test_loader, val_batches_limit)
        val_pbar = tqdm(val_iterator, total=val_batches_limit, desc="Validating")

        for batch in val_pbar:
            point_clouds = batch['point_cloud'].to(DEVICE, non_blocking=True)
            bbox_info = batch['bbox_info'].to(DEVICE, non_blocking=True)
            gt_rot = batch['rotation'].to(DEVICE, non_blocking=True)
            gt_trans = batch['translation'].to(DEVICE, non_blocking=True)

            if USE_MIXED_PRECISION:
                with autocast('cuda'):
                    pred_rot, pred_trans = model(point_clouds, bbox_info)
                    loss, _, _, _ = criterion(pred_rot, gt_rot, pred_trans, gt_trans)
            else:
                pred_rot, pred_trans = model(point_clouds, bbox_info)
                loss, _, _, _ = criterion(pred_rot, gt_rot, pred_trans, gt_trans)

            running_val_loss += loss.item()
            count_batches += 1

    avg_val_loss = running_val_loss / count_batches if count_batches > 0 else 0.0
    val_losses.append(avg_val_loss)

    # --- REPORT & SAVE ---
    print(f"üìä Epoch {epoch+1}: Train={avg_train_loss:.4f} | Val={avg_val_loss:.4f}")

    # --- VISUALIZE RANDOM SAMPLE ---
    if (epoch + 1) % 5 == 0:
      print(f"üé® Visualizing random validation sample...")
      visualize_training_sample(model, test_dataset, dataset_root, CHECKPOINT_DIR, epoch, DEVICE)

    # Save best model
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        best_epoch = epoch + 1

        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'criterion_state_dict': criterion.state_dict(),
            'val_loss': best_val_loss
        }, os.path.join(CHECKPOINT_DIR, "best_model.pth"))

        print(f"üèÜ New Best Model! (Loss: {best_val_loss:.4f})")

    # Save last checkpoint
    if (epoch + 1) == NUM_EPOCHS:
        torch.save({
            'epoch': epoch+1,
            'model_state_dict': model.state_dict(),
            'criterion_state_dict': criterion.state_dict(),
        }, os.path.join(CHECKPOINT_DIR, f"checkpoint_ep{epoch+1}.pth"))

print("\nüéâ TRAINING COMPLETE!")

Epoch 1/50 [Train]: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 38/38 [00:35<00:00,  1.08it/s, L_Tot=0.26, L_Rot=0.27, L_Trans=0.017]
Validating: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 50/50 [00:46<00:00,  1.07it/s]


üìä Epoch 1: Train=0.3383 | Val=0.2694
üèÜ New Best Model! (Loss: 0.2694)


Epoch 2/50 [Train]: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 38/38 [00:33<00:00,  1.12it/s, L_Tot=0.55, L_Rot=0.55, L_Trans=0.033]
Validating: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 50/50 [00:45<00:00,  1.09it/s]


üìä Epoch 2: Train=0.3344 | Val=0.2511
üèÜ New Best Model! (Loss: 0.2511)


Epoch 3/50 [Train]: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 38/38 [00:36<00:00,  1.05it/s, L_Tot=0.33, L_Rot=0.36, L_Trans=0.012]
Validating: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 50/50 [00:47<00:00,  1.06it/s]


üìä Epoch 3: Train=0.3171 | Val=0.2505
üèÜ New Best Model! (Loss: 0.2505)


Epoch 4/50 [Train]: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 38/38 [00:34<00:00,  1.10it/s, L_Tot=0.35, L_Rot=0.39, L_Trans=0.014]
Validating: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 50/50 [00:47<00:00,  1.06it/s]


üìä Epoch 4: Train=0.3071 | Val=0.2296
üèÜ New Best Model! (Loss: 0.2296)


Epoch 5/50 [Train]: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 38/38 [00:35<00:00,  1.06it/s, L_Tot=0.41, L_Rot=0.45, L_Trans=0.015]
Validating: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 50/50 [00:47<00:00,  1.06it/s]


üìä Epoch 5: Train=0.2963 | Val=0.2109
üé® Visualizing random validation sample...
‚úÖ Saved: epoch 4 | Trans Err: 0.3328m | Angle Err: 112.54¬∞
üèÜ New Best Model! (Loss: 0.2109)


Epoch 6/50 [Train]: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 38/38 [00:34<00:00,  1.10it/s, L_Tot=0.29, L_Rot=0.33, L_Trans=0.020]
Validating:  44%|‚ñà‚ñà‚ñà‚ñà‚ñç     | 22/50 [00:21<00:27,  1.03it/s]


KeyboardInterrupt: 

In [None]:
# Plot training history
plt.figure(figsize=(12, 6))

plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Train Loss', marker='o', alpha=0.7)
plt.plot(val_losses, label='Validation Loss', marker='s', alpha=0.7)
if best_epoch > 0:
    plt.axvline(x=best_epoch-1, color='r', linestyle='--', alpha=0.5,
                label=f'Best Epoch ({best_epoch})')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('PointNet Training History')
plt.legend()
plt.grid(True, alpha=0.3)

plt.subplot(1, 2, 2)
plt.plot(train_losses, label='Train Loss', marker='o', alpha=0.7)
plt.plot(val_losses, label='Validation Loss', marker='s', alpha=0.7)
if best_epoch > 0:
    plt.axvline(x=best_epoch-1, color='r', linestyle='--', alpha=0.5,
                label=f'Best Epoch ({best_epoch})')
plt.xlabel('Epoch')
plt.ylabel('Loss (log scale)')
plt.yscale('log')
plt.title('PointNet Training History (Log)')
plt.legend()
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig(os.path.join(CHECKPOINT_DIR, 'training_history.png'), dpi=150)
plt.show()

print(f"\nüìä Training Statistics:")
print(f"   Best epoch: {best_epoch}")
print(f"   Best val loss: {best_val_loss:.6f}")
print(f"   Final train loss: {train_losses[-1]:.6f}")

# Save history
history = {
    'train_losses': [float(x) for x in train_losses],
    'val_losses': [float(x) for x in val_losses],
    'best_epoch': int(best_epoch),
    'best_val_loss': float(best_val_loss),
    'timestamp': timestamp
}

with open(os.path.join(CHECKPOINT_DIR, 'history.json'), 'w') as f:
    json.dump(history, f, indent=2)

## Visualize Point Cloud Sample

Visualizing point cloud from random point in the dataset

In [None]:
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

# Choose sample
sample = train_dataset[100]
pc = sample['point_cloud'].numpy()  # (512, 6) [x, y, z, r, g, b]

fig = plt.figure(figsize=(10, 8))
ax = fig.add_subplot(111, projection='3d')

# Plot 3D points with RGB colors
ax.scatter(pc[:, 0], pc[:, 1], pc[:, 2],
           c=pc[:, 3:6], s=2, alpha=0.6)

ax.set_xlabel('X (m)')
ax.set_ylabel('Y (m)')
ax.set_zlabel('Z (m)')
ax.set_title(f'Point Cloud - Object {sample["object_id"]}')
plt.show()

print(f"Object ID: {sample['object_id']}")
print(f"Rotation (quat): {sample['rotation']}")
print(f"Translation (m): {sample['translation']}")