# Calibrate cameras, create initial orthoimage

In [None]:
import os
from glob import glob
import numpy as np
from tqdm import tqdm
import pandas as pd
import cv2
import geopandas as gpd
from shapely.geometry import Point
import rioxarray as rxr
import xarray as xr
import matplotlib.pyplot as plt
from ast import literal_eval
import rasterio
import warnings
warnings.filterwarnings('ignore')

# Locate image files
data_folder = '/Users/rdcrlrka/Research/Soo_locks'
image_folder = os.path.join(data_folder, 'inputs', 'original_images')
image_list = sorted(glob(os.path.join(image_folder, '*.tiff')))
print(f"{len(image_list)} images located")

# Grab other input files
gcp_folder = os.path.join(data_folder, 'inputs', 'gcp')
refdem_file = os.path.join(data_folder, 'inputs', 'RLS_DSM_filled_cropped.tif')

# Define output folders
out_folder = os.path.join(data_folder, 'camera_calibration')
os.makedirs(out_folder, exist_ok=True)

## Merge GCP

In [None]:
gcp_merged_file = os.path.join(gcp_folder, '..', 'GCP_merged.gpkg')
if not os.path.exists(gcp_merged_file):
    gdf_list = []
    gcp_files = sorted(glob(os.path.join(gcp_folder, '*.csv')))
    for f in gcp_files:
        df = pd.read_csv(f, header=0)
        df['geometry'] = [Point(x,y) for (x,y) in df[['X', 'Y']].values]
        gdf = gpd.GeoDataFrame(df, geometry='geometry', crs="EPSG:32619")
        
        gdf['X'] = [x.coords.xy[0][0] for x in gdf['geometry']]
        gdf['Y'] = [x.coords.xy[1][0] for x in gdf['geometry']]

        # add channel column
        gdf['channel'] = 'ch' + os.path.basename(f).split('ch')[1][0:2]

        gdf_list += [gdf]
    df_merged = pd.concat(gdf_list).reset_index(drop=True)
    # rename image pixel columns
    df_merged = df_merged.rename(columns={
        'img_px': 'col_sample',
        'img_py': 'row_sample'
    })
    df_merged = df_merged[['channel', 'X', 'Y', 'Z', 'col_sample', 'row_sample', 'geometry']]

    gcp_merged = gpd.GeoDataFrame(df_merged, geometry='geometry', crs="EPSG:32619")
    gcp_merged.plot(kind='scatter', x='X', y='Y')
    plt.show()
    gcp_merged.to_file(gcp_merged_file, index=False)
    print(f'Merged GCP saved to:\n{gcp_merged_file}')

else:
    print('Merged GCP already exists in file, skipping.')
    

## Calibrate cameras using GCP

In [None]:
def calibrate_cameras(
    image_files: list,
    gcp_file: str,
    plot_results: bool = True,
):
    # Load GCPs
    gcp = gpd.read_file(gcp_file)

    # initialize strong vs. all GCP lists. 
    # Strong images used for intrinsics calibration. Then, all image extrinsics calibrated.
    strong_obj_pts = []
    strong_img_pts = []
    strong_files = []
    strong_channels = []

    all_obj_list = []
    all_img_list = []
    all_gcp_frames = []
    image_size = None

    # build image (pixel) and object (real-world) lists
    for image_file in image_files:
        ch = 'ch' + os.path.basename(image_file).split('ch')[1][0:2]
        gcp_image = gcp.loc[gcp['channel'] == ch].copy()

        obj = gcp_image[['X', 'Y', 'Z']].values.astype(np.float64)
        img = gcp_image[['col_sample', 'row_sample']].values.astype(np.float64)

        # store for extrinsic stage regardless of size
        all_obj_list.append(obj.reshape(1, -1, 3))
        all_img_list.append(img.reshape(1, -1, 2))
        all_gcp_frames.append(gcp_image)

        if image_size is None:
            im = cv2.imread(image_file, cv2.IMREAD_GRAYSCALE)
            image_size = (im.shape[1], im.shape[0])

        strong_obj_pts.append(obj.reshape(1, -1, 3))
        strong_img_pts.append(img.reshape(1, -1, 2))
        strong_files.append(image_file)
        strong_channels.append(ch)

    if len(strong_obj_pts) == 0:
        raise ValueError("No images with enough GCPs for intrinsic calibration. Lower min_points_intrinsic or add GCPs.")

    # init intrinsics
    fx = np.float64(1000)
    fy = np.float64(1200)
    K_init = np.array([
        [fx, 0, np.float64(image_size[0]) / 2],
        [0, fy, np.float64(image_size[1]) / 2],
        [0, 0, 1]
    ])
    D_init = np.zeros((4, 1), dtype=np.float64)
    calibration_flags = cv2.fisheye.CALIB_FIX_SKEW + cv2.fisheye.CALIB_RECOMPUTE_EXTRINSIC
    termination_criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 100, 1e-9)

    print(f"Using {len(strong_obj_pts)} images for intrinsic calibration.")

    # Calibrate camera fisheye intrinsics
    rms, K, D, rvecs, tvecs = cv2.fisheye.calibrate(
        strong_obj_pts,
        strong_img_pts,
        image_size,
        K_init,
        D_init,
        None,
        None,
        calibration_flags,
        termination_criteria
    )

    print(f"Intrinsic calibration done. RMS: {rms:.6f}")

    # Estimate K with full field of view (default results in cropped image)
    K_full = cv2.fisheye.estimateNewCameraMatrixForUndistortRectify(K, D, image_size, np.eye(3), balance=1.0)

    # Calibrate camera extrinsics for all images using solvePnP
    results = []
    for i, image_file in enumerate(tqdm(image_files, desc="Estimating extrinsics")):
        ch = 'ch' + os.path.basename(image_file).split('ch')[1][0:2]

        rvec = rvecs[i]
        tvec = tvecs[i]
        # gcp_image = all_gcp_frames[i].copy()
        # obj = all_obj_list[i].reshape(-1, 3)
        # img = all_img_list[i].reshape(-1, 2)

        # if obj.shape[0] < min_points_extrinsic:
        #     print(f"Skipping extrinsic solve for {os.path.basename(image_file)}: only {obj.shape[0]} points (< {min_points_extrinsic})")
        #     continue

        # # undistort to normalized coords
        # pts = img.astype(np.float32).reshape(1, -1, 2)
        # pts_norm = cv2.fisheye.undistortPoints(pts, K, D)  # shape: (1,N,2)

        # # convert normalized coords -> pixel coords using K_full
        # pts_norm = pts_norm.reshape(-1, 2)
        # pts_pix = cv2.convertPointsToHomogeneous(pts_norm).reshape(-1, 3)  # (x, y, 1)
        # pts_pix = (K_full @ pts_pix.T).T[:, :2]  # (N, 2)

        # retval, rvec, tvec = cv2.solvePnP(
        #     obj.astype(np.float64),
        #     pts_pix.astype(np.float64),
        #     K_full,
        #     None,
        #     flags=cv2.SOLVEPNP_ITERATIVE
        # )
        # if not retval:
        #     print(f"solvePnP failed for {image_file}")
        #     continue

        # store results
        r = {
            "channel": int(ch[2:]),
            "K": K.tolist(),
            "dist": D.tolist(),
            "K_full": K_full.tolist(),
            "rvec": rvec.reshape(-1).tolist(),
            "tvec": tvec.reshape(-1).tolist(),
        }
        results.append(r)

        # plot results
        if plot_results:
            image = cv2.imread(image_file, cv2.IMREAD_GRAYSCALE)
            # create undistort map and undistort
            map1, map2 = cv2.fisheye.initUndistortRectifyMap(K, D, np.eye(3), K_full, image_size, cv2.CV_16SC2)
            image_undistorted = cv2.remap(image, map1, map2, interpolation=cv2.INTER_LINEAR)
            mask = np.ones(image.shape, dtype=np.uint8) * 255
            mask_undistorted = cv2.remap(mask, map1, map2, interpolation=cv2.INTER_NEAREST)
            valid_mask = mask_undistorted > 0
            image_undistorted_nodata = image_undistorted.astype(np.float32)
            image_undistorted_nodata[~valid_mask] = np.nan

            # undistort points using fisheye.undistortPoints
            image_pts = img.astype(np.float32).reshape(1, -1, 2)
            undistorted_pts = cv2.fisheye.undistortPoints(image_pts, K, D, P=K_full).reshape(-1, 2)
            gcp_image['col_sample_undistorted'] = undistorted_pts[:, 0]
            gcp_image['row_sample_undistorted'] = undistorted_pts[:, 1]

            fig, ax = plt.subplots(1, 2, figsize=(10, 5))
            ax[0].imshow(image, cmap='gray')
            ax[0].plot(gcp_image['col_sample'], gcp_image['row_sample'], 'xr')
            ax[1].imshow(image_undistorted, cmap='gray')
            ax[1].plot(gcp_image['col_sample_undistorted'], gcp_image['row_sample_undistorted'], 'xr')
            fig.suptitle(os.path.basename(image_file))
            for a in ax:
                a.set_xticks([])
                a.set_yticks([])
            fig.tight_layout()
            plt.show()

    results_df = pd.DataFrame(results)
    return results_df


### CALIBRATE CAMERAS IN GROUPS ###
# Define groups
# group_images = [
#     image_list[0:8], 
#     image_list[8:15],
#     image_list[15:18],
#     image_list[18:22],
#     image_list[22:]
# ]
group_images = [[image] for image in image_list]

for i, g_images in enumerate(group_images):

    print(f'\nGROUP {i+1}:\n--------')

    try:
        # Calibrate cameras
        results_df = calibrate_cameras(
            image_files = g_images,
            gcp_file = gcp_merged_file, 
            plot_results = False
            )
        
        # Save calibrated camera specs
        results_file = os.path.join(out_folder, f"group{i+1}_calbration_params.csv")
        results_df.to_csv(results_file, index=False, header=True)
        print('Calibrated cameras saved to file')
    except:
        print("FAILED")
        continue

    i+=1

## Combine calibration parameters into one file

In [None]:
calib_files = sorted(glob(os.path.join(out_folder, '*.csv')))
calib_list = []
for calib_file in calib_files:
    calib_list += [pd.read_csv(calib_file)]
calib = pd.concat(calib_list).reset_index(drop=True)
calib
# Evaluate values
for k in ['K', 'dist', 'K_full', 'rvec', 'tvec']:
    calib[k] = calib[k].apply(literal_eval)
calib = calib.sort_values(by='channel')
out_file = os.path.join(out_folder, "original_calibrated_cameras.csv")
calib.to_csv(out_file, index=False)
print(f'Merged calibration parameters saved to:\n{out_file}')

In [None]:
# Plot camera centers

def camera_center_from_rt(rvec, tvec):
    rvec = np.asarray(rvec, dtype=float).reshape(3)
    tvec = np.asarray(tvec, dtype=float).reshape(3)

    R, _ = cv2.Rodrigues(rvec)

    C = -R.T @ tvec
    return C

# Apply to your dataframe
calib['cam_center'] = calib.apply(lambda row: camera_center_from_rt(row['rvec'], row['tvec']), axis=1)

# Extract coordinates into separate columns
calib[['cx','cy','cz']] = pd.DataFrame(calib['cam_center'].tolist(), index=calib.index)

calib.plot(x='cx', y='cy', kind='scatter')
plt.show()

In [None]:
def rt_from_camera_center(rvec, C):
    rvec = np.asarray(rvec, dtype=float).reshape(3)
    C = np.asarray(C, dtype=float).reshape(3)

    # rotation matrix
    R, _ = cv2.Rodrigues(rvec)

    # inverse of camera_center_from_rt:
    # C = -R.T @ t  â†’  t = -R @ C
    tvec = -R @ C

    return tvec

C1 = np.array([266504.559392, 4.691321e+06, 181.850910])
C2 = np.array([266499.233621, 4.691320e+06, 181.823417])
rvec = np.array([-3.0946134401757095, -0.21939730731184545, -0.12865256676878867])

tvec1 = rt_from_camera_center(rvec, C1)
tvec2 = rt_from_camera_center(rvec, C2)

print(tvec1)
print(tvec2)

In [None]:
23	[[2378.638636917316, 0.0, 2278.9747663716175], [0.0, 2378.929659808212, 1273.6454222674124], [0.0, 0.0, 1.0]]	[[-0.09827590766055995], [0.09173511868102373], [-0.06486618695342697], [0.007577770497528054]]	[[1382.2348473658133, 0.0, 2295.164359756932], [0.0, 1382.4039617386038, 1272.1228937442452], [0.0, 0.0, 1.0]]	[3.0971599447015805, 0.23554003658122544, 0.10409368288105521]	[-930559.17975548, 4604233.84936232, 120862.36654376]
24	[[2378.638636917316, 0.0, 2278.9747663716175], [0.0, 2378.929659808212, 1273.6454222674124], [0.0, 0.0, 1.0]]	[[-0.09827590766055995], [0.09173511868102373], [-0.06486618695342697], [0.007577770497528054]]	[[1382.2348473658133, 0.0, 2295.164359756932], [0.0, 1382.4039617386038, 1272.1228937442452], [0.0, 0.0, 1.0]]	[3.0971599447015805, 0.23554003658122544, 0.10409368288105521]	[-930553.78091779, 4604233.60295167, 120862.76205105]