In [1]:
from typing import List
import numpy as np
import math
from openptv_python.tracking_frame_buf import Frame
from openptv_python.parameters import VolumePar, ControlPar
from openptv_python.calibration import Calibration
from openptv_python.correspondences import Correspond #, match_pairs
from openptv_python.epi import Candidate, Coord2d, epi_mm
from openptv_python.find_candidate import find_start_point
#from openptv_python.correspondences import  match_pairs
# from openptv_python.tracking_frame_buf import Target
MAXCAND = 1000
# import pytest
# from openptv_python.find_candidate import find_candidate

from openptv_python.calibration import Calibration, read_calibration
from openptv_python.constants import MAXCAND
from openptv_python.correspondences import (
    consistent_pair_matching,
    match_pairs,
    # py_correspondences,
    safely_allocate_adjacency_lists,
    safely_allocate_target_usage_marks,
)
from openptv_python.epi import Coord2d
from openptv_python.imgcoord import img_coord
from openptv_python.parameters import ControlPar, read_control_par, read_volume_par
from openptv_python.tracking_frame_buf import (
    Frame,
    Target,
    # TargetArray,
    # match_coords,
    # matched_coords_as_arrays,
    n_tupel,
    # read_targets,
)
from openptv_python.trafo import correct_brown_affine, dist_to_flat, metric_to_pixel, pixel_to_metric

In [2]:
def read_all_calibration(num_cams: int = 4) -> list[Calibration]:
    """Read all calibration files."""
    ori_tmpl = "./testing_fodder/cal/sym_cam%d.tif.ori"
    added_name = "./testing_fodder/cal/cam1.tif.addpar"

    calib = []

    for cam in range(num_cams):
        ori_name = ori_tmpl % (cam + 1)
        calib.append(read_calibration(ori_name, added_name))

    return calib

In [3]:
def correct_frame(
    frm: Frame, calib: list[Calibration], cpar: ControlPar, tol: float
) -> list[list[Coord2d]]:
    """
    Perform the transition from pixel to metric to flat coordinates.

    and x-sorting as required by the correspondence code.

    Arguments:
    ---------
    frm - target information for all cameras.
    cpar - parameters of image size, pixel size etc.
    tol - tolerance parameter for iterative flattening phase, see
        trafo.h:correct_brown_affine_exact().
    """
    corrected = [[Coord2d()] * frm.num_targets[i_cam] for i_cam in range(cpar.num_cams)]

    for cam in range(cpar.num_cams):
        row = corrected[cam]

        for part in range(frm.num_targets[cam]):
            x, y = pixel_to_metric(
                frm.targets[cam][part].x, frm.targets[cam][part].y, cpar
            )
            x, y = dist_to_flat(x, y, calib[cam], tol)

            row[part] = Coord2d(frm.targets[cam][part].pnr, x, y)

            # print(
            #     f"frm.targets:{cam},{part},{frm.targets[cam][part].x},{frm.targets[cam][part].y},{frm.targets[cam][part].pnr}"
            # )
            # print(
            #     f"corrected: {row[part].x} {row[part].y} {row[part].pnr}"
            # )

        # This is expected by find_candidate()
        row.sort(key=lambda Coord2d: Coord2d.x)

        # for r in row:
        #     print(r.pnr, r.x, r.y)

    # corrected[cam] = row

    return corrected

In [4]:
def generate_test_set(calib: list[Calibration], cpar: ControlPar) -> Frame:
    """
    Generate data for targets on N cameras.

    The targets are organized on a 4x4 grid, 10 mm apart.
    """
    frm = Frame(num_cams=cpar.num_cams, max_targets=16)

    # Four cameras on 4 quadrants looking down into a calibration target.
    # Calibration taken from an actual experimental setup
    for cam in range(cpar.num_cams):
        # fill in only what's needed
        frm.num_targets[cam] = 16
        frm.targets[cam] = [Target() for _ in range(frm.num_targets[cam])]

        # Construct a scene representing a calibration target, generate
        # targets for it, then use them to reconstruct correspondences.
        for cpt_horz in range(4):
            for cpt_vert in range(4):
                cpt_ix = cpt_horz * 4 + cpt_vert
                if cam % 2:
                    cpt_ix = 15 - cpt_ix  # Avoid symmetric case

                targ = frm.targets[cam][cpt_ix]
                targ.pnr = cpt_ix

                tmp = np.r_[cpt_vert * 10, cpt_horz * 10, 0]
                targ.x, targ.y = img_coord(tmp, calib[cam], cpar.mm)
                targ.x, targ.y = metric_to_pixel(targ.x, targ.y, cpar)

                # These values work in check_epi, so used here too
                targ.n = 25
                targ.nx = targ.ny = 5
                targ.sumg = 10

    return frm

In [5]:
# import ipytest
# ipytest.autoconfig()

In [6]:
# vpar = VolumePar()
# vpar.set_eps0(0.1)

# out = find_start_point([Coord2d(1, 2, 3), Coord2d(4, 5, 6), Coord2d(7, 8, 9)], 3, 6.0, vpar)
# assert out == 1

# out = find_start_point([Coord2d(1, 2, 3), Coord2d(4, 5, 6), Coord2d(7, 8, 9)], 5, 1.0, vpar)
# assert out == 0

# out = find_start_point([Coord2d(1, 2, 3), Coord2d(4, 5, 6), Coord2d(7, 8, 9)], 3, 12.0, vpar)
# assert out == 1

In [7]:
# def test_two_camera_matching():
"""Setup is the same as the 4-camera test, targets are darkened in.

two cameras to get 16 pairs.
"""
cpar = read_control_par("./testing_fodder/parameters/ptv.par")
vpar = read_volume_par("./testing_fodder/parameters/criteria.par")

cpar.num_cams = 2

vpar.z_min_lay[0] = -1
vpar.z_min_lay[1] = -1
vpar.z_max_lay[0] = 1
vpar.z_max_lay[1] = 1

calib = read_all_calibration(cpar.num_cams)
frm = generate_test_set(calib, cpar)

corrected = correct_frame(frm, calib, cpar, 0.0001)


In [8]:
cpar

ControlPar(num_cams=2, img_base_name=['dumbbell/cam1_Scene77_4085', 'dumbbell/cam2_Scene77_4085', 'dumbbell/cam3_Scene77_4085', 'dumbbell/cam4_Scene77_4085'], cal_img_base_name=['cal/cam1.tif', 'cal/cam2.tif', 'cal/cam3.tif', 'cal/cam4.tif'], hp_flag=1, allCam_flag=0, tiff_flag=1, imx=1280, imy=1024, pix_x=0.017, pix_y=0.017, chfield=0, mm=MultimediaPar(nlay=1, n1=1.0, n2=[1.49], d=[5.0], n3=1.33))

In [9]:
vpar

VolumePar(x_lay=[-250.0, 250.0], z_min_lay=[-1, -1], z_max_lay=[1, 1], cn=0.01, cnx=0.3, cny=0.3, csumg=0.01, eps0=1.0, corrmin=33.0)

In [10]:
corrected

[[Coord2d(pnr=0, x=9.634104244327423, y=-0.3099888706384586),
  Coord2d(pnr=4, x=9.638280718193668, y=2.7900197624718657),
  Coord2d(pnr=8, x=9.652881478232118, y=5.890825171902248),
  Coord2d(pnr=12, x=9.67784985826524, y=8.993307588156474),
  Coord2d(pnr=1, x=11.834235975793156, y=-0.3040521136686557),
  Coord2d(pnr=5, x=11.838259741222911, y=2.736581190661918),
  Coord2d(pnr=9, x=11.852326629914815, y=5.777952612835549),
  Coord2d(pnr=13, x=11.876381978889428, y=8.820877575310746),
  Coord2d(pnr=2, x=13.951695411111832, y=-0.29833851839929915),
  Coord2d(pnr=6, x=13.955564416041577, y=2.685151603123312),
  Coord2d(pnr=10, x=13.96909028813525, y=5.669325168017283),
  Coord2d(pnr=14, x=13.992220489524248, y=8.654937200232952),
  Coord2d(pnr=3, x=15.99037521975221, y=-0.29283400914430263),
  Coord2d(pnr=7, x=15.994088594408465, y=2.635604287837789),
  Coord2d(pnr=11, x=16.007070434196716, y=5.564675112078112),
  Coord2d(pnr=15, x=16.02927042024007, y=8.495077247428911)],
 [Coord2d(pnr=

In [11]:

# print(f" Corrected ")
# for cam in range(cpar.num_cams):
#     print(f"{cam}")
#     for part in corrected[cam]:
#             print(part)

corr_lists = safely_allocate_adjacency_lists(cpar.num_cams, frm.num_targets)


In [12]:

corr_lists[0][1][0].p1, corr_lists[0][1][0].p2[-1]

(0, 0)

In [13]:
# def quality_ratio(a, b):
#     """Return the ratio of the smaller to the larger of the two numbers."""
#     if a == 0 and b == 0:
#         return 0
#     return float(min(a, b) / max(a, b))

quality_ratio = lambda a, b: a / b if a < b else b / a

In [14]:
quality_ratio(5,25), quality_ratio(25,5)

(0.2, 0.2)

In [15]:
def find_candidate(
    crd: List[Coord2d],
    pix: List[Target],
    num: int,
    xa: float,
    ya: float,
    xb: float,
    yb: float,
    n: int,
    nx: int,
    ny: int,
    sumg: int,
    vpar: VolumePar,
    cpar: ControlPar,
    cal: Calibration,
) -> List[Candidate]:
    """Search in the image space of the image all the candidates around the epipolar line.

    originating from another camera. It is a binary search in an x-sorted coord-set,
    exploits shape information of the particles.

    Args:
    ----
        crd: A list of `coord_2d` objects. Each object corresponds to the corrected
                coordinates of a target in an image.
        pix: A list of `target` objects. Each object corresponds to the target information
                (size, grey value, etc.) of a target.
        num: The number of targets in the image.
        xa: The x-coordinate of the start point of the epipolar line.
        ya: The y-coordinate of the start point of the epipolar line.
        xb: The x-coordinate of the end point of the epipolar line.
        yb: The y-coordinate of the end point of the epipolar line., &xmax, &ymax
        n: The total size of a typical target.
        nx: The x-size of a typical target.
        ny: The y-size of a typical target.
        sumg: The sum of the grey values of a typical target.
        cand: A list of `candidate` objects. Each object corresponds to a candidate target.
        vpar: A `volume_par` object.
        cpar: A `control_par` object.
        cal: A `Calibration` object.

    Returns:
    -------
        cand: list of candidates, or empty list if nothing is found
    """
    cand = []

    # The image space is the image plane of the camera. The image space is
    # given in millimeters of sensor size and the origin is in the center of the sensor.

    xmin = (-1) * cpar.pix_x * cpar.imx / 2
    xmax = cpar.pix_x * cpar.imx / 2
    ymin = (-1) * cpar.pix_y * cpar.imy / 2
    ymax = cpar.pix_y * cpar.imy / 2
    xmin -= cal.int_par.xh
    ymin -= cal.int_par.yh
    xmax -= cal.int_par.xh
    ymax -= cal.int_par.yh

    xmin, ymin = correct_brown_affine(xmin, ymin, cal.added_par)
    xmax, ymax = correct_brown_affine(xmax, ymax, cal.added_par)

    # line equation: y = m*x + b
    if xa == xb:  # the line is a point or a vertical line in this camera
        xb += 1e-10

    m = (yb - ya) / (xb - xa)
    b = ya - m * xa

    if xa > xb:
        xa, xb = xb, xa

    if ya > yb:
        ya, yb = yb, ya

    # If epipolar line out of sensor area, give up.
    if xb <= xmin or xa >= xmax or yb <= ymin or ya >= ymax:
        return cand

    j0 = find_start_point(crd, num, xa, vpar)

    for j in range(j0, num):
        # Since the list is x-sorted, an out of x-bound candidate is after the
        # last possible candidate, so stop.
        if crd[j].x > xb + vpar.eps0:
            return cand

        # Candidate should at the very least be in the epipolar search window
        # to be considered.
        if crd[j].y <= ya - vpar.eps0 or crd[j].y >= yb + vpar.eps0:
            continue
        if crd[j].x <= xa - vpar.eps0 or crd[j].x >= xb + vpar.eps0:
            continue

        # Only take candidates within a predefined distance from epipolar line.
        d = math.fabs((crd[j].y - m * crd[j].x - b) / math.sqrt(m * m + 1))
        if d >= vpar.eps0:
            continue

        p2 = crd[j].pnr
        
        if p2 >= num: 
            print(f"{p2} is larger than {num}")
            return cand
        

        # Quality of each parameter is a ratio of the values of the
        # size n, nx, ny and sum of grey values sumg.
        

        qn = quality_ratio(n, pix[p2].n)
        qnx = quality_ratio(nx, pix[p2].nx)
        qny = quality_ratio(ny, pix[p2].ny)
        qsumg = quality_ratio(sumg, pix[p2].sumg)

        # Enforce minimum quality values and maximum candidates.
        if qn < vpar.cn or qnx < vpar.cnx or qny < vpar.cny or qsumg < vpar.csumg:
            continue

        # Empirical correlation coefficient from shape and brightness
        # parameters.
        corr = 4 * qsumg + 2 * qn + qnx + qny

        # Prefer matches with brighter targets.
        corr *= (sumg + pix[p2].sumg)

        
        tmp = Candidate(pnr=j, tol=d, corr=corr)
        # print(f" good candidate {tmp} {tmp.pnr},{tmp.tol},{tmp.corr}")
        
        cand.append(tmp)
        
        if len(cand) >= MAXCAND:
            print(f"More candidates than {MAXCAND}: {len(cand)}\n")
            return cand        

        # cand[count].pnr = j
        # cand[count].tol = d
        # cand[count].corr = corr
        # count += 1

    return cand

In [22]:

# match_pairs(corr_lists, corrected, frm, vpar, cpar, calib)


# def match_pairs(
#     corr_list: List[List[List[Correspond]]], corrected, frm: Frame, vpar, cpar, calib
# ):
"""Match pairs of cameras.

**This function matches pairs of cameras by finding corresponding points in each camera.
The correspondences are stored in the `corr_lists` argument.**

**The following steps are performed:**

1. For each pair of cameras, the epipolar lines for the two cameras are calculated.
2. For each target in the first camera, the corresponding points in the second camera
are found by searching along the epipolar line.
3. The correspondences are stored in the `corr_lists` argument.

**The `corr_lists` argument is a list of lists of lists of `Correspond` objects.
Each inner list corresponds to a pair of cameras, and each inner-most list corresponds
to a correspondence between two points in the two cameras. The `Correspond` objects
have the following attributes:**

* `p1`: The index of the target in the first camera.
* `p2`: The index of the target in the second camera.
* `corr`: The correspondence score.
* `dist`: The distance between the two points.

**The following are the arguments for the function:**

* `corr_lists`: A list of lists of lists of `Correspond` objects. Each inner list
corresponds to a pair of cameras, and each inner-most list corresponds to a
correspondence between two points in the two cameras.

* `corrected`: A list of lists of `coord_2d` objects. Each inner list corresponds to a
camera, and each inner-most object corresponds to the corrected coordinates of a target in
that camera.
* `frm`: A `frame` object.
* `vpar`: A `volume_par` object.
* `cpar`: A `control_par` object.
* `calib`: A list of `Calibration` objects.

**The function returns None.**
"""
for i1 in range(cpar.num_cams - 1):
    for i2 in range(i1 + 1, cpar.num_cams):
        for i in range(frm.num_targets[i1]):
            # if corrected[i1][i].x == PT_UNUSED: # no idea why it's here
            #     continue

            xa12, ya12, xb12, yb12 = epi_mm(
                corrected[i1][i].x,
                corrected[i1][i].y,
                calib[i1],
                calib[i2],
                cpar.mm,
                vpar,
            )
            
            # print(xa12, ya12, xb12, yb12)
            

            # origin point in the corr_list
            corr_lists[i1][i2][i].p1 = i
            pt1 = corrected[i1][i].pnr
            
            print("looking for dot")
            print(corr_lists[i1][i2][i].p1, pt1)

            # search for a conjugate point in corrected[i2]
            # cand = [Correspond() for _ in range(MAXCAND)]
            cand = find_candidate(
                corrected[i2],
                frm.targets[i2],
                frm.num_targets[i2],
                xa12,
                ya12,
                xb12,
                yb12,
                frm.targets[i1][pt1].n,
                frm.targets[i1][pt1].nx,
                frm.targets[i1][pt1].ny,
                frm.targets[i1][pt1].sumg,
                vpar,
                cpar,
                calib[i2],
            )
            
            print("after find_candidate")
            print(i1, i2, pt1, cand)

            # write all corresponding candidates to the preliminary corr_list of correspondences
            count = len(cand)
            if count > MAXCAND:
                count = MAXCAND
                cand = cand[0:count]

            for j in range(count):
                corr_lists[i1][i2][i].p2[j] = cand[j].pnr
                corr_lists[i1][i2][i].corr[j] = cand[j].corr
                corr_lists[i1][i2][i].dist[j] = cand[j].tol

            corr_lists[i1][i2][i].n = count
            
            print(f"set {count} values in corr_lists")
            # print(corr_lists[i1][i2][i].p2[j])
            # print(corr_lists[i1][i2][i].corr[j])
            # print(corr_lists[i1][i2][i].dist[j])





looking for dot
0 0
after find_candidate
0 1 0 [Candidate(pnr=3, tol=1.6089045281581617e-06, corr=160.0)]
set 1 values in corr_lists
3
160.0
1.6089045281581617e-06
looking for dot
1 4
after find_candidate
0 1 4 [Candidate(pnr=2, tol=1.4474616517248957e-05, corr=160.0)]
set 1 values in corr_lists
2
160.0
1.4474616517248957e-05
looking for dot
2 8
after find_candidate
0 1 8 [Candidate(pnr=1, tol=3.0513958614011064e-05, corr=160.0)]
set 1 values in corr_lists
1
160.0
3.0513958614011064e-05
looking for dot
3 12
after find_candidate
0 1 12 []
set 0 values in corr_lists
0
0.0
0.0
looking for dot
4 1
after find_candidate
0 1 1 [Candidate(pnr=7, tol=1.5899338909423245e-06, corr=160.0)]
set 1 values in corr_lists
7
160.0
1.5899338909423245e-06
looking for dot
5 5
after find_candidate
0 1 5 [Candidate(pnr=6, tol=1.4306325666103258e-05, corr=160.0)]
set 1 values in corr_lists
6
160.0
1.4306325666103258e-05
looking for dot
6 9
after find_candidate
0 1 9 [Candidate(pnr=5, tol=3.0176761971479098e-05

In [17]:

# Assert each target has the real matches as candidates
for cam in range(cpar.num_cams - 1):
    for subcam in range(cam + 1, cpar.num_cams):
        for part in range(frm.num_targets[cam]):
            correct_pnr = (
                corrected[cam][corr_lists[cam][subcam][part].p1].pnr
                if (subcam - cam) % 2 == 0
                else 15 - corrected[cam][corr_lists[cam][subcam][part].p1].pnr
            )

            print(cam, subcam, part, correct_pnr)

            found_correct_pnr = False
            for cand in range(MAXCAND):
                print(
                    cand,
                    corrected[subcam][
                        corr_lists[cam][subcam][part].p2[cand]
                    ].pnr,
                )
                if (
                    corrected[subcam][
                        corr_lists[cam][subcam][part].p2[cand]
                    ].pnr
                    == correct_pnr
                ):
                    found_correct_pnr = True
                    break

            assert found_correct_pnr is True

# continue to the consistent_pair matching test
con = [n_tupel() for _ in range(4 * 16)]
tusage = safely_allocate_target_usage_marks(cpar.num_cams)

# high accept corr bcz of closeness to epipolar lines.
matched = consistent_pair_matching(
    corr_lists, cpar.num_cams, frm.num_targets, 10000.0, con, 4 * 16, tusage
)

print(f" matched = {matched}")

assert matched == 16

0 1 0 15
0 15
0 1 1 11
0 11
0 1 2 7
0 7
0 1 3 3
0 3
0 1 4 14
0 14
0 1 5 10
0 10
0 1 6 6
0 6
0 1 7 2
0 3
1 3
2 3
3 3
4 3
5 3
6 3
7 3
8 3
9 3
10 3
11 3
12 3
13 3
14 3
15 3
16 3
17 3
18 3
19 3
20 3
21 3
22 3
23 3
24 3
25 3
26 3
27 3
28 3
29 3
30 3
31 3
32 3
33 3
34 3
35 3
36 3
37 3
38 3
39 3
40 3
41 3
42 3
43 3
44 3
45 3
46 3
47 3
48 3
49 3
50 3
51 3
52 3
53 3
54 3
55 3
56 3
57 3
58 3
59 3
60 3
61 3
62 3
63 3
64 3
65 3
66 3
67 3
68 3
69 3
70 3
71 3
72 3
73 3
74 3
75 3
76 3
77 3
78 3
79 3
80 3
81 3
82 3
83 3
84 3
85 3
86 3
87 3
88 3
89 3
90 3
91 3
92 3
93 3
94 3
95 3
96 3
97 3
98 3
99 3


AssertionError: 

In [None]:
# def test_match_pairs():
# Test case 1
corr_list = [[[Correspond() for _ in range(10)] for _ in range(10)] for _ in range(10)]
corrected = [[Coord2d() for _ in range(10)] for _ in range(10)]
frm = Frame(num_cams=2)
vpar = VolumePar()
cpar = ControlPar()
calib = [Calibration() for _ in range(10)]
match_pairs(corr_list, corrected, frm, vpar, cpar, calib)


In [None]:
corr_list[0][0][0].p2

In [None]:

assert corr_list[0][1][0].p1 == 0
assert corr_list[0][1][0].p2[0] == 1
assert corr_list[0][1][0].corr[0] == 0.5
assert corr_list[0][1][0].dist[0] == 1.0
assert corr_list[0][1][0].n == 1


In [None]:

# Test case 2
corr_list = [[[Correspond() for _ in range(5)] for _ in range(5)] for _ in range(5)]
corrected = [[Coord2d() for _ in range(5)] for _ in range(5)]
frm = Frame(4)
vpar = VolumePar()
cpar = ControlPar()
calib = [Calibration() for _ in range(5)]
match_pairs(corr_list, corrected, frm, vpar, cpar, calib)
assert corr_list[2][3][1].p1 == 1
assert corr_list[2][3][1].p2[0] == 2
assert corr_list[2][3][1].corr[0] == 0.8
assert corr_list[2][3][1].dist[0] == 1.2
assert corr_list[2][3][1].n == 1
