# Calibrate with particles 

The idea is to run PyPTV as usual, and check the box "Use only 4 frames". The result will be in the /res folder with only quadruplets as 3D and the respective indices of 2D targets per image

If we read this dataset into the proper format, we can now reproject every 3D point in rt_is back into the image and then optimize calibration with disparity between the position of the target as detected and the reprojected center. 

In [35]:
# from Yosef Meller's PBI

import shutil
import numpy as np
import numpy.random as rnd
from optv.calibration import Calibration
from optv.imgcoord import image_coordinates
from optv.transforms import convert_arr_metric_to_pixel
# from optv.segmentation import target_recognition
from optv.imgcoord import image_coordinates
from optv.transforms import convert_arr_metric_to_pixel
# from optv.orientation import match_detection_to_ref
# from optv.orientation import external_calibration, full_calibration
from optv.calibration import Calibration
from optv.tracking_framebuf import TargetArray

from pyptv.ptv import read_targets
from pyptv import parameters as par

In [36]:
import os
from pathlib import Path

present_folder = Path.cwd()

working_folder = Path("/home/user/Documents/repos/test_cavity")
par_path = working_folder / "parameters"
working_folder.exists(), par_path.exists()


(True, True)

In [37]:
def load_parameters(n_cams):
    
    from optv.parameters import (
    ControlParams,
    VolumeParams,
    TrackingParams,
    SequenceParams,
    TargetParams,
    MultimediaParams,
    )
    
    # Control parameters
    cpar = ControlParams(n_cams)
    cpar.read_control_par(b"ptv.par")

    # Sequence parameters
    spar = SequenceParams(num_cams=n_cams)
    spar.read_sequence_par(b"sequence.par", n_cams)

    # Volume parameters
    vpar = VolumeParams()
    vpar.read_volume_par(b"criteria.par")

    # Tracking parameters
    track_par = TrackingParams()
    track_par.read_track_par(b"track.par")

    # Target parameters
    tpar = TargetParams(n_cams)
    tpar.read(b"targ_rec.par")

    # Examine parameters, multiplane (single plane vs combined calibration)
    epar = par.ExamineParams(path = '../')
    epar.read()

    return cpar, spar, vpar, track_par, tpar, epar

In [38]:

# Read parameters

n_cams = 4
os.chdir(par_path)
cpar, spar, vpar, track_par, tpar, epar  = load_parameters(n_cams)


In [39]:

os.chdir(working_folder)
# Calibration parameters
cals = []
for i_cam in range(n_cams):
    cal = Calibration()
    tmp = cpar.get_cal_img_base_name(i_cam)
    cal.from_file(tmp + b".ori", tmp + b".addpar")
    cals.append(cal)



def backup_ori_files(n_cams):
    """backup ORI/ADDPAR files to the backup_cal directory"""
    calOriParams = par.CalOriParams(n_cams)
    calOriParams.read()
    for f in calOriParams.img_ori:
        print(f"Backing up {f}")
        shutil.copyfile(f, f + ".bck")
        g = f.replace("ori", "addpar")
        shutil.copyfile(g, g + ".bck")


# Backup is the first thing to do 
backup_ori_files(n_cams)


Backing up cal/cam1.tif.ori
Backing up cal/cam2.tif.ori
Backing up cal/cam3.tif.ori
Backing up cal/cam4.tif.ori


In [40]:
def get_pos(inters, R, angs):
    # Transpose of http://planning.cs.uiuc.edu/node102.html
    # Also consider the angles are reversed when moving from camera frame to
    # global frame.
    s = np.sin(angs)
    c = np.cos(angs)
    pos = inters + R*np.r_[ s[1], -c[1]*s[0], c[1]*c[0] ]
    return pos

def get_polar_rep(pos, angs):
    """
    Returns the point of intersection with zero Z plane, and distance from it.
    """
    s = np.sin(angs)
    c = np.cos(angs)
    zdir = -np.r_[ s[1], -c[1]*s[0], c[1]*c[0] ]
    
    c = -pos[2]/zdir[2]
    inters = pos + c*zdir
    R = np.linalg.norm(inters - pos)
    
    return inters[:2], R
    
def gen_calib(inters, R, angs, glass_vec, prim_point, radial_dist, decent):
    pos = get_pos(inters, R, angs)
    return Calibration(pos, angs, prim_point, radial_dist, decent, 
        np.r_[1, 0], glass_vec)

def fitness(solution, calib_targs, calib_detect, glass_vec, cpar):
    """
    Checks the fitness of an evolutionary solution of calibration values to 
    target points. Fitness is the sum of squares of the distance from each 
    guessed point to the closest neighbor.
    
    Arguments:
    solution - array, concatenated: position of intersection with Z=0 plane; 3 
        angles of exterior calibration; primary point (xh,yh,cc); 3 radial
        distortion parameters; 2 decentering parameters.
    calib_targs - a (p,3) array of p known points on the calibration target.
    calib_detect - a (d,2) array of d detected points in the calibration 
        target.
    cpar - a ControlParams object with image data.
    """
    # Breakdown of of agregate solution vector:
    inters = np.zeros(3)
    inters[:2] = solution[:2]
    R = solution[2]
    angs = solution[3:6] 
    prim_point = solution[6:9]
    rad_dist = solution[9:12]
    decent = solution[12:14]
        
    # Compare known points' projections to detections:
    cal = gen_calib(inters, R, angs, glass_vec, prim_point, rad_dist, decent)
    known_proj = image_coordinates(calib_targs, cal, 
        cpar.get_multimedia_params())
    known_2d = convert_arr_metric_to_pixel(known_proj, cpar)
    dists = np.linalg.norm(
        known_2d[None,:,:] - calib_detect[:,None,:], axis=2).min(axis=0)
    
    return np.sum(dists**2)


In [41]:
def g(f):
    """ Returns a line without white spaces """
    return f.readline().strip()

def read(filepath, n_img = 4):
    with open(filepath, "r") as f:

        fixp_name = Path(g(f))
        fixp_name.exists()

        img_cal_name = []
        img_ori = []
        for i in range(n_img):
            img_cal_name.append(g(f))
            img_ori.append(g(f))

        tiff_flag = int(g(f)) != 0  # <-- overwrites the above
        pair_flag = int(g(f)) != 0
        chfield = int(g(f))


    # test if files are present, issue warnings
    for i in range(n_img):
        Path(img_cal_name[i]).exists()
        Path(img_ori[i]).exists()

    return fixp_name, img_cal_name, img_ori, tiff_flag, pair_flag, chfield


In [43]:
# def read_rt_is_files_with_targets(path, n_cams):
"""Reads all rt_is.* files from the specified path and returns a list of 3D points and their corresponding targets."""
path = working_folder
rt_is_files = sorted(list((path / "res").rglob("rt_is.*")))

n_frames = len(rt_is_files)

# Initialize a list of lists to hold TargetArray objects
targets = [[TargetArray() for _ in range(n_cams)] for _ in range(n_frames)]
points = [[] for _ in range(n_frames)]
ids = [[] for _ in range(n_frames)]


for i, file in enumerate(rt_is_files):
    frame_num = int(file.name.split(".")[1])
    # print(f" frame_num {frame_num}")
    with open(file, "r") as f:
        num_points = int(f.readline().strip())
        for _ in range(num_points):
            data = f.readline().strip().split()
            points[i].append([float(data[1]), float(data[2]), float(data[3])])
            ids[i].append([int(data[4]), int(data[5]), int(data[6]), int(data[7])])

    for i_cam in range(n_cams):
        target_file = path / "img" / f"cam{i_cam+1:d}.{frame_num:d}"
        targets[i][i_cam] = read_targets(str(target_file))
        
        # return np.array(points), targets


 filename: /home/user/Documents/repos/test_cavity/img/cam1.10001_targets
 filename: /home/user/Documents/repos/test_cavity/img/cam2.10001_targets
 filename: /home/user/Documents/repos/test_cavity/img/cam3.10001_targets
 filename: /home/user/Documents/repos/test_cavity/img/cam4.10001_targets
 filename: /home/user/Documents/repos/test_cavity/img/cam1.10002_targets
 filename: /home/user/Documents/repos/test_cavity/img/cam2.10002_targets
 filename: /home/user/Documents/repos/test_cavity/img/cam3.10002_targets
 filename: /home/user/Documents/repos/test_cavity/img/cam4.10002_targets
 filename: /home/user/Documents/repos/test_cavity/img/cam1.10003_targets
 filename: /home/user/Documents/repos/test_cavity/img/cam2.10003_targets
 filename: /home/user/Documents/repos/test_cavity/img/cam3.10003_targets
 filename: /home/user/Documents/repos/test_cavity/img/cam4.10003_targets
 filename: /home/user/Documents/repos/test_cavity/img/cam1.10004_targets
 filename: /home/user/Documents/repos/test_cavity/i

In [None]:
from optv.transforms import convert_arr_metric_to_pixel
import matplotlib.pyplot as plt



def _residuals(cal):
    err = 0
    for i_frame in range(n_frames):
        # fig, ax = plt.subplots()
        for i_point in range(len(points[i_frame])):
            if ids[i_frame][i_point][i_cam] != -1:
                tmp = convert_arr_metric_to_pixel(
                            image_coordinates(np.atleast_2d(points[i_frame][i_point]), cal, cpar.get_multimedia_params()),
                            cpar
                        )
                # x, y  = targets[i_frame][i_cam][ids[i_frame][i_point][i_cam]].pos()


                # ax.scatter(x, y, marker='+', color = 'blue')
                # ax.scatter(tmp[0,0], tmp[0,1], marker='x', color = 'red')
                

                err += np.sum((tmp - targets[i_frame][i_cam][ids[i_frame][i_point][i_cam]].pos())**2)

    return err
        

for i_cam in range(n_cams):
    cal = cals[i_cam]
    print(f"Camera {i_cam+1} residuals: {_residuals(cal)}")
    # plt.show()

def _residuals_combined(x, cal):
    """Combined residuals  """

    cal.set_radial_distortion(x[:3])
    cal.set_decentering(x[3:5])
    cal.set_affine_trans(x[5:])

    return _residuals(cal)



Camera 1 residuals: 22559.155857370057
Camera 2 residuals: 33191.27163927646
Camera 3 residuals: 191706.998238307
Camera 4 residuals: 81596.81015315512


In [None]:
# recognized names for the flags:
NAMES = ["cc", "xh", "yh", "k1", "k2", "k3", "p1", "p2", "scale", "shear"]

op = par.OrientParams()
op.read()

flags = [name for name in NAMES if getattr(op, name) == 1]

for i_cam in range(n_cams):  # iterate over all cameras

    
    if any(flag in flags for flag in ['k1', 'k2', 'k3']):
        sol = minimize(_residuals_k,
                    cals[i_cam].get_radial_distortion(), 
                    args=(cals[i_cam], 
                            cal_points["pos"], 
                            targs,
                            cpar
                            ), 
                            method='Nelder-Mead', 
                            tol=1e-11,
                            options={'disp':True},
                            )
        radial = sol.x 
        cals[i_cam].set_radial_distortion(radial)
    else:
        radial = cals[i_cam].get_radial_distortion()
    
    if any(flag in flags for flag in ['p1', 'p2']):
        # now decentering
        sol = minimize(_residuals_p,
                    cals[i_cam].get_decentering(), 
                    args=(cals[i_cam], 
                            cal_points["pos"], 
                            targs,
                            cpar
                            ), 
                            method='Nelder-Mead', 
                            tol=1e-11,
                            options={'disp':True},
                            )
        decentering = sol.x 
        cals[i_cam].set_decentering(decentering)
    else:
        decentering = cals[i_cam].get_decentering()
    
    if any(flag in flags for flag in ['scale', 'shear']):
        # now affine
        sol = minimize(_residuals_s,
                    cals[i_cam].get_affine(), 
                    args=(cals[i_cam], 
                            cal_points["pos"], 
                            targs,
                            cpar
                            ), 
                            method='Nelder-Mead', 
                            tol=1e-11,
                            options={'disp':True},
                            )
        affine = sol.x 
        cals[i_cam].set_affine_trans(affine)

    else:
        affine = cals[i_cam].get_affine()
    


    # Now project and estimate full residuals
    _project_cal_points(i_cam)

    residuals = _residuals_combined(
                    np.r_[radial, decentering, affine],
                    cals[i_cam], 
                    cal_points["pos"], 
                    targs,
                    cpar
                    )

    residuals /= 100

    targ_ix = [t.pnr() for t in targs if t.pnr() != -999]
    # targ_ix = np.arange(len(all_detected))
    
    # save the results from cals[i_cam]
    _write_ori(i_cam, addpar_flag=True)


# end of the loop per camera
status_text = "Orientation from particles finished."


def _residuals_k(x, cal, XYZ, xy, cpar):
    """Residuals due to radial distortion

    Args:
        x (array-like): Array of parameters.
        cal (Calibration): Calibration object.
        XYZ (array-like): 3D coordinates.
        xy (array-like): 2D image coordinates.
        cpar (CPar): Camera parameters.


args=(cals[i_cam], 
                                    cal_points["pos"], 
                                    targs,
                                    cpar
                                    )


    Returns:
        residuals: Distortion in pixels
    """

    cal.set_radial_distortion(x)
    targets = convert_arr_metric_to_pixel(
        image_coordinates(XYZ, cal, cpar.get_multimedia_params()),
        cpar
    )
    xyt = np.array([t.pos() if t.pnr() != -999 else [np.nan, np.nan] for t in xy])
    residuals = np.nan_to_num(xyt - targets)
    # residuals = xy[:,1:] - targets
    return np.sum(residuals**2)

def _residuals_p(x, cal, XYZ, xy, cpar):
    """Residuals due to decentering """
    cal.set_decentering(x)
    targets = convert_arr_metric_to_pixel(
        image_coordinates(XYZ, cal, cpar.get_multimedia_params()),
        cpar
    )
    xyt = np.array([t.pos() if t.pnr() != -999 else [np.nan, np.nan] for t in xy])
    residuals = np.nan_to_num(xyt - targets)
    return np.sum(residuals**2)

def _residuals_s(x, cal, XYZ, xy, cpar):
    """Residuals due to decentering """
    cal.set_affine_trans(x)
    targets = convert_arr_metric_to_pixel(
        image_coordinates(XYZ, cal, cpar.get_multimedia_params()),
        cpar
    )
    xyt = np.array([t.pos() if t.pnr() != -999 else [np.nan, np.nan] for t in xy])
    residuals = np.nan_to_num(xyt - targets)
    return np.sum(residuals**2)    

def _residuals_combined(x, cal, XYZ, xy, cpar):
    """Combined residuals  """

    cal.set_radial_distortion(x[:3])
    cal.set_decentering(x[3:5])
    cal.set_affine_trans(x[5:])

    targets = convert_arr_metric_to_pixel(
        image_coordinates(XYZ, cal, cpar.get_multimedia_params()),
        cpar
    )
    xyt = np.array([t.pos() if t.pnr() != -999 else [np.nan, np.nan] for t in xy])
    residuals = np.nan_to_num(xyt - targets)
    return residuals           

def _write_ori(i_cam, addpar_flag=False):
    """Writes ORI and ADDPAR files for a single calibration result
    of i_cam
    addpar_flag is a boolean that allows to keep previous addpar
    otherwise external_calibration overwrites zeros
    """
    # protect ORI files from NaNs
    # Check for NaNs in cals[i_cam]
    tmp  = np.array([
        cals[i_cam].get_pos(),
        cals[i_cam].get_angles(),
        cals[i_cam].get_affine(),
        cals[i_cam].get_decentering(),
        cals[i_cam].get_radial_distortion(),
    ],dtype=object)

    if np.any(np.isnan(np.hstack(tmp))):
        raise ValueError(f"Calibration parameters for camera {i_cam} contain NaNs. Aborting write operation.")

    ori = calParams.img_ori[i_cam]
    if addpar_flag:
        addpar = ori.replace("ori", "addpar")
    else:
        addpar = "tmp.addpar"

    print("Saving:", ori, addpar)
    cals[i_cam].write(ori.encode(), addpar.encode())
    if epar.Examine_Flag and not epar.Combine_Flag:
        save_point_sets(i_cam)