In [None]:
import os
import cv2
import gc
import matplotlib.pyplot    as plt
import numpy                as np
import matplotlib.patches   as mpatches
import plotly.graph_objects as go
import networkx             as nx
from scipy.stats       import skew, kurtosis, mode
from sklearn.cluster   import DBSCAN
from scipy.ndimage     import convolve, generic_filter
from scipy.signal import convolve2d

In [None]:
gc.collect()

In [None]:
class FileParser:
    def __init__(self, root_dir):
        self.root_dir = root_dir
        self.box_folders = []

    def find_folders(self):
        for root, dirs, files in os.walk(self.root_dir):
                for dir_name in dirs:
                    if dir_name.startswith("Box"):
                        adaptive_z_folder = os.path.join(root, dir_name, "AdaptiveZ_10mm")

                        if os.path.isdir(adaptive_z_folder): 
                            for part_folder in os.listdir(adaptive_z_folder):
                                full_part_path = os.path.join(adaptive_z_folder, part_folder)
                                if os.path.isdir(full_part_path) and "Part" in part_folder:
                                    component_parameters_path = None
                                    lidar2xrf_path = None
                                    bpc_path = None
                                    for file_name in os.listdir(full_part_path):
                                        if file_name.endswith(".component_parameters.txt"):
                                            component_parameters_path = os.path.join(full_part_path, file_name)
                                        elif file_name.endswith(".lidar2xrf"):
                                            lidar2xrf_path = os.path.join(full_part_path, file_name)
                                        elif file_name.endswith(".bpc"):
                                            bpc_path = os.path.join(full_part_path, file_name)
                                        elif file_name.endswith("_intensity.png"):
                                            intensity_path = os.path.join(full_part_path, file_name)

                                    self.box_folders.append((part_folder,component_parameters_path, lidar2xrf_path, bpc_path, intensity_path))

    def get_box_folders(self):
        self.find_folders()
        return self.box_folders


parser = FileParser(r"\\192.168.1.100\CoreScan3-2\Acquisitions\RnD\XRF\CH\Macassa_clearance")
paths_list = parser.get_box_folders()

for name, component_parameters_path, lidar2xrf_path, bpc_path, intensity_path in paths_list:
    print(f"Part: {name}")
    if component_parameters_path:
        print(f"  Component Parameters: {component_parameters_path}")
    if lidar2xrf_path:
        print(f"  LIDAR to XRF: {lidar2xrf_path}")
    if bpc_path:
        print(f"  BPC File: {bpc_path}")
    if intensity_path:
        print(f"  Intensity File: {intensity_path}")


In [None]:
def get_vectors(data, y_span=10):
    vectors = {}
    
    for point_cloud in data:
        
        name = point_cloud[0]
        cloud = point_cloud[1]
        
        i_to_x = point_cloud[3]
        y_to_i = point_cloud[4]
        
        y_indices = [y_to_i[value] for value in y_to_i.keys() if abs(value) <= y_span]

        for i in range(len(cloud[0])):
            x = i_to_x[i]
            distribution = []
            for j in y_indices:
                distribution.append(cloud[j][i])
            properties = get_props(distribution)
            if not (np.any(np.isnan(properties))):
                vectors[(name, x)] = properties

    return vectors

def get_vectors_combined(data1, data2, x_values_dict, y_span=10):
    vectors = {}   

    for point_cloud,convolved_cloud in zip(data1,data2):
        
        name = point_cloud[0]
        x_values = x_values_dict[name]
        cloud = point_cloud[1]
        convolved_cloud = convolved_cloud[1]
        
        x_to_i = point_cloud[2]
        y_to_i = point_cloud[4]
        
        y_indices = [y_to_i[value] for value in y_to_i.keys() if abs(value) <= y_span]

        for x in x_values:
            i = x_to_i[x]
            distribution = []
            convolved_distribution = []
            for j in y_indices:
                distribution.append(cloud[j][i])
                convolved_distribution.append(convolved_cloud[j][i])
            properties = get_props(distribution)
            convolved_properties = get_props_convolved(convolved_distribution)
            vector = convolved_properties + properties
            if not (np.any(np.isnan(vector))):
                vectors[(name, x)] = vector

    return vectors

def get_props_convolved(distribution):
    properties = []

    mean = np.sqrt(np.mean(distribution))
    max = np.sqrt(np.max(distribution))

    variance =  np.var(distribution)
    skw = skew(distribution)
    kurt = kurtosis(distribution)

    
    properties.append(variance)
    properties.append(skw)
    properties.append(kurt)
    

    norm = np.linalg.norm(properties)
    
    if norm > 0:
        properties = [x / norm for x in properties]
    

    properties.append(mean)
    properties.append(max)

    z = properties[0]
    y = properties[1]
    x = properties[2]

    properties.append((np.arctan2(y,x) + (2 * np.pi)) % (2 * np.pi))
    properties.append(np.arctan(z)/(np.pi/2))


    return properties

def get_props(distribution):
    properties = []

    variance =  np.var(distribution)
    skw = skew(distribution)
    kurt = kurtosis(distribution)

    
    properties.append(variance)
    properties.append(skw)
    properties.append(kurt)

    norm = np.linalg.norm(properties)
    
    if norm > 0:
        properties = [x / norm for x in properties]

    z = properties[0]

    properties.append(np.arccos(z)/(np.pi/2))

    return properties



In [None]:
def get_point_cloud(paths, y_window = 25, upsample_ratio = 4):

    with open(paths[1]) as file:
            lines = file.readlines()
            for line in lines:
                if "XRAY_DPP[Acquisition]#0.X.Start:" in line:
                    x_start = (float)(line.split("XRAY_DPP[Acquisition]#0.X.Start:")[1].strip())
                if "XRAY_DPP[Acquisition]#0.X.Stop:" in line:
                    x_stop = (float)(line.split("XRAY_DPP[Acquisition]#0.X.Stop:")[1].strip())

    with open(paths[2]) as file:
        lines = file.readlines()
        transformation_matrix = np.array([list(map(float, line.strip().split(","))) for line in lines])

    imread = lambda fn: cv2.imread(fn, cv2.IMREAD_ANYDEPTH)
    
    point_cloud  = np.fromfile(paths[3], dtype=np.float32).reshape(-1, 3) 
    intensity_map = imread(paths[4])
    
    print(f"{paths[0]} is loaded. \n# of points {point_cloud.shape[0]}")
    
    intensity_values = np.reshape(intensity_map, (-1, 1))
    intensity_cloud = np.hstack((point_cloud[:,:2], intensity_values))

    point_cloud = (np.hstack((point_cloud, np.ones((point_cloud.shape[0], 1)))) @ transformation_matrix.T)[:,:3]
    intensity_cloud = (np.hstack((intensity_cloud, np.ones((intensity_cloud.shape[0], 1)))) @ transformation_matrix.T)[:,:3]

    mask = (point_cloud[:,0] <= x_start) & (point_cloud[:,0] >= x_stop) & (np.abs(point_cloud[:,1]) <= y_window)
    point_cloud = point_cloud[mask]
    intensity_cloud = intensity_cloud[mask]

    min_intensity = np.nanmax(intensity_cloud[:,2])
    min_z = np.nanmax(point_cloud[:,2])
    
    print(f"trimmed to {point_cloud.shape[0]} points")

    minimum_x = point_cloud[np.argmin(np.abs(point_cloud[:,0] - x_stop)),0]
    point_cloud[:,0] -= minimum_x
    intensity_cloud[:,0] -= minimum_x

    x_values = np.unique(point_cloud[:,0])
    y_values = np.unique(point_cloud[:,1])

    index_ratio = (1/np.median(np.diff(x_values))) * upsample_ratio 
    x_range = (int)(np.max(x_values) * index_ratio) + 1

    y_value_dict = {y: index for index,y in enumerate(y_values)}
    x_value_dict = {x: int(x*index_ratio) for x in x_values}

    known_indices = np.array(list(x_value_dict.values()), dtype=int)
    known_x_values = np.array(list(x_value_dict.keys()), dtype=float)
    all_indices = np.arange(x_range)

    x_values = np.array(np.interp(all_indices,known_indices,known_x_values))

    x_value_dict = {x: index for index,x in enumerate(x_values)}
    
    point_array = np.full((len(y_values), x_range),np.nan)
    intensity_array = np.full((len(y_values), x_range),np.nan)

    point_dictionary = {(row[0],row[1]): (index, row[2]) for index,row in enumerate(point_cloud)}
    intensity_dictionary = {(row[0],row[1]): (index, row[2]) for index,row in enumerate(intensity_cloud)}

    for x in range (x_range):
        for y in range (len(y_values)):
            x_val = x_values[x]
            y_val = y_values[y]

            point_z = point_dictionary.get((x_val,y_val))
            intensity_z = intensity_dictionary.get((x_val,y_val))

            if point_z is not None:
                point_array[y,x] = point_z[1]
            else:
                point_array[y,x] = min_z
            if intensity_z is not None:
                intensity_array[y,x] = intensity_z[1]
            else:
                intensity_array[y,x] = min_intensity

    print(f"arrays built")

    for y in range(1,len(y_values)-1):
        interpolated_z_values= []
        interpolated_i_values = []

        for dy in [-1,0,1]:
            y_dy = y + dy
            known_z = point_array[y_dy,known_indices]

            known_i = intensity_array[y_dy,known_indices]
            known_x = x_values[known_indices]
            mask = ~np.isnan(known_z)

            interpolated_z_values.append(np.interp(x_values, known_x[mask], known_z[mask]))
            interpolated_i_values.append(np.interp(x_values, known_x[mask], known_i[mask]))
        
        point_array[y, :] = np.mean(interpolated_z_values, axis=0) 
        intensity_array[y, :] = np.mean(interpolated_i_values, axis=0) 

    print(f"upsampled to {point_array.size} points")
    print(f"{paths[0]} finished \n")

    return [paths[0],point_array, x_value_dict, x_values, y_value_dict, y_values, intensity_array]


In [None]:
#Get the point clouds
point_clouds = []

for paths in paths_list:
    if None not in paths:
        point_clouds.append(get_point_cloud(paths, y_window = 16))


In [None]:
fig = plt.figure(figsize=(150, 7 * len(point_clouds)), dpi=150)  
gs = fig.add_gridspec(len(point_clouds) * 2, 1, hspace=0.6)  

for (i, point_cloud) in enumerate(point_clouds):
    name = point_cloud[0]
    cloud_1 = point_cloud[1]
    cloud_2 = point_cloud[6]
    

    ax = fig.add_subplot(gs[i * 2, 0])
    ax.imshow(np.flipud(cloud_1), cmap='jet', interpolation='nearest', alpha = 1)    
    ax.set_xlabel("X index")
    ax.set_ylabel("Y index")
    ax.set_title(f"Point Cloud: {name}", fontsize = 30 ) 

    ax = fig.add_subplot(gs[i* 2 + 1, 0])
    ax.imshow(np.flipud(cloud_2), cmap='binary', interpolation='nearest', alpha = 1)    
    ax.set_xlabel("X index")
    ax.set_ylabel("Y index") 


plt.show()


In [None]:
#create convolved copys of the point_clouds

convolved_clouds = []
canny_clouds = []

kernel_s = np.array([[1,2,1],[2,4,2],[1,2,1]])
kernel_y = np.array([[-1],[-2],[0],[2],[1]])
kernel_x = np.array([[-1,-2,0,2,1]])


for point_cloud in point_clouds:
    convolved_cloud = np.abs(convolve(point_cloud[1],kernel_y))
    convolved_clouds.append([point_cloud[0],convolved_cloud,point_cloud[2],point_cloud[3],point_cloud[4],point_cloud[5]])


In [None]:
#get vectors for initial clustering

v_dict = get_vectors(point_clouds, y_span = 10)
v_array = np.array(list(v_dict.values()))

In [None]:
#inital clustering
dimensions = [1,3] 

dbscan = DBSCAN(min_samples = 125,
                metric = 'chebyshev', 
                eps = 0.035,
                n_jobs = -1) 

labels1 = dbscan.fit_predict(v_array[:,dimensions])

label_dictionary = {key: label for key,label in zip(v_dict.keys(),labels1)}

secondary_cluster_indices = np.where(labels1 == 1)[0]
key_list = list(v_dict.keys())
secondary_keys = [key_list[i] for i in secondary_cluster_indices]

secondary_x_values = {}

for name, x in secondary_keys:
    if name not in secondary_x_values:
        secondary_x_values[name] = []
    secondary_x_values[name].append(x)



In [None]:
#initial clustering plot
fig = plt.figure(figsize=(10,4), dpi = 300)

colors = [f"C{label}"  if label >= 0 else (0,1,1,1)  for label in labels1]

gs = fig.add_gridspec(1, 1 )
ax = fig.add_subplot(gs[0,0])

x_vals = [v_dict[x][1] for x in v_dict.keys()]
y_vals = [v_dict[x][3] for x in v_dict.keys()]
ax.set_xlabel("skew")
ax.set_ylabel("Azimuthal angle")


ax.scatter(x_vals, y_vals, color=colors, s=1)

In [None]:
#getting convolved vectors from undefined indices
print(secondary_keys)
v_dict_convolved = get_vectors_combined(point_clouds,convolved_clouds,secondary_x_values, y_span = 10)
v_array_convolved = np.array(list(v_dict_convolved.values()))

In [None]:
#secondary clustering
dimensions = [3,4,8,10]
hdbscan_refined = DBSCAN(min_samples = 140,
                             metric = "chebyshev",
                             eps= 0.065,
                             n_jobs=-1)

labels2 = hdbscan_refined.fit_predict(v_array_convolved[:,dimensions])

label_dictionary.update({key: 1 if label == 0 else -1 for key,label in zip(v_dict_convolved.keys(),labels2)})

In [None]:
#secondary clustering plot
vectors = np.array(list(v_dict_convolved.values()))
x = vectors[:,4]
y = vectors[:,10]
z = vectors[:,3]


fig = go.Figure(data=[go.Scatter3d(
    x=x, y=y, z=z,
    mode='markers',
    marker=dict(size=1.3, color=labels2, opacity=1),
)])

fig.update_layout(
    title="3D Scatter of Vectors",
    scene=dict(
        xaxis_title="mean",
        yaxis_title="skew",
        zaxis_title="azimuthal angle",
        camera=dict(projection=dict(type="orthographic"))
    ),
    width=1000,
    height=1000
)

fig.show()

In [None]:
#clustering results
fig = plt.figure(figsize=(135, 5 * len(point_clouds)), dpi=75)  
gs = fig.add_gridspec(len(point_clouds), 1, hspace=0.2)  

label_colors = {
    -1: "C0",
    0: "C1",
    1: "C2",
    2: "C3",
    3: (0,0,0,1) 
}

for i, point_cloud in enumerate(point_clouds):
    name = point_cloud[0]
    cloud = point_cloud[1]
    x_to_i = point_cloud[2]
    x_vals = point_cloud[3]
    y_vals = point_cloud[5]

    labels = [label_dictionary.get((name, x), 3) for x in x_vals]
    colors = [label_colors[label] if label in [-1, 0, 1, 2] else label_colors[3] for label in labels ]

    ax = fig.add_subplot(gs[i, 0])

    ax.imshow(cloud, cmap='binary_r', interpolation='nearest', 
              extent=[min(x_vals), max(x_vals), min(y_vals), max(y_vals)], 
              origin='lower') 

    ax.bar(x_vals, height=20, bottom=-10, color=colors, width=1, alpha=0.45)

    ax.set_xticks(x_vals[::100])  
    ax.set_xticklabels([f"{x:.2f}" for x in x_vals[::100]])

    ax.set_yticks(y_vals[::25])
    ax.set_yticklabels([f"{y:.2f}" for y in y_vals[::25]])

    ax.set_xlabel("X Coordinate")
    ax.set_ylabel("Y Coordinate")
    ax.set_title(f"Point Cloud: {name}", fontsize=20)

legend_patches = [mpatches.Patch(color=color, label=f"Label {label}") for label, color in label_colors.items()]
fig.legend(handles=legend_patches, loc="upper center", bbox_to_anchor=(0.5, 0.91), ncol=16, fontsize=40)

plt.show()
