# Alignment Error Visualization

This notebook collects COM data from the database and tries to quantify some alignment errors. The main results are shown in the plots at the end of the notebook.

In [2]:
import os
import sys
from pathlib import Path

import numpy as np
import pandas as pd
from collections import OrderedDict
from scipy.ndimage import affine_transform
from skimage import measure
import SimpleITK as sitk
import matplotlib.pyplot as plt


PIPELINE_ROOT = Path('./').absolute().parents[1]
PIPELINE_ROOT = PIPELINE_ROOT.as_posix()
sys.path.append(PIPELINE_ROOT)
print(PIPELINE_ROOT)


/home/eddyod/programming/pipeline/src


In [3]:
%load_ext autoreload
%autoreload 2

In [4]:
from library.controller.sql_controller import SqlController
from library.image_manipulation.filelocation_manager import FileLocationManager
from library.atlas.atlas_utilities import affine_transform_point, get_affine_transformation, \
fetch_coms, list_coms, compute_affine_transformation, affine_transform_volume
from library.atlas.brain_structure_manager import BrainStructureManager
from library.utilities.utilities_process import M_UM_SCALE, SCALING_FACTOR, random_string, \
read_image, write_image


In [5]:
def sum_square_com(com):
    ss = np.sqrt(sum([s**2 for s in com]))
    return ss

# ---------- Utilities ----------
def _to_homogeneous(P):
    """(N,3) -> (N,4) homogeneous coordinates"""
    return np.hstack([P, np.ones((P.shape[0], 1), dtype=P.dtype)])

def apply_transform(P, T):
    """
    Apply 4x4 transform T to Nx3 array P.
    Returns transformed Nx3 array.
    """
    Ph = _to_homogeneous(P)  # (N,4)
    Qh = Ph @ T.T            # (N,4)
    return Qh[:, :3] / Qh[:, 3:4]

# ---------- Rigid (Kabsch) ----------
def fit_rigid(A, B, weights=None, allow_reflection=False):
    """
    Best-fit rigid transform from A -> B (both Nx3).
    Returns 4x4 transform matrix T such that B ≈ apply_transform(A, T).

    Parameters
    ----------
    A, B : (N,3)
        Corresponding points (same ordering).
    weights : (N,) or None
        Optional non-negative weights per correspondence.
    allow_reflection : bool
        If False (default), forbids reflections (det(R)=+1).
    """
    A = np.asarray(A, dtype=np.float64)
    B = np.asarray(B, dtype=np.float64)
    assert A.shape == B.shape and A.shape[1] == 3

    if weights is None:
        w = np.ones(A.shape[0], dtype=np.float64)
    else:
        w = np.asarray(weights, dtype=np.float64)
        assert w.shape == (A.shape[0],)
        w = np.clip(w, 0, np.inf)

    W = w / (np.sum(w) + 1e-15)
    muA = np.sum(A * W[:, None], axis=0)
    muB = np.sum(B * W[:, None], axis=0)

    A0 = A - muA
    B0 = B - muB

    # Weighted covariance
    H = (A0 * W[:, None]).T @ B0  # 3x3

    U, S, Vt = np.linalg.svd(H)
    R = Vt.T @ U.T

    # Enforce proper rotation (det=+1) unless reflections are allowed
    if not allow_reflection and np.linalg.det(R) < 0:
        Vt[-1, :] *= -1
        R = Vt.T @ U.T

    t = muB - R @ muA

    T = np.eye(4)
    T[:3, :3] = R
    T[:3, 3] = t
    return T

# ---------- Similarity (Umeyama) ----------
def fit_similarity(A, B, allow_reflection=False):
    """
    Best-fit similarity (scale * rotation + translation) using Umeyama.
    Returns 4x4 transform with uniform scale baked into the linear part.
    """
    A = np.asarray(A, dtype=np.float64)
    B = np.asarray(B, dtype=np.float64)
    assert A.shape == B.shape and A.shape[1] == 3

    muA = A.mean(axis=0)
    muB = B.mean(axis=0)
    A0 = A - muA
    B0 = B - muB

    C = (A0.T @ B0) / A.shape[0]
    U, D, Vt = np.linalg.svd(C)
    S = np.eye(3)

    if not allow_reflection and np.linalg.det(U @ Vt) < 0:
        S[-1, -1] = -1

    R = U @ S @ Vt
    varA = np.mean(np.sum(A0**2, axis=1))
    scale = (np.sum(D * np.diag(S)) / (varA + 1e-15))

    t = muB - scale * (R @ muA)

    T = np.eye(4)
    T[:3, :3] = scale * R
    T[:3, 3] = t
    return T, scale

# ---------- Affine (12-DoF) ----------
def fit_affine(A, B):
    """
    Best-fit full affine transform A->B using linear least squares.
    Solves for 3x4 matrix [M|t] such that B ≈ A*M^T + t.
    Returns a 4x4 matrix T.
    """
    A = np.asarray(A, dtype=np.float64)
    B = np.asarray(B, dtype=np.float64)
    assert A.shape == B.shape and A.shape[1] == 3

    N = A.shape[0]
    Ah = _to_homogeneous(A)           # (N,4)
    # Solve Ah @ X^T ≈ B, where X is 3x4
    X, *_ = np.linalg.lstsq(Ah, B, rcond=None)  # X is (4,3)
    X = X.T                                   # (3,4)

    T = np.eye(4)
    T[:3, :4] = X
    return T

# ---------- RANSAC wrapper (optional) ----------
def ransac_fit(A, B, model='rigid', thresh=1.0, max_trials=1000, min_inliers=3, random_state=None):
    """
    Robustly estimate transform with RANSAC to handle outliers.

    Parameters
    ----------
    model : 'rigid' | 'similarity' | 'affine'
    thresh : float
        Inlier distance threshold in target space units.
    max_trials : int
    min_inliers : int
        Minimum inliers to accept a model.
    """
    rng = np.random.default_rng(random_state)
    A = np.asarray(A, dtype=np.float64)
    B = np.asarray(B, dtype=np.float64)
    assert A.shape == B.shape and A.shape[1] == 3

    if model == 'rigid' or model == 'similarity':
        sample_size = 3  # 3 non-collinear pairs
    elif model == 'affine':
        sample_size = 4  # 4 points for 12 params (overdetermined)
    else:
        raise ValueError("model must be 'rigid', 'similarity', or 'affine'")

    N = A.shape[0]
    best_T, best_inliers = None, np.array([], dtype=bool)

    for _ in range(max_trials):
        idx = rng.choice(N, size=sample_size, replace=False)
        A_s, B_s = A[idx], B[idx]

        try:
            if model == 'rigid':
                T = fit_rigid(A_s, B_s)
            elif model == 'similarity':
                T, _ = fit_similarity(A_s, B_s)
            else:  # affine
                T = fit_affine(A_s, B_s)
        except np.linalg.LinAlgError:
            continue

        B_pred = apply_transform(A, T)
        errs = np.linalg.norm(B - B_pred, axis=1)
        inliers = errs <= thresh

        if inliers.sum() > best_inliers.sum():
            best_inliers = inliers
            # Refit on all inliers for a refined model
            if inliers.sum() >= min_inliers:
                if model == 'rigid':
                    T = fit_rigid(A[inliers], B[inliers])
                elif model == 'similarity':
                    T, _ = fit_similarity(A[inliers], B[inliers])
                else:
                    T = fit_affine(A[inliers], B[inliers])
                best_T = T

    if best_T is None:
        raise RuntimeError("RANSAC failed to find a valid model. "
                           "Try increasing max_trials or thresh.")

    return best_T, best_inliers


In [10]:
moving_name = 'AtlasV8'
fixed_name = 'Allen'
moving_all = list_coms(moving_name, scaling_factor=1)
fixed_all = list_coms(fixed_name, scaling_factor=1)
common_keys = list(moving_all.keys() & fixed_all.keys())
bad_keys = ('RtTg', 'AP')
#bad_keys = ('RtTg',)
#bad_keys = ()
good_keys = set(common_keys) - set(bad_keys)
good_keys = ['SC', 'IC']

moving_src = np.array([moving_all[s] for s in good_keys])
fixed_src = np.array([fixed_all[s] for s in good_keys])
print(len(common_keys))

37


In [11]:
print(np.round(moving_src[0]))
print(np.round(fixed_src[0]))

[10949.  3924.  5199.]
[9140. 2388. 5692.]


In [12]:
#transformation_matrix = get_affine_transformation(moving_name=moving_name, fixed_name=fixed_name, 
#                                                  scaling_factor=1)
transformation_matrix = compute_affine_transformation(moving_src, fixed_src)
print(transformation_matrix)

[[ 1.08693395e+00 -3.08554504e-01 -2.98168012e-01 -4.79588311e-05]
 [-2.64341116e-02  2.68837747e-01  3.12088859e-01  5.62837284e-05]
 [ 4.20139649e-02  5.22616466e-01  6.12002297e-01  1.10885657e-04]
 [ 0.00000000e+00  0.00000000e+00  0.00000000e+00  1.00000000e+00]]


In [13]:
df_list = []
error = []
transformed_dict = {}
for structure in common_keys:
    moving0 = np.array(moving_all[structure])
    fixed0 = np.array(fixed_all[structure]) 
    transformed = affine_transform_point(moving0, transformation_matrix)
    difference = [a - b for a, b in zip(transformed, fixed0)]
    ss = sum_square_com(difference)
    row = [structure, np.round(moving0), np.round(fixed0), 
           np.round(transformed), np.round(difference), ss]
    df_list.append(row)
    error.append(ss)
    transformed_dict[structure] = transformed
rms = sum(error)/len(df_list)
print(f'RMS: {rms} observations: {len(df_list)}')
# MD589 to Allen RMS 260.0211852431133
# MD585 to Allen RMS 263.314352291951
# MD594 to Allen RMS 250.79820210419254
# AtlasV8 DB to Allen RMS 238.5831606646421
# MD585 to MD589 RMS 18.2658167690059

RMS: 2671.6313473166147 observations: 37


In [None]:
#transformation_matrix = np.hstack([transformation_matrix, t])
#transformation_matrix = np.vstack([transformation_matrix, np.array([0, 0, 0, 1])])
#print(transformation_matrix)
structure = 'SC'
try:
    com = np.array(moving_all[structure]).reshape(1,3)
except KeyError:
    structure = common_keys[0]
    com = np.array(moving_all[structure]).reshape(1,3)
#com = [1095, 392, 519]
print(f'{moving_name} {structure} non trans {np.round(np.array(com))}')
transformed_structure = apply_transform(com, transformation_matrix)

print(f'{moving_name} {structure} apply trans {np.round(transformed_structure/1)}')
print(f'{fixed_name} {structure} {np.round(np.array(fixed_all[structure]))}')
diff = transformed_structure - fixed_all[structure]
print(f'{moving_name}->{fixed_name} error {structure} {diff}')

In [None]:
columns = ['structure', moving_name, fixed_name, 'transformed', 'difference', 'sumsquares']
df = pd.DataFrame(df_list, columns=columns)
df.index.name = 'Index'
df = df.round(4)
df.sort_values(by=['sumsquares'], inplace=True)
#df.to_csv('/home/eddyod/programming/pipeline/docs/sphinx/source/_static/results.csv', index=False)
df.head(50)

In [None]:
plt.figure(figsize=(30, 5)) 
plt.axhline(y=rms, linestyle='--', linewidth=2, color='red', label='Mean')
plt.text(0, rms, f"Mean RMS={round(rms,2)}")
plt.bar(df['structure'], df['sumsquares'])

In [None]:
outpath = '/net/birdstore/Active_Atlas_Data/data_root/atlas_data/DK55/com'
for structure, com in moving_all.items():
    comfile = structure + '.txt'
    compath = os.path.join(outpath, comfile)
    np.savetxt(compath, com)

In [None]:
um = 50
registration_path = '/net/birdstore/Active_Atlas_Data/data_root/brains_info/registration'
base_com_path = '/net/birdstore/Active_Atlas_Data/data_root/atlas_data'
for brain in [moving_name, fixed_name]:
    brain_point_path = os.path.join(registration_path, brain, f'{brain}_{um}um_sagittal.pts')
    brain_com_path = os.path.join(base_com_path, brain, 'com')
    comfiles = sorted(os.listdir(brain_com_path))
    with open(brain_point_path, 'w') as f:
        f.write('point\n')
        f.write(f'{len(common_keys)}\n')
        for comfile in comfiles:
            structure = comfile.replace('.txt','')
            if structure in common_keys:
                #print(structure)
                compath = os.path.join(brain_com_path, comfile)
                x,y,z = np.loadtxt(compath)
                f.write(f'{round(x/um,4)} {round(y/um,4)} {round(z/um,4)}')
                f.write('\n')


In [None]:
def ants_3d_to_scipy_2d(affine_3d, plane='axial', slice_index=0):
    """
    Convert a 3D ANTs affine transformation matrix to a 2D affine transform
    suitable for scipy.ndimage.affine_transform.

    Parameters:
        affine_3d (np.ndarray): A 4x4 affine matrix from ANTs.
        plane (str): Plane to slice through ('axial', 'coronal', 'sagittal').
        slice_index (int): Index of the slice in the chosen plane.

    Returns:
        matrix_2d (np.ndarray): 2x2 affine transformation matrix.
        offset_2d (np.ndarray): Length-2 offset vector.
    """
    if affine_3d.shape != (4, 4):
        raise ValueError("Expected a 4x4 affine transformation matrix.")

    # Extract rotation+scaling and translation components
    rotation_scaling = affine_3d[:3, :3]
    translation = affine_3d[:3, 3]

    if plane == 'axial':
        matrix_2d = rotation_scaling[:2, :2]
        offset_2d = translation[:2] + rotation_scaling[:2, 2] * slice_index
    elif plane == 'coronal':
        matrix_2d = rotation_scaling[[0,2], :][:, [0,2]]
        offset_2d = translation[[0,2]] + rotation_scaling[[0,2], 1] * slice_index
    elif plane == 'sagittal':
        matrix_2d = rotation_scaling[1:3, 1:3]
        offset_2d = translation[1:3] + rotation_scaling[1:3, 0] * slice_index
    else:
        raise ValueError("Plane must be 'axial', 'coronal', or 'sagittal'.")

    return matrix_2d, offset_2d

In [None]:
reg_path = '/net/birdstore/Active_Atlas_Data/data_root/brains_info/registration'
matrix_path = os.path.join(reg_path, 'ALLEN771602/ALLEN771602_Allen_32.0x28.8x28.8um_sagittal.tfm')
transform = sitk.ReadTransform(matrix_path)

In [None]:
tx = sitk.AffineTransform(transform)
Rt = tx.GetMatrix()
R = np.array(Rt).reshape(3,3)
print(R)
print()
Rr = np.rot90(R, k=1)
print(Rr)

In [None]:
affine_transform = sitk.AffineTransform(3)
affine_transform.SetMatrix(Rr.flatten())
affine_transform.SetTranslation(t)
affine_transform.SetCenter(c)


In [None]:
# Example 3x3 array where each row is a 3D point
A = np.array([
    [1, 0, 0],
    [0, 1, 0],
    [0, 0, 1]
])

# Rotation matrix for 90 degrees around Y-axis (counter-clockwise)
theta = np.radians(90)
rotation_matrix = np.array([
    [np.cos(theta), 0, np.sin(theta)],
    [0,            1, 0],
    [-np.sin(theta), 0, np.cos(theta)]
])

# Apply rotation
rotated = A @ rotation_matrix.T
print(rotated)

In [None]:
rotation_matrix.T