In [17]:
#here I simply took the images that I took and converted it into the same dataframe as the training images
import pandas as pd
import os

def images_to_parquet(image_folder, output_parquet="images.parquet"):
    data = []

    for filename in os.listdir(image_folder):
        if filename.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp')):
            filepath = os.path.join(image_folder, filename)
            with open(filepath, 'rb') as f:
                img_bytes = f.read()
            data.append({
                "image": {"bytes": img_bytes, "path": filepath},
                "filename": filename
            })

    df = pd.DataFrame(data)
    df.to_parquet(output_parquet, index=False)
    print(f"✅ Saved {len(df)} images to {output_parquet}")
    return df


images_to_parquet("/content/checkerboard_images", "my_calibration_images.parquet")


✅ Saved 14 images to my_calibration_images.parquet


Unnamed: 0,image,filename
0,{'bytes': b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x...,pic8.jpg
1,{'bytes': b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x...,pic3.jpg
2,{'bytes': b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x...,pic4.jpg
3,{'bytes': b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x...,pic2.jpg
4,{'bytes': b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x...,pic12.jpg
5,{'bytes': b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x...,pic10.jpg
6,{'bytes': b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x...,pic6.jpg
7,{'bytes': b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x...,pic1.jpg
8,{'bytes': b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x...,pic11.jpg
9,{'bytes': b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x...,pic14.jpg


In [18]:
# Andrew Aquino Midterm take home Exam
import io
import numpy as np
import pandas as pd
from PIL import Image
import cv2
import torch
import warnings

# Added these warning filtering so that it wouldnt clutter my console
warnings.filterwarnings("ignore", category=RuntimeWarning)
warnings.filterwarnings("ignore", category=UserWarning)

#this defines the number of squares in the rows and columns of the patterns
ROWS = 6
COLS = 8
SQUARE_SIZE = 1.0  #initally I thought eh square size was necceasry
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
DTYPE = torch.float64
CRITERIA = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.001)

#these three functions helped with taking in the checker board from the images
# and detect the corners for the calibartion
def make_checkerboard_points(rows=ROWS, cols=COLS, size=SQUARE_SIZE):

    objp = np.zeros((rows * cols, 3), dtype=np.float64)
    objp[:, :2] = np.mgrid[0:cols, 0:rows].T.reshape(-1, 2) * size
    return objp

def load_image_from_bytes(entry):

    b = entry['bytes'] if isinstance(entry, dict) and 'bytes' in entry else entry

    if isinstance(b, bytes):
        return np.array(Image.open(io.BytesIO(b)).convert('L'))
    else:
        pass

    if isinstance(entry, str):
         return cv2.imread(entry, cv2.IMREAD_GRAYSCALE)

    return None

#I attempted to do this manually and had no luck so I had to use OpenCV to detect the corners
def detect_corners_opencv(gray_img):
    """Detects and refines checkerboard corners."""
    if gray_img is None:
      return None

    # Corner detection pre-processing
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    enhanced_img = clahe.apply(gray_img)
    blurred_img = cv2.GaussianBlur(enhanced_img, (5, 5), 0)

    # Detection flags needed this for the wide angle images for testing
    flags = cv2.CALIB_CB_ADAPTIVE_THRESH | cv2.CALIB_CB_NORMALIZE_IMAGE | cv2.CALIB_CB_CLUSTERING

    ret, corners = cv2.findChessboardCorners(blurred_img, (COLS, ROWS), None, flags=flags)
    if not ret:
      return None

    corners_subpix = cv2.cornerSubPix(gray_img, corners, (11, 11), (-1, -1), CRITERIA)
    return corners_subpix.reshape(-1, 2)


#These functions are the for the DLT Calibration
#This function will normalize the points
def normalize_points_2d(points):
    pts = np.asarray(points)
    mean = pts.mean(axis=0) #mean
    std = pts.std(axis=0) #standard deviation
    s = np.sqrt(2) / (np.mean(std) + 1e-12) #scaling factor for average distance
    T = np.array([[s, 0, -s*mean[0]], [0, s, -s*mean[1]], [0, 0, 1]]) #standard 3x3 Transformation matrix
    pts_h = np.column_stack([pts, np.ones(len(pts))]).T
    pts_n = (T @ pts_h).T[:, :2]
    return pts_n, T

#this function will estimate the homography matrix between two sets of 2D Points
def estimate_homography_dlt(obj_pts_2d, img_pts):
    """Estimates Homography H using DLT with normalization."""
    obj_n, To = normalize_points_2d(obj_pts_2d)
    img_n, Ti = normalize_points_2d(img_pts)
    A = [] #this is to contruct the matrix A based on the DLT Equation
    for (X, Y), (x, y) in zip(obj_n, img_n):
        A.append([-X, -Y, -1, 0, 0, 0, x*X, x*Y, x])
        A.append([0, 0, 0, -X, -Y, -1, y*X, y*Y, y])
    A = np.array(A, dtype=np.float64)
    _, _, Vt = np.linalg.svd(A)#make changes to the last column
    h = Vt[-1, :].reshape(3, 3)
    H = np.linalg.inv(Ti) @ h @ To
    return H / H[2, 2]

#this estimates the instinsic matix K or Zhangs method
def estimate_intrinsics_from_homographies(H_list):
    def v_ij(H, i, j):
        # vector V (6x1)
        return np.array([H[0, i]*H[0, j], H[0, i]*H[1, j] + H[1, i]*H[0, j], H[1, i]*H[1, j],
                         H[2, i]*H[0, j] + H[0, i]*H[2, j], H[2, i]*H[1, j] + H[1, i]*H[2, j], H[2, i]*H[2, j]])

    V = []
    for H in H_list:
        H /= H[2, 2] #normaliztion
        V.append(v_ij(H, 0, 1))
        V.append(v_ij(H, 0, 0) - v_ij(H, 1, 1))

    _, _, Vt = np.linalg.svd(np.vstack(V))
    b = Vt[-1, :]
    B11, B12, B22, B13, B23, B33 = b #extract the elements of the symmetrix matrix

    # getting the intrinsic parameters (alphta, beta, gamna, u0 and v0)
    v0 = (B12*B13 - B11*B23) / (B11*B22 - B12**2)
    lam = B33 - (B13**2 + v0*(B12*B13 - B11*B23)) / B11
    alpha = np.sqrt(np.abs(lam / B11))
    beta = np.sqrt(np.abs(lam*B11 / (B11*B22 - B12**2)))
    gamma = -B12*alpha**2*beta / lam
    u0 = gamma*v0 / beta - B13*alpha**2 / lam

    return np.array([[alpha, gamma, u0], [0.0, beta, v0], [0.0, 0.0, 1.0]])

#then lastly this computs the rotation and translation matrix
def extrinsic_from_homography(H, K):
    Kinv = np.linalg.inv(K)
    h1, h2, h3 = H[:, 0], H[:, 1], H[:, 2]
    lam = 1 / np.linalg.norm(Kinv @ h1)

    #the parameters for the Rotation matrix
    r1, r2 = lam * (Kinv @ h1), lam * (Kinv @ h2)
    r3 = np.cross(r1, r2)
    t = lam * (Kinv @ h3)


    R = np.column_stack([r1, r2, r3]) #inital rotation matrix
    U, _, Vt = np.linalg.svd(R) #want to find the nearest orthogonalk matrix to R
    R = U @ Vt

    # this is just Rodrigues formula
    theta = np.arccos(np.clip((np.trace(R) - 1) / 2, -1.0, 1.0)) #calculate the angle of rotation
    if abs(theta) < 1e-8:
      return np.zeros(3), t #if no rotation happens

    sin_theta = np.sin(theta)
    if abs(sin_theta) < 1e-8: #samething here , if no rotation happens
      return np.zeros(3), t

    #putting everything together and scaling everything by the calculated theta
    r_vec = theta * np.array([R[2, 1] - R[1, 2], R[0, 2] - R[2, 0], R[1, 0] - R[0, 1]]) / (2*sin_theta)
    return r_vec, t

#this where I used Pytorch which would later be used for optimization
def project_points_torch(obj_pts, rvec, tvec, K, dist_coeffs):
    #just making sure everything is a Tensor
    if not isinstance(obj_pts, torch.Tensor):
        obj_pts = torch.tensor(obj_pts, dtype=rvec.dtype, device=rvec.device)

    #computing the Rodrigues Formula, same as before but with tensors
    theta = torch.norm(rvec)
    #this is if there is no rotation
    if theta.item() < 1e-12:
        R = torch.eye(3, device=DEVICE, dtype=DTYPE)
    else:
        k = rvec / theta
        K_skew = torch.zeros(3, 3, dtype=DTYPE, device=DEVICE)
        K_skew[0, 1], K_skew[0, 2] = -k[2], k[1]
        K_skew[1, 0], K_skew[1, 2] = k[2], -k[0]
        K_skew[2, 0], K_skew[2, 1] = -k[1], k[0]
        I = torch.eye(3, device=DEVICE, dtype=DTYPE)
        #rodrigues formula
        R = I + torch.sin(theta) * K_skew + (1 - torch.cos(theta)) * (K_skew @ K_skew)

    Pcam = (R @ obj_pts.T).T + tvec.unsqueeze(0)
    X = Pcam[:, 0] / (Pcam[:, 2] + 1e-12)
    Y = Pcam[:, 1] / (Pcam[:, 2] + 1e-12)

    # applying distortion (radial and tangential)
    r2 = X**2 + Y**2
    k1, k2, p1, p2, k3 = dist_coeffs[0], dist_coeffs[1], dist_coeffs[2], dist_coeffs[3], dist_coeffs[4]
    radial = 1 + k1*r2 + k2*r2**2 + k3*r2**3
    x_dist = X*radial + 2*p1*X*Y + p2*(r2 + 2*X**2)
    y_dist = Y*radial + p1*(r2 + 2*Y**2) + 2*p2*X*Y

    # applying intrinsics
    fx, skew, cx = K[0, 0], K[0, 1], K[0, 2]
    fy, cy = K[1, 1], K[1, 2]
    u = fx*x_dist + skew*y_dist + cx
    v = fy*y_dist + cy

    return torch.stack([u, v], dim=1)

#this function calls the one at the top of the script to helpfind the checkboard corners
def build_image_points_list(df):
    img_pts_list, objp = [], make_checkerboard_points()

    for idx, row in df.iterrows():
        gray = load_image_from_bytes(row.get('image', row.get('bytes')))
        corners = detect_corners_opencv(gray)

        if corners is not None and corners.shape[0] == ROWS * COLS:
            img_pts_list.append(corners)
            print(f"[{idx}] Detected {corners.shape[0]} corners successfully.")

    return img_pts_list, objp[:, :2], objp

#this is the main part of the code where is uses DLT/Zhangs method and
#then uses PyTorch gradient optimization
def calibrate_pipeline(df):
    #DLT Calibration
    img_pts_list, obj_pts_2d, obj_pts_3d = build_image_points_list(df)

    print(f"\n--- Starting Calibration with {len(img_pts_list)} valid views ---")

    #DLT Initilization. Calculating the Homography, Intrinsic Matrix and Extrinisic Parameters
    H_list = [estimate_homography_dlt(obj_pts_2d, pts) for pts in img_pts_list]
    K_init = estimate_intrinsics_from_homographies(H_list)
    extrinsics_init = [extrinsic_from_homography(H, K_init) for H in H_list]
    print("Initial K (DLT):\n", K_init)

    # this is the optimization using PyTorch
    # make sure everything is a tensor first
    obj_pts_t = torch.tensor(obj_pts_3d, dtype=DTYPE, device=DEVICE)
    img_pts_all = [torch.tensor(pts, dtype=DTYPE, device=DEVICE) for pts in img_pts_list]

    fx = torch.tensor(K_init[0, 0], dtype=DTYPE, device=DEVICE, requires_grad=True)
    fy = torch.tensor(K_init[1, 1], dtype=DTYPE, device=DEVICE, requires_grad=True)
    skew = torch.tensor(K_init[0, 1], dtype=DTYPE, device=DEVICE, requires_grad=True)
    cx = torch.tensor(K_init[0, 2], dtype=DTYPE, device=DEVICE, requires_grad=True)
    cy = torch.tensor(K_init[1, 2], dtype=DTYPE, device=DEVICE, requires_grad=True)
    dist = torch.zeros(5, dtype=DTYPE, device=DEVICE, requires_grad=True)
    rvecs = [torch.tensor(r, dtype=DTYPE, device=DEVICE, requires_grad=True) for r, _ in extrinsics_init]
    tvecs = [torch.tensor(t, dtype=DTYPE, device=DEVICE, requires_grad=True) for _, t in extrinsics_init]

    #combining eveything that needs optimization
    params = [fx, fy, skew, cx, cy, dist] + rvecs + tvecs
    optimizer = torch.optim.Adam(params, lr=1e-3)

    # This loop  is the optimization step, I used adam (gradient descent)
    print("\nStarting Non-Linear Refinement (Adam)")
    for epoch in range(3000):
        optimizer.zero_grad() #to clear previous gradients
        K_mat = torch.stack([
            torch.stack([fx, skew, cx]),
            torch.stack([torch.tensor(0.0, dtype=DTYPE, device=DEVICE), fy, cy]),
            torch.stack([torch.tensor(0.0, dtype=DTYPE, device=DEVICE), torch.tensor(0.0, dtype=DTYPE, device=DEVICE), torch.tensor(1.0, dtype=DTYPE, device=DEVICE)])
        ])

        loss = torch.tensor(0.0, dtype=DTYPE, device=DEVICE)
        #calculate the reporjection error like the opencv version
        for rvec, tvec, img_pts_t in zip(rvecs, tvecs, img_pts_all):
            proj = project_points_torch(obj_pts_t, rvec, tvec, K_mat, dist)
            loss = loss + torch.mean((proj - img_pts_t)**2) #loss function Mean Squared Error

        loss.backward()
        optimizer.step()

        #track progress as the code runs
        if (epoch + 1) % 200 == 0:
            rmse = torch.sqrt(loss / len(img_pts_all)).item()
            print(f"Epoch {epoch+1}: Current Loss = {loss:.4f}, Avg. Reprojection RMSE = {rmse:.4f} pixels")

    # Final Results
    with torch.no_grad():
        K_opt = K_mat.cpu().numpy()
        dist_opt = dist.cpu().numpy()
        final_errors = [torch.sqrt(torch.mean((project_points_torch(obj_pts_t, rvec, tvec, K_mat, dist) - img_pts_t)**2)).item()
                        for rvec, tvec, img_pts_t in zip(rvecs, tvecs, img_pts_all)]

    print("\n--- Final Optimized Parameters ---")
    print("Optimized K:\n", K_opt)
    print("Distortion coefficients (k1, k2, p1, p2, k3):\n", dist_opt)
    print(f"\nFinal Average RMSE: {np.mean(final_errors):.4f} pixels")
    print("Calibration successful!")

    return None

parquet_path = '/content/my_calibration_images.parquet'
df = pd.read_parquet(parquet_path)
print(f"Loaded {len(df)} images")
calibrate_pipeline(df)


Loaded 14 images
[4] Detected 48 corners successfully.
[12] Detected 48 corners successfully.

--- Starting Calibration with 2 valid views ---
Initial K (DLT):
 [[855.79818522  96.90466702 210.30484788]
 [  0.         863.00498405 112.08202518]
 [  0.           0.           1.        ]]

Starting Non-Linear Refinement (Adam)
Epoch 200: Current Loss = 1.2322, Avg. Reprojection RMSE = 0.7849 pixels
Epoch 400: Current Loss = 1.1731, Avg. Reprojection RMSE = 0.7659 pixels
Epoch 600: Current Loss = 1.1350, Avg. Reprojection RMSE = 0.7533 pixels
Epoch 800: Current Loss = 1.1112, Avg. Reprojection RMSE = 0.7454 pixels
Epoch 1000: Current Loss = 1.0958, Avg. Reprojection RMSE = 0.7402 pixels
Epoch 1200: Current Loss = 1.0853, Avg. Reprojection RMSE = 0.7367 pixels
Epoch 1400: Current Loss = 1.0774, Avg. Reprojection RMSE = 0.7340 pixels
Epoch 1600: Current Loss = 1.0707, Avg. Reprojection RMSE = 0.7317 pixels
Epoch 1800: Current Loss = 1.0661, Avg. Reprojection RMSE = 0.7301 pixels
Epoch 2000: