<a href="https://www.kaggle.com/code/anirudhrangu/cs6350?scriptVersionId=270734723" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

# KITTI Dataset Visualization

In this step, we:
1. **Load** stereo image pairs (left and right camera) from the KITTI dataset.
2. **Read** the corresponding calibration text file to extract:
   - The **rotation matrix (R)**
   - The **translation vector (t)**
3. **Display** a few sample images with their R and t values to confirm that data loading is correct.

This helps us ensure that the image data and ground-truth poses are aligned before moving into keypoint detection and pose estimation.


In [None]:
import cv2
import matplotlib.pyplot as plt
import numpy as np
import os
from mpl_toolkits.mplot3d import Axes3D

In [None]:
base_dir = "/kaggle/input/kitti-dataset"  
left_dir = os.path.join(base_dir, "data_object_image_2/training/image_2/000000.png")  # left camera images
pose_file = os.path.join(base_dir, "data_object_label_2/training/label_2/000000.txt")  # pose file

In [None]:
import pandas as pd

def parse_kitti_label_file(label_path):

    columns = [
        "type", "truncated", "occluded", "alpha",
        "bbox_xmin", "bbox_ymin", "bbox_xmax", "bbox_ymax",
        "height", "width", "length",
        "pos_x", "pos_y", "pos_z",
        "rotation_y"
    ]
    
    data = []

    try:
        with open(label_path, 'r') as f:
            for line in f:
                parts = line.strip().split()
                if parts[0] == "DontCare":
                    continue  # skip non-labeled regions
                values = parts[:15]  # ensure only 15 fields are taken
                data.append(values)
        
        # Convert to DataFrame
        df = pd.DataFrame(data, columns=columns)
        
        # Convert numeric columns to float
        for col in columns[1:]:
            df[col] = df[col].astype(float)
        
        return df
    except:
        with exception as e:
            print("error:", e)

# Example usage:
label_file = "/kaggle/input/kitti-dataset/data_object_label_2/training/label_2/000001.txt"
df = parse_kitti_label_file(label_file)
print(df.head())


# Preprocessing

In [None]:
def preprocess_image(img_path):
    img_rgb = cv2.imread(img_path)
    
    # grayscale
    gray = cv2.cvtColor(img_rgb, cv2.COLOR_BGR2GRAY)
    
    # Gaussian Blur (reduces high-frequency noise)
    gray = cv2.GaussianBlur(gray, (5, 5), sigmaX=1.0)
    
    # CLAHE (Contrast Limited Adaptive Histogram Equalization)
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    gray_eq = clahe.apply(gray)

    # Normalize pixel intensities (0–255 range)
    norm = cv2.normalize(gray_eq, None, alpha=0, beta=255, norm_type=cv2.NORM_MINMAX)

    # edge enhancement to improve feature matching
    enhanced = cv2.bilateralFilter(norm, d=5, sigmaColor=50, sigmaSpace=50)

    return enhanced

In [None]:
def compute_disparity(left_img, right_img):
    """
    Compute disparity map from stereo pair
    Returns:
        disparity: Disparity map (float32)
    """
    min_disp = 0
    num_disp = 128  
    block_size = 5

    stereo = cv2.StereoSGBM_create(
        minDisparity=min_disp,
        numDisparities=num_disp,
        blockSize=block_size,
        P1=8 * 3 * block_size ** 2,
        P2=32 * 3 * block_size ** 2,
        disp12MaxDiff=1,
        uniquenessRatio=10,
        speckleWindowSize=100,
        speckleRange=32,
        preFilterCap=63,
        mode=cv2.STEREO_SGBM_MODE_SGBM_3WAY
    )

    disparity = stereo.compute(left_img, right_img).astype(np.float32) / 16.0
    
    return disparity

In [None]:
def compute_depth(disparity, P1, P2):
    """
    Convert disparity to depth using calibration matrices
    Returns:
        depth_map: Depth in meters
        focal_length: Focal length in pixels
        baseline: Baseline in meters
    """
    
    # Extract focal length from P1
    focal_length = P1[0, 0]
    
    # Calculate baseline from P1 and P2
    # baseline = |t_x| / f_x
    baseline = abs(P2[0, 3] - P1[0, 3]) / focal_length
    
    print(f"Focal length: {focal_length:.2f} px")
    print(f"Baseline: {baseline:.4f} m")
    
    # Compute depth: Z = (f * B) / d
    # Avoid division by zero
    disparity[disparity <= 0] = 0.1
    depth_map = (focal_length * baseline) / disparity
    
    # Filter unrealistic depth values
    depth_map[depth_map > 100] = 100  # Max 100 meters
    depth_map[depth_map < 0] = 0
    
    return depth_map, focal_length, baseline

In [None]:
def backproject_to_3d(depth_map, P1, mask=None):
    """
    Convert depth map to 3D point cloud
    Returns:
        points_3d: Nx3 array of 3D points
    """
    
    # Extract camera intrinsics
    fx = P1[0, 0]
    fy = P1[1, 1]
    cx = P1[0, 2]
    cy = P1[1, 2]
    
    h, w = depth_map.shape
    
    # Create pixel coordinate grid
    u, v = np.meshgrid(np.arange(w), np.arange(h))
    
    # Back-project to 3D
    X = (u - cx) * depth_map / fx
    Y = (v - cy) * depth_map / fy
    Z = depth_map
    
    # Stack into point cloud
    points_3d = np.stack((X, Y, Z), axis=-1)
    
    # Apply mask if provided
    if mask is not None:
        points_3d = points_3d[mask > 0]
    else:
        points_3d = points_3d.reshape(-1, 3)
    
    # Filter invalid points
    valid = (points_3d[:, 2] > 0) & (points_3d[:, 2] < 100)
    points_3d = points_3d[valid]
    
    return points_3d


In [None]:
def detect_and_match_features(img1, img2, method='ORB', max_features=5000):
    """
    Detect and match features between two frames
    Returns:
        pts1: Matched keypoints in image 1
        pts2: Matched keypoints in image 2
        matches: Good matches
    """
    
    if method == 'SIFT':
        detector = cv2.SIFT_create(nfeatures=max_features)
        matcher = cv2.BFMatcher(cv2.NORM_L2, crossCheck=False)
    else:  # ORB
        detector = cv2.ORB_create(nfeatures=max_features)
        matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False)
    
    # Detect keypoints and compute descriptors
    kp1, des1 = detector.detectAndCompute(img1, None)
    kp2, des2 = detector.detectAndCompute(img2, None)
    
    if des1 is None or des2 is None or len(kp1) < 10 or len(kp2) < 10:
        print("Insufficient features detected")
        return None, None, None
    
    # Match using KNN
    matches = matcher.knnMatch(des1, des2, k=2)
    
    # Apply Lowe's ratio test
    good_matches = []
    for match_pair in matches:
        if len(match_pair) == 2:
            m, n = match_pair
            if m.distance < 0.75 * n.distance:
                good_matches.append(m)
    
    print(f"Found {len(good_matches)} good matches")
    
    if len(good_matches) < 8:
        print("Not enough good matches")
        return None, None, None
    
    # Extract matched point coordinates
    pts1 = np.float32([kp1[m.queryIdx].pt for m in good_matches])
    pts2 = np.float32([kp2[m.trainIdx].pt for m in good_matches])
    
    return pts1, pts2, good_matches

In [None]:
def get_3d_points_for_matches(pts2d, depth_map, P1):
    """
    Get 3D coordinates for matched 2D points
    
    Args:
        pts2d: 2D points in image (Nx2)
        depth_map: Depth map
        P1: Camera projection matrix
    
    Returns:
        pts3d: 3D points (Mx3)
        valid_pts2d: Corresponding valid 2D points (Mx2)
    """
    
    fx = P1[0, 0]
    fy = P1[1, 1]
    cx = P1[0, 2]
    cy = P1[1, 2]
    
    pts3d = []
    valid_pts2d = []
    
    for pt in pts2d:
        u, v = int(pt[0]), int(pt[1])
        
        # Check bounds
        if 0 <= u < depth_map.shape[1] and 0 <= v < depth_map.shape[0]:
            Z = depth_map[v, u]
            
            # Valid depth check
            if 0.1 < Z < 100:
                X = (u - cx) * Z / fx
                Y = (v - cy) * Z / fy
                pts3d.append([X, Y, Z])
                valid_pts2d.append(pt)
    
    return np.array(pts3d, dtype=np.float32), np.array(valid_pts2d, dtype=np.float32)


In [None]:
def estimate_pose_pnp(pts3d_prev, pts2d_curr, K, dist_coeffs=None):
    """
    Estimate camera pose using 3D-2D correspondences
    
    Args:
        pts3d_prev: 3D points from previous frame (Nx3)
        pts2d_curr: 2D points in current frame (Nx2)
        K: Camera intrinsic matrix (3x3)
        dist_coeffs: Distortion coefficients (None for KITTI rectified)
    
    Returns:
        R: Rotation matrix (3x3)
        t: Translation vector (3x1)
        inliers: Inlier mask
    """
    
    if len(pts3d_prev) < 6:
        print("Not enough 3D-2D correspondences")
        return None, None, None
    
    # Solve PnP with RANSAC
    success, rvec, tvec, inliers = cv2.solvePnPRansac(
        pts3d_prev,
        pts2d_curr,
        K,
        dist_coeffs,
        iterationsCount=1000,
        reprojectionError=2.0,
        confidence=0.99,
        flags=cv2.SOLVEPNP_ITERATIVE
    )
    
    if not success or inliers is None or len(inliers) < 6:
        print("Pose estimation failed")
        return None, None, None
    
    # Convert rotation vector to matrix
    R, _ = cv2.Rodrigues(rvec)
    t = tvec
    
    print(f"PnP inliers: {len(inliers)}/{len(pts3d_prev)}")
    
    return R, t, inliers

In [None]:
def estimate_pose_essential(pts1, pts2, K):
    """
    Estimate pose using Essential Matrix (2D-2D correspondences)
    
    Args:
        pts1: 2D points in frame 1
        pts2: 2D points in frame 2
        K: Camera intrinsic matrix
    
    Returns:
        R: Rotation matrix
        t: Translation vector
        mask: Inlier mask
    """
    
    # Find Essential Matrix
    E, mask = cv2.findEssentialMat(
        pts1, pts2, K,
        method=cv2.RANSAC,
        prob=0.999,
        threshold=1.0
    )
    
    if E is None:
        print("Essential matrix computation failed")
        return None, None, None
    
    # Recover pose
    _, R, t, mask = cv2.recoverPose(E, pts1, pts2, K, mask=mask)
    
    inliers = np.sum(mask)
    print(f"Essential matrix inliers: {inliers}/{len(pts1)}")
    
    return R, t, mask


In [None]:
class StereoVisualOdometry:
    """Complete stereo visual odometry pipeline"""
    
    def __init__(self, P1, P2):
        """
        Initialize visual odometry
        
        Args:
            P1: Left camera projection matrix (3x4)
            P2: Right camera projection matrix (3x4)
        """
        self.P1 = P1
        self.P2 = P2
        self.K = P1[:, :3]  # Intrinsic matrix
        
        # Calculate baseline
        self.baseline = abs(P2[0, 3] - P1[0, 3]) / self.K[0, 0]
        
        # Current pose (world to camera transformation)
        self.R_total = np.eye(3)
        self.t_total = np.zeros((3, 1))
        
        # Trajectory storage
        self.trajectory = [np.zeros(3)]
        
        # Previous frame data
        self.prev_left = None
        self.prev_depth = None
        
    def process_stereo_pair(self, left_img, right_img):
        """
        Process a stereo pair and update pose
        
        Args:
            left_img: Preprocessed left grayscale image
            right_img: Preprocessed right grayscale image
        
        Returns:
            success: Whether pose was successfully estimated
        """
        
        # Step 1: Compute disparity and depth
        print("\n--- Computing disparity ---")
        disparity = compute_disparity(left_img, right_img)
        depth_map, _, _ = compute_depth(disparity, self.P1, self.P2)
        
        # First frame: just store
        if self.prev_left is None:
            self.prev_left = left_img
            self.prev_depth = depth_map
            print("First frame initialized")
            return True
        
        # Step 2: Feature matching between frames
        print("\n--- Matching features ---")
        pts_prev, pts_curr, matches = detect_and_match_features(
            self.prev_left, left_img, method='ORB'
        )
        
        if pts_prev is None or len(pts_prev) < 8:
            print("Failed: Not enough matches")
            return False
        
        # Step 3: Get 3D points from previous frame
        print("\n--- Getting 3D points ---")
        pts3d_prev, valid_pts_prev = get_3d_points_for_matches(
            pts_prev, self.prev_depth, self.P1
        )
        
        if len(pts3d_prev) < 6:
            print("Failed: Not enough 3D points")
            return False
        
        # Find corresponding current 2D points
        pts2d_curr = []
        pts3d_filtered = []
        
        for i, prev_pt in enumerate(pts_prev):
            for j, valid_pt in enumerate(valid_pts_prev):
                if np.allclose(prev_pt, valid_pt, atol=0.1):
                    pts2d_curr.append(pts_curr[i])
                    pts3d_filtered.append(pts3d_prev[j])
                    break
        
        pts2d_curr = np.array(pts2d_curr, dtype=np.float32)
        pts3d_filtered = np.array(pts3d_filtered, dtype=np.float32)
        
        if len(pts3d_filtered) < 6:
            print("Failed: Not enough valid correspondences")
            return False
        
        # Step 4: Estimate relative pose
        print("\n--- Estimating pose ---")
        R_rel, t_rel, inliers = estimate_pose_pnp(
            pts3d_filtered, pts2d_curr, self.K
        )
        
        if R_rel is None:
            print("Failed: Pose estimation unsuccessful")
            return False
        
        # Step 5: Update cumulative pose
        # T_new = T_old * T_rel
        self.t_total = self.t_total + self.R_total @ t_rel
        self.R_total = self.R_total @ R_rel
        
        # Store position
        position = self.t_total.flatten()
        self.trajectory.append(position.copy())
        
        print(f"Position: x={position[0]:.2f}, y={position[1]:.2f}, z={position[2]:.2f}")
        
        # Update for next iteration
        self.prev_left = left_img
        self.prev_depth = depth_map
        
        return True
    
    def get_trajectory(self):
        """Get trajectory as numpy array"""
        return np.array(self.trajectory)
    
    def get_current_pose(self):
        """Get current pose (R, t)"""
        return self.R_total, self.t_total


In [None]:
def visualize_disparity(disparity):
    """Visualize disparity map"""
    plt.figure(figsize=(10, 6))
    plt.imshow(disparity, cmap='jet')
    plt.colorbar(label='Disparity (pixels)')
    plt.title('Disparity Map')
    plt.axis('off')
    plt.tight_layout()
    plt.show()

def visualize_depth(depth_map):
    """Visualize depth map"""
    plt.figure(figsize=(10, 6))
    plt.imshow(depth_map, cmap='plasma', vmin=0, vmax=50)
    plt.colorbar(label='Depth (meters)')
    plt.title('Depth Map')
    plt.axis('off')
    plt.tight_layout()
    plt.show()

def visualize_matches(img1, img2, pts1, pts2, num_show=50):
    """Visualize feature matches"""
    h1, w1 = img1.shape
    h2, w2 = img2.shape
    
    # Create side-by-side image
    combined = np.zeros((max(h1, h2), w1 + w2), dtype=np.uint8)
    combined[:h1, :w1] = img1
    combined[:h2, w1:] = img2
    combined = cv2.cvtColor(combined, cv2.COLOR_GRAY2BGR)
    
    # Draw matches
    num_show = min(num_show, len(pts1))
    for i in range(num_show):
        pt1 = tuple(pts1[i].astype(int))
        pt2 = tuple((pts2[i] + [w1, 0]).astype(int))
        color = tuple(np.random.randint(0, 255, 3).tolist())
        cv2.circle(combined, pt1, 3, color, -1)
        cv2.circle(combined, pt2, 3, color, -1)
        cv2.line(combined, pt1, pt2, color, 1)
    
    plt.figure(figsize=(15, 6))
    plt.imshow(combined)
    plt.title(f'Feature Matches (showing {num_show})')
    plt.axis('off')
    plt.tight_layout()
    plt.show()

def visualize_trajectory(trajectory, ground_truth=None):
    """Visualize 2D trajectory (top-down view)"""
    plt.figure(figsize=(10, 10))
    
    traj = np.array(trajectory)
    plt.plot(traj[:, 0], traj[:, 2], 'b-o', label='Estimated', markersize=3)
    
    if ground_truth is not None:
        gt = np.array(ground_truth)
        plt.plot(gt[:, 0], gt[:, 2], 'r-', label='Ground Truth', linewidth=2)
    
    plt.xlabel('X (meters)')
    plt.ylabel('Z (meters)')
    plt.title('Camera Trajectory (Top-Down View)')
    plt.legend()
    plt.axis('equal')
    plt.grid(True)
    plt.tight_layout()
    plt.show()

def visualize_3d_points(points_3d, max_points=10000):
    """Visualize 3D point cloud"""
    # Subsample if too many points
    if len(points_3d) > max_points:
        indices = np.random.choice(len(points_3d), max_points, replace=False)
        points_3d = points_3d[indices]
    
    fig = plt.figure(figsize=(10, 8))
    ax = fig.add_subplot(111, projection='3d')
    
    ax.scatter(points_3d[:, 0], points_3d[:, 1], points_3d[:, 2], 
               c=points_3d[:, 2], cmap='viridis', s=1)
    
    ax.set_xlabel('X')
    ax.set_ylabel('Y')
    ax.set_zlabel('Z')
    ax.set_title('3D Point Cloud')
    plt.tight_layout()
    plt.show()

In [None]:
def main():
    """
    Main execution pipeline
    """
    
    # Parse calibration matrices (your KITTI values)
    P1 = np.array([
        [7.070493e+02, 0.000000e+00, 6.040814e+02, -3.797842e+02],
        [0.000000e+00, 7.070493e+02, 1.805066e+02, 0.000000e+00],
        [0.000000e+00, 0.000000e+00, 1.000000e+00, 0.000000e+00]
    ])
    
    P2 = np.array([
        [7.070493e+02, 0.000000e+00, 6.040814e+02, 4.575831e+01],
        [0.000000e+00, 7.070493e+02, 1.805066e+02, -3.454157e-01],
        [0.000000e+00, 0.000000e+00, 1.000000e+00, 4.981016e-03]
    ])
    
    print("=" * 60)
    print("STEREO VISUAL ODOMETRY PIPELINE")
    print("=" * 60)
    
    # Initialize visual odometry
    vo = StereoVisualOdometry(P1, P2)
    
    # Example: Process stereo image sequence
    # Replace with your actual image loading
    
    #for frame_id in range(num_frames):
    for frame_id in range(1):
        # Load stereo pair (preprocessed)
        left_img = preprocess_image("/kaggle/input/kitti-dataset/data_object_image_2/training/image_2/000000.png")
        right_img = preprocess_image("/kaggle/input/kitti-dataset/data_object_image_3/training/image_3/000000.png")
        
        # Process frame
        print(f"\n{'='*60}")
        print(f"FRAME {frame_id}")
        print(f"{'='*60}")
        
        success = vo.process_stereo_pair(left_img, right_img)
        
        if not success:
            print(f"Frame {frame_id} failed")
            continue
    
    # Get trajectory
    trajectory = vo.get_trajectory()
    
    # Visualize results
    visualize_trajectory(trajectory)
    
    # Save trajectory
    np.savetxt('estimated_trajectory.txt', trajectory)
    print("\nTrajectory saved to 'estimated_trajectory.txt'")
    
    
    # Demo with single stereo pair
    print("\nTo use this pipeline:")
    print("1. Load your preprocessed left and right images")
    print("2. Initialize StereoVisualOdometry with P1, P2")
    print("3. Call process_stereo_pair() for each frame pair")
    print("4. Use get_trajectory() to retrieve camera path")
    print("5. Visualize with provided visualization functions")

if __name__ == "__main__":
    main()