In [1]:
%run "../Notebooks/Visualization_functions.ipynb"

loaded variables: 
myparams, myconfiguration_file
(255, 180.03, 0)


## data preparation

In [None]:
def preprocess_point_cloud(pcd_down, voxel_size,pprint_statements = False):
    
    #pcd_down = pcd.voxel_down_sample(voxel_size)
    radius_normal = voxel_size * 2
    radius_feature = voxel_size * 5
    
    pcd_down.estimate_normals(
        o3d.geometry.KDTreeSearchParamHybrid(radius=radius_normal, max_nn=230))
    
    pcd_fpfh = o3d.pipelines.registration.compute_fpfh_feature(
        pcd_down,
        o3d.geometry.KDTreeSearchParamHybrid(radius=radius_feature, max_nn=230))
    
    if pprint_statements== True: 
        print("\n Downsample with a voxel size %.3f." % voxel_size)
        print("Estimate normal with search radius %.3f." % radius_normal)
        print("Compute FPFH feature with search radius %.3f." % radius_feature)

    return pcd_down, pcd_fpfh


def prepare_dataset(source,target,voxel_size, trans_init = None,mytitle = "", print_statements = False):
    """
    source and target are already downsampled 
    """

    source_down, source_fpfh = preprocess_point_cloud(source, voxel_size)
    target_down, target_fpfh = preprocess_point_cloud(target, voxel_size)

    
    #outlier removal
    if print_statements== True:
        print ("removing outliers")
    processed_source, outlier_index = source.remove_radius_outlier(
                                              nb_points=25,
                                              radius=0.5)

    processed_target, outlier_index = target.remove_radius_outlier(
                                              nb_points=25,
                                              radius=0.5)

    return source, target, source_down, target_down, source_fpfh, target_fpfh, trans_init

## global and icp

In [None]:
def execute_global_registration(source_down, target_down, source_fpfh,
                                target_fpfh, voxel_size,
                                max_iteration,
                                max_validation,
                                print_statements = False,
                               ):
    distance_threshold = voxel_size *1.5
    
    if print_statements== True: 
        print("\nGLOBAL REGISTRATION: RANSAC registration on downsampled point clouds.")
        #print("   Since the downsampling voxel size is %.3f," % voxel_size)
        #print("   we use a liberal dista¢nce threshold %.3f." % distance_threshold)
    result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
        source_down, target_down, source_fpfh, target_fpfh, True,
        distance_threshold,
        o3d.pipelines.registration.TransformationEstimationPointToPoint(False),
        3, [
            o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(
                0.99),
            o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(
                distance_threshold)
        ], o3d.pipelines.registration.RANSACConvergenceCriteria(max_iteration,max_validation))
    

    return result


#icp
def refine_registration(source, target, 
                        source_fpfh, target_fpfh, 
                        voxel_size,
                        mytranformation = None,
                        print_statements = False
                       ):
    
    distance_threshold = voxel_size *2
    
    if print_statements== True: 
        print("\nPOINT-TO-PLANE ICP registration is applied on original point")
        print("distance threshold %.3f." % distance_threshold)
        
    #if type(mytranformation) != "numpy.ndarray":
        #mytranformation = np.identity(4)
        
    
    radius_normal = voxel_size * 5
    
    source.estimate_normals(
        o3d.geometry.KDTreeSearchParamHybrid(radius=radius_normal, max_nn=230))
    target.estimate_normals(
        o3d.geometry.KDTreeSearchParamHybrid(radius=radius_normal, max_nn=230))
    
    result = o3d.pipelines.registration.registration_icp(
        source, target, distance_threshold, mytranformation,
        o3d.pipelines.registration.TransformationEstimationPointToPlane())
    return result


## Helper functions

In [None]:
def get_num_points(list_pointclouds, print_statement = False):
    num_points = []
    n_pc = list(range(len(list_pointclouds)))
    for i in n_pc:
        points = len(np.asarray(list_pointclouds[i].points))
        num_points.append(points)
    #print("")
    if print_statement == True:
        #print("")
        print("number of points in clouds")
        print (*zip(n_pc,num_points), sep = "\n")
    return num_points

In [None]:

def save_registration_result(source, target, transformation,
                             
                             #visualization parameters
                             title = "", mytuples = None, 
                             params = None, #camera parameters,json file (P)
                             fov_step  = None, 
                             configuration_file = None, #object properties ,json file (O)
                             rotate = False,
                             
                             save_result = True,
                             visualize_result = False):
    
    dt_string = mytimestamp()
    filename = dt_string+'-stitch_'+title+'.pcd'
    
    # apply the chosen transformation to source and target
    source_temp = copy.deepcopy(source)
    target_temp = copy.deepcopy(target)
    source_temp.paint_uniform_color([1, 0.706, 0])
    target_temp.paint_uniform_color([0, 0.651, 0.929])
    source_temp.transform(transformation)
    
    # combine them and create the newpoint cloud
    newpointcloud = source_temp + target_temp
    #newpointcloud.paint_uniform_color([0,0.5,0.1])
    
    #save
    if save_result == True: 
        o3d.io.write_point_cloud(filename, newpointcloud)
    
    #visualize
    if visualize_result == True:
        #o3d.visualization.draw_geometries([newpointcloud],
                                         # width=1000, height=800,
                                         #window_name=title)
        
        
        custom_draw_geometry(newpointcloud,
                     mytitle = title,
                     params = params,  # parameter for camera point view, json file via pressing P
                     configuration_file = configuration_file, # configuration file for properties, json file via pressing o
                     rotate = rotate
                            )
        
        
        
    return newpointcloud


## Main

In [5]:
def stitch_two_point_clouds(source, target, 
                            mytitle,dt_string, 
                            voxel_size,
                            calculate_global = True,
                            calculate_icp = True, # mark False if you want to skip this step
                            trans_init = None,
                            pprint_statements = False,
                            save_statements = True,
                            visualization_on = False,
                            final_vis_on = True,
                            mmax_iteration = 10**7,
                            mmax_validation = 0.999,
                            myoverlapping_factor = 0.7,
                            maxnumattempts = 100):
    #list of transformations
    trasformations_list = []
    
    # print 
    print ("stitching : %s" %mytitle)
    
    # Use different colors on the two point clouds
    source.paint_uniform_color([1, 0.706, 0])    #source is yellow
    target.paint_uniform_color([0, 0.651, 0.929])#target is blue
    
    # visualize
    geometry_list = [source,target]
    if visualization_on == True:
        o3d.visualization.draw_geometries(geometry_list,
                                          width=1000, height=800,
                                          window_name='Open3D-original %s'%mytitle)
        
    #Outlier removal for two point clouds separately
    if pprint_statements == True:
        print ("\noutlier removal")
    processed_source, outlier_index_source = source.remove_radius_outlier(
                                                  nb_points=16,
                                                  radius=0.45)

    processed_target, outlier_index_target = target.remove_radius_outlier(
                                                  nb_points=16,
                                                  radius=0.5)
    
    if visualization_on == True:
        o3d.visualization.draw_geometries([processed_source,processed_target],
                                          width=1000, height=800,
                                          window_name='Open3D-processed %s'%mytitle)


    # note: here we are re-defining the source and target as processed
    if pprint_statements == True:
        print ("\ndataset preparation")
    source, target, source_down, target_down, source_fpfh, target_fpfh,trans_init = prepare_dataset(processed_source,
                                                                                             processed_target,
                                                                                             voxel_size, 
                                                                                             trans_init,
                                                                                             mytitle =mytitle,
                                                                                             print_statements = pprint_statements
                                                                                              )
    # append the just obtained one as trasformations_list
    trasformations_list.append(trans_init)
    
    #-----------------------------------------------------------------------------------------------------
    #being in sequence the overlapping in a proper stitching should be high
    #Therefore we put a condition:
    #> while result_icp.correspondence_set < 0.7*(len(np.asarray(source.points))+len(np.asarray(target.points)))
    #> do execute_global_registration
    
    #points_source = len(np.asarray(source.points))
    #points_target = len(np.asarray(target.points))
    
    points_source,points_target = get_num_points([source,target],print_statement = pprint_statements)
    overlapping_points = np.zeros(1) # initialization 
    numattempts = 0
    
    while (numattempts <= maxnumattempts) and (len(np.asarray(overlapping_points)) < myoverlapping_factor*(points_target)):
    #-----------------------------------------------------------------------------------------------------    
        
        if calculate_global== True: 
            #global registration: execute ransac
            result_ransac = execute_global_registration(source_down, target_down,
                                                        source_fpfh, target_fpfh,
                                                        voxel_size,
                                                        max_iteration = mmax_iteration,
                                                        max_validation = mmax_validation,
                                                        print_statements = pprint_statements
                                                       )
            #print result
            if pprint_statements == True:
                print("Fit is:")
                print (type(result_ransac))
                print(result_ransac)
                print("Transformation is:")
                print(result_ransac.transformation)

            #save it
            if save_statements == True: 
                with open(dt_string+'-transformation_result_ransac-'+mytitle+'.pkl','wb') as f:
                    pkl.dump(result_ransac.transformation, f)
                if pprint_statements == True:
                    print("saved")

            threshold = 0.01

            if visualization_on == True: 
                draw_registration_result(source, target, 
                                         result_ransac.transformation,
                                         title = "global registration-%s"%mytitle,
                                        )

            # append the just obtained one as trasformations_list
            trasformations_list.append(result_ransac.transformation)

            points_source,points_target = get_num_points([source,target],print_statement = pprint_statements)
            overlapping_points = result_ransac.correspondence_set
            if pprint_statements == True:
                print ("overlapping points : " ,len(np.asarray(overlapping_points)))
                
            numattempts += 1
        #-----------------------------------------------------------------------------------------------------
        if calculate_icp== True: 
            #execute icp
            result_icp = refine_registration(source, target, 
                                             source_fpfh, target_fpfh,
                                             voxel_size,
                                             mytranformation =trasformations_list[-1],
                                             print_statements = pprint_statements
                                            )
            # print result
            if pprint_statements == True:
                print("Fit is:")
                print(result_icp)
                print("Transformation is:")
                print(result_icp.transformation)
            #save it
            if save_statements == True: 
                with open(dt_string+'-transformation_result_icp-'+mytitle+'.pkl','wb') as f:
                    pkl.dump(result_icp.transformation, f)
                if pprint_statements == True:
                    print("saved")

            #append the just obtained one as trasformations_list
            trasformations_list.append(result_icp.transformation)


            points_source,points_target = get_num_points([source,target],print_statement = pprint_statements)
            overlapping_points = result_icp.correspondence_set
            if pprint_statements == True:
                print ("overlapping points : " ,len(np.asarray(overlapping_points)))

        numattempts += 1
    #-----------------------------------------------------------------------------------------------------
    if final_vis_on == True: 
        draw_registration_result(source, target, 
                                 trasformations_list[-1],
                                 title = "last fit registration-%s"%mytitle
                                )
        

    #-----------------------------------------------------------------------------------------------------
    
    #save it
    newpointcloud = save_registration_result(source, target, trasformations_list[-1], 
                                             mytitle, 
                                             save_result = save_statements,
                                             visualize_result = False)
    
    return newpointcloud,trasformations_list


In [None]:
def stitch_sequences(processed_source,processed_target,
                     source_label,target_label,
                     mmax_iteration = 10**7,
                     mmax_validation = 0.999,
                     voxel_size = 0.1,
                     print_statements = False,
                     visualization_on = False,
                     myoverlapping_factor = 0.5,
                     maxnumattempts = 10
                     ):
    mytitle = "allpc_downsampled_%s_%s" %(source_label,target_label)
    print(mytitle)


    # prepare dataset
    new_source, new_target, source_down, target_down, source_fpfh, target_fpfh,trans_init = prepare_dataset(processed_source,
                                                                                                 processed_target,
                                                                                                 voxel_size=voxel_size, 
                                                                                                 #trans_init,
                                                                                                 mytitle =mytitle,
                                                                                                 print_statements = print_statements
                                                                                         )

    # plot
    if  visualization_on == True: 
        custom_draw_geometry((processed_target+processed_source),
                         mytitle = mytitle,
                         params = myparams,  # parameter for camera point view, json file via pressing P
                         configuration_file = myconfiguration_file, # configuration file for properties, json file via pressing o
                         rotate = True)
    
    
    # quality condition initialization 
    points_source,points_target = get_num_points([processed_source,processed_target],print_statement = print_statements)
    overlapping_points = np.zeros(1) 
    numattempts = 0
    valuetoreach = myoverlapping_factor*(points_target)
    print ("valuetoreach: ",valuetoreach)
    all_results = list()
    
    while (numattempts <= maxnumattempts) and (len(np.asarray(overlapping_points)) < valuetoreach):
        print (f"ATTEMPT {numattempts}")
        # GLOBAL 
        result_ransac = execute_global_registration(source_down, target_down,
                                                    source_fpfh, target_fpfh,
                                                    voxel_size= voxel_size,
                                                    print_statements = print_statements,
                                                    max_iteration = mmax_iteration,
                                                    max_validation = mmax_validation 
                                                   )

        overlapping_points = result_ransac.correspondence_set

        if print_statements == True :
            print("Transformation is:")
            print(result_ransac.transformation)
            print ("overlapping points : " ,len(np.asarray(overlapping_points)))
            print (result_ransac)

        #plot
        if visualization_on == True :
            draw_registration_result(new_source, new_target, 
                                 result_ransac.transformation,
                                 title = "%global registration"
                                )

        

        # ICP
        result_icp = refine_registration(new_source, new_target, 
                                                     source_fpfh, target_fpfh,
                                                     voxel_size,
                                                     mytranformation =result_ransac.transformation,
                                                     print_statements = print_statements
                                                    )

        overlapping_points = result_icp.correspondence_set


        if print_statements == True :
            print("Transformation is:")
            print(result_icp.transformation)
            print ("overlapping points : " ,len(np.asarray(overlapping_points)))
            print (result_icp)

        if visualization_on == True :
            draw_registration_result(new_source, new_target, 
                                     result_icp.transformation,
                                     title = "icp registration"
                                     )
            
        numattempts += 1
        all_results.append(result_icp)
    
    
    #save the last one
    ## TO DO: save the best one!
    newpointcloud = save_registration_result(source, target, all_results[-1].transformation, 
                                             mytitle, 
                                             save_result = save_statements,
                                             visualize_result = False)
    
    return new_source, new_target, newpointcloud ,all_results


In [None]:
def evaluation_by_manual_registration(source,target,
                            
                             threshold = 0.03,
                             picked_id_source = None,
                             picked_id_target = None,
                                      
                             #statements
                             print_statements = False,
                             visualization_on = False
                             ):
    """
    transf,picked_id_source,picked_id_target = evaluation_by_manual_registration
    
    """
    
    

    if picked_id_source is None: 
        # pick points from two point clouds and builds correspondences
        picked_id_source = pick_points(source)
        
    if picked_id_target is None:
        picked_id_target = pick_points(target)
    
    print (picked_id_source)
    assert (len(picked_id_source) >= 3 and len(picked_id_target) >= 3)
    assert (len(picked_id_source) == len(picked_id_target))
    corr = np.zeros((len(picked_id_source), 2))
    corr[:, 0] = picked_id_source
    corr[:, 1] = picked_id_target

    # estimate rough transformation using correspondences
    if print_statements == True:
        print("Compute a rough transform using the correspondences given by user")
    p2p = o3d.pipelines.registration.TransformationEstimationPointToPoint()
    trans_init = p2p.compute_transformation(source, target,
                                            o3d.utility.Vector2iVector(corr))

    # point-to-point ICP for refinement
    if print_statements == True:
        print("Perform point-to-point ICP refinement")
    threshold = threshold  # 3cm distance threshold
    reg_p2p = o3d.pipelines.registration.registration_icp(
        source, target, threshold, trans_init,
        o3d.pipelines.registration.TransformationEstimationPointToPoint())
    transf = reg_p2p.transformation 
    
    print (reg_p2p)
    if visualization_on == True: 
        draw_registration_result(source, target, transf)
        
    if print_statements == True:
        print("")
        print (reg_p2p)
        print(transf) 
    return transf,picked_id_source,picked_id_target,reg_p2p

In [None]:
def full_evaluation_pipeline(
                             #input 
                             list_stitches,
                             labels_stitches = ["model_pc","stitched_pc"],
                             color_stitches = [[1, 0.706, 0],[0, 0.651, 0.929]],
                             
                             #plotting 
                             params = myparams,  # parameter for camera point view, json file via pressing P
                             configuration_file = myconfiguration_file, # configuration file for properties, json file via pressing o
                             take_screen_shot = True,
                             rotate = False,
                             onewindow = True,
                             
                             #scaling
                             scale_factor = 1000,
                             voxels = [0.1,0.2],
                             
                             #evaluation 
                             threshold = 0.03,
                             picked_id_source = None,
                             picked_id_target = None,
    
                             #statements
                             print_statements = False,
                             visualization_on = False
                            ):
    """
    list_stitches = [pcd,st_pcd]
        pcd = original point cloud of the model
        st_pcd = our stitched point cloud that we want to evaluate against the model 
    
    
    """
    # check input size
    assert len(list_stitches) == len(labels_stitches) 
    assert len(labels_stitches) == len(color_stitches)
    assert len(color_stitches) == 2
    
    #
    pcd,st_pcd = list_stitches

    ## assign colors
    list_stitches[0].paint_uniform_color(color_stitches[0])
    list_stitches[1].paint_uniform_color(color_stitches[1])
    
    # mytitle
    mytitle = f"{labels_stitches[0]}_{get_color_name(color_stitches[0])}-{labels_stitches[1]}_{get_color_name(color_stitches[1])}"
    
    
    if visualization_on == True:
        # plot a list of geometries
        custom_draw_geometry(list_stitches[0]+list_stitches[1],
                             mytitle = mytitle+"before_resizing",
                             params = params,  # parameter for camera point view, json file via pressing P
                             configuration_file = configuration_file, # configuration file for properties, json file via pressing o
                             take_screen_shot = take_screen_shot,
                             rotate = rotate,
                             onewindow = onewindow)
    
    #define scaling function 
    trans_scale = np.asarray([[scale_factor, 0.0, 0.0, 0.0], 
                             [0.0, scale_factor, 0.0, 0.0],
                             [0.0, 0.0, scale_factor, 0.0], 
                             [0.0, 0.0, 0.0, 1.0]])
    
    # apply scaling function
    temp_pcd = copy.deepcopy(pcd)
    temp_pcd.transform(trans_scale)
    list_stitches[0] = temp_pcd
    
    if visualization_on == True:
        # plot a list of geometries
        custom_draw_geometry(list_stitches[0]+list_stitches[1],
                             mytitle = mytitle+"after_resizing",
                             params = params,  # parameter for camera point view, json file via pressing P
                             configuration_file = configuration_file, # configuration file for properties, json file via pressing o
                             take_screen_shot = take_screen_shot,
                             rotate = rotate,
                             onewindow = onewindow)
        
    #downsampling
    down_pcd = temp_pcd.voxel_down_sample(voxel_size=voxels[0])
    down_st_pcd = st_pcd.voxel_down_sample(voxel_size=voxels[1])
    downsampled_stitches = [down_pcd,down_st_pcd]
    
    if print_statements == True:
        print (len(down_pcd.points))
        print (len(down_st_pcd.points))
        
    #set source and target
    source = copy.deepcopy(downsampled_stitches[0])
    target = copy.deepcopy(downsampled_stitches[1])
    
    # evaluation by manual icp registration 
    transf,new_picked_id_source,new_picked_id_target, registration = evaluation_by_manual_registration(source,target,
                                                                             threshold = 0.03,
                                                                             picked_id_source = picked_id_source,
                                                                             picked_id_target = picked_id_target,
                                                                             visualization_on = visualization_on
                                                                            )
    
    return transf,new_picked_id_source,new_picked_id_target,registration,list_stitches