# Empirical Law Estimator: 
## with highest peak width get $w_0$ and using Gradient algorithm and that, obtain $R_0$
This is a notebook to obtain the empirical law that follows the parameter $R_0$ given the estimation by the gravicenter algorithm and the theoretically known $w_0$ and an empirical Law to obtain the $w_0$ parameter if we know the width of the highest peak along the main axis of the CR ring.

The idea is the following:
 - **(1)**: Simulate many many CR rings with different $R_0$ and $w_0$ combinations, fixing the angle at, say, $\phi_{CR}=0$ and $nx=540$ so we get something similar in resolution to the Basler camera.
 
 
 - **(2)**: For each of the images compute the iX image and run the Gradient Algorithm to estimate the masked gravicenter (which shall be aligned with the gravicenter and the maximum of the ring and spoted in the Pogendorf dark ring). Record in a list the tuples ($D:=$dist(masked_grav,grav), w_0, R_0).
 
 
 - **(3)**: For each image, estimate the width of the highest peak above a certain tolerance of intensity to account for possible backgrounds (which in reality should not be present since they are simulated images, but ok). For this, look for the first pixel that is above the tolerance in the outside part of the big peak and then look for the pixel where there is a second trend change from there, by computing the difference for instance (the differential).
 
 
 - **(4)**: Run two fully connected and two simple regressors to fit each a model for our two objectives. Train the fully connecteds with gradient descent, while the simple multidimensional regressors (1 layer) using the normal equations if possible. Use the L1 norm to avoid unnecessary terms to have weight and get the simplest of the expressions (use also the L2 norm, in case we get a considerably better model).
    
    In order to compute the $w_0$ from the width of the highest peak, we could introduce also the data of the $D$ in order to give more flexibility to the model (try both things). For obtaining $R_0$ then use both the estimated $w_0$ and the $D$.
 
 
 - **(5)**: Save the trained models, and make a section where we can input experimental images and obtain automatically (or manually, since maybe the noise now gives us strange things) the width of the highest peak and with it $w_0$ and then $R_0$ with the gradient algorithm estimate.

In [1]:
import os
from GPU_Classes import *
from Image_Manager import *
from Polarization_Obtention_Algorithms import Gradient_Algorithm
import numpy as np
import json
import cv2
import pandas as pd
import matplotlib.pyplot as plt
experiment_name="540_Basler_like_phiCR_0" # "NOT_USING_INTERPOLATION_IN_iX" # "RECENTERING_AVERAGE_IMAGE_TO_iX_USING_INTERPOLATION" # "RECENTERING_AVERAGE_IMAGE_TO_iX_NOT_USING_INTERPOLATION" # los 4 con 540 y los 4 con el doble de resolucion y ver que sacamos de los resultados - note that this means only two simulation rounds are necessary
vig_path=f"./OUTPUT/EMPIRICAL_LAWS/{experiment_name}/VIGILANT_{experiment_name}.json"
os.chdir(f"/home/oiangu/Desktop/Conical_Refraction_Polarimeter/")

## Step 1: Run simulations for theoretical samples with ground-truths +
## Step 2: Compute the iX image and run the gradient algorithm, saving the results, ground-truths and Profiles along the main axis

In [2]:
# Set the PARAMETERS ############################################
##################################################################
randomization_seed=666
image_depth=8 # or 16 bit per pixel
pol_or_CR="pol"
image_shortest_side=540
saturation=1

# 1. SIMULATION ####################################################
# Ring parameters to test (each will be a different simulation)
phiCR_s=[0]
R0_s=np.linspace(70,180,40) # in pxels
#rho_0s=np.array([2,3,4,5,6,7,9,11,14,20])
w0_s=np.linspace(8,50,40)
rho_0s=R0_s/w0_s


resolution_side_nx=image_shortest_side # generated images will be resolution_side x resolution_side
# Other parameters
max_k=50
num_k=1200
sim_chunk_ax=image_shortest_side

# PROFILES
pix_spacing=3

# 4. GRAVICENTER iX and PROFILES ######################################
X=int(resolution_side_nx*1.4/2)
interpolation_flag= cv2.INTER_CUBIC #{"CUBIC":cv2.INTER_CUBIC, "LANCZOS":cv2.INTER_LANCZOS4}# "LINEAR":cv2.INTER_LINEAR, "AREA":cv2.INTER_AREA, "NEAREST":cv2.INTER_NEAREST}

# 5. GRADIENT ALGORITHM ###################################
# Each using both Fibonacci and Quadratic Fit Search

rad_min_Grav=3
rad_max_Grav=image_shortest_side/2-2
initial_guess_delta_pix=10

use_exact_gravicenter=True
precision_quadratic=1e-10
max_it_quadratic=100
cost_tolerance_quadratic=1e-14
precision_fibonacci=1e-10
max_points_fibonacci=100
cost_tolerance_fibonacci=1e-14

##################################################
##################################################

# General considerations
im_type=np.uint16 if image_depth==16 else np.uint8
max_intensity=65535 if image_depth==16 else 255
np.random.seed(randomization_seed)
polCR=1 if pol_or_CR=='CR' else 0.5

vig_path=f"./OUTPUT/EMPIRICAL_LAWS/{experiment_name}/VIGILANT_{experiment_name}.json"
os.makedirs(f"./OUTPUT/EMPIRICAL_LAWS/{experiment_name}/", exist_ok=True)

In [3]:
# GENERAL ROUTINES #################################
def compute_intensity_gravity_center(image):
    """
        Expects input image to be an array of dimensions [h, w].
        It will return an array of gravity centers [2(h,w)] in pixel coordinates
        Remember that pixel coordinates are set equal to numpy indices

    """
    # image wise total intensity and marginalized inensities for weighted sum
    intensity_in_w = np.sum(image, axis=0) # weights for x [raw_width]
    intensity_in_h = np.sum(image, axis=1) # weights for y [raw_height]
    total_intensity = intensity_in_h.sum()

    # Compute mass center for intensity
    # [2] (h_center,w_center)
    return np.nan_to_num( np.stack(
        (np.dot(intensity_in_h, np.arange(image.shape[0]))/total_intensity,
         np.dot(intensity_in_w, np.arange(image.shape[1]))/total_intensity)
        ) )

def compute_raw_to_centered_iX(image, X):

    g_raw = compute_intensity_gravity_center(image)
    # crop the iamges with size (X+1+X)^2 leaving the gravity center in
    # the central pixel of the image. In case the image is not big enough for the cropping,
    # a 0 padding will be made.
    centered_image = np.zeros( (2*X+1, 2*X+1),  dtype = image.dtype )

    # we round the gravity centers to the nearest pixel indices
    g_index_raw = np.rint(g_raw).astype(int) #[N_images, 2]

    # obtain the slicing indices around the center of gravity
    # TODO -> make all this with a single array operation by stacking the lower and upper in
    # a new axis!!
    # [ 2 (h,w)]
    unclipped_lower = g_index_raw[:]-X
    unclipped_upper = g_index_raw[:]+X+1
    # unclippde could get out of bounds for the indices, so we clip them
    lower_bound = np.clip( unclipped_lower, a_min=0, a_max=image.shape)
    upper_bound = np.clip( unclipped_upper, a_min=0, a_max=image.shape)
    # we use the difference between the clipped and unclipped to get the necessary padding
    # such that the center of gravity is left still in the center of the image
    padding_lower = lower_bound-unclipped_lower
    padding_upper = upper_bound-unclipped_upper

    # crop the image
    centered_image[padding_lower[0]:padding_upper[0] or None,
                                    padding_lower[1]:padding_upper[1] or None ] = \
                  image[lower_bound[0]:upper_bound[0],
                                      lower_bound[1]:upper_bound[1]]
    return centered_image
    '''
    else:
        # We compute the center of gravity of the cropped images, if everything was made allright
        # they should get just centered in the central pixels number X+1 (index X)
        g_centered = compute_intensity_gravity_center(centered_image)

        # We now compute a floating translation of the image so that the gravicenter is exactly
        # centered at pixel (607.5, 607.5) (exact center of the image in pixel coordinates staring
        # form (0,0) and having size (607*2+1)x2), instead of being centered at the beginning of
        # around pixel (607,607) as is now
        translate_vectors = X+0.5-g_centered #[ 2(h,w)]
        T = np.float64([[1,0, translate_vectors[1]], [0,1, translate_vectors[0]]])
        return cv2.warpAffine( centered_image, T, (X*2+1, X*2+1),
                    flags=interpolation_flag) # interpolation method
    '''

In [None]:
# Initialize the vigilant
try:
    phase_vigilant = json.load(open(vig_path))
except:
    phase_vigilant = {'stage':0, 'simulation_IDs':[], 'GT_R0s':[], 'GT_w0s':[], 'profiles':[], 'Ds':[], 'grav':[], 'masked_grav':[], 'angle_error':[], 'slope':[]}

# Set the objects ready ##################
# The image manager
image_loader = Image_Manager(mode=X, interpolation_flag=None)
# Define the Gradient algorithm
gradient_algorithm = Gradient_Algorithm(image_loader,
        rad_min_Grav, rad_max_Grav,
        initial_guess_delta_pix,
        use_exact_gravicenter)
image_container=np.zeros( (1, 2*X+1, 2*X+1), dtype=np.float64)
image_names=['a']
# The simulator object
simulator=RingSimulator_Optimizer_GPU( n=1.5, a0=1.0, max_k=max_k, num_k=num_k, nx=resolution_side_nx, sim_chunk_x=sim_chunk_ax, sim_chunk_y=sim_chunk_ax)

cols = np.broadcast_to( np.arange(X*2+1), (X*2+1,X*2+1)) #[h,w]
rows = cols.swapaxes(0,1) #[h,w]

# Execute the stuff #####################
i=1
for phi_CR in phiCR_s:
    for R0 in R0_s:
        for w0 in w0_s:
            ID=f"phiCR_{phi_CR}_R0_{R0}_w0_{w0}"
            if ID not in phase_vigilant['simulation_IDs']:
                # simulate image
                image=simulator.compute_CR_ring( CR_ring_angle=phi_CR, R0_pixels=R0, Z=0, w0_pixels=w0)
                # normalize the image to output datatype
                image = max_intensity*(image.astype(np.float64)/image.max())
                phase_vigilant['simulation_IDs'].append(ID)
                phase_vigilant['GT_R0s'].append(R0)
                phase_vigilant['GT_w0s'].append(w0)
                
                # get iX image
                image = np.where( image<=(max_intensity*saturation), image, max_intensity*saturation)
                image = compute_raw_to_centered_iX(image, X)
                
                # run gradient algorithm
                # charge the image
                image_container[0]=image.astype(np.float64)
                image_names[0]=ID
                # charge the image loader:
                image_loader.import_converted_images_as_array(image_container, image_names)
                
                optimal_masked_gravs={}
                optimal_radii={}
                optimal_angle={}
                grav=compute_intensity_gravity_center(image)
                # run both fibonacci and quadratic and then compute the average as the desired optimal
                gradient_algorithm.reInitialize(image_loader)
                gradient_algorithm.quadratic_fit_search(precision_quadratic, max_it_quadratic, cost_tolerance_quadratic)
                optimal_masked_gravs['quad'] = gradient_algorithm.masked_gravs[f"Quadratic_Search_{ID}"]
                optimal_radii['quad'] = gradient_algorithm.optimals[f"Quadratic_Search_{ID}"]
                optimal_angle['quad'] = gradient_algorithm.angles[f"Quadratic_Search_{ID}"]
                gradient_algorithm.reInitialize(image_loader)
                gradient_algorithm.fibonacci_ratio_search(precision_fibonacci, max_points_fibonacci, cost_tolerance_fibonacci)
                optimal_masked_gravs['fibo'] = gradient_algorithm.masked_gravs[f"Fibonacci_Search_{ID}"]
                optimal_radii['fibo'] = gradient_algorithm.optimals[f"Fibonacci_Search_{ID}"]
                optimal_angle['fibo'] = gradient_algorithm.angles[f"Fibonacci_Search_{ID}"]

                masked_grav=(optimal_masked_gravs['quad']+optimal_masked_gravs['fibo'])/2.0
                
                phase_vigilant['Ds'].append((optimal_radii['quad']+optimal_radii['fibo'])/2)
                phase_vigilant['grav'].append(grav.tolist())
                phase_vigilant['masked_grav'].append(masked_grav.tolist())
                # since we know that phiCR=0 we could input the exact profile of the main axis...but maybe for 
                # generality afterwards it will be nice to do it as there
                slope=(masked_grav[0]-grav[0])/(masked_grav[1]-grav[1])
                mask=(rows<( slope*(cols-masked_grav[1]) +masked_grav[0]+pix_spacing )) & (rows>( slope*(cols-masked_grav[1]) +masked_grav[0]-pix_spacing ))

                filtered_image=np.where(mask, image, 0)
                phase_vigilant['profiles'].append((np.sum(filtered_image,axis=0)).tolist())
                phase_vigilant['slope'].append(slope)
                
                # it is good to record the error performed in the angle as a measure of how accurate the gradient algorithm was detecting the main axis
                phase_vigilant['angle_error'].append(max(abs(optimal_angle['fibo']-phi_CR), abs(optimal_angle['quad']-phi_CR)))
                # we save the progess (in order to be able to quit and resume)
                json.dump(phase_vigilant, open( vig_path, "w"))
                print(f"{i}-th Simulated")
                i+=1

# We print the maximum error that happened in the angle detection, as a sanity check
print(f"Maximum error made in angle detection is {max(phase_vigilant['angle_error'])}")

# We pass to the next stage
if phase_vigilant['stage']==0:
    phase_vigilant['stage']=2
    json.dump(phase_vigilant, open( vig_path, "w"))


1-th Simulated
2-th Simulated
3-th Simulated
4-th Simulated
5-th Simulated
6-th Simulated
7-th Simulated
8-th Simulated
9-th Simulated
10-th Simulated
11-th Simulated
12-th Simulated
13-th Simulated
14-th Simulated
15-th Simulated
16-th Simulated
17-th Simulated
18-th Simulated
19-th Simulated
20-th Simulated
21-th Simulated
22-th Simulated
23-th Simulated
24-th Simulated
25-th Simulated
26-th Simulated
27-th Simulated
28-th Simulated
29-th Simulated
30-th Simulated
31-th Simulated
32-th Simulated
33-th Simulated
34-th Simulated
35-th Simulated
36-th Simulated
37-th Simulated
38-th Simulated
39-th Simulated
40-th Simulated
41-th Simulated
42-th Simulated
43-th Simulated
44-th Simulated
45-th Simulated
46-th Simulated
47-th Simulated
48-th Simulated
49-th Simulated
50-th Simulated
51-th Simulated
52-th Simulated
53-th Simulated
54-th Simulated
55-th Simulated
56-th Simulated
57-th Simulated
58-th Simulated
59-th Simulated
60-th Simulated
61-th Simulated
62-th Simulated
63-th Simulated
6

## Step 3: Estimate the width of the highest peak for each image's profile
For this, we could actually use the masked gravicenter, since the optimal is supposed to be exactly in the Pogendorf dark ring, but well, we will use a different approach

In [None]:
def plot_sanity_check_of_result(profile, first_increment_index, valley_min_index, slope):
    plt.rc('font', size=8) 
    fig = plt.figure(figsize=(2*4.5, 2*4.5))
    axes=fig.subplots(1,1)
    axes.plot(np.arange(len(profile))*np.sqrt(1+slope**2) , profile , markersize=1, label=f'Intensity profile along main axis')
    axes.plot([first_increment_index, valley_min_index], [0,0], 'or')
    axes.grid()
    axes.set_ylabel(f'Reduced Intensity profile along main axis')
    axes.set_xlabel("Pixels along the main axis") 

    plt.show()

In [None]:
tolerance=0.001
phase_vigilant['Ws']=[]

for k, profile in enumerate(phase_vigilant['profiles']):
    profile=np.array(profile)
    grav_x=round(phase_vigilant['grav'][k][1])
    if np.sum(profile[:grav_x])<np.sum(profile[grav_x:]): # then the bump is on the right
        profile=np.flip(profile)
    # At this point any profile has the bump in the left
    first_increment_index=np.argmax(profile>=tolerance) # it will stop in the first True it finds
    diff_prof=profile[1:]-profile[:-1]
    is_increasing=(diff_prof>tolerance) # we select the points whose next point is higher than them
    first_increment_index=np.argmax(is_increasing)
    peak_of_high_bump_index=first_increment_index+np.argmin(is_increasing[first_increment_index:])
    valley_min_index=peak_of_high_bump_index+np.argmax(is_increasing[peak_of_high_bump_index:])
    
    W = (valley_min_index-first_increment_index)*np.sqrt(1+phase_vigilant['slope'][k]**2) 
    phase_vigilant['Ws'].append(W)
    print(f"{k}-th W found: {W} with w0 {phase_vigilant['GT_w0s'][k]} and R0 {phase_vigilant['GT_R0s'][k]}")
    #plot_sanity_check_of_result(profile, first_increment_index, valley_min_index, phase_vigilant['slope'][k])


# We print the maximum difference between the R0 and the W, as a sanity check
print(f"Maximum R0 to W absolute difference found is {np.max(np.abs((2.4*np.array(phase_vigilant['GT_w0s'])-np.array(phase_vigilant['Ws']))))}")

# We pass to the next stage
if phase_vigilant['stage']==2:
    phase_vigilant['stage']=3
    #phase_vigilant['W']=[]
    json.dump(phase_vigilant, open( vig_path, "w"))

## Step 4: Run the Machine Learning to estimate the empirical laws. Save the fitted models.

We are going to train several models.

### a) Models to obtain $w_0$ from the $W$ and the $D$

### b) Models to obtain $R_0$ from the $W$ and the $D$ or the estimated $w_0$ and the $D$ - in the end it should be roughly the same -

### (c) Obtain $w_0, R_0$ from $D$, $slope$ and $Profile$ as a ten layer fc model



In [None]:
import torch #should be installed by default in any colab notebook
import numpy as np
import matplotlib.pyplot as plt
from IPython import display
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
import matplotlib.pyplot as plt


assert torch.cuda.is_available(), "GPU is not enabled"

# use gpu if available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

### NORMAL EQUATIONS
### (a) Obtain $w_0$ from $W$ and $D$ as a Linear Model (the rule of thumb)
First define the design matrix:

In [None]:
phase_vigilant = json.load(open(vig_path))

# If we only include the W
y = np.array(phase_vigilant['GT_w0s'])
X = np.array(np.array([  phase_vigilant['Ws']])
                ).T # the potential arguments to the empirical function we are looking for
LS_w_1 = np.linalg.solve(X.T@X,X.T@y)

print(f"Least square solution is w_0=W*{LS_w_1[0]:5.5}, or W=w_0*{(1/LS_w_1[0]):5.5}, with average abs error {np.mean(np.abs(y-X@LS_w_1)):5.5} pix")

# If we have bias as well
X = np.array(np.array([ [1 for i in range(len(phase_vigilant['Ws']))], phase_vigilant['Ws']])
                ).T
LS_w_2 = np.linalg.solve(X.T@X,X.T@y)

print(f"\nLeast square solution is w_0={LS_w_2[0]:5.5}+W*{LS_w_2[1]:5.5}, or W=(w_0-{LS_w_2[0]:5.5})*{1/LS_w_2[1]:5.5}, with average abs error {np.mean(np.abs(y-X@LS_w_2)):5.5} pix")

# If we allow D to enter the equation
X = np.array(np.array([ [1 for i in range(len(phase_vigilant['Ws']))], phase_vigilant['Ws'],
                    phase_vigilant['Ds']])
                ).T
LS_w_3 = np.linalg.solve(X.T@X,X.T@y)

print(f"\nLeast square solution is w_0={LS_w_3[0]:5.5}+W*{LS_w_3[1]:5.5}+D*{LS_w_3[2]:5.5} with average abs error {np.mean(np.abs(y-X@LS_w_3)):5.5} pix")

# Id we allow to have crossed terms and bias
X = np.array(np.array([ [1 for i in range(len(phase_vigilant['Ws']))], phase_vigilant['Ws'],
                    phase_vigilant['Ds'],np.array(phase_vigilant['Ws'])*np.array(phase_vigilant['Ds'] )])
                ).T
LS_w_4 = np.linalg.solve(X.T@X,X.T@y)

print(f"\nLeast square solution is w_0={LS_w_4[0]:5.5}+W*{LS_w_4[1]:5.5}+D*{LS_w_4[2]:5.5}+{LS_w_4[3]:5.5}*WD with average abs error {np.mean(np.abs(y-X@LS_w_4)):5.5} pix")

# Last combination
X = np.array(np.array([  phase_vigilant['Ws'],
                    phase_vigilant['Ds']])
                ).T
LS_w_5 = np.linalg.solve(X.T@X,X.T@y)
print(f"\nLeast square solution is w_0=W*{LS_w_5[0]:5.5}+D*{LS_w_5[1]:5.5} with average abs error {np.mean(np.abs(y-X@LS_w_5)):5.5} pix")


Clearly the best rule of thumb is given by the simple relation with W. We could then check if regularization would say the same we will compute GD with an L1+L2 regularization appart from the MSE objective function.


In [None]:
estimated_w0=np.array(np.array([  phase_vigilant['Ws']])).T@LS_w_1

### (b.1) Obtain $R_0$ from $D$ and estimated $w_0$ with the other rule of thumb

In [None]:
phase_vigilant = json.load(open(vig_path))

# If we only include the W
y = np.array(phase_vigilant['GT_R0s'])
X = np.array(np.array([  estimated_w0 ])
                ).T # the potential arguments to the empirical function we are looking for
LS_w_1 = np.linalg.solve(X.T@X,X.T@y)

print(f"Least square solution is R_0=hat_w0*{LS_w_1[0]:5.5}, or hat_w0=R_0*{(1/LS_w_1[0]):5.5}, with average abs error {np.mean(np.abs(y-X@LS_w_1)):5.5} pix")

# If we have bias as well
X = np.array(np.array([ [1 for i in range(len(estimated_w0))], estimated_w0])
                ).T
LS_w_2 = np.linalg.solve(X.T@X,X.T@y)

print(f"\nLeast square solution is R_0={LS_w_2[0]:5.5}+hat_w0*{LS_w_2[1]:5.5}, or hat_w0=(R_0-{LS_w_2[0]:5.5})*{1/LS_w_2[1]:5.5}, with average abs error {np.mean(np.abs(y-X@LS_w_2)):5.5} pix")

# If we allow D to enter the equation
X = np.array(np.array([ [1 for i in range(len(estimated_w0))], estimated_w0,
                    phase_vigilant['Ds']])
                ).T
LS_w_3 = np.linalg.solve(X.T@X,X.T@y)

print(f"\nLeast square solution is R_0={LS_w_3[0]:5.5}+hat_w0*{LS_w_3[1]:5.5}+D*{LS_w_3[2]:5.5} with average abs error {np.mean(np.abs(y-X@LS_w_3)):5.5} pix")

# Id we allow to have crossed terms and bias
X = np.array(np.array([ [1 for i in range(len(estimated_w0))], estimated_w0,
                    phase_vigilant['Ds'],estimated_w0*np.array(phase_vigilant['Ds'] )])
                ).T
LS_w_4 = np.linalg.solve(X.T@X,X.T@y)

print(f"\nLeast square solution is R_0={LS_w_4[0]:5.5}+hat_w0*{LS_w_4[1]:5.5}+D*{LS_w_4[2]:5.5}+{LS_w_4[3]:5.5}*WD with average abs error {np.mean(np.abs(y-X@LS_w_4)):5.5} pix")

# Last combination
X = np.array(np.array([  estimated_w0,
                    phase_vigilant['Ds']])
                ).T
LS_w_5 = np.linalg.solve(X.T@X,X.T@y)
print(f"\nLeast square solution is R_0=hat_w0*{LS_w_5[0]:5.5}+D*{LS_w_5[1]:5.5} with average abs error {np.mean(np.abs(y-X@LS_w_5)):5.5} pix")


### (b.2) Obtain $R_0$ from $D$ and $W$

In [None]:
phase_vigilant = json.load(open(vig_path))

# If we only include the W
y = np.array(phase_vigilant['GT_R0s'])
X = np.array(np.array([  phase_vigilant['Ws']])
                ).T # the potential arguments to the empirical function we are looking for
LS_w_1 = np.linalg.solve(X.T@X,X.T@y)

print(f"Least square solution is R_0=W*{LS_w_1[0]:5.5}, or W=R_0*{(1/LS_w_1[0]):5.5}, with average abs error {np.mean(np.abs(y-X@LS_w_1)):5.5} pix")

# If we have bias as well
X = np.array(np.array([ [1 for i in range(len(phase_vigilant['Ws']))], phase_vigilant['Ws']])
                ).T
LS_w_2 = np.linalg.solve(X.T@X,X.T@y)

print(f"\nLeast square solution is R_0={LS_w_2[0]:5.5}+W*{LS_w_2[1]:5.5}, or W=(R_0-{LS_w_2[0]:5.5})*{1/LS_w_2[1]:5.5}, with average abs error {np.mean(np.abs(y-X@LS_w_2)):5.5} pix")

# If we allow D to enter the equation
X = np.array(np.array([ [1 for i in range(len(phase_vigilant['Ws']))], phase_vigilant['Ws'],
                    phase_vigilant['Ds']])
                ).T
LS_w_3 = np.linalg.solve(X.T@X,X.T@y)

print(f"\nLeast square solution is R_0={LS_w_3[0]:5.5}+W*{LS_w_3[1]:5.5}+D*{LS_w_3[2]:5.5} with average abs error {np.mean(np.abs(y-X@LS_w_3)):5.5} pix")

# Id we allow to have crossed terms and bias
X = np.array(np.array([ [1 for i in range(len(phase_vigilant['Ws']))], phase_vigilant['Ws'],
                    phase_vigilant['Ds'],np.array(phase_vigilant['Ws'])*np.array(phase_vigilant['Ds'] )])
                ).T
LS_w_4 = np.linalg.solve(X.T@X,X.T@y)

print(f"\nLeast square solution is R_0={LS_w_4[0]:5.5}+W*{LS_w_4[1]:5.5}+D*{LS_w_4[2]:5.5}+{LS_w_4[3]:5.5}*WD with average abs error {np.mean(np.abs(y-X@LS_w_4)):5.5} pix")

# Last combination
X = np.array(np.array([  phase_vigilant['Ws'],
                    phase_vigilant['Ds']])
                ).T
LS_w_5 = np.linalg.solve(X.T@X,X.T@y)
print(f"\nLeast square solution is R_0=W*{LS_w_5[0]:5.5}+D*{LS_w_5[1]:5.5} with average abs error {np.mean(np.abs(y-X@LS_w_5)):5.5} pix")


### NEURAL NETWORKS -Gradient Descent using L1 and MSE cost fct-
#### Define the  FC model classes#### Define the dataset and the FC model classes

In [None]:
from collections import OrderedDict

class FCModel(nn.Module):
    def __init__(self, input_size, n_layers, neurons_per_layer, use_relu_in_last=False): 
        # last layer number of neurons will be equal to the output size!
        super(FCModel, self).__init__()
        self.input_size = input_size

        sequence=[(f'Linear0', nn.Linear(input_size, neurons_per_layer[0])), (f'ReLU0', nn.ReLU())]
        for i in range(1,n_layers):
            sequence.append((f'Linear{i}', nn.Linear(neurons_per_layer[i-1], neurons_per_layer[i])))
            sequence.append((f'ReLU{i}', nn.ReLU()))
            
        if use_relu_in_last==False:
            sequence=sequence[:-1]
        self.network = nn.Sequential(OrderedDict( sequence ))

    def forward(self, x):
        x = x.view(-1, self.input_size)
        return self.network(x)
    
def createModel(input_size  = 28*28, 
                output_size = 10, n_layers=128, neurons_per_layer = 128*[30], use_relu_in_last=False,
                parameter_initialiser=None):

    torch.manual_seed(0) # seed for reproductibility

    model = FCModel(input_size, n_layers, neurons_per_layer, use_relu_in_last)

    # subroutine to count number of parameters in the model
    def get_n_params(model):
        np=0
        for p in list(model.parameters()):
            np += p.numel()
        return np

    print(f"Number of parameters in model {get_n_params(model)}")

    # move model to gpu if available
    model.to(device)
    if not parameter_initialiser==None:
        initialize_parameters(model)
    return model

#### The routines to validate and train

In [None]:
@torch.no_grad()  # prevent this function from computing gradients 
def validate(criterion, model, loader): #show_confusion_matrix = False):

    val_loss = 0
    max_abs_error = torch.Tensor([0]).to(device)
    mean_abs_error = 0
    preds = torch.Tensor().to(device)
    targets = torch.Tensor().to(device)

    model.eval()

    for data, target in loader:

        data, target = data.to(device), target.to(device)
        data = data.view(-1, model.input_size)
        output = model(data)
        target = target.view(output.shape)
        loss = criterion(output, target, model)
        val_loss += loss.item()                                                              
        #pred = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability
        #preds = torch.cat((preds, pred.view_as(target)))
        #targets= torch.cat((targets, target))                                                                 
        #correct += pred.eq(target.view_as(pred)).sum().item()
        max_abs_error = torch.maximum(torch.max(torch.abs(output-target), 0).values, max_abs_error)
        mean_abs_error += torch.sum(torch.abs(output-target), 0)

    val_loss /= len(loader.dataset)
    mean_abs_error /= len(loader.dataset)
    #accuracy = 100. * correct / len(loader.dataset)
    print(f'\nValidation set: Average loss: {val_loss:.4f}, Average Abs Error: {np.array(mean_abs_error.cpu())}, Maximum Abs Error: {np.array(max_abs_error.cpu())} \n')

    #if show_confusion_matrix:
    #    visualize_confusion_matrix(preds.to(torch.device('cpu')), targets.to(torch.device('cpu')))

    return val_loss


def train(epoch, criterion, model, optimizer, loader, print_loss_every_batches=100):
    
    total_loss = 0.0

    model.train()

    for batch_idx, (data, target) in enumerate(loader):
        
        optimizer.zero_grad()

        data, target = data.to(device), target.to(device)

        data = data.view(-1, model.input_size)

        output = model(data)
        loss = criterion(output, target, model)
        loss.backward()
        optimizer.step()
        
        # print loss every N batches
        if batch_idx % print_loss_every_batches == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(loader.dataset),
                print_loss_every_batches * batch_idx / len(loader), loss.item()))


        total_loss += loss.item()  #.item() is very important here
        # Why?-> In order to avoid having total_loss as a tensor in the gpu

    return total_loss / len(loader.dataset)

#### The full training loop

In [None]:
def full_training_loop(model, criterion, optimizer, train_loader, val_loader, epochs=10, print_loss_every_batches=100):
    losses = {"train": [], "val": []}
    for epoch in range(epochs):

        train_loss = train(epoch, criterion, model, optimizer, train_loader, print_loss_every_batches=print_loss_every_batches)
        val_loss = validate(criterion, model, val_loader)
        losses["train"].append(train_loss)
        losses["val"].append(val_loss)
        
        display.clear_output(wait=True)

        plt.plot(losses["train"], label="training loss")
        plt.plot(losses["val"], label="validation loss")
        plt.yscale('log')
        plt.legend()
        plt.pause(0.01)
        plt.show()   
    return losses

### (a) Obtain $w_0$ from $W$ and $D$ as a Linear One Layer Model (the rule of thumb)
First define the data:

In [None]:
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, data, targets):
        self.data = data
        self.targets = targets
        
    def __getitem__(self, index):
        x = self.data[index]
        y = self.targets[index]
        return x, y
    
    def __len__(self):
        return len(self.data)

phase_vigilant = json.load(open(vig_path))
targets = torch.tensor(phase_vigilant['GT_w0s']).float()
data = torch.tensor(np.array([phase_vigilant['Ws'], phase_vigilant['Ds'], 
                     np.array(phase_vigilant['Ds'])*np.array(phase_vigilant['Ws'])])).transpose(0,1).float() # the potential arguments
                                                                # to the empirical function we are looking for

full_dataset = CustomDataset(data, targets)
train_size = int(0.95 * len(full_dataset))
test_size = len(full_dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(full_dataset, [train_size, test_size])

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=200, shuffle=True, num_workers=2)
val_loader = torch.utils.data.DataLoader(test_dataset, batch_size=200, shuffle=True, num_workers=2)

Custom loss for having L1 regularization

In [None]:
def MSE_L1_loss(estimation, target, model, decay=1e-4):
    losser = nn.MSELoss()
    loss = losser(estimation, target) # mse computation
    #L1_reg = torch.tensor(0., requires_grad=True)
    #for name, param in model.named_parameters():
    #    if 'weight' in name:
    #        L1_reg = L1_reg + torch.norm(param, 1)
    #loss = loss + decay * L1_reg
    return loss

In [None]:
# We instantiate a model
thumb_model_w0 = createModel( input_size  = 3, n_layers=1, 
                    neurons_per_layer = [1],  use_relu_in_last=False,
                    parameter_initialiser=None) # que triste jajajajaja

# nn package also has different loss functions.
# we use mean square error for the regression task
criterion = lambda est,targ,model : MSE_L1_loss(est,targ, model) 

# we use the optim package to apply
# stochastic gradient descent for our parameter updates
#optimizer = torch.optim.SGD(thumb_model_w0.parameters(), lr=1e-8, momentum=0.9, weight_decay=0) # built-in L2 weight decay
optimizer = torch.optim.Adagrad(thumb_model_w0.parameters(), lr=0.1,lr_decay=0.01, weight_decay=0.3, initial_accumulator_value=0, eps=1e-10)

# Execute the training and validation
losses = full_training_loop(thumb_model_w0, criterion, optimizer, train_loader, val_loader, epochs=100, print_loss_every_batches=200)

print("\n\n\nFINAL VALIDATION! ####################################################\n\n")
validate(criterion, thumb_model_w0, val_loader)

In [None]:
print(thumb_model_w0)

print(thumb_model_w0.parameters())
# Print model's state_dict
print("\n\nModel's state_dict:")
for param_tensor in thumb_model_w0.state_dict():
    print(param_tensor, "\t", thumb_model_w0.state_dict()[param_tensor])

### (b.1) Obtain $R_0$ from $D$ and estimated $w_0$ with the other rule of thumb

In [None]:
(torch.cat([estimated_w0, torch.Tensor(phase_vigilant['Ds']).unsqueeze(1), 
                     torch.Tensor(phase_vigilant['Ds']).unsqueeze(1)*(estimated_w0)], 1)).shape

In [None]:
phase_vigilant = json.load(open(vig_path))
targets = torch.tensor(phase_vigilant['GT_R0s']).float()
estimated_w0=thumb_model_w0(data.to(device)).cpu().detach().clone()
data = torch.cat([estimated_w0, torch.Tensor(phase_vigilant['Ds']).unsqueeze(1), 
                     torch.Tensor(phase_vigilant['Ds']).unsqueeze(1)*(estimated_w0)], 1).float() # the potential arguments
                                                                # to the empirical function we are looking for

full_dataset = CustomDataset(data, targets)
train_size = int(0.95 * len(full_dataset))
test_size = len(full_dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(full_dataset, [train_size, test_size])

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=200, shuffle=True, num_workers=2)
val_loader = torch.utils.data.DataLoader(test_dataset, batch_size=200, shuffle=True, num_workers=2)

In [None]:
# We instantiate a model
thumb_model_R0_1 = createModel( input_size  = 3, n_layers=1, 
                    neurons_per_layer = [1],  use_relu_in_last=False,
                    parameter_initialiser=None) # que triste jajajajaja

# nn package also has different loss functions.
# we use mean square error for the regression task
criterion = lambda est,targ,model : MSE_L1_loss(est,targ, model) 

# we use the optim package to apply
# stochastic gradient descent for our parameter updates
#optimizer = torch.optim.SGD(thumb_model_R0_1.parameters(), lr=1e-9, momentum=0.2, weight_decay=0) # built-in L2 weight decay
optimizer = torch.optim.Adagrad(thumb_model_R0_1.parameters(), lr=0.1, lr_decay=0.01, weight_decay=0.3, initial_accumulator_value=0, eps=1e-10)
#optimizer = torch.optim.Adam(thumb_model_R0_1.parameters(), lr=0.05, betas=(0.9, 0.999), eps=1e-08, weight_decay=0.7, amsgrad=True) 
#optimizer = torch.optim.RMSprop(thumb_model_R0_1.parameters(), lr=0.01, alpha=0.7, eps=1e-08, weight_decay=0.2, momentum=0.1, centered=False)

# Execute the training and validation
losses = full_training_loop(thumb_model_R0_1, criterion, optimizer, train_loader, val_loader, epochs=70, print_loss_every_batches=400)

print("\n\n\nFINAL VALIDATION! ####################################################\n\n")
validate(criterion, thumb_model_R0_1, val_loader)

In [None]:
print(thumb_model_R0_1)

print(thumb_model_R0_1.parameters())
# Print model's state_dict
print("\n\nModel's state_dict:")
for param_tensor in thumb_model_R0_1.state_dict():
    print(param_tensor, "\t", thumb_model_R0_1.state_dict()[param_tensor])

### (b.2) Obtain $R_0$ from $D$ and $W$

In [None]:
phase_vigilant = json.load(open(vig_path))
targets = torch.tensor(phase_vigilant['GT_R0s']).float()
data = torch.tensor(np.array([phase_vigilant['Ws'], phase_vigilant['Ds'], 
                     np.array(phase_vigilant['Ds'])*np.array(phase_vigilant['Ws'])])).transpose(0,1).float() # the potential arguments
                                                                # to the empirical function we are looking for

full_dataset = CustomDataset(data, targets)
train_size = int(0.95 * len(full_dataset))
test_size = len(full_dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(full_dataset, [train_size, test_size])

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=200, shuffle=True, num_workers=2)
val_loader = torch.utils.data.DataLoader(test_dataset, batch_size=200, shuffle=True, num_workers=2)

In [None]:
# We instantiate a model
thumb_model_R0_2 = createModel( input_size  = 3, n_layers=1, 
                    neurons_per_layer = [1],  use_relu_in_last=False,
                    parameter_initialiser=None) # que triste jajajajaja

# nn package also has different loss functions.
# we use mean square error for the regression task
criterion = lambda est,targ,model : MSE_L1_loss(est,targ, model) 

# we use the optim package to apply
# stochastic gradient descent for our parameter updates
#optimizer = torch.optim.SGD(thumb_model_R0_2.parameters(), lr=1e-9, momentum=0.2, weight_decay=0) # built-in L2 weight decay
#optimizer = torch.optim.Adagrad(thumb_model_R0_2.parameters(), lr=0.1, lr_decay=0.01, weight_decay=0.3, initial_accumulator_value=0, eps=1e-10)
optimizer = torch.optim.RMSprop(thumb_model_R0_2.parameters(), lr=0.01, alpha=0.8, eps=1e-08, weight_decay=0.1, momentum=0.2, centered=False)

# Execute the training and validation
losses = full_training_loop(thumb_model_R0_2, criterion, optimizer, train_loader, val_loader, epochs=70, print_loss_every_batches=100)

print("\n\n\nFINAL VALIDATION! ####################################################\n\n")
validate(criterion, thumb_model_R0_2, val_loader)

In [None]:
print(thumb_model_R0_2)

print(thumb_model_R0_2.parameters())
# Print model's state_dict
print("\n\nModel's state_dict:")
for param_tensor in thumb_model_R0_2.state_dict():
    print(param_tensor, "\t", thumb_model_R0_2.state_dict()[param_tensor])

### (c) Obtain $w_0, R_0$ from $D$, $slope$ and $Profile$ as a ten layer fc model

In [None]:
phase_vigilant = json.load(open(vig_path))
targets = torch.tensor([phase_vigilant['GT_w0s'], phase_vigilant['GT_R0s']]).transpose(0,1).float()
data = torch.cat([torch.Tensor(phase_vigilant['slope']).unsqueeze(1), torch.Tensor(phase_vigilant['Ds']).unsqueeze(1), torch.Tensor(phase_vigilant['profiles'])],1).float()

full_dataset = CustomDataset(data, targets)
train_size = int(0.95 * len(full_dataset))
test_size = len(full_dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(full_dataset, [train_size, test_size])

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=200, shuffle=True, num_workers=2)
val_loader = torch.utils.data.DataLoader(test_dataset, batch_size=200, shuffle=True, num_workers=2)

In [None]:
# We instantiate a model
fc_model_w0_R0 = createModel( input_size  = 2+len(phase_vigilant['profiles'][0]), n_layers =10, 
                    neurons_per_layer = [10,10,5,5,4,4,3,3,2,2],  use_relu_in_last=False,
                    parameter_initialiser=None) 

# nn package also has different loss functions.
# we use mean square error for the regression task
criterion = lambda est,targ,model : MSE_L1_loss(est,targ, model) 

# we use the optim package to apply
# stochastic gradient descent for our parameter updates
#optimizer = torch.optim.SGD(fc_model_w0_R0.parameters(), lr=1e-3, momentum=0.9, weight_decay=0) # built-in L2 weight decay
optimizer = torch.optim.Adagrad(fc_model_w0_R0.parameters(), lr=0.1, lr_decay=0.001, weight_decay=0.3, initial_accumulator_value=0, eps=1e-10)
#optimizer = optimizer = torch.optim.RMSprop(fc_model_w0_R0.parameters(), lr=0.02, alpha=0.8, eps=1e-08, weight_decay=0.1, momentum=0.1, centered=False)

# Execute the training and validation
losses = full_training_loop(fc_model_w0_R0, criterion, optimizer, train_loader, val_loader, epochs=100, print_loss_every_batches=300)

print("\n\n\nFINAL VALIDATION! ####################################################\n\n")
validate(criterion, fc_model_w0_R0, val_loader)

In [None]:
print(fc_model_w0_R0)
print(fc_model_w0_R0.parameters())
# Print model's state_dict
print("\n\nModel's state_dict:")
for param_tensor in fc_model_w0_R0.state_dict():
    print(param_tensor, "\t", fc_model_w0_R0.state_dict()[param_tensor])

### We save the models

In [None]:
torch.save({
            'thumb_model_w0': thumb_model_w0.state_dict(),
            'thumb_model_R0_1': thumb_model_R0_1.state_dict(),
            'thumb_model_R0_2': thumb_model_R0_2.state_dict(),
            'fc_model_w0_R0': fc_model_w0_R0.state_dict()
            }, f"./OUTPUT/EMPIRICAL_LAWS/{experiment_name}/ML_Models.pt")

## Step 5: Use the computed models to get estimates in experimental images
We first reload the trained models

In [None]:
thumb_model_w0 = createModel( input_size  = 3, n_layers=1, 
                    neurons_per_layer = [1],  use_relu_in_last=False,
                    parameter_initialiser=None)
thumb_model_R0_1 = createModel( input_size  = 3, n_layers=1, 
                    neurons_per_layer = [1],  use_relu_in_last=False,
                    parameter_initialiser=None)
thumb_model_R0_2 = createModel( input_size  = 3, n_layers=1, 
                    neurons_per_layer = [1],  use_relu_in_last=False,
                    parameter_initialiser=None)
fc_model_w0_R0 = createModel( input_size  = 2+len(phase_vigilant['profile'][0]), n_layers =10, 
                    neurons_per_layer = [5,5,5,4,4,3,3,2,2,2],  use_relu_in_last=False,
                    parameter_initialiser=None) 

checkpoint = torch.load(f"./OUTPUT/EMPIRICAL_LAWS/{experiment_name}/ML_Models.pt")
thumb_model_w0.load_state_dict(checkpoint['thumb_model_w0'])
thumb_model_R0_1.load_state_dict(checkpoint['thumb_model_R0_1'])
thumb_model_R0_2.load_state_dict(checkpoint['thumb_model_R0_2'])
fc_model_w0_R0.load_state_dict(checkpoint['fc_model_w0_R0'])

In [None]:
from ipyfilechooser import FileChooser
# Create and display a FileChooser widget
fc = FileChooser('/home/oiangu/Desktop/Conical_Refraction_Polarimeter')
display(fc)

In [None]:
image_full_path=fc.selected
im = cv2.imread(image_full_path, cv2.IMREAD_ANYDEPTH)
if im is None:
    print(f" Unable to import image {image_full_path}")
    raise ValueError
plt.imshow(im)
plt.show()

Compute Gradient algorithm (also Mirror and Rotation ya que estamos)

In [None]:
import os
import sys
from GPU_Classes import *
from Image_Manager import *
from Polarization_Obtention_Algorithms import Rotation_Algorithm, Mirror_Flip_Algorithm, Gradient_Algorithm
import numpy as np
import json
import cv2
import pandas as pd
import matplotlib.pyplot as plt

image=im.copy()
saturation=1
pol_or_CR="pol" 
deg_or_rad="deg" # for the final output
image_depth=8 # or 16 bit per pixel
image_shortest_side=540
randomization_seed=666
recenter_average_image=False


# 4. GRAVICENTER iX and PROFILES ######################################
X=int(image_shortest_side*1.4/2)
plot_3d_finnes=0.3 # value that should go in (0,1]. 1 means all the pixels will be ploted in the 3d plot, 0.5 only half of them



# 5. POLARIZATION RELATIVE ANGLES ###################################
# Mirror with affine interpolation & Rotation Algorithms will be employed
# Each using both Fibonacci and Quadratic Fit Search
# Also a gradient algorithm
theta_min_Rot=-np.pi
theta_max_Rot=np.pi
rad_min_Grav=3
rad_max_Grav=image_shortest_side
theta_min_Mir=0
theta_max_Mir=np.pi
initial_guess_delta_rad=0.1
initial_guess_delta_pix=10
use_exact_gravicenter=True
precision_quadratic=1e-10
max_it_quadratic=100
cost_tolerance_quadratic=1e-14
precision_fibonacci=1e-10
max_points_fibonacci=100
cost_tolerance_fibonacci=1e-14


##################################################################
##################################################################
im_type=np.uint16 if image_depth==16 else np.uint8
max_intensity=65535 if image_depth==16 else 255
np.random.seed(randomization_seed)
polCR=1 if pol_or_CR=='CR' else 0.5

# 4. GRAVICENTER iX ###############################
def compute_intensity_gravity_center(image):
    """
        Expects input image to be an array of dimensions [h, w].
        It will return an array of gravity centers [2(h,w)] in pixel coordinates
        Remember that pixel coordinates are set equal to numpy indices

    """
    # image wise total intensity and marginalized inensities for weighted sum
    intensity_in_w = np.sum(image, axis=0) # weights for x [raw_width]
    intensity_in_h = np.sum(image, axis=1) # weights for y [raw_height]
    total_intensity = intensity_in_h.sum()

    # Compute mass center for intensity
    # [2] (h_center,w_center)
    return np.nan_to_num( np.stack(
        (np.dot(intensity_in_h, np.arange(image.shape[0]))/total_intensity,
         np.dot(intensity_in_w, np.arange(image.shape[1]))/total_intensity)
        ) )

def compute_raw_to_centered_iX(image, X, interpolation_flag=None):

    g_raw = compute_intensity_gravity_center(image)
    # crop the iamges with size (X+1+X)^2 leaving the gravity center in
    # the central pixel of the image. In case the image is not big enough for the cropping,
    # a 0 padding will be made.
    centered_image = np.zeros( (2*X+1, 2*X+1),  dtype = image.dtype )

    # we round the gravity centers to the nearest pixel indices
    g_index_raw = np.rint(g_raw).astype(int) #[N_images, 2]

    # obtain the slicing indices around the center of gravity
    # TODO -> make all this with a single array operation by stacking the lower and upper in
    # a new axis!!
    # [ 2 (h,w)]
    unclipped_lower = g_index_raw[:]-X
    unclipped_upper = g_index_raw[:]+X+1
    # unclippde could get out of bounds for the indices, so we clip them
    lower_bound = np.clip( unclipped_lower, a_min=0, a_max=image.shape)
    upper_bound = np.clip( unclipped_upper, a_min=0, a_max=image.shape)
    # we use the difference between the clipped and unclipped to get the necessary padding
    # such that the center of gravity is left still in the center of the image
    padding_lower = lower_bound-unclipped_lower
    padding_upper = upper_bound-unclipped_upper

    # crop the image
    centered_image[padding_lower[0]:padding_upper[0] or None,
                                    padding_lower[1]:padding_upper[1] or None ] = \
                  image[lower_bound[0]:upper_bound[0],
                                      lower_bound[1]:upper_bound[1]]
    if interpolation_flag==None:
        return centered_image
    else:
        # We compute the center of gravity of the cropped images, if everything was made allright
        # they should get just centered in the central pixels number X+1 (index X)
        g_centered = compute_intensity_gravity_center(centered_image)

        # We now compute a floating translation of the image so that the gravicenter is exactly
        # centered at pixel (607.5, 607.5) (exact center of the image in pixel coordinates staring
        # form (0,0) and having size (607*2+1)x2), instead of being centered at the beginning of
        # around pixel (607,607) as is now
        translate_vectors = X+0.5-g_centered #[ 2(h,w)]
        T = np.float64([[1,0, translate_vectors[1]], [0,1, translate_vectors[0]]])
        return cv2.warpAffine( centered_image, T, (X*2+1, X*2+1),
                    flags=interpolation_flag) # interpolation method

image = np.where( image<=(max_intensity*saturation), image, max_intensity*saturation)
image = compute_raw_to_centered_iX(image, X)
# saturated and iX

# 6. POLARIZATION RELATIVE ANGLES ###################################
# Mirror with affine interpolation & Rotation Algorithms will be employed
# Each using both Fibonacci and Quadratic Fit Search
# Results will be gathered in a table and outputed as an excel csv
# Mock Image Loader
# Computar el angulo de cada uno en un dataframe donde una de las entradas sea results y haya un result per fibo qfs y per rotation y mirror affine. Y luego procesar en un 7º paso estos angulos para obtener los angulos relativos etc y perhaps hacer tablucha con ground truth menos el resulting delta angle medido por el algoritmo
image_loader = Image_Manager(mode=X, interpolation_flag=None)
# Define the ROTATION ALGORITHM
rotation_algorithm = Rotation_Algorithm(image_loader,
    theta_min_Rot, theta_max_Rot, None,
    initial_guess_delta_rad, use_exact_gravicenter, initialize_it=False)

# Define the Affine Mirror algorithm
mirror_algorithm = Mirror_Flip_Algorithm(image_loader,
    theta_min_Mir, theta_max_Mir, None,
    initial_guess_delta_rad, method="aff", left_vs_right=True, use_exact_gravicenter=use_exact_gravicenter, initialize_it=False)

# Define the Gradient algorithm
gradient_algorithm = Gradient_Algorithm(image_loader,
        rad_min_Grav, rad_max_Grav,
        initial_guess_delta_pix,
        use_exact_gravicenter)

# A dictionary to gather all the resulting angles for each image

individual_image_results = { 'polarization_method':[], 'optimization_1d':[], 'found_phiCR':[], 'predicted_opt_precision':[] }

def to_result_dict(result_dict, alg, alg_name, opt_name, im_names):
    for key, name in zip(alg.times.keys(), im_names):
        result_dict['polarization_method'].append(alg_name)
        result_dict['optimization_1d'].append(opt_name)
        result_dict['found_phiCR'].append(alg.angles[key])
        result_dict['predicted_opt_precision'].append(alg.precisions[key])
image_container=np.zeros( (1, 2*X+1, 2*X+1), dtype=np.float64)
image_names=[]
# charge the image
image_container[0]=image.astype(np.float64)
image_names.append(f"{fc.selected_filename}")

# charge the image loader:
image_loader.import_converted_images_as_array(image_container, image_names)
# Execute the Rotation and Mirror Algorithms:
# ROTATION ######
interpolation_flag=None
# the interpolation algorithm used in case we disbale its usage for the iX image obtention will be the Lanczos one
rotation_algorithm.interpolation_flag=interpolation_flag if interpolation_flag is not None else cv2.INTER_CUBIC
rotation_algorithm.reInitialize(image_loader)
rotation_algorithm.quadratic_fit_search(precision_quadratic, max_it_quadratic, cost_tolerance_quadratic)
to_result_dict( individual_image_results, rotation_algorithm, "Rotation", "Quadratic", image_names)
rotation_algorithm.reInitialize(image_loader)
rotation_algorithm.fibonacci_ratio_search(precision_fibonacci, max_points_fibonacci, cost_tolerance_fibonacci)
to_result_dict( individual_image_results, rotation_algorithm, "Rotation", "Fibonacci", image_names)

# MIRROR #######
mirror_algorithm.interpolation_flag=interpolation_flag if interpolation_flag is not None else cv2.INTER_CUBIC
mirror_algorithm.reInitialize(image_loader)
mirror_algorithm.quadratic_fit_search(precision_quadratic, max_it_quadratic, cost_tolerance_quadratic)
to_result_dict( individual_image_results, rotation_algorithm, "Mirror", "Quadratic", image_names)
mirror_algorithm.reInitialize(image_loader)
mirror_algorithm.fibonacci_ratio_search(precision_fibonacci, max_points_fibonacci, cost_tolerance_fibonacci)
to_result_dict( individual_image_results, rotation_algorithm, "Mirror", "Fibonacci", image_names)

# GRADIENT #######
optimal_masked_gravs={}
optimal_radii={}
grav=compute_intensity_gravity_center(image)

gradient_algorithm.interpolation_flag=interpolation_flag if interpolation_flag is not None else cv2.INTER_CUBIC
gradient_algorithm.reInitialize(image_loader)
gradient_algorithm.quadratic_fit_search(precision_quadratic, max_it_quadratic, cost_tolerance_quadratic)
to_result_dict( individual_image_results, gradient_algorithm, "Gradient", "Quadratic", image_names)
optimal_masked_gravs['quad'] = gradient_algorithm.masked_gravs[f"Quadratic_Search_{fc.selected_filename}"]
optimal_radii['quad'] = gradient_algorithm.optimals[f"Quadratic_Search_{fc.selected_filename}"]

gradient_algorithm.reInitialize(image_loader)
gradient_algorithm.fibonacci_ratio_search(precision_fibonacci, max_points_fibonacci, cost_tolerance_fibonacci)
to_result_dict( individual_image_results, gradient_algorithm, "Gradient", "Fibonacci", image_names)

optimal_masked_gravs['fibo'] = gradient_algorithm.masked_gravs[f"Fibonacci_Search_{fc.selected_filename}"]
optimal_radii['fibo'] = gradient_algorithm.optimals[f"Fibonacci_Search_{fc.selected_filename}"]

masked_grav=(optimal_masked_gravs['quad']+optimal_masked_gravs['fibo'])/2.0
optimal_radi = (optimal_radii['quad']+optimal_radii['fibo'])/2
print(f"\n\nOptimal masked gravs: {optimal_masked_gravs}\nOptimal radii: {optimal_radii}\n\n\n")
print(pd.DataFrame.from_dict(individual_image_results))

# 7. PROCESS FINAL RESULTS ##########################################
def angle_to_pi_pi( angle): # convert any angle to range ()-pi,pi]
    angle= angle%(2*np.pi) # take it to [-2pi, 2pi]
    return angle-np.sign(angle)*2*np.pi if abs(angle)>np.pi else angle    

average_found_phiCR=np.mean([angle_to_pi_pi(phi) for i,phi in enumerate(individual_image_results['found_phiCR']) if individual_image_results['polarization_method'][i]!='Gradient'])



Plot the Profiles

In [None]:
# PLOT THE PROFILES #################################################
pix_spacing=10
%matplotlib notebook

plt.rc('font', size=8) 

prof_x=image[int(grav[0])]
prof_y=image[:,int(grav[1])]
fig = plt.figure(figsize=(2*4.5, 2*4.5))
axes=fig.subplots(2,2)
cm=axes[0, 0].imshow(image, cmap='viridis')
axes[0,0].plot([grav[1],masked_grav[1]], [grav[0], masked_grav[0]], '-w', markersize=1)
axes[0,0].plot(grav[1], grav[0], 'or', markersize=4, label="Full image gravicenter")
axes[0,0].plot(masked_grav[1], masked_grav[0], 'ow', markersize=4, label="Masked circle gravicenter")
axes[0,0].legend()
axes[0,0].grid(True)
axes[0,1].scatter(prof_y, np.arange(len(prof_y)), s=1, label=f'Intensity profile in y')
axes[0,1].set_ylim((0,len(prof_y)))
axes[0,1].invert_yaxis()
axes[1,0].scatter(np.arange(len(prof_x)), prof_x, s=1, label=f'Intensity profile in x')
axes[1,0].set_xlim((0,len(prof_x)))
axes[1,0].invert_yaxis()
axes[0,0].set_xlabel("x (pixels)")
#axes[0,0].set_ylabel("y (pixels)")
axes[0,1].set_xlabel("Raw Intensity along Gravicenter")
axes[0,1].set_ylabel("y (pixels)")
axes[1,0].set_ylabel("Raw Intensity along Gravicenter")
axes[1,0].set_xlabel("x (pixels)")
axes[1,0].grid(True)
axes[0,1].grid(True)

cols = np.broadcast_to( np.arange(X*2+1), (X*2+1,X*2+1)) #[h,w]
rows = cols.swapaxes(0,1) #[h,w]
slope=(masked_grav[0]-grav[0])/(masked_grav[1]-grav[1])
mask=(rows<( slope*(cols-masked_grav[1]) +masked_grav[0]+pix_spacing )) & (rows>( slope*(cols-masked_grav[1]) +masked_grav[0]-pix_spacing ))

filtered_image=np.where(mask, image, 0)
filtered_line=np.where(mask, image, 200)
axes[0,0].imshow(filtered_line, alpha=0.1, label="Optimal Radious Mask")
prof_filt=np.sum(filtered_image,axis=0)
axes[1,1].scatter(np.arange(len(prof_filt))*np.sqrt(1+slope**2), prof_filt , s=1, label=f'Intensity profile along main axis')
axes[1,1].grid()
axes[1,1].set_ylabel(f'Reduced Intensity profile along main axis')
axes[1,1].set_xlabel("Pixels along the main axis") 
fig.suptitle(f"Raw Intesity Profiles for Image{fc.selected_filename}\nBest grav-masked_grav={optimal_radi:3.5} pix   Average found phiCR={average_found_phiCR:3.5} rad\nEstimated radious={optimal_radi/0.689095:4.9} pix")
cbax=fig.add_axes([0.1,0.91,0.4,0.01])
fig.colorbar(cm, ax=axes[0,0], cax=cbax, orientation='horizontal')

plt.show()

Run the ML models and the LS solutions on the image!

In [None]:
thumb_model_w0
thumb_model_R0_1 
thumb_model_R0_2
fc_model_w0_R0