In [1]:
from posenet.model import PoseNet
import torch
import torch.nn as nn
import torch.functional as F
import sys
import os
from torch.utils.data import DataLoader
import torch.optim as optim
from dataset.tartanair import TartanAirDataset 


In [2]:
dataset = TartanAirDataset(root_dir="D:\KOC UNIVERSITY\COMP447\data\image_left")
torch.cuda.empty_cache()
train_loader = DataLoader(dataset, batch_size=8, shuffle=True)
device = torch.device("cuda")
pose_net = PoseNet().to(device)
images, translations, rotations = next(iter(train_loader))
images = images.to(device)
translations = translations.to(device)
rotations = rotations.to(device)

optimizer = optim.Adam(pose_net.parameters(), lr=1e-4)
for epoch in range(1):  
    total_epoch_loss = 0.0
    for images, translations, rotations in train_loader:
        images = images.to(device, non_blocking=True)
        translations = translations.to(device, non_blocking=True)
        rotations = rotations.to(device, non_blocking=True)

        optimizer.zero_grad()
        t_pred, q_pred = pose_net(images)

        loss = pose_net.pose_loss(t_pred, q_pred, translations, rotations)

        loss.backward()
        optimizer.step()

        total_epoch_loss += loss.item()

    if epoch % 1 == 0:
        print(f"Epoch {epoch}: Avg Loss = {total_epoch_loss / len(train_loader):.6f}")

print("Finished overfitting entire dataset.")

  dataset = TartanAirDataset(root_dir="D:\KOC UNIVERSITY\COMP447\data\image_left")


Epoch 0: Avg Loss = 0.005748
Finished overfitting entire dataset.


In [3]:
from nflownet.model import NFlowNet
from dataset.tartanair2 import PairedImageDataset
from tqdm import tqdm
from torch.utils.data import random_split

img_file_path = "D:\KOC UNIVERSITY\COMP447\\amusement_sample_P008\P008\image_left"
opt_flow_file_path = "D:\KOC UNIVERSITY\COMP447\\amusement_sample_P008\P008\\flow"

dataset = PairedImageDataset(img_file_path, opt_flow_file_path)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

img_file_path = "D:\KOC UNIVERSITY\COMP447\\amusement_sample_P008\P008\image_left"
opt_flow_file_path = "D:\KOC UNIVERSITY\COMP447\\amusement_sample_P008\P008\\flow"
dataset = PairedImageDataset(img_file_path, opt_flow_file_path)

train_size = int(0.8 * len(dataset))  # 80% for training
test_size = len(dataset) - train_size  # Remaining 20% for testing

train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)

nflownet = NFlowNet().to(device)
optimizer = torch.optim.Adam(nflownet.parameters(), lr=1e-4)
criterion = nn.MSELoss()

train_losses = []
test_losses = []

num_epochs = 1
for epoch in range(num_epochs):
    nflownet.train()
    running_train_loss = 0.0
    batch_losses = []

    # Wrap train_loader with tqdm
    pbar = tqdm(train_loader, desc=f"Epoch [{epoch + 1}/{num_epochs}]")

    for paired_batch, normal_flow_batch in pbar:
        paired_batch, normal_flow_batch = paired_batch.to(device), normal_flow_batch.to(device)

        optimizer.zero_grad()
        outputs = nflownet(paired_batch)

        loss = criterion(outputs, normal_flow_batch)
        loss.backward()
        optimizer.step()

        running_train_loss += loss.item()
        batch_losses.append(loss.item())

        # Update progress bar with current batch loss
        pbar.set_postfix({'Batch Loss': loss.item()})

  img_file_path = "D:\KOC UNIVERSITY\COMP447\\amusement_sample_P008\P008\image_left"
  opt_flow_file_path = "D:\KOC UNIVERSITY\COMP447\\amusement_sample_P008\P008\\flow"
  img_file_path = "D:\KOC UNIVERSITY\COMP447\\amusement_sample_P008\P008\image_left"
  opt_flow_file_path = "D:\KOC UNIVERSITY\COMP447\\amusement_sample_P008\P008\\flow"
Epoch [1/1]: 100%|██████████| 65/65 [00:54<00:00,  1.18it/s, Batch Loss=22.7]


In [4]:
import torch
import torch.nn as nn

class CheiralityLayer(nn.Module):
    """
    Differentiable Cheirality Layer that enforces depth positivity constraint
    using normal flow estimates from NFlowNet.
    
    Args:
        max_iter (int): Maximum iterations for optimization
        tol (float): Tolerance for convergence
        lr (float): Learning rate for L-BFGS optimizer
    """
    def __init__(self, max_iter=100, tol=1e-20, lr=0.1):
        super(CheiralityLayer, self).__init__()
        self.max_iter = max_iter
        self.tol = tol
        self.lr = lr
        
    def forward(self, normal_flow, image_gradients, init_pose):
        """
        Args:
            normal_flow (torch.Tensor): Estimated normal flow from NFlowNet [B, H, W]
            image_gradients (torch.Tensor): Image spatial gradients [B, 2, H, W]
            init_pose (torch.Tensor): Initial pose estimate from PoseNet [B, 6] (3 translation, 3 rotation)
            
        Returns:
            torch.Tensor: Refined pose estimate [B, 6]
        """
        batch_size = init_pose.size(0)
        refined_poses = []
        
        # Process each sample in batch separately
        for i in range(batch_size):
            nf = normal_flow[i]  # [H, W]
            grad = image_gradients[i]  # [2, H, W]
            init_p = init_pose[i]  # [6]
            
            # Convert to numpy for L-BFGS (paper uses this approach)
            # Note: In practice you might want to keep everything in PyTorch
            refined_p = self.optimize_pose(nf, grad, init_p)
            refined_poses.append(refined_p)
            
        return torch.stack(refined_poses, dim=0)
    
    def optimize_pose(self, normal_flow, image_gradients, init_pose):
        """
        Optimize pose using cheirality constraint with L-BFGS
        
        Args:
            normal_flow (torch.Tensor): [H, W]
            image_gradients (torch.Tensor): [2, H, W]
            init_pose (torch.Tensor): [6]
            
        Returns:
            torch.Tensor: Refined pose [6]
        """
        # Convert to numpy for optimization (as done in paper)
        # Alternatively could implement fully in PyTorch using torch.optim.LBFGS
        import numpy as np
        from scipy.optimize import minimize
        
        # Normalize gradients to get direction (unit vectors)
        grad_norm = torch.norm(image_gradients, dim=0, keepdim=True)
        grad_dir = image_gradients / (grad_norm + 1e-6)  # [2, H, W]
        
        # Get image coordinates grid
        _, H, W = normal_flow.shape
        magnitude = torch.sqrt(normal_flow[0]**2 + normal_flow[1]**2)
        y_coords, x_coords = torch.meshgrid(torch.arange(H), torch.arange(W))
        coords = torch.stack([x_coords, y_coords], dim=0).float().to(normal_flow.device)  # [2, H, W]
        
        # Prepare data for optimization
        nf_np = magnitude.detach().cpu().numpy().flatten()
        grad_dir_np = grad_dir.detach().cpu().numpy().reshape(2, -1)  # [2, H*W]
        coords_np = coords.detach().cpu().numpy().reshape(2, -1)  # [2, H*W]
        init_pose_np = init_pose.detach().cpu().numpy()
        
        # Define objective function
        def objective(pose_params):
            V = pose_params[:3]  # translation
            Omega = pose_params[3:]  # rotation
            
            # Compute A and B matrices (from paper equations 9-10)
            x, y = coords_np
            ones = np.ones_like(x)
            zeros = np.zeros_like(x)
            
            # A matrix [2, 3] -> [H*W, 2, 3]
            A = np.stack([
                -ones, zeros, x,
                zeros, -ones, y
            ], axis=1).reshape(-1, 2, 3)
            
            # B matrix [2, 3] -> [H*W, 2, 3]
            B = np.stack([
                x*y, -(x**2 + 1), y,
                (y**2 + 1), -x*y, -x
            ], axis=1).reshape(-1, 2, 3)

            
            g_dot_A = np.matmul(grad_dir_np.T[:, np.newaxis, :], A).squeeze(1)  # [H*W, 3]
            g_dot_B = np.matmul(grad_dir_np.T[:, np.newaxis, :], B).squeeze(1)
            term1 = np.dot(g_dot_A, V)  # [H*W]
            term2 = nf_np - np.dot(g_dot_B, Omega)  # [H*W]
            
            rho = term1 * term2  # [H*W]
            
            # GELU activation (smooth ReLU approximation)
            # Negative GELU as in paper equation 13
            from scipy.special import erf
            loss = -0.5 * rho * (1.0 + erf(rho / np.sqrt(2.0)))  # GELU approximation
            return np.mean(loss)
        
        # Run L-BFGS optimization
        result = minimize(objective, 
                         init_pose_np,
                         method='L-BFGS-B',
                         options={'maxiter': self.max_iter,
                                'ftol': self.tol,
                                'gtol': self.tol})
        
        refined_pose = torch.from_numpy(result.x).float().to(init_pose.device)
        return refined_pose

In [5]:
import torch

def quaternion_to_euler_xyz(q: torch.Tensor) -> torch.Tensor:
    """
    Converts a quaternion to Euler angles (X-Y-Z order: roll-pitch-yaw)
    
    Args:
        q: [B, 4] tensor in [x, y, z, w] format.

    Returns:
        euler_angles: [B, 3] tensor with angles in radians: [roll(X), pitch(Y), yaw(Z)]
    """
    x, y, z, w = q[:, 0], q[:, 1], q[:, 2], q[:, 3]

    # Compute rotation matrix elements
    t0 = 2.0 * (w * x + y * z)
    t1 = 1.0 - 2.0 * (x * x + y * y)
    roll_x = torch.atan2(t0, t1)

    t2 = 2.0 * (w * y - z * x)
    t2 = torch.clamp(t2, -1.0, 1.0)
    pitch_y = torch.asin(t2)

    t3 = 2.0 * (w * z + x * y)
    t4 = 1.0 - 2.0 * (y * y + z * z)
    yaw_z = torch.atan2(t3, t4)

    euler_angles = torch.stack([roll_x, pitch_y, yaw_z], dim=1)  # [B, 3]
    return euler_angles


In [6]:
import torch

def euler_xyz_to_quaternion(euler: torch.Tensor) -> torch.Tensor:
    """
    Converts Euler angles (X-Y-Z order: roll-pitch-yaw) to quaternion.

    Args:
        euler: [B, 3] tensor with angles in radians: [roll(X), pitch(Y), yaw(Z)]

    Returns:
        q: [B, 4] tensor in [x, y, z, w] format.
    """
    roll, pitch, yaw = euler[:, 0], euler[:, 1], euler[:, 2]

    cr = torch.cos(roll * 0.5)
    sr = torch.sin(roll * 0.5)
    cp = torch.cos(pitch * 0.5)
    sp = torch.sin(pitch * 0.5)
    cy = torch.cos(yaw * 0.5)
    sy = torch.sin(yaw * 0.5)

    w = cr * cp * cy + sr * sp * sy
    x = sr * cp * cy - cr * sp * sy
    y = cr * sp * cy + sr * cp * sy
    z = cr * cp * sy - sr * sp * cy

    q = torch.stack([x, y, z, w], dim=1)  # [B, 4]
    return q


In [7]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms

class DiffPoseNet(nn.Module):

    """
    Complete DiffPoseNet framework integrating:
    - NFlowNet for normal flow estimation
    - PoseNet for initial pose estimation
    - CheiralityLayer for pose refinement
    """
    def __init__(self, nflownet, posenet, cheirality_layer):
        super(DiffPoseNet, self).__init__()
        self.nflownet = nflownet
        self.posenet = posenet
        self.cheirality_layer = cheirality_layer
        
    def forward(self, img1, img2):
        """
        Args:
            img1 (torch.Tensor): First image [B, C, H, W]
            img2 (torch.Tensor): Second image [B, C, H, W]
            
        Returns:
            tuple: (refined_pose, init_pose, normal_flow)
        """
        resize_transform = transforms.Compose([
            transforms.Resize((480, 640))  # (height, width)
            ])
        img1_resized = resize_transform(img1)
        img2_resized = resize_transform(img2)
        # Compute normal flow between images
        with torch.no_grad():
            img = torch.cat((img1_resized, img2_resized), dim=1)
            normal_flow = self.nflownet(img)  # [B, H, W]
        
        # Compute image gradients (for cheirality layer)
        # Using Sobel filters as approximation
        sobel_x = torch.tensor([[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]], dtype=torch.float32, device=img1.device).view(1, 1, 3, 3)
        sobel_y = torch.tensor([[-1, -2, -1], [0, 0, 0], [1, 2, 1]], dtype=torch.float32, device=img1.device).view(1, 1, 3, 3)
        
        # Compute gradients for both images and average
        grad_x1 = F.conv2d(img1_resized.mean(dim=1, keepdim=True), sobel_x, padding=1)
        grad_y1 = F.conv2d(img1_resized.mean(dim=1, keepdim=True), sobel_y, padding=1)
        grad_x2 = F.conv2d(img2_resized.mean(dim=1, keepdim=True), sobel_x, padding=1)
        grad_y2 = F.conv2d(img2_resized.mean(dim=1, keepdim=True), sobel_y, padding=1)
        
        image_gradients = 0.5 * (torch.cat([grad_x1, grad_y1], dim=1) + torch.cat([grad_x2, grad_y2], dim=1))  # [B, 2, H, W]
        
        # Get initial pose estimate
        t_pred, q_pred = self.posenet(torch.stack([img1, img2], dim=1))
        r_pred = quaternion_to_euler_xyz(q_pred)
        init_pose = torch.cat([t_pred, r_pred], dim=1)  # [B, 6]
        # Refine pose using cheirality layer
        refined_pose = self.cheirality_layer(normal_flow, image_gradients, init_pose)
        
        return refined_pose, init_pose, normal_flow

In [None]:
nflownet.eval()  # We won't train the flow networks
pose_net.train()  # Only train PoseNet
cheirality_layer = CheiralityLayer().to(device)

# Dataset and DataLoader
dataset = TartanAirDataset(root_dir="D:/KOC UNIVERSITY/COMP447/data/image_left")
train_loader = DataLoader(dataset, batch_size=1, shuffle=True)

# Optimizer (only for PoseNet parameters)
optimizer = optim.Adam(pose_net.parameters(), lr=1e-4)

# Training loop
num_epochs = 1
for epoch in range(num_epochs):
    total_loss = 0.0
    
    for batch_idx, (images, translations, rotations) in enumerate(train_loader):
        images = images.to(device)
        translations = translations.to(device)
        rotations = rotations.to(device)
        pose_net.train()
        nflownet.eval() 
        # Split images into pairs (assuming consecutive frames)
        img1 = images[:, 0]  # First image in pair
        img2 = images[:, 1]  # Second image in pair
        
        optimizer.zero_grad()

        t_pred, q_pred = pose_net(torch.stack([img1, img2], dim=1))

        t_pred_no_grad = t_pred
        q_pred_no_grad =q_pred
        # Compute normal flow between images
        with torch.no_grad():
            resize_transform = transforms.Compose([
            transforms.Resize((480, 640))  # (height, width)
            ])
            img1_resized = resize_transform(img1)
            img2_resized = resize_transform(img2)
            img = torch.cat((img1_resized, img2_resized), dim=1)
            normal_flow = nflownet(img)  # [B, H, W]
        
            # Compute image gradients (for cheirality layer)
            # Using Sobel filters as approximation
            sobel_x = torch.tensor([[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]], dtype=torch.float32, device=img1.device).view(1, 1, 3, 3)
            sobel_y = torch.tensor([[-1, -2, -1], [0, 0, 0], [1, 2, 1]], dtype=torch.float32, device=img1.device).view(1, 1, 3, 3)
            
            # Compute gradients for both images and average
            grad_x1 = F.conv2d(img1_resized.mean(dim=1, keepdim=True), sobel_x, padding=1)
            grad_y1 = F.conv2d(img1_resized.mean(dim=1, keepdim=True), sobel_y, padding=1)
            grad_x2 = F.conv2d(img2_resized.mean(dim=1, keepdim=True), sobel_x, padding=1)
            grad_y2 = F.conv2d(img2_resized.mean(dim=1, keepdim=True), sobel_y, padding=1)
            
            image_gradients = 0.5 * (torch.cat([grad_x1, grad_y1], dim=1) + torch.cat([grad_x2, grad_y2], dim=1))  # [B, 2, H, W]
            
            r_pred = quaternion_to_euler_xyz(q_pred_no_grad)
            init_pose = torch.cat([t_pred_no_grad, r_pred], dim=1) 
        
            # Forward pass through DiffPoseNet
            refined_pose = cheirality_layer(normal_flow, image_gradients, init_pose)
            # Calculate loss between refined pose and ground truth
            # Assuming refined_pose contains both translation and rotation
            t_pred_no_grad = refined_pose[:, :3]  # Predicted translation
            euler_angles = refined_pose[:, 3:]  # [B, 3] -> [roll, pitch, yaw]
            q_pred_no_grad = euler_xyz_to_quaternion(euler_angles)
        # Use PoseNet's loss function
        loss = pose_net.pose_loss(t_pred, q_pred, t_pred_no_grad, q_pred_no_grad)
        print(f"Loss is calculated: {loss.item()}")
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        
        if batch_idx % 100 == 0:
            print(f"Epoch {epoch}, Batch {batch_idx}: Loss = {loss.item():.6f}")
    
    avg_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch} completed. Average Loss: {avg_loss:.6f}")

print("Training completed.")

tensor([[-0.0490, -0.0569,  0.3437]], device='cuda:0',
       grad_fn=<SliceBackward0>)
tensor([[-0.0490, -0.0569,  0.3437]], device='cuda:0')
tensor([[-0.2551,  0.5312, -0.6528,  0.4760]], device='cuda:0',
       grad_fn=<DivBackward0>)
tensor([[-0.2551,  0.5312, -0.6528,  0.4760]], device='cuda:0')
Loss is calculated: 0.0
Epoch 0, Batch 0: Loss = 0.000000
tensor([[-0.0063, -0.0573,  0.3954]], device='cuda:0',
       grad_fn=<SliceBackward0>)
tensor([[-0.0063, -0.0573,  0.3954]], device='cuda:0')
tensor([[-0.1010,  0.6814, -0.6568,  0.3068]], device='cuda:0',
       grad_fn=<DivBackward0>)
tensor([[-0.1010,  0.6814, -0.6568,  0.3068]], device='cuda:0')
Loss is calculated: 0.0
tensor([[ 0.0143, -0.0961,  0.1984]], device='cuda:0',
       grad_fn=<SliceBackward0>)
tensor([[ 0.0143, -0.0961,  0.1984]], device='cuda:0')
tensor([[-0.2131,  0.5060, -0.7341,  0.3996]], device='cuda:0',
       grad_fn=<DivBackward0>)
tensor([[-0.2131,  0.5060, -0.7341,  0.3996]], device='cuda:0')
Loss is calc

  ret = umr_sum(arr, axis, dtype, out, keepdims, where=where)
  df = fun(x) - f0
  rho = term1 * term2  # [H*W]
  loss = -0.5 * rho * (1.0 + erf(rho / np.sqrt(2.0)))  # GELU approximation


tensor([[ 0.0185, -0.1276,  0.2581]], device='cuda:0',
       grad_fn=<SliceBackward0>)
tensor([[ 1.8522e-02, -1.2763e-01,  8.9019e+09]], device='cuda:0')
tensor([[-0.0273,  0.2933, -0.8849,  0.3609]], device='cuda:0',
       grad_fn=<DivBackward0>)
tensor([[ 0.8118,  0.3629, -0.3730,  0.2651]], device='cuda:0')
Loss is calculated: 2.641442765215577e+19


KeyboardInterrupt: 