## Generating metrics utility
This is a utility tool created by willim to allow easy exploration of the work we have done and the metrics which correspond to the solutions we have found.

Good science should be replicable. Running this file from a to z will create a dataset containing the raw images of each scenes, and do so for each cleaning method we found.  

The structure of the the resulting pickle will be:
level 1 : method or raw
level 2: scene

The resulting arrays are normalized to 1 and stored safely into the pickle, ready for data analysis.

In [2]:
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import pickle

**Run all of this next section for the rest to go smoothly.**

In next cell just enter the folder in which you have placed your dataset then run the entire notebook, and for data analysis open data_manipulation.ipynb or image_enhancement.ipynb for asthetics manipulation. 


In [None]:
dataset_location = r"C:\Users\WillQuantique\OneDrive - univ-lyon2.fr\Fac\M1\DataChallege\git\M1_S8_dataChallenge_24"

In [None]:
scene_df1 = pd.read_hdf(dataset_location + r'\scene.hdf5')
scene_df2 = pd.read_hdf(dataset_location + r'\scene2.hdf5')
scene_df3 = pd.read_hdf(dataset_location + r'\scene3.hdf5')
car_scene_df1 = pd.read_hdf(dataset_location + r'\car_scene.hdf5')
car_scene_df2 = pd.read_hdf(dataset_location + r'\car_scene2.hdf5')
car_scene_df3 = pd.read_hdf(dataset_location + r'\car_scene3.hdf5')

In [None]:
calibration_df1 = pd.read_hdf(dataset_location+r"\calibration_set_1.hdf5")
calibration_df2 = pd.read_hdf(dataset_location+r"\calibration_set_2.hdf5")
car_calibration_df1 = pd.read_hdf(dataset_location+r"\car_calibration_set_1.hdf5")
car_calibration_df2 = pd.read_hdf(dataset_location+r"\car_calibration_set_2.hdf5")

In [None]:
def normalize(arr, min_val=0.0, max_val=1.0):
    """
    Normalizes the values of a NumPy array to a specified range.

    Args:
        arr (numpy.ndarray): Input array to be normalized.
        min_val (float): Minimum value of the normalized range (default: 0.0).
        max_val (float): Maximum value of the normalized range (default: 1.0).

    Returns:
        numpy.ndarray: Normalized array with values in the specified range.
    """
    arr_min = np.min(arr)
    arr_max = np.max(arr)
    normalized_arr = (arr - arr_min) / (arr_max - arr_min)
    normalized_arr = normalized_arr * (max_val - min_val) + min_val
    return normalized_arr

### 1. Calibration matching
Here is the method for calibration matching.

In [None]:
# Function to find the closest calibration image
def find_closest_calibration_image(scene_temp, calibration_data):
    calibration_temperatures = calibration_data['t_fpa'].values
    closest_index = (np.abs(calibration_temperatures - scene_temp)).argmin()
    return calibration_data.iloc[closest_index]['image']

# Function to apply denoising using the closest calibration image
def denoise_image(scene_image, calibration_image):
    return scene_image - calibration_image

def calibration_matching(scene_df, calibration_df):
    corrected_scene_images = []
    for i, row in scene_df.iterrows():
        scene_temp = row['t_fpa']
        scene_image = row['image']
        closest_calibration_image = find_closest_calibration_image(scene_temp, calibration_df)
        corrected_image = denoise_image(scene_image, closest_calibration_image)
        corrected_scene_images.append(corrected_image)

# Convert corrected_scene_images to numpy array
    return np.array(corrected_scene_images)

### 2. Linear regression

Here is the linear regression method (best results over all), created for challenge two.

In [None]:

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

After loading the device we have the parameter estimation function.

Note that all calibration df returned the same parameters (to the best of my observation)

In [None]:
def estimate_parameters(calibration_df):
    calibration_images = calibration_df['image'].values
    calibration_temperatures = np.vstack((calibration_df['t_fpa'].values, calibration_df['t_cn'].values)).T
    
    # Ensure calibration_images is a numeric array
    calibration_images = np.array(calibration_images.tolist(), dtype=np.float32)
    calibration_temperatures = np.array(calibration_temperatures, dtype=np.float32)

    # Convert calibration images and temperatures to PyTorch tensors
    calibration_images_tensor = torch.tensor(calibration_images, dtype=torch.float32).to(device)
    calibration_temperatures_tensor = torch.tensor(calibration_temperatures, dtype=torch.float32).to(device)

    height, width = calibration_images_tensor.shape[1:]
    num_pixels = height * width

    # Reshape images for linear regression
    images_reshaped = calibration_images_tensor.view(calibration_images_tensor.shape[0], -1)
    calibration_temperatures_tensor = calibration_temperatures_tensor.to(device)

    # Add bias term for intercept in linear regression
    X = torch.cat([calibration_temperatures_tensor, torch.ones(calibration_temperatures_tensor.size(0), 1).to(device)], dim=1)

    # Perform linear regression using torch.linalg.lstsq
    responsivities_and_offsets = torch.linalg.lstsq(X, images_reshaped).solution

    # Separate responsivities and offsets
    responsivities = responsivities_and_offsets[:-1, :].T
    offsets = responsivities_and_offsets[-1, :]

    # Reshape back to original image shape
    offsets = offsets.view(height, width).cpu().numpy()
    responsivities = responsivities.view(height, width, -1).cpu().numpy()

    return offsets, responsivities


This function applies the array of parameters to one image

In [None]:
def apply_correction(scene_image, offsets, responsivities, scene_temp, avg_black_body_temp):
    # Convert inputs to PyTorch tensors
    scene_image_tensor = torch.tensor(scene_image, dtype=torch.float32).to(device)
    offsets_tensor = torch.tensor(offsets, dtype=torch.float32).to(device)
    responsivities_tensor = torch.tensor(responsivities, dtype=torch.float32).to(device)
    scene_temp_tensor = torch.tensor(scene_temp, dtype=torch.float32).to(device)
    avg_black_body_temp_tensor = torch.tensor(avg_black_body_temp, dtype=torch.float32).to(device)

    height, width = scene_image_tensor.shape
    corrected_image_tensor = torch.zeros_like(scene_image_tensor)

    # Calculate expected value
    intercepts_tensor = offsets_tensor
    coef_fpa_tensor = responsivities_tensor[:, :, 0]
    coef_cn_tensor = responsivities_tensor[:, :, 1]
    expected_value = intercepts_tensor + coef_fpa_tensor * scene_temp_tensor + coef_cn_tensor * avg_black_body_temp_tensor
    
    # Apply correction
    corrected_image_tensor = scene_image_tensor - expected_value

    # Convert the result back to a numpy array if needed
    corrected_image = corrected_image_tensor.cpu().numpy()

    return corrected_image

This function is all one need to run to estimate and apply parameters from a calibration df to a scene df  

In [None]:
def applier(calibration_df,scene_df):
    offset, gain = estimate_parameters(calibration_df)
    avg_black_body_temp = calibration_df['t_cn'].mean()
    # Process each scene image
    corrected_scene_images = []
    
    for index, row in scene_df.iterrows():
        scene_image = row['image']
        scene_temp = row['t_fpa']
        corrected_image = apply_correction(scene_image, offset, gain, scene_temp, avg_black_body_temp)
        corrected_scene_images.append(corrected_image)

    # Convert corrected_scene_images to numpy array
    return np.array(corrected_scene_images)

### 3. Pseudo-calibration

The method we found for challenge 3.

*slicing arrays into block of n pixels, using stride < block size to create overlap*

In [None]:
def slice_array(arr, num=10, stride=8):
    t, x, y = arr.shape
    
    slices = []
    
    for i in range(0, x - x // num + 1, stride):
        for j in range(0, y - y // num + 1, stride):
            slice_x_start = i
            slice_x_end = i + x // num
            slice_y_start = j
            slice_y_end = j + y // num
            
            slice_ij = arr[:, slice_x_start:slice_x_end, slice_y_start:slice_y_end]
            slices.append((slice_ij, (slice_x_start, slice_x_end, slice_y_start, slice_y_end)))
    
    return slices


*rebuilding the arrays*

In [None]:
def rebuild_array(slices, original_shape, num=10):
    t, x, y = original_shape
    
    # Initialize an empty array with the original shape
    rebuilt_array = np.zeros((x, y))
    weight = np.zeros((x, y))
    
    for slice_ij, (slice_x_start, slice_x_end, slice_y_start, slice_y_end) in slices:
        rebuilt_array[slice_x_start:slice_x_end, slice_y_start:slice_y_end] += slice_ij
        weight[slice_x_start:slice_x_end, slice_y_start:slice_y_end] += 1
    
    # Avoid division by zero
    weight[weight == 0] = 1
    return rebuilt_array / weight

*sort images by order of growing variance, allows us to reorder blocks independant of time and give a better pick for the mean*

In [None]:
def sorted_uniform_pictures(arr):
    """
    Sort (t, x, y) array's t slices from most uniform to least uniform.

    Parameters:
    arr (np.ndarray): Input 3D array of shape (t, x, y).

    Returns:
    np.ndarray: A 3D array where the t slices are sorted by uniformity.
    """
    t, x, y = arr.shape
    
    # Compute the variance for each t slice
    variances = np.var(arr, axis=(1, 2))
    
    # Get the sorted indices based on variances
    sorted_indices = np.argsort(variances)
    
    # Extract and sort the t slices
    sorted_slices = arr[sorted_indices]
    
    return sorted_slices

*pick one out of n images in the given sequence and return the corresponding array*

I advise to use `int(len(arr)/n)` for the sake of looping (see the end of the file)  

In [None]:
def pick_regular_slices(arr, num):
    """
    Pick a tenth of all t slices in a regular order from a (t, x, y) array.

    Parameters:
    arr (np.ndarray): Input 3D array of shape (t, x, y).

    Returns:
    np.ndarray: A 3D array with a tenth of the t slices picked in a regular order.
    """
    t, x, y = arr.shape
    
    # Compute the indices to pick a tenth of the t slices at regular intervals
    step = t // num
    indices = np.arange(0, t, step)
    
    # Pick slices at the computed indices
    picked_slices = arr[indices]
    
    return picked_slices

**The one function to run for pseudo calibration, which will include all the previous steps.**

In [None]:
def pseud_calibration(arr, block_size, reg= 50, stride = 4):
    slices = slice_array(arr, block_size,stride)
    t, x, y = arr.shape
    medslice = [sorted_uniform_pictures(s[0]) for s in slices]
    tenth = [pick_regular_slices(s, reg) for s in medslice]
    meaned = [np.mean(t, axis=0) for t in tenth]
    reconstructed = rebuild_array(list(zip(meaned, [(s[1][0], s[1][1], s[1][2], s[1][3]) for s in slices])), (t, x, y), block_size)
    
    return reconstructed

### 4. loop and record data for evaluation

In [None]:

def save_dict_of_dicts_pickle(data, filename):
    with open(filename, 'wb') as f:
        pickle.dump(data, f)

Prepare the the dictionary

In [None]:
data_for_metrics ={
                "method_1": {},
                "method_2": {},
                "method_3": {},
                "raw"     : {}
                }

Loop around for method 1, 2, 3:  
- return calibrated sequence as an array
- normalizes the array
- store it into the corresponding level two part of the dictionary  

Note that each method takes increasingly more time.

In [None]:

# initialisation for the loops of the calibration and scene images
cal_scene1_images = np.stack(scene_df1["image"].values)
calibrated_scene = calibration_matching(scene_df1, calibration_df1)

all_scene_df = [scene_df1,scene_df2,scene_df3,car_scene_df1,car_scene_df2,car_scene_df3]
all_scenes_keys = ["scene_1","scene_2","scene_3","car_scene_1","car_scene_2","car_scene_3"]
all_calib =[calibration_df2,car_calibration_df2]

In [None]:
for i , scene_df in enumerate(all_scene_df):
    calibration_df = all_calib[0] if i <3 else all_calib[1]
    calibrated = calibration_matching(scene_df, calibration_df)
    data_for_metrics["method_1"].update({all_scenes_keys[i] : normalize(calibrated)})

In [None]:
for i , scene_df in enumerate(all_scene_df):
    calibration_df = all_calib[0] if i <3 else all_calib[1]
    calibrated = applier(calibration_df,scene_df)
    data_for_metrics["method_2"].update({all_scenes_keys[i] : normalize(calibrated)})

In [None]:
for i , scene_df in enumerate(all_scene_df):
    arr = np.stack(scene_df["image"].values)
    calibrated = pseud_calibration(arr,32,int(len(arr)/5),4)
    data_for_metrics["method_3"].update({all_scenes_keys[i] :normalize(calibrated)})

For storing raw images

In [None]:
for i , scene_df in enumerate(all_scene_df):
    arr = np.stack(scene_df["image"].values)
    data_for_metrics["raw"].update({all_scenes_keys[i] :normalize(arr)})

**And finally -- this takes a minute or two AND the resulting file is around 14,7 GB.**

In [None]:
save_dict_of_dicts_pickle(data_for_metrics,"data_for_metrics.pkl")