# B-splineデモ

Noshita, Koji <noshita@morphometrics.jp>

できるだけシンプルなB-spline fittingによるcurve fragmentの後処理のデモンストレーション

In [7]:
import glob, pickle
import pathlib, os

import numpy as np
import scipy as sp
import pandas as pd

from scipy.interpolate import make_lsq_spline
import open3d as o3d
from sklearn.decomposition import PCA

import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go

In [8]:
def cvt_polar(x,y):
    """Convert to polar coordinate system
    x, y -> r, theta
    """
    r = np.sqrt(x**2+y**2)
    theta = np.sign(y)*np.arccos(x/np.sqrt(x**2+y**2))
    return r, theta

def generate_knots(start, end, n_interval, d):
    """Generate knot vector
    Parameters
    ==============
    start: float
    end: float
        start and end of knots
    n_interval: int
        number of interval
    d: int
        degree of B-spline
    
    Returns
    ==============
    knots: array, shape(n_interval+2*dim + 1)
        knot vector
    
    """
    step = (end-start)/n_interval
    knots_ = [i for i in np.arange(start, end+step, step)]
    knots_p = [knots_[0] for i in range(d)]
    knots_a = [knots_[-1] for i in range(d)]
    
    knots = np.array(knots_p + knots_ + knots_a)
    return knots

In [9]:
from scipy.optimize import curve_fit
from scipy.interpolate import BSpline

def generate_knots_circular_bspline(start, end, n_interval, d):
    
    step = (end-start)/n_interval
    knots_ = [i for i in np.arange(start, end+step, step)]
    knots_p = [knots_[i+len(knots_)-2] - (knots_[-1] - knots_[0]) for i in range(1-d, 1)]
    knots_a = [knots_[i-len(knots_)]+ (knots_[-1]-knots_[0]) for i in range(len(knots_)+1,len(knots_)+d+2)]
    
    knots = knots_p + knots_ + knots_a
    
    return np.array(knots)

degree = 3
knots = generate_knots_circular_bspline(-np.pi, np.pi, 16, 3)

def bspline_wrapper(theta, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13, c14, c15, c16, c17, c18, c19):
    bspline = BSpline(knots, 
                      [c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13, c14, c15, c16, c17, c18, c19, 0], 
                      degree, extrapolate="periodic")
    return bspline(theta)

In [10]:
def get_pathes(folder="../opt_curve_fragment"):
    file_pathes = glob.glob(folder + "/*/*/*/*_cfs.pickle")
    return file_pathes

def load_cfs(file_path):
    with open(file_path, "rb") as f:
        curve_fragments = pickle.load(f)
    return curve_fragments

def make_output_folder(file_path):
    p_file_path = pathlib.Path(file_path)
    ind = p_file_path.parent.name
    noise = p_file_path.parent.parent.name
    occ = p_file_path.parent.parent.parent.name
    output_folder = os.path.join("marged_curve", occ, noise, ind)
    os.makedirs(output_folder, exist_ok=True)
    return output_folder

def make_output_file_name(file_path, output_folder):
    p_file_path = pathlib.Path(file_path)
    p_output_folder = pathlib.Path(output_folder)
    output_file = pathlib.Path("marged_" + p_file_path.name)
    return p_output_folder/output_file
    
def marge_cfs(curve_fragments, output_name):
    N_INTERVAL = 16
    DEGREE = 3

    theta_list = []
    curve_list = []

    for i, cf in enumerate(curve_fragments):
        coord_3d = np.concatenate(cf)

        pca = PCA(n_components=2)
        coord_2d = pca.fit_transform(coord_3d)

        _, theta = cvt_polar(coord_2d[:,0], coord_2d[:,1])
        #theta = np.array([t+2*np.pi if t < -np.pi/2 else t for t in theta])
        
        degree = 3
        knots = generate_knots_circular_bspline(-np.pi, np.pi, 16, 3)

        sort_flag = True
        theta_sorted = None
        coord_3d_sorted = None
        while sort_flag:
            theta_r = theta + (10**-8)*np.random.random(len(theta))
            idx_sorted = np.argsort(theta_r)
            theta_sorted = theta_r[idx_sorted]
            coord_3d_sorted = coord_3d[idx_sorted]
            # ここまで一致

            if len(theta) == len(np.unique(theta_sorted)):
                sort_flag = False
                
        popt_x, _ = curve_fit(bspline_wrapper, theta_sorted, coord_3d_sorted[:,0], bounds=(-2,2), ftol=10**-8)
        popt_y, _ = curve_fit(bspline_wrapper, theta_sorted, coord_3d_sorted[:,1], bounds=(-2,2), ftol=10**-8)
        popt_z, _ = curve_fit(bspline_wrapper, theta_sorted, coord_3d_sorted[:,2], bounds=(-2,2), ftol=10**-8)

        theta_recon = np.linspace(-np.pi, np.pi, 360)
        coord_3d_recon = np.stack([
            bspline_wrapper(theta_recon, *popt_x), 
            bspline_wrapper(theta_recon, *popt_y), 
            bspline_wrapper(theta_recon, *popt_z), 
            ]).T
        theta_list.append(theta_recon)
        curve_list.append(coord_3d_recon)
    
    with open(output_name, 'wb') as f:
        pickle.dump(curve_list, f)

In [11]:
file_path = "../known_opt_curve_fragment/cfs.pickle"
output_file_name = "../known_opt_curve_fragment/marged_cfs.pickle"

In [12]:
curve_fragments = load_cfs(file_path)
marge_cfs(curve_fragments, output_file_name)

In [24]:
file_path = "marged_curve/q_a/0mm/q_1_a/marged_128_cfs.pickle"
with open(file_path, "rb") as f:
    coord_3d_recons = pickle.load(f)

In [25]:
x,y,z = coord_3d_recons[3].T

fig = go.Figure(data=[go.Scatter3d(
    x=x,
    y=y,
    z=z,
    mode='markers',
    marker=dict(
        size=1,
        color=z,                
        colorscale='Viridis',   
        opacity=0.8
    )
)])

fig.update_layout(
    margin=dict(l=0, r=0, b=0, t=0), 
    scene={"aspectmode":"data"}
)

## 差の計算
[Shapely](https://shapely.readthedocs.io/en/stable/)を使って，Fréchet距離を計算する

In [27]:
from shapely import linearrings, frechet_distance, hausdorff_distance

In [28]:
def edges_to_lineset(mesh, edges, color):
    ls = o3d.geometry.LineSet()
    ls.points = mesh.vertices
    ls.lines = edges
    colors = np.empty((np.asarray(edges).shape[0], 3))
    colors[:] = color
    ls.colors = o3d.utility.Vector3dVector(colors)
    return ls

def rotation_mat(angle):
    Rx = np.array([[1,0,0],
                 [0, np.cos(angle[0]), -np.sin(angle[0])],
                 [0, np.sin(angle[0]), np.cos(angle[0])]])

    Ry = np.array([[np.cos(angle[1]), 0, np.sin(angle[1])],
                 [0,1,0],
                 [-np.sin(angle[1]), 0, np.cos(angle[1])]])

    Rz = np.array([[np.cos(angle[2]), -np.sin(angle[2]), 0],
                 [np.sin(angle[2]), np.cos(angle[2]), 0],
                 [0,0,1]])
    return Rz@Rx@Ry

angle = [np.pi, np.pi, 0] # pcd.rotate()
angle = [-np.pi/2, np.pi/2, np.pi/2]
R = rotation_mat(angle)
R_mirror = np.array([[-1,0,0],
                    [0,1,0],
                    [0,0,1]])

def get_pts(mesh, edges, max_point_num=360):
    vertices = np.asarray(mesh.vertices)
    edges = np.asarray(edges)
    dist = 0
    for i in range(len(vertices[edges])):
        start = vertices[edges][i][0]
        end = vertices[edges][i][1]
        dist_part = np.sqrt(np.sum(np.square(start - end)))
        dist += dist_part
    unit_dist = dist/max_point_num
    next_start_length = 0
    next_start_partation = 0
    pts_list = []
    for i in range(len(vertices[edges])):
        start = vertices[edges][i][0]
        end = vertices[edges][i][1]
        next_st_pt = start + (end - start)*next_start_partation
        dist_part = np.sqrt(np.sum(np.square(start - end)))
        dist_nx = np.sqrt(np.sum(np.square(next_st_pt - end)))
        pts_num = int(dist_nx/unit_dist)+1
        pts = next_st_pt + np.repeat(np.arange(pts_num),3).reshape((pts_num,3)) * (end - next_st_pt)/(pts_num-1)
        end_pts = pts[-1]
        next_start_length = pts_num*unit_dist - dist_part + next_start_length
        next_start_partation = next_start_length/dist_part
        pts_list.append(pts)

    pts_array = np.concatenate(pts_list)

    return pts_array

def PCA_sort(points):
    pca = PCA(n_components=2)
    coord_2d = pca.fit_transform(points)
    r, theta = cvt_polar(coord_2d[:,0], coord_2d[:,1])
    #theta = np.array([t+2*np.pi if t < -np.pi/2 else t for t in theta])
    idx_sorted = np.argsort(theta)
    coord_3d_sorted = points[idx_sorted]
    return coord_3d_sorted

def closest_points(S1, S2):
    # S1とS2の全ての点の距離行列を計算する
    dist_matrix = np.sqrt(np.sum((S1[:, np.newaxis, :] - S2) ** 2, axis=2))

    # 距離行列から最も近い点のインデックスを計算する
    idx1, idx2 = np.unravel_index(np.argmin(dist_matrix), dist_matrix.shape)

    # 最も近い点のインデックスを返す
    return idx1, idx2

def sort_curves(S1, S2, idx1, idx2):
    # S1とS2を最も近い点のインデックスを元に順序を揃える
    S1_sorted = np.roll(S1, -idx1, axis=0)
    S2_sorted = np.roll(S2, -idx2, axis=0)

    # S2_sortedを反転させて順序を揃える
    if np.sum(np.abs(S1_sorted - S2_sorted)) > np.sum(np.abs(S1_sorted - np.flip(S2_sorted, axis=0))):
        S2_sorted = np.flip(S2_sorted, axis=0)

    return S1_sorted, S2_sorted

def matching(x1, x2):
    idx1, idx2 = closest_points(x1, x2)
    x1, x2 = sort_curves(x1, x2, idx1, idx2)
    return x1, x2

def fd(x1, x2):
    return frechet_distance(linearrings(x1), linearrings(x2))

def cal_frechet(ind, s_curve_list):
    rng = np.random.default_rng(123)
    mesh = o3d.io.read_triangle_mesh("polygon/quan_e_a/{}.ply".format(ind))
    y_frechet = []
    x_area = []
    for i in range(np.max(np.asarray(mesh.cluster_connected_triangles()[0]))+1):
        mesh = o3d.io.read_triangle_mesh("polygon/quan_e_a/{}.ply".format(ind))
        mesh.triangles = o3d.utility.Vector3iVector(np.asarray(mesh.triangles)[np.asarray(mesh.cluster_connected_triangles()[0])==i])

        _, _, area = mesh.cluster_connected_triangles()
        edges = mesh.get_non_manifold_edges(allow_boundary_edges=False)
        x2 = get_pts(mesh, edges)
        x2 = PCA_sort(x2[:360])
        frechet_list = []
        for j in range(8):
            x1 = s_curve_list[i]
            pcd = o3d.geometry.PointCloud()
            pcd.points = o3d.utility.Vector3dVector(x1)
            pcd.rotate(np.linalg.inv(R),(0,0,0))
            pcd.rotate(R_mirror,(0,0,0))
            pcd.scale(10,(0,0,0))
            x1 = np.asarray(pcd.points)
            x1, x2 = matching(x1, x2)
            
            d = fd(x1, x2)
            frechet_list.append(d)
        frechet = np.min(frechet_list)
        if frechet > 1:
            print(i)
        
        y_frechet.append(frechet)
        x_area.append(area[0])
    return y_frechet, x_area

In [29]:
occs = ["q_a", "q_aothin", "q_aothick"]
noises = ["0mm", "1mm", "3mm"]
inds = ["q_1_a", "q_2_a", "q_3_a"]
img_nums = [128,64,32]

In [30]:
path_list = []
for occ in occs:
    for noise in noises:
        for ind in inds:
            for img_num in img_nums:
                p = pathlib.Path("marged_curve/{}/{}/{}/marged_{}_cfs.pickle".format(occ,noise,ind,img_num))
                path_list.append(p)

In [31]:
for path in path_list:
    with open(str(path), 'rb') as f:
        s_curve_list = pickle.load(f)
    #print(path)
    img_num = path.name.split("_")[1]
    ind = path.parent.name
    noise = path.parent.parent.name
    occ = path.parent.parent.parent.name
    y_frechet, x_area = cal_frechet(ind, s_curve_list)
    np.savetxt('frechet_csv/{}_{}_{}_{}_area.csv'.format(occ,noise,ind,img_num), x_area, delimiter=',')
    np.savetxt('frechet_csv/{}_{}_{}_{}_frechet.csv'.format(occ,noise,ind,img_num), y_frechet, delimiter=',')

In [39]:
i = 7
path = pathlib.Path("marged_curve/q_aothin/3mm/q_3_a/marged_32_cfs.pickle")
with open(str(path), 'rb') as f:
        s_curve_list = pickle.load(f)
img_num = path.name.split("_")[1]
ind = path.parent.name
noise = path.parent.parent.name
occ = path.parent.parent.parent.name
mesh = o3d.io.read_triangle_mesh("polygon/quan_e_a/{}.ply".format(ind))
mesh.triangles = o3d.utility.Vector3iVector(np.asarray(mesh.triangles)[np.asarray(mesh.cluster_connected_triangles()[0])==i])
_, _, area = mesh.cluster_connected_triangles()
edges = mesh.get_non_manifold_edges(allow_boundary_edges=False)
x2 = get_pts(mesh, edges)
x2 = PCA_sort(x2[:360])


x1 = s_curve_list[i]
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(x1)
pcd.rotate(np.linalg.inv(R),(0,0,0))
pcd.rotate(R_mirror,(0,0,0))
pcd.scale(10,(0,0,0))

x1 = np.asarray(pcd.points)
x1, x2 = matching(x1, x2)
o3d.visualization.draw_geometries([pcd, mesh])
fd(x1, x2)



1.670720646018234