In [6]:
import requests
import time
from pathlib import Path
import os, glob, re
import cv2
from typing import List
import micasense.capture as capture
import micasense.imageutils as imageutils
import skimage
import numpy as np
from skimage.transform import warp,matrix_transform,resize,FundamentalMatrixTransform,estimate_transform,ProjectiveTransform

In [None]:
# Adresse IP de la caméra
CAMERA_IP = "192.168.1.83"
CAPTURE_URL = f"http://{CAMERA_IP}/capture"
CAPTURE_CONF_URL = f"http://{CAMERA_IP}/config"
DETECT_PANNEL_URL = f"http://{CAMERA_IP}/detect_panel"

class TeddyFlight:
    def __init__(self, capture_interval:float = 0.9) -> None:
        self.capture_interval = capture_interval
        pass
    
    
    def detect_pannel(self):
        cresponse = self.capture_image(args={"detect_panel": True})
        if cresponse.get("status") == "error":
            print("Camera failed to detect a pannel\nRetrying in 3sRetrying in 3s............")
            time.sleep(3)
            self.detect_pannel()


    def set_config(self):
        capture_conf = {"auto_cap_mode": "timer", "timer_period": self.capture_interval, "audio_enable": True}
        response = requests.post(CAPTURE_URL, json=capture_conf)
        if response.status_code == 200:
            print("Configuration set successfully")
        else:
            print("Failed to configure")
            
            
    def capture_image(self, args:dict = {}):
        presponse = requests.post(DETECT_PANNEL_URL, json={"abort_detect_panel": True})
        if presponse.status_code == 200:
            assert presponse.json().get("detect_panel") == False, "Failed to abort the pannel detection mode even though the request was successful\nProbably an internal error"
        else:
            print("Failed to abort the pannel detection mode")
            self.capture_image()
            
        if args == {}:
            response = requests.get(CAPTURE_URL)
        else:
            response = requests.post(CAPTURE_URL, json=args)
        if response.status_code == 200:
            print("Images captured successfully")
        else:
            print("Failed to capture images")
        return response.json()
    
    
    def launch(self):
        #self.detect_pannel()
        self.set_config()
        self.capture_image()
        
        # The following approach offers more flexibility on each capture but may increase the delta time between each capture
        #while True:
        #    self.capture_image()
        #    time.sleep(self.capture_interval)

if __name__ == "__main__":
    TeddyFlight().launch()


In [13]:
def check_exst_warp_matrices(warp_matrices_filename: str):
    if Path('./' + warp_matrices_filename).is_file():
        print("Found existing warp matrices for camera")
        load_warp_matrices = np.load(warp_matrices_filename, allow_pickle=True)
        loaded_warp_matrices = []
        for matrix in load_warp_matrices: 
            loaded_warp_matrices.append(matrix.astype('float32'))
        print("Warp matrices successfully loaded.")
        warp_matrices = loaded_warp_matrices
    else:
        print("No existing warp matrices found.")
        warp_matrices = False
    return warp_matrices


def save_warp_matrices(warp_matrices : list, warp_matrices_filename : str, rewrite : bool = True):
    working_wm = warp_matrices
    if not Path('./' + warp_matrices_filename).is_file() or rewrite:
        temp_matrices = []
        for x in working_wm:
            if isinstance(x, np.ndarray):
                temp_matrices.append(x)
            if isinstance(x, skimage.transform._geometric.ProjectiveTransform):
                temp_matrices.append(x.params)
        np.save(warp_matrices_filename, np.array(temp_matrices, dtype=object), allow_pickle=True)
        print("Saved to", Path('./' + warp_matrices_filename).resolve())
    else:
        print("Matrices already exist at",Path('./' + warp_matrices_filename).resolve())
    return


def save_aligned_images(im_aligned, thecapture : capture.Capture, output_dir : str, stacked : bool = True):
    os.makedirs(output_dir, exist_ok=True)
    band_images_names = [Path(img.path).name  for img in thecapture.images]
    
    if stacked:
        # save the 5 bands as a single stacked tif file in RGB color
        rgb_band_indices = [thecapture.band_names_lower().index('red'),
                        thecapture.band_names_lower().index('green'),
                        thecapture.band_names_lower().index('blue')]

        # Create normalized stacks for viewing
        im_display = np.zeros((im_aligned.shape[0],im_aligned.shape[1],im_aligned.shape[2]), dtype=np.float32)
        im_min = np.percentile(im_aligned[:,:,rgb_band_indices].flatten(), 0.5)  # modify these percentiles to adjust contrast
        im_max = np.percentile(im_aligned[:,:,rgb_band_indices].flatten(), 99.5)  # for many images, 0.5 and 99.5 are good values

        # for rgb true color, we use the same min and max scaling across the 3 bands to 
        # maintain the "white balance" of the calibrated image
        for i in rgb_band_indices:
            im_display[:,:,i] =  imageutils.normalize(im_aligned[:,:,i], im_min, im_max)
        rgb = im_display[:,:,rgb_band_indices]
        stacked_rgb_img_name = f"IMG_{thecapture.uuid}.tif"
        cv2.imwrite(os.path.join(output_dir, stacked_rgb_img_name), (rgb * 255).astype(np.uint8))
    
    else:
        num_bands = im_aligned.shape[2]
        for i in range(num_bands):
            band_image = im_aligned[:, :, i]
            filename = band_images_names[i]
            filepath = os.path.join(output_dir, filename)
            cv2.imwrite(filepath, band_image)
    return


def realign_images(set_root_path: str = ".data/0000SET/000", 
                   panels_ids : List[str] = ["0000", "0001", "0002", "0003", "0004", "0005", "0006"],
                   regenerate_matrices : bool = True, 
                   save_as_stack : bool = True):
    
    images_path = Path(set_root_path)
    panels_ids = panels_ids or []
    if not images_path.exists() or not images_path.is_dir():
        print(f"Le répertoire {images_path} n'existe pas ou n'est pas un répertoire.")

    captures = {}
    panels = {}
    for image_path in images_path.glob('IMG_*.tif'):
        match = re.match(r'IMG_(\d+)_\d\.tif', image_path.name)
        if match:
            capture_number = match.group(1)
            if capture_number in panels_ids:
                # c'est une image du panneau
                if capture_number not in panels:
                    panels[capture_number] = []
                panels[capture_number].append(image_path.as_posix())
            else:
                # c'est une image normale
                if capture_number not in captures:
                    captures[capture_number] = []
                captures[capture_number].append(image_path.as_posix())

    valid_captures = {k: v for k, v in captures.items() if len(v) == 5} if captures != {} else None
    valid_panels = {k: v for k, v in panels.items() if len(v) == 5} if panels != {} else None
    
    if valid_panels:
        for capture_number, imageNames in valid_panels.items():
            panel_capture = capture.Capture.from_filelist(imageNames)
    
    if valid_captures:
        for capture_number, imageNames in valid_captures.items():
            thecapture = capture.Capture.from_filelist(imageNames)
            if valid_panels: img_type = "reflectance"
            elif thecapture.dls_present(): img_type='reflectance'
            else: img_type='radiance'
            
            cam_serial = thecapture.camera_serial
            warp_matrices_filename = str(cam_serial) + "_warp_matrices_opencv.npy"
            warp_matrices = check_exst_warp_matrices(warp_matrices_filename)
            if warp_matrices is False or regenerate_matrices is True:
                st = time.time()
                pyramid_levels = 0 # for images with RigRelatives, setting this to 0 or 1 may improve alignment
                max_alignment_iterations = 20
                match_index = 1
                warp_mode = cv2.MOTION_HOMOGRAPHY # MOTION_HOMOGRAPHY or MOTION_AFFINE. For Altum images only use HOMOGRAPHY
                print(f"Aligning images of capture {capture_number}")
                warp_matrices, alignment_pairs = imageutils.align_capture(thecapture,
                                                              ref_index = match_index,
                                                              max_iterations = max_alignment_iterations,
                                                              warp_mode = warp_mode,
                                                              pyramid_levels = pyramid_levels)
                save_warp_matrices(warp_matrices, warp_matrices_filename=warp_matrices_filename, rewrite=regenerate_matrices)
            else:
                print("Using existing warp matrices...")
            
            print(f"Finished Aligning after {int(time.time() - st)} seconds")
            
            cropped_dimensions, edges = imageutils.find_crop_bounds(thecapture, warp_matrices, warp_mode=warp_mode, reference_band=match_index)
            im_aligned = thecapture.create_aligned_capture(warp_matrices=warp_matrices, motion_type=warp_mode, img_type=img_type)
            band_images_names = [Path(imageName).name for imageName in imageNames]
            save_aligned_images(im_aligned, thecapture, set_root_path, stacked=save_as_stack)
            


In [12]:
realign_images(set_root_path = "./data/0000SET/000", panels_ids = ["0000", "0001", "0002", "0003", "0004", "0005", "0006"],
                   regenerate_matrices = True, save_as_stack = True)

No existing warp matrices found.
Aligning images of capture 0007
Finished aligning band 1


error: OpenCV(4.8.1) D:\bld\libopencv_1698890405962\work\modules\video\src\ecc.cpp:574: error: (-7:Iterations do not converge) NaN encountered. in function 'cv::findTransformECC'


In [None]:
import cv2
import numpy as np

def align_image(source_image, template, iterations, eps, motion_type=cv2.MOTION_EUCLIDEAN):
    # Initialize the warp matrix with identity matrix
    warp_matrix_temp = np.float32([[1, 0, 0], [0, 1, 0]])

    # Convert images to grayscale if they are not already
    if len(source_image.shape) == 3:
        source_gray = cv2.cvtColor(source_image, cv2.COLOR_BGR2GRAY)
    else:
        source_gray = source_image.copy()

    if len(template.shape) == 3:
        template_gray = cv2.cvtColor(template, cv2.COLOR_BGR2GRAY)
    else:
        template_gray = template.copy()

    # Find ECC transformation
    cc, warp_matrix = cv2.findTransformECC(template_gray, source_gray, warp_matrix_temp, motion_type, criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, iterations, eps))

    # Apply the found warp_matrix to the source image
    if len(source_image.shape) == 3:
        height, width, _ = source_image.shape
    else:
        height, width = source_image.shape

    aligned_image = cv2.warpAffine(source_image, warp_matrix, (width, height), flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP)

    return aligned_image, warp_matrix
