In [37]:
import cv2 as cv
import numpy as np
import matplotlib.pyplot as plt

import re
import os
from collections import defaultdict

In [38]:
def preprocess(img):
    img = cv.GaussianBlur(img, (3, 3), 0)
    img = cv.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)).apply(img)
    return img

In [39]:
def keypoints(I1, I2):
    sift = cv.SIFT_create()  # or cv.AKAZE_create() / cv.ORB_create() for speed
    kp1, des1 = sift.detectAndCompute(I1, None)
    kp2, des2 = sift.detectAndCompute(I2, None)
    return (kp1, des1), (kp2, des2)

In [40]:
def match(des1, des2):
    bf = cv.BFMatcher()
    matches = bf.knnMatch(des1, des2, k=2)

    good = []
    for m, n in matches:
        if m.distance < 0.75 * n.distance:  # Lowe's ratio test
            good.append(m)

    # print(f"Found {len(good)} good matches out of {len(matches)} total")
    return good

In [41]:
def ransac(kp1, kp2, good):
        pts1 = np.float32([kp1[m.queryIdx].pt for m in good])
        pts2 = np.float32([kp2[m.trainIdx].pt for m in good])

        M, inliers = cv.estimateAffinePartial2D(
            pts2, pts1,
            method=cv.RANSAC,
            ransacReprojThreshold=3.0,
            maxIters=2000,
            confidence=0.99
        )

        # print("Affine matrix:\n", M)
        return M, inliers

In [42]:
def display(img1_full, img1_gray, img2_full, img2_gray, show=False):

    blue_tinted_image = np.zeros_like(img1_full)
    blue_tinted_image[:, :, 2] = img1_gray
    red_tinted_image = np.zeros_like(img2_full)
    red_tinted_image[:, :, 0] = img2_gray

    overlay = cv.addWeighted(blue_tinted_image, 0.5, red_tinted_image, 0.5, 0)

    if show:
        plt.figure(figsize=(24,12))
        plt.subplot(1,1,1); plt.imshow(overlay); plt.title("Overlay after alignment")
        plt.show()

    return overlay

In [43]:
def register(img1_full, img2_full):

    img1_gray = cv.cvtColor(img1_full, cv.COLOR_BGR2GRAY)
    img2_gray = cv.cvtColor(img2_full, cv.COLOR_BGR2GRAY)

    I1, I2 = preprocess(img1_gray), preprocess(img2_gray)

    # --- 1. Detect and describe keypoints ---
    (kp1, des1), (kp2, des2) = keypoints(I1, I2)

    # --- 2. Match descriptors with ratio test ---
    if kp1 is not None and kp2 is not None and des1 is not None and des2 is not None:
        good = match(des1, des2)

        # --- 3. Estimate affine transform using RANSAC ---
        if len(good) >= 3:  # need at least 3 points for affine
            M, inliers = ransac(kp1, kp2, good)

            # If you need to enforce only translation, you can extract the translation components
            # from the estimated matrix and create a new purely translational matrix.
            # The translation components are in the last column of the affine matrix.
            tx = M[0, 2]
            ty = M[1, 2]

            if abs(int(tx)) < 325 and abs(int(ty)) < 325:
                translation_matrix = np.array([[1, 0, tx],
                                               [0, 1, ty]], dtype=np.float32)

                print("\nPurely Translational Matrix:")
                print(translation_matrix)

                rows, cols, _ = img2_full.shape
                int_tx = int(round(tx))
                int_ty = int(round(ty))
                return (int_tx, int_ty)

            else:
                print("Translation exceeds limits.")
        else:
            print("Not enough good matches for reliable registration.")
    else:
        print("Not enough descriptors for reliable registration.")

In [44]:
def pipeline(fname1, fname2):
    img1 = cv.imread(fname1)    # reference (earlier)
    img2 = cv.imread(fname2)    # moving (later)

    try:
        result = register(img1, img2)
        if result is not None:
            return result
        else:
            return False
    except:
        print("Error with registering images: " + fname1 + ", " + fname2)
        return False

In [45]:
def group_and_order_filenames(filenames, maxTP, maxLevel):
    grouped_files = defaultdict(lambda: defaultdict(lambda: [None] * maxTP))
    pattern = r'^(?P<plant>[^_]+)_(?P<tube>\d+)_(?P<level>\d+)_(?P<date>\d{4}-\d{2}-\d{2})_TP(?P<timepoint>\d+)\.png$'
    #plant_tube_depth_yyyy-mm-dd_TP#

    for fname in filenames:

        match = re.match(pattern, fname)
        if match:
            plant = match.group('plant')
            tube = int(match.group('tube'))
            level = int(match.group('level'))
            date = match.group('date')
            timepoint = int(match.group('timepoint'))
            if 1 <= timepoint <= maxTP:
                grouped_files[tube][level - 1][timepoint - 1] = fname

    return grouped_files

In [46]:
def accumulate(filename, translation, translations, references, all):
    if filename not in references:
        return translation

    ref = references[filename]
    if ref not in translations or ref not in all:
        ref_translation = (0,0)
        return tuple(map(sum, zip(translation, ref_translation)))
    else:
        ref_translation = translations[ref]
        all.remove(ref)
        return tuple(map(sum, zip(translation, accumulate(ref, ref_translation, translations, references, all))))

In [47]:
imgfilelist = [f for f in os.listdir("slu_data") if f.endswith(".png")]
print(f"Found {len(imgfilelist)} image files")

imgfilegroups = group_and_order_filenames(imgfilelist, 12, 7)
print(f"Found {len(imgfilegroups)} image groups")

Found 2194 image files
Found 27 image groups


In [48]:
for tube, depths in imgfilegroups.items():
    for depth, tp_files in depths.items():

        non_none_files = list(filter(None, tp_files))
        my_iterator = iter(non_none_files)
        translations = {}
        references = {}
        try:

            current_item = next(my_iterator)
            next_item = next(my_iterator)
            translations[current_item] = (0,0)
            while current_item and next_item:

                next_path = "slu_data/" + next_item
                current_path = "slu_data/" + current_item
                translation = pipeline(current_path, next_path)
                if translation:
                    translations[next_item] = translation
                    references[next_item] = current_item

                inner_iterator = iter(non_none_files)
                try:
                    new_item = next(inner_iterator)
                    while not translation and new_item:
                        if new_item == next_item or new_item == current_item:
                            new_item = next(inner_iterator)
                            continue

                        new_path = "slu_data/" + new_item
                        translation = pipeline(new_path, next_path)
                        if translation:
                            translations[next_item] = translation
                            references[next_item] = new_item
                        new_item = next(inner_iterator)

                except StopIteration:
                    current_item = next_item
                    next_item = next(my_iterator)
                    continue

                current_item = next_item
                next_item = next(my_iterator)

        except StopIteration:

            global_translations = {}
            for f_accumulate, t_accumulate in translations.items():
                try:
                    all_compared = non_none_files.copy()
                    global_t = accumulate(f_accumulate, t_accumulate, translations, references, all_compared)
                    global_translations[f_accumulate] = global_t
                except RecursionError:
                    print("f_accumulate: " + f_accumulate)

            global_x = [x for (dx, dy) in global_translations.values() for x in (dx, dx + 850)]
            global_y = [y for (dx, dy) in global_translations.values() for y in (dy, dy + 757)]
            if not global_x or not global_y:
                print("Error finding translations for plot ("+ str(tube) +") depth (" + str(depth) + ")")
                continue
            min_x = min(global_x)
            min_y = min(global_y)
            max_x = max(global_x)
            max_y = max(global_y)
            final_cols  = max_x - min_x
            final_rows = max_y - min_y
            offset_x = -min_x
            offset_y = -min_y

            for f, t in global_translations.items():
                f_no_ext, f_ext = os.path.splitext(f)
                p = "slu_data/" + f
                img = cv.imread(p)

                c_start = t[0] + offset_x
                r_start = t[1] + offset_y
                new_img = np.zeros((final_rows, final_cols, 3), dtype=img.dtype)

                try:
                    new_img[r_start:r_start + img.shape[0], c_start:c_start + img.shape[1]] = img
                except ValueError:
                    print("f: " + f)

                cv.imwrite("slu_data/" + f_no_ext + "_sift.png", new_img)

            for f_overlay, r_overlay in references.items():
                f_no_ext, f_ext = os.path.splitext(f_overlay)
                if os.path.exists("slu_data/" + f_no_ext + "_sift.png"):
                    f_path = "slu_data/" + f_no_ext + "_sift.png"
                else:
                    f_path = "slu_data/" + f_overlay
                r_no_ext, r_ext = os.path.splitext(r_overlay)
                if os.path.exists("slu_data/" + r_no_ext + "_sift.png"):
                    r_path = "slu_data/" + r_no_ext + "_sift.png"
                else:
                    r_path = "slu_data/" + r_overlay

                try:
                    img_f = cv.imread(f_path)
                    img_r = cv.imread(r_path)
                    img_f_gray = cv.cvtColor(img_f, cv.COLOR_BGR2GRAY)
                    img_r_gray = cv.cvtColor(img_r, cv.COLOR_BGR2GRAY)
                    img_f_gray = cv.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)).apply(img_f_gray)
                    img_r_gray = cv.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)).apply(img_r_gray)
                    f_overlay = display(img_r, img_r_gray, img_f, img_f_gray)
                    cv.imwrite("slu_data/" + f_no_ext + "_sift_overlay.png", f_overlay)
                except:
                    print("Error with overlaying images: " + r_path, f_path)

            continue


Purely Translational Matrix:
[[  1.         0.        19.005274]
 [  0.         1.       -12.649337]]

Purely Translational Matrix:
[[  1.         0.       -10.165524]
 [  0.         1.        66.34411 ]]

Purely Translational Matrix:
[[ 1.         0.        -4.0230994]
 [ 0.         1.        57.11301  ]]

Purely Translational Matrix:
[[  1.        0.      -21.40849]
 [  0.        1.      -49.14754]]

Purely Translational Matrix:
[[ 1.         0.        -7.171874 ]
 [ 0.         1.        -1.6973312]]
Translation exceeds limits.

Purely Translational Matrix:
[[   1.         0.      -119.45959]
 [   0.         1.       323.8768 ]]
Translation exceeds limits.
Translation exceeds limits.
Translation exceeds limits.
Translation exceeds limits.
Translation exceeds limits.
Translation exceeds limits.
Translation exceeds limits.

Purely Translational Matrix:
[[  1.         0.        14.746062]
 [  0.         1.       310.2824  ]]
Translation exceeds limits.
Translation exceeds limits.
Trans