In [1]:
## Imports
import math
import scipy.ndimage
import numpy as np

from PIL import Image

In [2]:
# Utils

def first(aList):
    return aList[0]

def last(aList):
    return aList[-1]

In [3]:
## Setting parameters

IMAGE_1 = '../images/Image 1a.png'
IMAGE_2 = '../images/Image 1b.png'
FRAMES_PER_IMAGE = 1

DEFAULT_PROOF = 0
DEFAULT_OVERLAP = 50
DEFAULT_INTERROGATION_WINDOW = 32

In [4]:
## Reading images
# Loading images as an IxJ matrix, containing the intensity of each pixel.
#
# Output: 
# Array with the following dimensions: 0 - Image; 1 - Height (Y); 2 - Width (X).

def load_images():
    images = []
    
    for IMAGE in [IMAGE_1, IMAGE_2]:
        imag = Image.open(IMAGE)
        grayscale_image = imag.convert("L")
        grayscale_array = np.asarray(grayscale_image)
        images += [np.array(grayscale_array)]
    
    return np.array(images)

def load_fake_images(x=100, y=None, total_images=5, mode='const'):
    if not y:
        y = x
    count = 1
    images = []
    for idx in range(total_images):
        if mode == 'rand':
            images += [(np.random.rand(x, y) * 100).astype(np.uint8)]
        elif mode == 'inc':
            images += [np.reshape(np.arange(count, count + x * y), [x, y])]
            count += x * y
        else:
            images += [np.ones((x, y), np.uint8) * (idx + 1)]
    return np.array(images)

In [5]:
## Single to double frame
# Combines images by 2, returning an array with two frames (one for each image). 
#
#   Input: 5 images with step 1.
#   Output: 4 double-framed images.
#      FrameA:  1  2  3  4
#      FrameB:  2  3  4  5
#
#   Input: 8 images with step 3.
#   Output: 5 doubled-framed images.
#      FrameA:  1  2  3  4  5
#      FrameB:  4  5  6  7  8
#
# This function also crops the image according to the provided Region of Interest (ROI), that must be passed as:
# ROI = [X-start X-end Y-start Y-end], for example: [1 100 1 50].
#
# Output:
# Array with the following dimensions: 0 - Image; 1 - Frame; 2 - Height (Y); 3 - Width (X).

def single_to_double_frame(images, step=1, roi=None):
    total_images = len(images)

    frameA_idx = list(range(0,total_images-step))
    frameB_idx = [idx+1 for idx in frameA_idx]

    height, width = first(images).shape
    mask = np.ones([height, width], np.uint8)

    images_double_framed = []
    for idx in frameA_idx:
        double_frame = (images[frameA_idx[idx]], images[frameB_idx[idx]])
            
        if roi and len(roi) == 4:
            size_y, size_x = double_frame[0].shape
            min_x, max_x = max(0, roi[0]-1), min(roi[1], size_x)
            min_y, max_y = max(0, roi[2]-1), min(roi[3], size_x)
            
            double_frame[0] = np.array(double_frame[0][min_y:max_y, min_x:max_x])
            double_frame[1] = np.array(double_frame[1][min_y:max_y, min_x:max_x])

        images_double_framed += [double_frame]
            
    return np.array(images_double_framed)

In [6]:
## Calculate phases
# Number of phases to reconstruct. If set to 1, no phase reconstruction is achieved.
#
# Output:
# A plain number representing the different phases and and a vector of phases.

def calculate_phases(images, cumulcross=True, acquisition_freq=1, actuation_freq=1, default_phases=1):
    if cumulcross:
        T_acquired = np.array(range(len(images))) / acquisition_freq
        
        if actuation_freq == 0:
            phases = T_acquired * 0 + 1
            total_phases = 1
        else:
            phases = np.floor(np.mod(T_acquired + 1 / (2 * actuation_freq), 1 / actuation_freq) * actuation_freq * default_phases) + 1
            total_phases = default_phases
    else:
        phases = np.array(range(len(images)))
        total_phases = len(phases)
        
    return total_phases, phases

In [7]:
## Prepare images for PIV
# Determine which indices must be used to create the interrogation windows. 
# It also add a padding dark color to the images.
#
# Output: Indexes for vectors (MinX, MaxX, MinY, MaxY), the padded images and the interrogation window indexes.

def prepare_piv_images(images, window_size, step):
    
    # Calculating vectors.
    min_x = 1 + math.ceil(step) # TODO: Check if shouldn't be 0 + step (because of Python vs Matlab start index).
    min_y = 1 + math.ceil(step) # TODO: Check if shouldn't be 0 + step (because of Python vs Matlab start index).
    size_y, size_x = first(images)[0].shape
    max_x = step * math.floor(size_x / step) - (window_size - 1) + math.ceil(step)
    max_y = step * math.floor(size_y / step) - (window_size - 1) + math.ceil(step)
    vectors_u = math.floor((max_x - min_x)/step + 1)
    vectors_v = math.floor((max_y - min_y)/step + 1)
    
    # Centering image grid.
    pad_x = size_x - max_x
    pad_y = size_y - max_y
    shift_x = max(0, round((pad_x - min_x) / 2))
    shift_y = max(0, round((pad_y - min_y) / 2))
    min_x += shift_x
    min_y += shift_y
    max_x += shift_x
    max_y += shift_y
    
    # Adding a dark padded border to images.
    padded_images = []
    for idx in range(len(images)):
        padded_images += [[]]
        for frame in range(2):
            image = images[idx][frame]
            padded_images[idx] += [np.pad(image, math.ceil(window_size-step), constant_values=image.min())]
        padded_images[idx] = np.array(padded_images[idx])
    padded_images = np.array(padded_images)
    
    # Interrogation window indexes for first frame.
    padded_size_y, padded_size_x = first(padded_images)[0].shape
    min_s0 = np.repmat(np.array(np.arange(min_y, max_y + 1, step) - 1).transpose(), 1, vectors_u)
    max_s0 = np.repmat(np.array(np.arange(min_x, max_x + 1, step) - 1) * padded_size_y, vectors_v, 1)
    s0 = np.asarray(min_s0 + max_s0).flatten()[..., np.newaxis, np.newaxis].transpose([1, 2, 0])

    min_s1 = np.repmat(np.array(np.arange(1, window_size + 1)).transpose(), 1, window_size)
    max_s1 = np.repmat(np.array(np.arange(1, window_size + 1) - 1) * padded_size_y, window_size, 1)
    s1 = min_s1 + max_s1

    indexes = np.tile(np.asarray(s1)[..., np.newaxis], [1, 1, s0.shape[2]]) + np.tile(s0, [window_size, window_size, 1]) - 1
    
    return min_x, max_x, min_y, max_y, padded_images, indexes

In [8]:
## Cumulative cross correlation
# Averages correlation maps from an image stack.
#
# TODO: This function isn't working properly! Matlab FFT ≠ Numpy FFT.
# Should fix the cross correlation calculation and also check the normalization (different shape expected).
#
# Output: A correlation matrix with the same size as the images input.

NORMALIZED_CORRELATION_RESOLUTION = 2**8
def cumulative_cross_correlation(images, indexes, int_window):
    
    total_correlation = 0
    for idx, image in enumerate(images):
        frame_a = image[0].take(indexes).astype(np.single)
        frame_b = image[1].take(indexes).astype(np.single)
        
        # Calculating cross correlation
        fft_a = np.fft.fft2(frame_a)
        fft_b = np.fft.fft2(frame_b)

        fft_shifting = np.real(np.fft.ifft(np.fft.ifft(np.conj(fft_a) * fft_b, window_size, 2), window_size, 1))
        correlation = np.fft.fftshift(np.fft.fftshift(fft_shifting, 2), 1)
        correlation[correlation < 0] = 0
        
        # Normalizing correlation
        min_corr = np.tile(correlation.min(0).min(0), [correlation.shape[0], correlation.shape[1], 1])
        max_corr = np.tile(correlation.max(0).max(0), [correlation.shape[0], correlation.shape[1], 1])
        norm_corr = (correlation - min_corr) / (max_corr - min_corr) * (NORMALIZED_CORRELATION_RESOLUTION - 1)
    
        total_correlation += norm_corr/len(images)
        
    return total_correlation

In [9]:
## Vector field determination
# No fucking clue what this does. Here it's where it happens the magic, calculating peaks and doing science shit.
#
# Output: OutputPIV object

S2N_FILTER = False
DEFAULT_S2N_THRESHOLD = 1
DEFAULT_RES_NORMALIZATION = 255
def vector_field_determination(correlation, int_window, step, min_x, max_x, min_y, max_y):
    
    # Normalize result
    squeezed_min_corr = correlation.min(0).min(0).squeeze()[:, np.newaxis, np.newaxis]
    squeezed_delta_corr = correlation.max(0).max(0).squeeze()[:, np.newaxis, np.newaxis] - squeezed_min_corr
    min_res = np.tile(squeezed_min_corr, [1, correlation.shape[0], correlation.shape[1]]).transpose([1, 2, 0])
    delta_res = np.tile(squeezed_delta_corr, [1, correlation.shape[0], correlation.shape[1]]).transpose([1, 2, 0])
    corr = ((correlation - min_res) / delta_res) * DEFAULT_RES_NORMALIZATION
    
    # Find peaks and S2N
    x1, y1, indexes1, x2, y2, indexes2, s2n = find_all_displacements(corr)
    
    # Sub-pixel determination
    pixel_offset = 1 if (int_window % 2 == 0) else 0.5
    vector = sub_pixel_gaussian(corr, int_window, x1, y1, indexes1, pixel_offset)
    
    # Create data
    x_range = np.arange(min_x, max_x, step)
    y_range = np.arange(min_y, max_y, step)
    output_x = np.tile(x_range + int_window / 2, [len(y_range), 1])
    output_y = np.tile(y_range[:, None] + int_window / 2, [1, len(x_range)])

    # TODO: Check flow, while testing vector had not enough elements to reshape.
    vector = np.reshape(vector, np.append(np.array(output_x.transpose().shape), 2), order='F').transpose([1, 0, 2])

    # Signal to noise filter
    s2n = s2n[np.reshape(np.array(range(output_x.size)), output_x.transpose().shape, order='F').transpose()]
    if S2N_FILTER:
        vector[:,:,0] = vector[:,:,0] * (s2n > DEFAULT_S2N_THRESHOLD)
        vector[:,:,1] = vector[:,:,1] * (s2n > DEFAULT_S2N_THRESHOLD)
    
    output_u = vector[:,:,0]
    output_v = vector[:,:,1]

    output_x -= int_window/2
    output_y -= int_window/2

    return OutputPIV(output_x, output_y, output_u, output_v, s2n)
    
    
## Gaussian sub-pixel mode
# No f*cking clue what this does.
#
# Output: A vector with a sub-pixel deviation - Maybe? I'm not sure. Its dimensions are Number-of-Correlations by 2. 

def sub_pixel_gaussian(correlation, int_window, x, y, indexes, pixel_offset):
    z = np.array(range(indexes.shape[0])).transpose()
    
    xi = np.nonzero(np.logical_not(np.logical_and(
        # Adjusting -1 to -2 according to Matlab/Python mapping.
        np.logical_and(x <= correlation.shape[1] - 2, y <= correlation.shape[0] - 2),
        np.logical_and(x >= 2, y >= 2)
    )))[0]

    x = np.delete(x, xi)
    y = np.delete(y, xi)
    z = np.delete(z, xi)
    x_max = correlation.shape[1]
    vector = np.ones((correlation.shape[2], 2)) * np.nan

    if len(x) > 0:
        ip = np.ravel_multi_index(np.array([x, y, z]), correlation.shape, order='F')
        flattened_correlation = correlation.flatten(order='F')

        f0 = np.log(flattened_correlation[ip])
        f1 = np.log(flattened_correlation[ip - 1])
        f2 = np.log(flattened_correlation[ip + 1])
        peak_y = y + (f1 - f2) / (2 * f1 - 4 * f0 + 2 * f2)

        f1 = np.log(flattened_correlation[ip - x_max])
        f2 = np.log(flattened_correlation[ip + x_max])
        peak_x = y + (f1 - f2) / (2 * f1 - 4 * f0 + 2 * f2)
    
        sub_pixel_x = peak_x - (int_window / 2) - pixel_offset
        sub_pixel_y = peak_y - (int_window / 2) - pixel_offset
    
        vector[z, :] = np.array([sub_pixel_x, sub_pixel_y]).transpose()
    
    return vector

    
## Find all displacements
# Find all integer pixel displacement in a stack of correlation windows.
#
# Output: Horizontal and vertical indexes of the first and second maximum for each slice of correlation in the third
# dimension (PeakX1, PeackY1, PeakX2, PeakY2), the absolute indexes of the correlation maximums (Idx1, Idx2) and the
# ratio between the first and second peack (S2N) - 0 indicates non confiable results.

def find_all_displacements(correlation):
    corr_size = correlation.shape[0]
    
    # Finding first peak
    peak1_val, peak1_x, peak1_y, peak_indexes1, peak_positions1 = find_peaks(correlation)

    # Finding second peak (1 extra point from Matlab size)
    filter_size = 10 if corr_size >= 64 else 5 if corr_size >= 32 else 4
    filtered = scipy.ndimage.correlate(peak_positions1, np.ones([filter_size, filter_size, 1]), mode='constant')
    correlation = (1 - filtered) * correlation
    peak2_val, peak2_x, peak2_y, peak_indexes2, _ = find_peaks(correlation)

    # Calculating Signal to Noise ratio
    signal_to_noise = np.zeros([peak1_val.shape[0]])
    signal_to_noise[peak2_val != 0] = peak1_val[peak2_val != 0] / peak2_val[peak2_val != 0]

    # Maximum at a border usually indicates that MAX took the first one it found, so we should put a bad S2N, like 0.
    signal_to_noise[peak1_y == 0] = 0
    signal_to_noise[peak1_x == 0] = 0
    signal_to_noise[peak1_y == (corr_size - 1)] = 0
    signal_to_noise[peak1_x == (corr_size - 1)] = 0
    signal_to_noise[peak2_y == 0] = 0
    signal_to_noise[peak2_x == 0] = 0
    signal_to_noise[peak2_y == (corr_size - 1)] = 0
    signal_to_noise[peak2_x == (corr_size - 1)] = 0
    
    return peak1_x, peak1_y, peak_indexes2, peak2_x, peak2_y, peak_indexes2, signal_to_noise
    
    
## Find peaks
# Find max values for each correlation.
#
# Output: The MAX peak, its coordinates (X and Y) and the indexes.
    
def find_peaks(correlation):
    corr_size = correlation.shape[0]
    corr_numbers = correlation.shape[2]
    max_peak = correlation.max(0).max(0)
    max_positions = correlation == np.tile(max_peak[np.newaxis, np.newaxis, ...], [corr_size, corr_size, 1])
    max_indexes = np.where(max_positions.transpose(2, 1, 0).flatten())[0]
    peak_y, peak_x, peak_z = np.unravel_index(max_indexes, (corr_size, corr_size, corr_numbers), order='F')

    # If two elements equals to the max we should check if they are in the same layer and take the first one.
    # Surely the second one will be the second highest peak. Anyway this would be a bad vector.
    unique_max_indexes = np.unique(peak_z)
    max_indexes = max_indexes[unique_max_indexes]
    peak_x = peak_x[unique_max_indexes]
    peak_y = peak_y[unique_max_indexes]
    
    return max_peak, peak_x, peak_y, max_indexes, max_positions

In [10]:
## PIV Output data model
# An ad-hoc object with the following fields: X, Y, U (X velocity), V (Y velocity) and S2N (signal to noise ratio).

class OutputPIV:
    def __init__(self, x, y, u, v, s2n):
        self.x = x
        self.y = y
        self.u = u
        self.v = v
        self.s2n = s2n    
    

## Calculate PIV
# Generate the PIV data from the images loaded with the input parameters.
#
# Output: OutputPIV object

def PIV(images, int_window, overlap=DEFAULT_OVERLAP):
    step = round(int_window * overlap / 100)
    min_x, max_x, min_y, max_y, padded_images, indexes = prepare_piv_images(images, int_window, step)
    correlation = cumulative_cross_correlation(padded_images, indexes, int_window)
    data = vector_field_determination(correlation, int_window, step, min_x, max_x, min_y, max_y)