In [1]:
## Data Processing

In [2]:
#------------#------------#
# Get them Libraries
#------------#------------#

import numpy as np
import pandas as pd
import os
import time
import datetime

import matplotlib.pyplot as plt
import matplotlib as mpl
from matplotlib import cm
from matplotlib.ticker import MaxNLocator
from matplotlib.colors import TwoSlopeNorm

import math
from scipy import interpolate
from statistics import mean

import h5py

#------------#------------#
# Font Settings
#------------#------------#

# Set global font to Times New Roman and font size to 10pt
mpl.rcParams['font.family'] = 'Times New Roman'
mpl.rcParams['font.size'] = 10
plot_font_size = 20


In [3]:
#------------#------------#
# File Names and Paths
#------------#------------#


# Folder Name ## Only use for running in this file (not from main)
#xyz_folder_name = "A - 316L Rod"

# File Names ## Only use for running in this file (not from main)
#sample_name = 'A1 Pre-Galling Test.xyz'

# Code File Location
code_dir = os.getcwd()

# Source File relative to code file
csv_path = os.path.join(code_dir, "..", "xyz Files", xyz_folder_name, sample_name)

# Create the output file name based on the input file name
output_name = sample_name.replace('.xyz', '_piv.txt')

# Replace the file extension for the output CSV file
saving_h5_name = sample_name.replace('.xyz', '_processed.h5')

# Output Folder
target_folder = xyz_folder_name

# Find the file path within the target folder
output_path = os.path.join(code_dir, "..", "Processed Files", target_folder, output_name)


# Find the file path within the target folder
saving_h5_path = os.path.join(code_dir, "..", "Processed Files", target_folder, saving_h5_name)

In [4]:
#------------#------------#
# Profilometer Scan Parameters
#------------#------------#

%run Profilometer_Scan_Parameters.ipynb {csv_path}


Check parameter selection, skip_rows =  3
Edge size / mm:  0.1


In [6]:
start_time = time.time()

def process_raw_data(file_path, output_path, saving_h5_path):

#------------#------------#
# Step 0: Define Analysis Parameters
#------------#------------#

# Filtering parameters
    moving_average = 15 - 1 # must be even?????
    std_range = 1 #Number of standard deviations from mean which is accepted


# Min data points in a row/column for physical centre locating
    min_points = 250 # Minimum number of data points in a row/column that we'll consider as being a relevant row/column for calculating the mid-point


#------------#------------#
# Step 1: Read, convert, and save
#------------#------------#

# Read the CSV file
    unprocessed_data = pd.read_csv(file_path, sep=col_sep, skiprows=skip_rows, usecols=[0,1,2])
    print('Read the CSV file')
    
# Remove last row (if erroneous)
    if not isinstance(unprocessed_data.iloc[-1, 0], (int, float)):
        unprocessed_data = unprocessed_data.iloc[:-1]
        
# Replace "No" with NaN
    if unprocessed_data.iloc[0,-1] == "No":
        unprocessed_data.replace("No", np.nan, inplace=True)
        
# Assuming the DataFrame has the structure [X, Y, Z]
    unprocessed_data.columns = ['X', 'Y', 'Z']  # Assign column names if needed
    unprocessed_data['X'] = unprocessed_data['X'].astype('float64') *unit_conversion_xy
    unprocessed_data['Y'] = unprocessed_data['Y'].astype('float64') *unit_conversion_xy
    unprocessed_data['Z'] = unprocessed_data['Z'].astype('float64') *unit_conversion_z

# Pivot the DataFrame to create the 2D map
    pivot_table = unprocessed_data.pivot_table(index='Y', columns='X', values='Z')
    print('Pivoted the DataFrame')

# Save the cleaned and pivoted DataFrame to the new file
    np.savetxt(output_path, pivot_table, delimiter=",")

    save_time = time.time()
    elapsed_time = save_time - start_time
    print(f"Read, Convert, and Save time: {elapsed_time:.2f} seconds")

#------------#------------#
# Step 2: Read the data
#------------#------------#

# Read data
    A = pd.read_csv(output_path)

# This grabs all the real data from our now 2D array
    raw_data = A.iloc[:,1:]
    
#------------#------------# 
# Step 3: Find the mid point
#------------#------------#

#Find Edges
    def find_edge(data, minimum_points): #Finds 'top' edge
        data = pd.DataFrame(data)
        for i in range(data.shape[0]): # shape gives [row, col]
            data_slice = data.iloc[i, :].astype(np.float64) #slice of just one row
            if np.isnan(data_slice.values).sum() < data.shape[1]-(min_points-1):
                return i
        
    top_edge = find_edge(raw_data, min_points)
    bottom_edge = raw_data.shape[0]-find_edge(np.flipud(raw_data), min_points)-1
    left_edge = find_edge(np.transpose(raw_data), min_points)
    right_edge = raw_data.shape[1] - find_edge(np.flipud(np.transpose(raw_data)), min_points)-1

#Transpose to match physical specimen
    raw_data = (pd.DataFrame(data=raw_data).T).astype(float)
    raw_data = pd.DataFrame(data=raw_data)
    raw_data = raw_data*1

#Define sample centres: X & Y apparently swapped due to dataframe being transposed
    Y0 = int(round(mean((left_edge, right_edge)),0)) #since the dataset has been transposed
    X0 = int(round(mean((top_edge, bottom_edge)),0)) #since the dataset has been transposed

#Define X & Y axes from dataset
    X = (np.linspace(0, raw_data.shape[1]-1, raw_data.shape[1])-X0)*X_res/1000 # px
    Y = (np.linspace(0, raw_data.shape[0]-1, raw_data.shape[0])-Y0)*Y_res/1000 # px

#------------#------------#
# Step 4: Cut off the edges of the data before filtering
#------------#------------#

#Removing edges using Y axis
    edge_removal = 1*raw_data

#Construction of outer and inner radii to remove edge effects

#Outside of left outer edge (i.e. i<Xoutneg)
    for i in range(0, int(X0-outer_radius)+1, 1):
        edge_removal.iloc[i,:] = np.nan

#Outside of right outer edge (i.e. i>Xoutneg)
    for i in range(int(X0+outer_radius)+1, raw_data.shape[0], 1):
        edge_removal.iloc[i,:] = np.nan

#Outside of lower outer edge (i.e. i<Y_outer_negative)
    for i in range(0, int(Y0-outer_radius)+1, 1):
        edge_removal.iloc[:, i] = np.nan

#Outside of upper outer edge (i.e. i>Y_outer_negative)
    for i in range(int(Y0+outer_radius)+1, raw_data.shape[1], 1):
        edge_removal.iloc[:, i] = np.nan
     
    
#Between Xout and Xin (for both sides)
    for i in range(int(X0-outer_radius)+1, int(X0)+1, 1):
        Y_outer_positive = int(Y0 + math.sqrt(outer_radius**2 - (i-X0)**2))
        Y_outer_negative = int(Y0 - math.sqrt(outer_radius**2 - (i-X0)**2))
    
        edge_removal.iloc[i, 0:Y_outer_negative] = np.nan
        edge_removal.iloc[i, Y_outer_positive:edge_removal.shape[1]-1] = np.nan

    for i in range(int(X0), int(X0+outer_radius), 1):
        Y_outer_positive = int(Y0 + math.sqrt(outer_radius**2 - (i-X0)**2))
        Y_outer_negative = int(Y0 - math.sqrt(outer_radius**2 - (i-X0)**2))

        edge_removal.iloc[i, 0:Y_outer_negative] = np.nan
        edge_removal.iloc[i, Y_outer_positive:edge_removal.shape[1]-1] = np.nan

#Between Xinneg and Xinpos (hole in middle as a single section)
    for i in range(int(X0-inner_radius+1), int(X0+inner_radius-1), 1):
    #Outer portion
        Y_outer_positive = int(Y0 + math.sqrt(outer_radius**2 - (i-X0)**2))
        Y_outer_negative = int(Y0 - math.sqrt(outer_radius**2 - (i-X0)**2))
    
        edge_removal.iloc[i,0:Y_outer_negative] = np.nan
        edge_removal.iloc[i, Y_outer_positive:edge_removal.shape[1]-1] = np.nan
        
    #Inner portion
        Y_inner_positive = int(Y0 + math.sqrt(inner_radius**2 - (i-X0)**2))
        Y_inner_negative = int(Y0 - math.sqrt(inner_radius**2 - (i-X0)**2))
    
        edge_removal.iloc[i, Y_inner_negative:Y_inner_positive] = np.nan

    edgeless_data = edge_removal

#------------#------------#
# Step 5: Filter noise
#------------#------------#

# Filtering of data
# Excluding data outside of mean+x*standarddeviations, where x = std (below) and the mean is found using a moving average

    def filter_from_moving_average(data, window, std_range):
    # flaten to ease rolling averages
        flat_data = data.to_numpy().flatten()
        series_data = pd.Series(flat_data)
    # take rolling mean and std of moving window width
        rolling_mean = series_data.rolling(window=window, min_periods=1).mean()
        rolling_std = series_data.rolling(window=window, min_periods=1).std()
    
        for i in range(window // 2, len(flat_data) - window // 2):
            if abs(rolling_mean[i] - flat_data[i]) >= abs(std_range * rolling_std[i]): # if rolling average is out of standard deviation
                flat_data[i] = np.nan
        return pd.DataFrame(flat_data.reshape(data.shape))

    filtered_edgeless_data = filter_from_moving_average(edgeless_data, moving_average, std_range)

    filter_time = time.time()
    print('Total Filtering Time (s): ', filter_time-start_time)
    
    
#------------#------------#    
# Step 6: Interpolate small to large in XY direction
#------------#------------#

#Interpolate across whole sample (without edges)
    def interpolate_data(data):
        interpolated = data.copy()
        for limit in [15, 30, 60, 120, 250]:
            interpolated = interpolated.interpolate(method='linear', axis=1, limit=limit)
            interpolated = interpolated.interpolate(method='linear', axis=0, limit=limit)
        return interpolated

    interpolated_data = interpolate_data(filtered_edgeless_data)
    
#------------#------------#    
#Step 6.5: Re-remove edge filled in due to interpolation
#------------#------------#

#Second edge removal
    edge_removal = 1*interpolated_data

#Outside of left outer edge (i.e. i<Xoutneg)
    for i in range(0, int(X0-outer_radius)+1, 1):
        edge_removal.iloc[i,:] = np.nan

#Outside of right outer edge (i.e. i>Xoutneg)
    for i in range(int(X0+outer_radius)+1, raw_data.shape[0], 1):
        edge_removal.iloc[i,:] = np.nan

    
#Outside of lower outer edge (i.e. i<Y_outer_negative)
    for i in range(0, int(Y0-outer_radius)+1, 1):
        edge_removal.iloc[:, i] = np.nan

#Outside of upper outer edge (i.e. i>Y_outer_negative)
    for i in range(int(Y0+outer_radius)+1, raw_data.shape[1], 1):
        edge_removal.iloc[:, i] = np.nan
    
    
    
#Between Xout and Xin (for both sides)
    for i in range(int(X0-outer_radius)+1, int(X0)+1, 1):
        Y_outer_positive = int(Y0 + math.sqrt(outer_radius**2 - (i-X0)**2))
        Y_outer_negative = int(Y0 - math.sqrt(outer_radius**2 - (i-X0)**2))
    
        edge_removal.iloc[i, 0:Y_outer_negative] = np.nan
        edge_removal.iloc[i, Y_outer_positive:edge_removal.shape[1]-1] = np.nan

    for i in range(int(X0), int(X0+outer_radius), 1):
        Y_outer_positive = int(Y0 + math.sqrt(outer_radius**2 - (i-X0)**2))
        Y_outer_negative = int(Y0 - math.sqrt(outer_radius**2 - (i-X0)**2))

        edge_removal.iloc[i, 0:Y_outer_negative] = np.nan
        edge_removal.iloc[i, Y_outer_positive:edge_removal.shape[1]-1] = np.nan

#Between Xinneg and Xinpos (hole in middle as a single section)
    for i in range(int(X0-inner_radius+1), int(X0+inner_radius-1), 1):
    #Outer portion
        Y_outer_positive = int(Y0 + math.sqrt(outer_radius**2 - (i-X0)**2))
        Y_outer_negative = int(Y0 - math.sqrt(outer_radius**2 - (i-X0)**2))
    
        edge_removal.iloc[i,0:Y_outer_negative] = np.nan
        edge_removal.iloc[i, Y_outer_positive:edge_removal.shape[1]-1] = np.nan
        
    #Inner portion
        Y_inner_positive = int(Y0 + math.sqrt(inner_radius**2 - (i-X0)**2))
        Y_inner_negative = int(Y0 - math.sqrt(inner_radius**2 - (i-X0)**2))
    
        edge_removal.iloc[i, Y_inner_negative:Y_inner_positive] = np.nan

    edgeless_interpolated_data = edge_removal


    interpolate_time = time.time()
    print('Interpolation Time (s): ', interpolate_time-start_time)
    
#------------#------------#
# Step 7: Tilt correction
#------------#------------#


    def tilt_correction(data, X, Y):
    
    # Tilt Correction
        F = 1 * data
        data_flat = np.ndarray.flatten(np.asarray(data))
        data_mean = np.nanmean(data_flat)
        data_std = np.nanstd(data_flat)
    
    # Remove data out of STD range
        F[(F > data_mean + data_std)] = np.nan
        F[(F < data_mean - data_std)] = np.nan
    
        G = 1 * F
        f_flat = np.ndarray.flatten(np.asarray(F))
        f_mean = np.nanmean(f_flat)
        f_std = 2 * np.nanstd(f_flat)
    
    # Remove data out of STD range again
        G[(G > f_mean + f_std)] = np.nan
        G[(G < f_mean - f_std)] = np.nan
        G_flat = np.ndarray.flatten(np.asarray(G))
    
        X_rep = np.tile(X, len(Y))
        Y_rep = np.repeat(Y, len(X))
    
    # Finding X slope
        LR_X = pd.DataFrame({'X_rep': X_rep, 'G_flat': G_flat})
        LR_X_clean = LR_X.dropna(axis=0)
        slope_x, intercept_x = np.polyfit(LR_X_clean['X_rep'], LR_X_clean['G_flat'], 1)
    
    # Correcting X tilt
        X_corr = slope_x * X
        data_X = data.subtract(X_corr, axis='columns')
    
    # Define X Tilt using correction factor
        X_Tilt = math.cos(np.arctan(np.nanmean(slope_x) / 1000))
    
    # Finding Y slope
        LR_Y = pd.DataFrame({'Y_rep': Y_rep, 'G_flat': G_flat})
        LR_Y_clean = LR_Y.dropna(axis=0)
        slope_y, intercept_y = np.polyfit(LR_Y_clean['Y_rep'], LR_Y_clean['G_flat'], 1)
    
    # Correcting Y tilt
        Y_corr = slope_y * Y
        data_YX = data_X.subtract(Y_corr, axis='rows')
    
    # Define Y Tilt using correction factor
        Y_Tilt = math.cos(np.arctan(np.nanmean(slope_y) / 1000))
    
    # Define new X & Y positions
        X_new = X / X_Tilt
        Y_new = Y / Y_Tilt
    
        return data_YX, X_new, Y_new, X_Tilt, Y_Tilt

#Tilt Correction 1
    tilted_data_1, X_new, Y_new, X_Tilt_1, Y_Tilt_1 = tilt_correction(edgeless_interpolated_data, X, Y)

    tilt_1_finish = time.time()
    print('Time for first tilt correction (s): ', tilt_1_finish - start_time)
    
#Do it again Trust me bro
#Tilt Correction 2
    tilted_data_2, X_new, Y_new, X_Tilt_2, Y_Tilt_2 = tilt_correction(tilted_data_1, X_new, Y_new)

#Do it again Trust me bro
#Tilt Correction 3
    tilted_data_3, X_new, Y_new, X_Tilt_3, Y_Tilt_3 = tilt_correction(tilted_data_2, X_new, Y_new)

    tilt_3_finish = time.time()
    print('Time for third tilt correction (s): ', tilt_3_finish - start_time)


#------------#------------#
# Step 8: Translating so that the most populus plane is at zero
#------------#------------#

#Initial translation so all data >=0, which makes finding the modal bin easier
    data_min = np.amin(tilted_data_3.min()) #Getting the min hieght value
    zerod_data = tilted_data_3-data_min #Translating so minimimum is at Z=0 

#Fill in NaN cells as 0 (otherwise binning/histogram doesn't work)
    filled_zerod_data = zerod_data.fillna(0)

#Z-Correction: Finding the zero-plane and translating this
    interval_bins = 2
    A = np.ndarray.flatten(np.asarray(filled_zerod_data))
    Amax = int(np.ceil(np.amax(A)))+50
#Set bins. Don't want to start_time at 0 since this will now contain what were previously NaN elements (ALL of them!) 
    bins = list(range(interval_bins, Amax + interval_bins, interval_bins))
    x, y = np.histogram(A, bins) #Need dataframe to be a 1D array (NOT 2D!!)


#Finding POSITION of modal bin i.e. where should zero-plane be
#NB: np.argmax(x) finds number of bin (NB: start_timeing at bin number 0) that max population is in
    print('Position of modal bin (counts): ', interval_bins*np.argmax(x), '-', interval_bins*(np.argmax(x)+1))
    print('Z-correction factor: ', interval_bins*(np.argmax(x)+0.5))


#Correcting Z such that the modal plane is at zero
    translated_data = zerod_data - interval_bins*(np.argmax(x)+0.5)

    
    Z_sort_nan = np.sort(np.ndarray.flatten(np.asarray(translated_data))) #Flattening the dataframe into a 1D array & ordering (doesn't matter but easy to do)
    Z_sort = Z_sort_nan[~np.isnan(Z_sort_nan)] #Removing the NaN cells (enabling further operations to be performed), -ve sign to remove NaN

#Finding the area of the data shown (i.e. not NaN), named 'Reduced Area' (RArea for short)
#Size of each pixel for corrected sample
    X_new_res = mean_res/(1000*X_Tilt_1*X_Tilt_2*X_Tilt_3) # mm
    Y_new_res = mean_res/(1000*Y_Tilt_1*Y_Tilt_2*Y_Tilt_3) # mm
    new_mean_res = (X_new_res + Y_new_res)/2  # mm
    
    R_Area = (1000*X_new_res)*(1000*Y_new_res)*len(Z_sort) #Multiplied by 1000 (twice) since Z is in um andd X/Ynewsize are in mm
    
#------------#------------#
# Step 9: Save to HDF5 File so it won't have to be processed again
#------------#------------#
    
    # pixel size given in mm^2
    pixel_size = X_new_res * Y_new_res
    print(f'Pixel size is {pixel_size:.4g} mm^2')

# Save data to HDF5 file
    with h5py.File(saving_h5_path, 'w') as f:
        f.create_dataset('X_new', data=X_new)
        f.create_dataset('Y_new', data=Y_new)
        f.create_dataset('translated_data', data=translated_data)
        f.create_dataset('raw_data', data=raw_data)
        f.create_dataset('tilted_data_3', data=tilted_data_3)
        f.attrs['pixel_size'] = pixel_size
        
    func_finish = time.time()
    print('Time for full function (s): ', func_finish - start_time)

#------------#------------#
# Step 10: De-Bugging
#------------#------------#  
    
    def check_if_good_processing():
        print(f"Raw Data (size: {raw_data.shape}): ", raw_data)
        print('-----------------------------------------')
        print("Left Edge:", left_edge, "Right Edge:", right_edge, 
          "Top Edge:", top_edge, "Bottom Edge:", bottom_edge)
        print('X0: ', X0)
        print('Y0: ', Y0)
        print('-----------------------------------------')
        print('Moving Point Average / - : ', moving_average+1)
        print('Number of Standard Deviations from Mean Allowed: ', std_range)
        print('-----------------------------------------')
        print('Filtered Max height / um: ', np.amax(filtered_edgeless_data.max()))
        print('Filtered Min height / um: ', np.amin(filtered_edgeless_data.min()))
        print('Edges Rt / um', np.amax(filtered_edgeless_data.max())-np.amin(filtered_edgeless_data.min()))
        print('-----------------------------------------')
        print('Corrected Edgeless Max height / um: ', np.amax(translated_data.max()))
        print('Corrected Edgless Min height / um: ', np.amin(translated_data.min()))
        print('Corrected Edgless Rt / um: ', np.amax(translated_data.max())-np.amin(translated_data.min()))
        print('-------------------------------------------')
        #------------#------------#
        # Compare Raw Data and Processed Data
        #------------#------------#

        # Create a figure with two subplots
        fig, ax = plt.subplots(1, 2, figsize=(12, 6))

        # Plot the original data on the first subplot
        c1 = ax[0].contourf(X_new, Y_new, raw_data, 256, vmin=np.amin(raw_data), vmax=np.amax(raw_data))
        ax[0].set_title('Original Data')
        ax[0].axis('equal')
        fig.colorbar(c1, ax=ax[0], ticks=range(-800, -100, 100), label='Z Position / um')
        c1.set_clim(vmax=np.amax(raw_data), vmin=np.amin(raw_data))
        ax[0].set_xlabel('X Position / mm')
        ax[0].set_ylabel('Y Position / mm')
        ax[0].spines["top"].set_visible(False)
        ax[0].spines["right"].set_visible(False)

        # Plot the corrected data on the second subplot
        norm_cor = TwoSlopeNorm(vmin=np.amin(translated_data), vcenter=0, vmax=np.amax(translated_data))
        c2 = ax[1].contourf(X_new, Y_new, translated_data, 256, norm=norm_cor)
        ax[1].set_title('Corrected Data')
        ax[1].axis('equal')
        fig.colorbar(c2, ax=ax[1], ticks=range(-500, 500, 50), label='Z Position / um')
        c2.set_clim(vmax=np.amax(translated_data), vmin=np.amin(translated_data))
        ax[1].set_xlabel('X Position / mm')
        ax[1].set_ylabel('Y Position / mm')
        ax[1].spines["top"].set_visible(False)
        ax[1].spines["right"].set_visible(False)

        plt.tight_layout(pad=0.4, w_pad=0.5, h_pad=1.0) # Spread out graphs to avoid overlap

        # Resizing the plot area
        fig_size = plt.rcParams["figure.figsize"]
        fig_size[0] = 12 # X-axis size
        fig_size[1] = 12 # Y-axis size

        plt.show()

# Use if needing to de-bug
    '''check_if_good_processing()'''
        
#------------#------------#
# End Function
    #return X_new_res, Y_new_res, R_Area
#------------#------------#

# Run function
process_raw_data(csv_path, output_path, saving_h5_path)

print('Finished!')
end = time.time()
print('Time elapsed: ', end - start_time, ' seconds')


Read the CSV file
Pivoted the DataFrame
Read, Convert, and Save time: 10.21 seconds
Total Filtering Time (s):  47.218563079833984
Interpolation Time (s):  67.81396889686584
Time for first tilt correction (s):  70.96553587913513
Time for third tilt correction (s):  77.06639385223389
Position of modal bin (counts):  92 - 94
Z-correction factor:  93.0
Pixel size is 6.636e-12 mm^2
Time for full function (s):  79.82726693153381
Finished!
Time elapsed:  79.88720393180847  seconds
