In [None]:
import json
import numpy as np
import matplotlib.pyplot as plt
from PIL import  Image, ImageDraw
from sklearn.neighbors import KernelDensity
from sklearn.neighbors import NearestNeighbors
import scipy.ndimage as ndi
import cv2 as cv
from align import *
import pickle
import math
import tqdm
from sklearn.neighbors import NearestNeighbors
from sklearn.metrics.pairwise import cosine_similarity
import seaborn as sns
import glob
sift = cv.SIFT_create()

In [None]:
def get_similar_neighbours(points1, points2, descriptors1, descriptors2, radius=150):
    '''
    Get all the similar neighbours of points1 in points2 in selected radius
    '''
    neigh = NearestNeighbors(radius=radius)
    neigh.fit(points2)
    neighboring_indices = neigh.radius_neighbors(points1, return_distance=False)
    similar_neighbours = []
    for i, neighbours in enumerate(neighboring_indices):
        if len(neighbours) == 0:
            similar_neighbours.append(np.empty((0)))
            continue
        similarities = cosine_similarity(descriptors1[i].reshape(1,-1), descriptors2[neighbours])[0]
        sorting = np.argsort(similarities)[::-1]
        similar_neighbours.append(neighbours[sorting])
    return similar_neighbours

In [None]:
with open('maps_data.pickle', 'rb') as handle:
    maps_data = pickle.load(handle)

In [None]:
selems = list()
selems.append(np.array([[0, 1, 0], [1, 1, 1], [0, 0, 0]]))
selems.append(np.array([[1, 0, 1], [0, 1, 0], [1, 0, 0]]))
selems.append(np.array([[1, 0, 1], [0, 1, 0], [0, 1, 0]]))
selems.append(np.array([[0, 1, 0], [1, 1, 0], [0, 0, 1]]))
selems.append(np.array([[0, 0, 1], [1, 1, 1], [0, 1, 0]]))
selems = [np.rot90(selems[i], k=j) for i in range(5) for j in range(4)]

selems.append(np.array([[0, 1, 0], [1, 1, 1], [0, 1, 0]]))
selems.append(np.array([[1, 0, 1], [0, 1, 0], [1, 0, 1]]))

In [None]:
sift_size = 19
radius = 175

#Intersections, sift for baseline
baseline = cv.imread("baseline.png", cv.IMREAD_GRAYSCALE)
baseline_branches = np.zeros_like(baseline, dtype=bool)
for selem in selems:
    baseline_branches |= ndi.binary_hit_or_miss(baseline, selem)

baseline_points = np.argwhere(baseline_branches)

baseline_keypoints = list()
for point in baseline_points:
    baseline_keypoints.append(cv.KeyPoint(float(point[1]), float(point[0]), sift_size))
baseline_keypoints, baseline_descriptors = sift.compute(baseline,baseline_keypoints)
baseline_points = [keypoint.pt for keypoint in baseline_keypoints]

for map_name in tqdm.tqdm(glob.glob("warped_maps_skeleton/*.png")):
    try:
        #Load the map image, search the intersections, and compute sift
        test_image = cv.imread(map_name, cv.IMREAD_GRAYSCALE)
        
        test_branches = np.zeros_like(test_image, dtype=bool)
        for selem in selems:
            test_branches |= ndi.binary_hit_or_miss(test_image, selem)
        test_points = np.argwhere(test_branches)

        test_keypoints = list()
        for point in test_points:
            test_keypoints.append(cv.KeyPoint(float(point[1]), float(point[0]), sift_size))
        test_keypoints, test_descriptors = sift.compute(test_image,test_keypoints)
        test_points = [keypoint.pt for keypoint in test_keypoints]

        s1 = get_similar_neighbours(test_points, baseline_points, test_descriptors, baseline_descriptors, radius=radius)
        s2 = get_similar_neighbours(baseline_points,test_points, baseline_descriptors,test_descriptors, radius=radius)

        src_pts = []
        dst_pts = []
        for i,s in enumerate(s1):
            if len(s) == 0:
                continue
            if i == s2[s[0]][0]:
                src_pts.append(test_points[i])
                dst_pts.append(baseline_points[s[0]])
        src_pts = np.array(src_pts)
        dst_pts = np.array(dst_pts)

        new_M, mask = cv.findHomography(src_pts, dst_pts, cv.RANSAC,5.0)
        
        test_image = cv.warpPerspective(test_image, new_M, (baseline.shape[1], baseline.shape[0]))
        map_name = map_name[21:]
        plt.imsave(f"realigned/{map_name}", test_image)
        
    except:
        print("error")
        continue

In [None]:
sift_size = 19
radius = 75

#Intersections, sift for baseline
baseline = cv.imread("warped_maps/baseline.png", cv.IMREAD_GRAYSCALE)
baseline_branches = np.zeros_like(baseline, dtype=bool)
for selem in selems:
    baseline_branches |= ndi.binary_hit_or_miss(baseline, selem)

baseline_points = np.argwhere(baseline_branches)

baseline_keypoints = list()
for point in baseline_points:
    baseline_keypoints.append(cv.KeyPoint(float(point[1]), float(point[0]), sift_size))
baseline_keypoints, baseline_descriptors = sift.compute(baseline,baseline_keypoints)
baseline_points = [keypoint.pt for keypoint in baseline_keypoints]

patch_size = 2000
original_height = baseline.shape[0]
original_width = baseline.shape[1]

for map_name, M, distance in tqdm.tqdm(maps_data):
    final = np.zeros_like(baseline)
    #Load the map image, search the intersections, and compute sift
    test_image = cv.imread(f"warped_maps/{map_name}.png", cv.IMREAD_GRAYSCALE)

    test_branches = np.zeros_like(test_image, dtype=bool)
    for selem in selems:
        test_branches |= ndi.binary_hit_or_miss(test_image, selem)
    test_points = np.argwhere(test_branches)
    
    for patch_x in range(0, baseline.shape[1], patch_size):
        for patch_y in range(0,baseline.shape[0], patch_size):
            
            if baseline.shape[1] - patch_x < patch_size or baseline.shape[0] - patch_y < patch_size:
                continue
            height = [patch_y, patch_y+patch_size]
            width = [patch_x, patch_x+patch_size]
            
            current_test_keypoints = list()
            for point in test_points:
                if (patch_x <= point[1] <= patch_x+patch_size) and (patch_y <= point[0] <= patch_y+patch_size):
                    current_test_keypoints.append(cv.KeyPoint(float(point[1]), float(point[0]), sift_size))

            current_test_keypoints, test_descriptors = sift.compute(test_image,current_test_keypoints)
            current_test_points = [keypoint.pt for keypoint in current_test_keypoints]

            try:
                s1 = get_similar_neighbours(current_test_points, baseline_points, test_descriptors, baseline_descriptors, radius=radius)
                s2 = get_similar_neighbours(baseline_points,current_test_points, baseline_descriptors,test_descriptors, radius=radius)

                src_pts = []
                dst_pts = []
                for i,s in enumerate(s1):
                    if len(s) == 0:
                        continue
                    if i == s2[s[0]][0]:
                        src_pts.append(current_test_points[i])
                        dst_pts.append(baseline_points[s[0]])
                src_pts = np.array(src_pts)
                dst_pts = np.array(dst_pts)

                new_M, mask = cv.findHomography(src_pts, dst_pts, cv.RANSAC,5.0)
                image_patch = test_image[max(0,height[0]-radius):min(height[1]+radius, original_height), max(0,width[0]-radius):min(width[1]+radius, original_width)]
                warped_patch = cv.warpPerspective(image_patch, new_M, (image_patch.shape[1], image_patch.shape[0]))
                final[max(0,height[0]-radius):min(height[1]+radius, original_height), max(0,width[0]-radius):min(width[1]+radius, original_width)] = warped_patch
            except:
                final[max(0,height[0]-radius):min(height[1]+radius, original_height), max(0,width[0]-radius):min(width[1]+radius, original_width)] = test_image[max(0,height[0]-radius):min(height[1]+radius, original_height), max(0,width[0]-radius):min(width[1]+radius, original_width)]
            #plt.imsave(f"test/after_{patch_x}_{patch_y}.png", warped_patch/2 + baseline[max(0,height[0]-radius):min(height[1]+radius, original_height), max(0,width[0]-radius):min(width[1]+radius, original_width)])
    plt.imsave(f"realigned_patch/{map_name}.png", final/2+baseline)

In [None]:
sift_sizes = [19]
radii = [175]
ameliorations = []
distances = []
for sift_size in tqdm.tqdm(sift_sizes):
    
    #Intersections, sift for baseline
    baseline = cv.imread("warped_maps_validation/baseline.png", cv.IMREAD_GRAYSCALE)
    baseline_branches = np.zeros_like(baseline, dtype=bool)
    for selem in selems:
        baseline_branches |= ndi.binary_hit_or_miss(baseline, selem)

    baseline_points = np.argwhere(baseline_branches)

    baseline_keypoints = list()
    for point in baseline_points:
        baseline_keypoints.append(cv.KeyPoint(float(point[1]), float(point[0]), sift_size))
    baseline_keypoints, baseline_descriptors = sift.compute(baseline,baseline_keypoints)
    baseline_points = [keypoint.pt for keypoint in baseline_keypoints]
    ameliorations_radius = [0]*len(radii)
    for map_name, M, distance in tqdm.tqdm(maps_data):
        try:
            with open(f"points_transformed/{map_name}.json", "r") as test_points_json:

                #Load test points and apply the previous homography
                test_points = json.load(test_points_json)
                pts = np.float32([[x,y] for x,y in zip(test_points["x"], test_points["y"])]).reshape(-1,1,2)
                targets = np.float32([[x,y] for x,y in zip(test_points["x_"], test_points["y_"])])
                p_transformed = cv.perspectiveTransform(pts,M)

                #Load the map image, search the intersections, and compute sift
                test_image = cv.imread(f"warped_maps_validation/{map_name}.png", cv.IMREAD_GRAYSCALE)
                test_branches = np.zeros_like(test_image, dtype=bool)
                for selem in selems:
                    test_branches |= ndi.binary_hit_or_miss(test_image, selem)
                test_points = np.argwhere(test_branches)

                test_keypoints = list()
                for point in test_points:
                    test_keypoints.append(cv.KeyPoint(float(point[1]), float(point[0]), sift_size))
                test_keypoints, test_descriptors = sift.compute(test_image,test_keypoints)
                test_points = [keypoint.pt for keypoint in test_keypoints]

                for index, radius in enumerate(radii):
                    s1 = get_similar_neighbours(test_points, baseline_points, test_descriptors, baseline_descriptors, radius=radius)
                    s2 = get_similar_neighbours(baseline_points,test_points, baseline_descriptors,test_descriptors, radius=radius)

                    src_pts = []
                    dst_pts = []
                    for i,s in enumerate(s1):
                        if len(s) == 0:
                            continue
                        if i == s2[s[0]][0]:
                            src_pts.append(test_points[i])
                            dst_pts.append(baseline_points[s[0]])
                    src_pts = np.array(src_pts)
                    dst_pts = np.array(dst_pts)

                    new_M, mask = cv.findHomography(src_pts, dst_pts, cv.RANSAC,5.0)
                    p_transformed_2 = cv.perspectiveTransform(p_transformed,new_M).reshape(-1,2)

                    distance_test = [math.sqrt((target_x - p_transformed_x)**2 + (target_y - p_transformed_y)**2) for ((target_x, target_y), (p_transformed_x, p_transformed_y)) in zip (targets, p_transformed_2)]
                    distances+=distance_test

        except:
            print("error")
            continue
            
    ameliorations.append(ameliorations_radius)

In [None]:
np.mean(distances)