In [1]:
# GENERAL PYTHON IMPORTS
import os
import glob
import numpy as np
from tqdm.notebook import tqdm
from itertools import product
from concurrent.futures import ThreadPoolExecutor

# IMAGE IMPORTS
import cv2
from PIL import Image

# GIS IMPORTS
import fiona
import rasterio as rio
from rasterio.crs import CRS
from affine import Affine
from shapely.geometry import shape, mapping

# PLOTTING IMPORTS
import matplotlib.pyplot as plt
import matplotlib.patches as patches

# PYTORCH IMPORTS
import torch
import torch.nn as nn
import torchvision.transforms as transforms

# CUSTOM UTILITIES
from WorldFileUtils import *
from icp import *

Image.MAX_IMAGE_PIXELS = 933120000

In [2]:
templates_dir = "data/templates/"
tempfiles_dir = "tempfiles/"

In [3]:
def prep(image):
    image = np.asarray(image).astype(np.uint8)
    image = np.uint8(255 - image)
    return image

def prepedges(image, dilate_iterations=30):
    image = np.asarray(image).astype(np.uint8)
    image = cv2.Canny(image,50,100)
    image = cv2.dilate(image, np.ones((3,3), np.uint8), iterations=dilate_iterations)
    return np.asarray(image ).astype(np.uint8)

# REFERENCE IMAGE FROM QGIS
template = prep(Image.open(f"{templates_dir}Harris_Boundary_hollow.png"))[:,:,0]
cv2.imwrite(f"{tempfiles_dir}template.png", template)

# TILE INDEX TO TEST ON
image    = prepedges(Image.open(f"{tempfiles_dir}masks_black.png"))
cv2.imwrite(f"{tempfiles_dir}coarsematching.png", image)
image  = prepedges(Image.open(f"{tempfiles_dir}masks_black.png"), dilate_iterations=0)
cv2.imwrite(f"{tempfiles_dir}border.png", image)

True

In [4]:
class ReturnValues:
    def __init__(self, _result, _mask, _rotated_template, _scale, _angle):
        self.result = _result
        self.mask = _mask
        self.rotated_template = _rotated_template
        self.scale = _scale
        self.angle = _angle

class InputValues:
    def __init__(self, scale, angle):
        self.angle = angle
        self.scale = scale
    def __str__(self):
        return f"{self.scale} {self.scale}"
        
def wrapPatternMatch(inputvalue):
    """
    Wrapper function for pattern match to only have one input for multithreading

    Args:
        inputvalue:  Custom class with values for function

    Returns:
        RVs          Custom wrapper class with required values
    """
    return patternMatch(image, template, inputvalue.scale, inputvalue.angle)

def patternMatch(image, template, scale, angle, method=cv2.TM_SQDIFF):
    """
    Pattern matching with rescaling and rotating. 

    Args:
        image:       Image to template match (2D array)
        template:    Template to use for matching (2D array)
        scale:       Scale for which to rescale as percent of original image width
        angle:       Angle to which to rotate template
        method:      Method to use for template matching

    Returns:
        RVs          Custom wrapper class with required values
    """
    
    # RESIZE THE TEMPLATE TO APPROPRIATE SCALE
    scaled_template = cv2.resize(template, None, fx=scale, fy=scale)
    
    # ROTATE THE TEMPLATE TO APPROPRIATE ANGLE
    rotation_matrix = cv2.getRotationMatrix2D((scaled_template.shape[1] / 2, scaled_template.shape[0] / 2), angle, 1.0)
    rotated_template = cv2.warpAffine(scaled_template, rotation_matrix, (scaled_template.shape[1], scaled_template.shape[0]))
    
    # CREATE MASK TO NOT COUNT ZEROS IN TEMPLATE
    mask = np.where(rotated_template == 0, 0, 255).astype(np.uint8)
    
    # RUN PATTERNMATCHING
    result = cv2.matchTemplate(image, rotated_template, method, mask=mask)    
    return ReturnValues(result, mask,rotated_template, scale, angle)

def postprocess_results(result_list, scales, angles, opt_max, thresh):
    """
    This converts the results from the patternmatching workflow to 
    actionable information as described below

    Args:
        result_list: List of 2D arrays containing results from pattern matching
        scales:      1D array of scales used for current pattern matching 
        angles:      1D array of angles used for current pattern matching 
        opt_max:     Flag indicating whether best is Max or Min. False = Min
        thresh:      Threshold to use for voting members

    Returns:
        x:           Best predicted X for pattern matching
        y:           Best predicted Y for pattern matching
        br:          Tuple containing (X, Y) coordinates of predicted bottom right corner
        rf:          Predicted rescale factor
        ang:         Predicted angle
    """
    
    # FLATTEN ALL ARRAYS TO CALCULATE CUTOFF VALUES FOR CANDIATES
    elem_list = [x.flatten() for x in result_list]
    elem_list = np.hstack(elem_list)
    
    # CUTOFF VALUES FOR CANDIDATES
    if opt_max:
        thresh = np.percentile(elem_list, 1-thresh)
        print(f"Voting members from {thresh:.2f} to {np.max(elem_list):.2f}")
        loc_list = [x >= thresh for x in result_list]
    else:
        thresh = np.percentile(elem_list, thresh)
        print(f"Voting members from {thresh} to {np.min(elem_list)}")
        loc_list = [x <= thresh for x in result_list]
    
    # VOTES ON DIFFERENT PATTERN MATCHING AT DIFFERENT VALUES
    votes = np.array([np.count_nonzero(x) for x in loc_list])
    
    rf  = np.sum(votes * scales) / np.sum(votes)
    ang = np.sum(votes * angles) / np.sum(votes)
    
    # LIST OF X AND Y VALUES
    x_list = np.hstack([np.where(x)[0] for x in loc_list])
    y_list = np.hstack([np.where(x)[1] for x in loc_list])
    
    # RETURN MEDIAN OF VALUES
    x = int(np.median(x_list))
    y = int(np.median(y_list))
    
    # BOTTOM RIGHT VALUE
    br = (int(y + template.shape[1] * rf), int(x + template.shape[0] * rf))
    
    return x, y, br, rf, ang

In [5]:
def SearchOptimalMatch(initial_guess, perturbance, 
                       angle=0, 
                       change_ang=None, 
                       retall=True, 
                       rngpert=2,
                       voting_thresh=0.1):
    
    # WHICH ANGLES AND SCALES TO SEARCH
    if change_ang is None:
        scales = np.arange(initial_guess-perturbance, initial_guess+perturbance+1e-5, perturbance/rngpert)
        inputs = [InputValues(scale, angle) for scale in scales]
        angles = [0]
    else:
        scales = np.arange(initial_guess-perturbance, initial_guess+perturbance+1e-5, perturbance/rngpert)
        angles = np.arange(angle-change_ang, angle+change_ang+1e-5, change_ang/2)
        inputs = [InputValues(scale, angle) for scale, angle in product(scales, angles)]
        scales = [IV.scale for IV in inputs]
        angles  = [IV.angle for IV in inputs]
                
    # INITIALIZE BEST MATCH VARIABLES
    best_match_scale = 1.0
    best_match_angle = 0.0
    
    # IS BEST MAX OR MIN? FALSE = MIN
    opt_max = False
    if opt_max:
        best_match_value = -1 * np.inf
    else:
        best_match_value = np.inf
        
    result_list = list()
    best_loc_list = list()
    rect_corner_list = list()

    with ThreadPoolExecutor(max_workers=8) as executor:
        for retvalue in tqdm(executor.map(wrapPatternMatch, inputs), total=len(inputs)):

            mask = retvalue.mask
            result = retvalue.result
            rotated_template = retvalue.rotated_template
            scale = retvalue.scale
            angle = retvalue.angle
            if not opt_max:
                result = np.sqrt(result / np.count_nonzero(mask))

            result_list.append(result)
            min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)

            if opt_max:
                best_val = max_val
                best_loc = max_loc
                bottom_right = (best_loc[0] + int(rotated_template.shape[1]), best_loc[1] + int(rotated_template.shape[0]))

                best_loc_list.append(best_loc)
                rect_corner_list.append(bottom_right)

            else:
                best_val = min_val
                best_loc = min_loc
                bottom_right = (best_loc[0] + int(rotated_template.shape[1]), best_loc[1] + int(rotated_template.shape[0]))
                best_loc_list.append(best_loc)
                rect_corner_list.append(bottom_right)            

            print(f"{scale:.3f} {angle} {best_val:3f}")
        
        postprocess = postprocess_results(result_list, scales, angles, opt_max, voting_thresh)
    
    if not retall:
        return postprocess[3]
    return postprocess

In [6]:
curr_guess = 0.8
perturbations = [0.1, 0.05, 0.025, 0.0125]

for i, pert in enumerate(perturbations):
    x, y, br, rescale_factor, angle = SearchOptimalMatch(curr_guess, pert)
    curr_guess = rescale_factor
    print(f"New Guess: {curr_guess} {angle}")

#ang_search = 1
#x, y, br, rescale_factor, angle = SearchOptimalMatch(curr_guess, perturbations[-1], 
#                                                     change_ang=ang_search,
#                                                     voting_thresh=1e-3
#                                                    )
#curr_guess = rescale_factor

#print(f"New Guess: {curr_guess} {angle}")
#print(rescale_factor)

  0%|          | 0/5 [00:00<?, ?it/s]

0.700 0 240.085251
0.750 0 239.914212
0.800 0 239.908004
0.850 0 239.957525
0.900 0 239.941018
Voting members from 240.1567513922422 to 239.90800434548157
New Guess: 0.7928748543602308 0.0


  0%|          | 0/5 [00:00<?, ?it/s]

0.743 0 240.172268
0.768 0 240.277662
0.793 0 240.151674
0.818 0 240.088293
0.843 0 240.058947
Voting members from 240.38073206138156 to 240.0589466566056
New Guess: 0.8083063140764478 0.0


  0%|          | 0/5 [00:00<?, ?it/s]

0.783 0 240.095956
0.796 0 240.158452
0.808 0 240.083841
0.821 0 240.135929
0.833 0 240.013730
Voting members from 240.34794207398767 to 240.0137301010282
New Guess: 0.8210904670819122 0.0


  0%|          | 0/5 [00:00<?, ?it/s]

0.809 0 240.072309
0.815 0 240.133632
0.821 0 240.077449
0.827 0 240.107634
0.834 0 240.089163
Voting members from 240.36239231269835 to 240.07230894816567
New Guess: 0.8234305545085288 0.0


In [7]:
three_band = np.dstack([image, image, image])
_ = cv2.rectangle(three_band, (y, x), br, (0, 255, 0), 5)

In [8]:
tfw_file        = f"{templates_dir}Harris_Boundary_hollow.pgw"
template_affine = get_affine_from_geotransform(get_geotransform_from_tfw(tfw_file)[0])
calc_affine     = Affine(1 / rescale_factor, 0, -y, 0, 1 / rescale_factor, -x) * Affine.rotation(angle)

In [9]:
# out_affine = combineAffine(template_affine, calc_affine)
out_affine = template_affine * calc_affine
write_world_file_from_affine(out_affine, "data/TileIndices/48201CIND0_0992.tfw")