# Image processing: Contact angle measurement and line tracking for water and Tween
This notebook contains the image processing steps (Section 2.1.2), including
- RANSAC fit of interface and contact angle measurement (step iii)
- interface tracking (step iv).

Note that since the processing of the Novec results require extra steps, it is given in *image_processing_novec*.

In [None]:
from sklearn.linear_model import RANSACRegressor
from sklearn.metrics import mean_squared_error
import cv2
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import os
from os.path import join
import pickle
import pandas as pd
import math

from shared_functions import *

## Functions

In [None]:
def find_min_gray_value_along_horizontal_line(gray, pos_y, start_x, end_x):
    """
    Finds the minimum value (darkest pixel) along a horizontal line in a grayscale image.

    Args:
    gray (numpy.ndarray): The grayscale image.
    pos_y (int): The y-coordinate of the horizontal line.
    start_x (int): The starting x-coordinate of the line.
    end_x (int): The ending x-coordinate of the line.

    Returns:
    A tuple containing the minimum greyscale value found along the line and the x-coordinate of the minimum value.
    """
    # create line
    gray_line = gray[pos_y][start_x:end_x]
    
    # find minimum and x position
    min_gray = min(gray_line)
    x_min_gray = np.where(gray_line == min_gray)[0][0]+start_x # only first occurrence is detected

    return min_gray, x_min_gray

In [None]:
def create_estimator_object(fit_degree):
    """
    Creates an instance of the PolynomialRegression class with the specified degree for polynomial fitting.

    This wrapper function is necessary to ensure the right fitting degree.
    In sklearn 1.2.2 it is not possible to specify the correct fit degree in ransac.fit, 
    even though the instance was created with the correct degree.
    Therefore this class works as a workaround to force the degree.

    Args:
    fit_degree (int): degree for polynomial fitting.

    Returns:
    An instance of the PolynomialRegression class with the specified degree.
    """
    
    # create polynomial regression class with given degree, necessary as input for RANSAC fit
    class PolynomialRegression(object):
        def __init__(self, degree=fit_degree, coeffs=None):
            self.degree = degree
            self.coeffs = coeffs

        def fit(self, X, y):
            self.coeffs = np.polyfit(X.ravel(), y, self.degree)

        def get_params(self, deep=True):
            return {'coeffs': self.coeffs}

        def set_params(self, coeffs=None, random_state=None):
            self.coeffs = coeffs

        def predict(self, X):
            poly_eqn = np.poly1d(self.coeffs)
            y_result = poly_eqn(X.ravel())
            return y_result

        def score(self, X, y):
            return mean_squared_error(y, self.predict(X))
        
    return PolynomialRegression()

In [None]:
def fit_interface_RANSAC(x_vals, y_vals, polyfit_degree, residual_threshold, min_samples):
    """
    Fits the interface pixels using the RANSAC algorithm with polynomial regression.

    Args:
    x_vals (numpy.ndarray): given sample points.
    y_vals (numpy.ndarray): points to be fitted to sample points.
    polyfit_degree (int): degree for polynomial fitting.
    residual_threshold (float): maximum residual for a data point to be classified as an inlier.
    min_samples (int): The minimum number of samples required to fit the model.

    Returns:
    tuple: A tuple containing the following elements:
     - y_result (numpy.ndarray): predicted y-values of the contact line.
     - inlier_mask (numpy.ndarray): boolean mask indicating the inliers.
     - theta_top_deg (float): resulting contact angle at the interface top.
     - theta_bot_deg (float): resulting contact angle at the interface bottom.
    """

    # create regression object 
    estimator = create_estimator_object(polyfit_degree)
    ransac = RANSACRegressor(estimator,
                             residual_threshold= residual_threshold, 
                             random_state=0,
                             min_samples=min_samples)

    # fit data
    ransac.fit(np.expand_dims(x_vals, axis=1), y_vals)
    inlier_mask = ransac.inlier_mask_
    y_result = ransac.predict(np.expand_dims(x_vals, axis=1))
    params = ransac.estimator_.coeffs
    
    # compute first derivative of fitted function
    fitted_polynomial = np.poly1d(params)
    derivative_of_fittted_polynomial = fitted_polynomial.deriv() 
    points_of_polynomial_derivative = np.polyval(derivative_of_fittted_polynomial.c, np.sort(x_vals))
    
    # compute slope at the walls
    slope_top = points_of_polynomial_derivative[0]
    slope_bot = points_of_polynomial_derivative[-1]
    
    # compute contact angles     
    theta_top_deg = math.degrees(math.atan(slope_top))
    theta_bot_deg = math.degrees(math.atan(slope_bot))

    return y_result, inlier_mask, theta_top_deg, theta_bot_deg

## (iii) Contact angle measurements

**User input: selection of cases and general parameters**

In [None]:
# choose fluid
fluid =  "01_water" # available: 01_water, 02_tween

# choose cases that should be considered
# if empty --> all cases are used. insert 1,2,3 to choose case 1,2,3
case_selection=[]

# choose the step of processed images (1 --> every image is used)
step_images = 1

# choose which result to plot ("all", "last", "none") --> "all" leads to longer runtime.
plot_pictures = "last"

# choose at which point to end the analysis.
# use "when_cavities_are_reached" for contact angle measurements, "when_interface_leaves_roi" for line tracking.
end_analysis = "when_cavities_are_reached"

# choose paths
path_data_experiments = "../data_experiments"
path_df_postproc = join(path_data_experiments, "case_parameters.xlsx")
path_df_channel_edges = join(path_data_experiments, fluid, "02_channel_edges", "df_channel_edges.csv")
path_images = join(path_data_experiments, fluid, "01_images_preprocessed")
path_save = join(path_data_experiments, fluid, "03_contact_angle_measurement")

# create directory for saving results and plots
os.makedirs(path_save, exist_ok=True)

**User input: RANSAC fitting parameters.**
- fit_degree (int): degree for polynomial fitting.
- min_samples (int): minimum amount of considered inliers.
- residual_threshold (int): maximum residual for a data point to be classified as an inlier during RANSAC curve fitting of the interface.
- pre_filter_threshold (int): threshold of initial pre-filtering step.\
    The pre-filter removes outliers in the interface detection by removing points whose distance to both neighbors is > pre_filter_threshold.

In [None]:
fit_degree = 3
min_samples = 100
residual_threshold = 10
pre_filter_threshold = 10

**Run the analysis**

In [None]:
# set case selection based on user input
if case_selection == []:
    cases = [int(_) for _ in os.listdir(path_images) if len(_)<3]
else: 
    cases = case_selection
print(f"selected cases: {cases}")

# initialize lists for results
list_ca_ransac_top, list_ca_ransac_bot, list_rms_dist_to_fit_ransac  = ([] for i in range(3))

for case in cases:
    
    print(f"Case {case}")

    # get case data
    x_start, x_cavities, y_channelEdge_bottom, y_channelEdge_top, framerate, selected_images = get_case_parameters(fluid, case, path_df_channel_edges, path_df_postproc, path_images, step_images, end_analysis)    
    print(f'\t number of selected images: {len(selected_images)}')
    
    # initialize lists for results of this case
    theta_ransac_top_case = []
    theta_ransac_bot_case = []
    
    # rms_dist_to_fit_polyfit_case = []
    rms_dist_to_fit_ransac_case = []
    
    # loop over images
    for i in range(len(selected_images)):
        
        # Load the image and convert it to grayscale
        img_name = selected_images[i]
        image = cv2.imread(os.path.join(path_images, str(case), img_name))
        image_gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        
        # --------------- detect interface: sample lowest grayscale values along rows
        
        # define region of interest (upstream of cavities)
        x_roi = [x_start, x_cavities-10] #safety distance
        y_roi = [y_channelEdge_top, y_channelEdge_bottom]
        
        # initialize lists
        y_sample_points = np.array(range(y_roi[0], y_roi[1])) 
        x_interface = np.empty(0)

        # sample darkest pixels (lowest grayscale values) along horizontal lines for each y sample point
        for y in y_sample_points:
            (min_gray, x_min_gray) = find_min_gray_value_along_horizontal_line(image_gray, y, x_roi[0], x_roi[1])
            x_interface = np.append(x_interface, x_min_gray)
      
        # -------------- pre-filter data
        
        # create filter: check neighbors for each sample point, remove sample points that are further 
        # away than pre_filter_threshold from both neighbors.
        mask_filter =  np.array(len(x_interface) * [True])
        for xi in range(1, len(x_interface)-1):
            if abs(x_interface[xi] - x_interface[xi-1]) > pre_filter_threshold:
                if abs(x_interface[xi] - x_interface[xi+1]) > pre_filter_threshold:
                    mask_filter[xi] = False
                    
        # special treatment for edge points
        mask_filter[0] = False if abs(x_interface[0] - x_interface[1]) > pre_filter_threshold else True
        mask_filter[-1] = False if abs(x_interface[-1] - x_interface[-2]) > pre_filter_threshold else True
        
        # apply filter
        x_filtered = np.array(x_interface)
        x_filtered[mask_filter==False]=0
            
        # -------------- contact angle detection using RANSAC
            
        # run ransac
        (x_fit, inlier_mask, theta_top, theta_bot) = fit_interface_RANSAC(y_sample_points, x_filtered, polyfit_degree=fit_degree, residual_threshold=residual_threshold, min_samples=min_samples)

        # transform contact angles to correct coordinate system and store them
        theta_ransac_top_case.append(90+theta_top)  
        theta_ransac_bot_case.append(90-theta_bot)  
        rms_dist_to_fit_ransac_case.append(np.sqrt(np.mean((x_fit[inlier_mask]-x_interface[inlier_mask])**2)))

        # ----------------plot
        plot_now = check_if_plot_now(plot_pictures, selected_images, i)       
        if plot_now == True:
            
            # plot the picture
            fig, ax = plt.subplots(figsize=(10,10))
            ax.imshow(image_gray, cmap='gray', vmin=0, vmax=255)

            # plot the ROI box
            ax.add_patch(patches.Rectangle([x_roi[0], y_roi[0]], x_roi[1]-x_roi[0], y_roi[1]-y_roi[0],  ls="--", lw=1, ec="k", fc="none", label='roi'))

            # plot sampled interface points
            ax.plot(x_interface[inlier_mask], y_sample_points[inlier_mask], 'b.', label='sampled points')

            # plot sampled outlier points
            ax.plot(x_interface[~mask_filter], y_sample_points[~mask_filter], '+', c='red', label='pre-filtered outliers')
            
            # plot sampled outlier points
            ax.plot(x_interface[~inlier_mask&mask_filter], y_sample_points[~inlier_mask&mask_filter], 'x', c='darkred', label='RANSAC outliers')

            # plot fits
            ax.plot(x_fit, y_sample_points, '-', c='cyan', label='interface fit')            
            
            # cosmetics
            ax.legend(title=f'case {case}, img {img_name[-8:-4]}, \nresulting contact angles: top {theta_top+90:.2f}°, bot {90-theta_bot:.2f}°')
            #fig.savefig(os.path.join(path_save, f"interface_{fluid}_{case}_{img_name[-8:-4]}.png"), bbox_inches='tight', dpi=300)

    # ---------------- store data in lists
    
    list_ca_ransac_top.append(theta_ransac_top_case)    
    list_ca_ransac_bot.append(theta_ransac_bot_case)
    list_rms_dist_to_fit_ransac.append(rms_dist_to_fit_ransac_case)

**Store data**

In [None]:
# create dataframe
df_measured_contactangles = pd.DataFrame(list(zip(list_ca_ransac_top, list_ca_ransac_bot, list_rms_dist_to_fit_ransac)), 
                                         index=cases, columns =['list_ca_ransac_top', 'list_ca_ransac_bot','list_rms_dist_to_fit_ransac'])
# store dataframe as csv
df_measured_contactangles.to_csv(join(path_save, f"df_measured_contactangles_poly{fit_degree}_{fluid}.csv"), index_label='case')

# disply dataframe
display(df_measured_contactangles)

## (iv) Interface tracking

**User input: selection of cases and general parameters**

In [None]:
# choose fluid
fluid =  "01_water" # available: 01_water, 02_tween

# choose cases that should be considered
# if empty --> all cases are used. insert 1,2,3 to choose case 1,2,3
case_selection = [] 

# choose the step of processed images (1 --> every image is used)
step_images = 1000

# choose which result to plot ("all", "last", "none") --> "all" leads to longer runtime.
plot_pictures = "none"

# choose at which point to end the analysis.
# use "when_cavities_are_reached" for contact angle measurements, "when_interface_leaves_roi" for line tracking.
end_analysis = "at_final_image"

# choose paths
path_data_experiments = "../data_experiments"
path_df_postproc = join(path_data_experiments, "case_parameters.xlsx")
path_df_channel_edges = join(path_data_experiments, fluid, "02_channel_edges", "df_channel_edges.csv")
path_images = join(path_data_experiments, fluid, "01_images_preprocessed")
path_save = join(path_data_experiments, fluid, "04_interface_tracking")

# create directory for saving results and plots
os.makedirs(path_save, exist_ok=True)

**User input: select position of sample lines** \
For water and Tween, the interface top and bottom are tracked at 0.1mm distance from the walls

In [None]:
pos_y_mm = 0.1

**Run analysis**

In [None]:
# set case selection based on user imput
if case_selection == []:
    cases = [int(_) for _ in os.listdir(path_images) if len(_)<3]
else: 
    cases = case_selection
print(f"selected cases: {cases}")

# initialize lists for results
list_time_range = []
list_x_min_gray_mm_top = []
list_x_min_gray_mm_bot = []
list_u = []

# loop over cases
for case in cases:
    print(f"Case {case}")
     
    # get case data
    x_start, x_cavities, y_channelEdge_bottom, y_channelEdge_top, framerate, selected_images = get_case_parameters(fluid, case, path_df_channel_edges, path_df_postproc, path_images, step_images, end_analysis)    
    print(f'\t number of selected images: {len(selected_images)}')
       
    # get velocity for storing in dataframe
    df_postproc =  pd.read_excel(path_df_postproc, sheet_name=fluid, index_col="case", skiprows=1)
    u = df_postproc['u'][case]

    # initialize lists for results of this case
    list_time_range_case = []
    list_x_min_gray_mm_top_case = []
    list_x_min_gray_mm_bot_case = []
    
    list_min_gray_top_case =  [0]*len(selected_images)
    list_x_min_gray_top_case =  [0]*len(selected_images)
    
    list_min_gray_bot_case =  [0]*len(selected_images)
    list_x_min_gray_bot_case =  [0]*len(selected_images)

    # scale sampling position from mm to px
    width_channel_mm = 2
    pos_y_top = int(y_channelEdge_top + pos_y_mm / width_channel_mm * abs(y_channelEdge_top - y_channelEdge_bottom))
    pos_y_bot = int(y_channelEdge_bottom - pos_y_mm / width_channel_mm * abs(y_channelEdge_top - y_channelEdge_bottom)) 
    
    #loop over images
    for i in range(len(selected_images)):
        
        # ----- Get Image 

        # Load the image and convert it to grayscale
        img_name = selected_images[i]
        image = cv2.imread(os.path.join(path_images, str(case), img_name))
        image_gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        
        # --------------- track interface
        # define start and stop of sampling line
        x_sample = [x_start, len(image_gray)-x_start]

        # get min greyscale value
        (list_min_gray_top_case[i], list_x_min_gray_top_case[i]) = find_min_gray_value_along_horizontal_line(image_gray, pos_y_top, x_sample[0], x_sample[1])
        (list_min_gray_bot_case[i], list_x_min_gray_bot_case[i]) = find_min_gray_value_along_horizontal_line(image_gray, pos_y_bot, x_sample[0], x_sample[1])
        
        # ---------------- plot
        plot_now = check_if_plot_now(plot_pictures, selected_images, i)

        if plot_now == True:

            # plot the picture
            fig, ax = plt.subplots(figsize=(10,10))
            plt.imshow(image_gray, cmap='gray', vmin=0, vmax=255)

            # plot the sample lines
            ax.plot(x_sample, [pos_y_top, pos_y_top], '--k', label = 'sample lines')
            ax.plot(x_sample, [pos_y_bot, pos_y_bot], '--k')
            
            # plot results
            ax.plot(list_x_min_gray_top_case[i], pos_y_top, 'x', label = 'interface pixel top')
            ax.plot(list_x_min_gray_bot_case[i], pos_y_bot, 'x', label = 'interface pixel bottom')

            # cosmetics
            ax.legend(fancybox=False, framealpha=1.0, facecolor=[0.8,0.8,0.8], edgecolor='k', title=f"case {case}, img {img_name[-8:-4]}")
    
            # save figure
            #fig.savefig(join(path_save, f"interfaceTracking_{fluid}_{case}_{img_name[-8:-4]}.png"), bbox_inches='tight', dpi=300)

    # convert results to mm and seconds
    list_x_min_gray_mm_top_case = np.array(list_x_min_gray_top_case) * width_channel_mm / abs(y_channelEdge_top - y_channelEdge_bottom)
    list_x_min_gray_mm_bot_case = np.array(list_x_min_gray_bot_case) * width_channel_mm / abs(y_channelEdge_top - y_channelEdge_bottom)
    list_time_range_case = range(len(list_x_min_gray_mm_top_case))/framerate   
    
    # add data to lists
    list_x_min_gray_mm_top.append(list_x_min_gray_mm_top_case)    
    list_x_min_gray_mm_bot.append(list_x_min_gray_mm_bot_case)
    list_time_range.append(list_time_range_case)
    list_u.append(u)
    
    # plot interface tracking result
    fig2, ax2 = plt.subplots(figsize=(5,5))
    ax2.plot(list_time_range_case, list_x_min_gray_mm_top_case,'+', label= 'top')
    ax2.plot(list_time_range_case, list_x_min_gray_mm_bot_case,'+', label = 'bottom')

    # cosmetics
    ax2.set_xlabel('t (s)')
    ax2.set_ylabel('x (m)')
    ax2.legend()
    ax2.set_title(f"interface tracking result, case {case}")
    
    #save figure
    fig2.savefig(join(path_save, f"interfaceTrackingResult_{fluid}_{case}.png"), bbox_inches='tight', dpi=300)


**store data**

In [None]:
# create dataframe
df_interface_tracking = pd.DataFrame(list(zip(list_u, list_time_range, list_x_min_gray_mm_top, list_x_min_gray_mm_bot)), 
                                         index=cases, columns =['u', 'list_time_range', 'list_x_min_gray_mm_top', 'list_x_min_gray_mm_bot'])
#d isplay dataframe
display(df_interface_tracking)

# save dataframe
df_interface_tracking.to_csv(join(path_save, f"df_interface_tracking_{fluid}.csv"), index_label='case')
