In [1]:
from PatchCutter.ImagePreprocessor import ImagePreprocessor
from WordSpotter.ModelWrapper import DeepSoloWrapper

from PIL import Image
import os

from Utils import result_reader as rr
from Utils import bezier_utils as butils
from Utils.visualizer import PolygonVisualizer
from Utils import sampler

import numpy as np

import matplotlib.pyplot as plt

# INPUT
img_path = 'Input/test.jpg' # kiepert_1845.jpeg vandevelde_1846.jpeg test.jpg

# OUTPUT
task_name = os.path.splitext(os.path.basename(img_path))[0]

output_dir = f'Results/{task_name}'

# mkdir
os.makedirs(output_dir, exist_ok=True)

stacked_detection_path = os.path.join(output_dir, f'stacked_detections.json')

flattened_detection_path = os.path.join(output_dir, f'flattened_detections.json')

grouper_graph_path = os.path.join(output_dir, f'grouper_graph.gexf')

toponym_detection_path = os.path.join(output_dir, f'toponym_detections.json')


# MODELS
model_cfg = 'WordSpotter/models/config_96voc.yaml'
model_weights = 'WordSpotter/models/finetune_v2/model.pth'
grouper_model_path = 'Grouper/grouper_model_epoch3.pth' # grouper_model_epoch3.pth  grouper_model_v1_epoch2.pth
# Optional
deepfont_encoder_path = 'StyleEncoder/DeepFontEncoder_full.pth'

## Step 1: Word Spotting

### Functions

In [30]:
def pyramid_scan(img_path, output_path, spotter, num_layers = 2, save_visualization=False):
    image = Image.open(img_path)
    image_preprocessor = ImagePreprocessor(image, overlapping_tolerance=0.3, num_layers=num_layers, min_patch_resolution=384, max_patch_resolution=384)
    image_preprocessor.process()
    print("preprocessing done")
    all_layer_results = []

    base_image_batch, base_offset_xs, base_offset_ys = image_preprocessor.get_image_patches(0)
    vis = PolygonVisualizer()
    vis.canvas_from_patches(base_image_batch, base_offset_xs, base_offset_ys)

    for i in range(image_preprocessor.num_layers):
        # If you want to save for each layer, uncomment the following line
        # image_preprocessor.save_patches(os.path.join(output_dir, f'layer_{i}_patches'), layer=i)

        image_batch, offset_xs, offset_ys = image_preprocessor.get_image_patches(i)
        spotter.load_batch(image_batch, offset_xs, offset_ys)
        results = spotter.inference_batch(batch_size=8, rotations = [0])
        all_layer_results.extend(results)

    print("Saving final results")
    if save_visualization:
        vis.draw(all_layer_results).save(output_path.replace('.json', '.jpg'))
    vis.save_json(all_layer_results, output_path)

    return all_layer_results

### Operations

In [31]:
spotter = DeepSoloWrapper(model_cfg, model_weights, score_threshold=0.4)

_ = pyramid_scan(img_path, stacked_detection_path, spotter, num_layers = 1, save_visualization=True)

Cropping patches: [13]
preprocessing done
Rotating images by 0 degrees


100%|██████████| 22/22 [00:34<00:00,  1.55s/it]


Saving final results


## Step 2: Flattening

### Functions

In [32]:
def aggregate_closest_results_iter(results, sample_count = 20, evaluate_overlapping = "any", height_multiplier = 0.3):
    '''
        results: list of entries with keys 'center_bezier_pts', 'center', 'text', 'score', 'avg_height', 'left', 'right'
    '''

    center_entry = results[0]
    avg_height = center_entry['avg_height']
    multiplier = height_multiplier

    # Find the closest sample_count points to the center
    closest_points, closest_indices = sampler.sample(center_entry, results, sample_count)
    
    group = []
    group_ids = []
    ambiguous = []
    center_bezier_pts_x = [p[0] for p in center_entry['center_bezier_pts']]
    center_bezier_pts_y = [p[1] for p in center_entry['center_bezier_pts']]
    for i, point in zip(closest_indices, closest_points):
        point_bezier_pts_x = [p[0] for p in point['center_bezier_pts']]
        point_bezier_pts_y = [p[1] for p in point['center_bezier_pts']]
        bezier_dist, is_ambiguous = butils.bezier_distance(center_bezier_pts_x, center_bezier_pts_y, point_bezier_pts_x, point_bezier_pts_y, evaluate_overlapping=evaluate_overlapping, samples = 40)
        if bezier_dist < avg_height * multiplier:
            group.append(point)
            group_ids.append(i)
            if is_ambiguous:
                ambiguous.append(True)
            else:
                ambiguous.append(False)

    return group, group_ids, ambiguous

def aggregate_closest_results(results, sample_count = 20, evaluate_overlapping = "none", height_multiplier = 0.8):
    ungrouped_results = results
    grouped_results = []
    ambiguity = []

    # Sort ungrouped results by distance between 'left' and 'right', descending
    ungrouped_results.sort(key=lambda x: np.linalg.norm(np.array(x['right']) - np.array(x['left'])), reverse=True)

    while len(ungrouped_results) > 0:
        group, group_ids, ambiguous = aggregate_closest_results_iter(ungrouped_results, sample_count, evaluate_overlapping=evaluate_overlapping, height_multiplier=height_multiplier)
        grouped_results.append(group)
        ambiguity.append(ambiguous)
        ungrouped_results = [result for i, result in enumerate(ungrouped_results) if i not in group_ids]
        print(f"Grouped {len(group)} results. {len(ungrouped_results)} results remaining. Ambiguous: {ambiguous}")

    return grouped_results, ambiguity

In [33]:
def normalize_adhesive(_grouped_results, ambiguity, original_image):
    result = []
    for group, ab in zip(_grouped_results, ambiguity):
        if any(ab) == False:
            group.sort(key=lambda x: np.linalg.norm(np.array(x['right']) - np.array(x['left'])), reverse=True)
            result.append(group[0])
        else:
            center_bezier_pts = group[0]['center_bezier_pts']
            center_bezier_xs = [pt[0] for pt in center_bezier_pts]
            center_bezier_ys = [pt[1] for pt in center_bezier_pts]
            width = group[0]['avg_height']*0.5
            for w, a in zip(group, ab):
                if a:
                    w_bezier_xs = [pt[0] for pt in w['center_bezier_pts']]
                    w_bezier_ys = [pt[1] for pt in w['center_bezier_pts']]
                    center_bezier_xs, center_bezier_ys = butils.glue_bezier(center_bezier_xs, center_bezier_ys, w_bezier_xs, w_bezier_ys)

            center_bezier_pts = [(x, y) for x, y in zip(center_bezier_xs, center_bezier_ys)]
            
            # Optional step to get word snippet
            #snippet, transform = butils.get_center_bezier_bbox(original_image=original_image, center_bezier_pts=center_bezier_pts, width=width,scale=1.5)

            # Take the longest word in the group                    
            new_text = max([w['text'] for w in group], key=len)
            new_avg_height = np.mean([w['avg_height'] for w in group])
            new_score = np.mean([w['score'] for w in group])

            new_result = butils.make_result(center_bezier_pts, new_avg_height, new_text, new_score)

            result.append(new_result)

    return result

### Operations

In [34]:
result = rr.read_json(stacked_detection_path)
        
grouped_results, ambiguity = aggregate_closest_results(result, sample_count=15, evaluate_overlapping="any")

new_results = normalize_adhesive(grouped_results, ambiguity, Image.open('Input/kiepert_1845.jpeg'))

rr.save_json(new_results, flattened_detection_path)

Grouped 6 results. 2106 results remaining. Ambiguous: [False, False, False, False, True, True]
Grouped 4 results. 2102 results remaining. Ambiguous: [False, False, False, False]
Grouped 5 results. 2097 results remaining. Ambiguous: [False, False, False, False, False]
Grouped 6 results. 2091 results remaining. Ambiguous: [False, False, False, False, False, False]
Grouped 4 results. 2087 results remaining. Ambiguous: [False, False, True, True]
Grouped 4 results. 2083 results remaining. Ambiguous: [False, False, False, False]
Grouped 1 results. 2082 results remaining. Ambiguous: [False]
Grouped 5 results. 2077 results remaining. Ambiguous: [False, False, False, False, False]
Grouped 2 results. 2075 results remaining. Ambiguous: [False, False]
Grouped 5 results. 2070 results remaining. Ambiguous: [False, False, False, False, False]
Grouped 4 results. 2066 results remaining. Ambiguous: [False, False, False, True]
Grouped 3 results. 2063 results remaining. Ambiguous: [False, False, False]
Gr

### Visualization

In [35]:
vis = PolygonVisualizer()
vis.canvas_from_image(Image.open(img_path))
vis.draw(new_results).save(flattened_detection_path.replace('.json', '.jpg'))

## Optional: Word Style Representation

### Functions

In [5]:
from StyleEncoder.DeepFont import DeepFontEncoder, EncodeFontBatch, load_model

def generate_style_embeddings(results, original_image, deepfont_encoder:DeepFontEncoder):
    snippets = []
    for r in results:
        snippet, _ = butils.get_bezier_bbox(original_image, r['upper_bezier_pts'], r['lower_bezier_pts'], scale=1.1)
        snippets.append(snippet)

    # Save snippets
    #for i, snippet in enumerate(snippets):
    #    snippet.save(f'Snippets/test_snippet_{i}.jpg')

    embeddings = EncodeFontBatch(deepfont_encoder, snippets)

    for r, embedding in zip(results, embeddings):
        r['style_embedding'] = embedding
    
    return results

### Operations

In [6]:
results = rr.read_json(flattened_detection_path)

deepfont_encoder = load_model(deepfont_encoder_path)

results = generate_style_embeddings(results, Image.open(img_path), deepfont_encoder)

rr.save_json(results, flattened_detection_path)

  return torch.max_pool2d(input, kernel_size, stride, padding, dilation, ceil_mode)


## Step 3: Toponym Assignment

### Functions

In [2]:
import Grouper.GrouperCaller as GC
import importlib
importlib.reload(GC)

from Grouper.GrouperCaller import *
import networkx as nx
from tqdm import tqdm

def _cosine_similarity(a, b):
    return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

def group_toponyms(results, grouper, sample_count = 15, use_style_embeddings = False):
    if use_style_embeddings:
        print("Using style embeddings")
        
    G = nx.DiGraph()
    order_observations = []

    for j in tqdm(range(len(results))):
        center_entry = results[j]
        # Find the closest sample_count points to the center
        closest_points, closest_indices = sampler.sample(center_entry, results, sample_count)

        group_ids = []
        features = []
        for i, point in zip(closest_indices, closest_points):
            point_bezier_pts = np.array(point['upper_bezier_pts'] + point['lower_bezier_pts'][::-1])

            # Flatten the bezier points into a 1D array
            point_bezier_pts = point_bezier_pts.flatten()

            features.append(point_bezier_pts)

        dict_ids = grouper.get_toponym_sequence2(features, 0)

        # Remove duplicates
        included = set()
        dict_ids = [i for i in dict_ids if i not in included and not included.add(i)]

        group_ids = [closest_indices[i] for i in dict_ids if i < len(closest_indices)]

        if len(group_ids) != 0:
            embedding_j = center_entry['style_embedding'] if use_style_embeddings else None
            for i in group_ids:
                embedding_i = results[i]['style_embedding'] if use_style_embeddings else None
                if use_style_embeddings:
                    similarity = _cosine_similarity(embedding_j, embedding_i)
                    G.add_edge(j, i, weight=similarity)
                else:
                    if not G.has_edge(j, i):
                        G.add_edge(j, i, weight=1)
                    else:
                        G[j][i]['weight'] += 1

        if len(group_ids) > 1:
            order_observations.append(group_ids)

    return G, order_observations

In [3]:
def minimize_observation_error_sorting(observations, n):
    # Initialize pairwise preference matrix
    pairwise_matrix = [[0] * n for _ in range(n)]

    # Update pairwise preferences based on observations
    for observation in observations:
        for i in range(len(observation)):
            for j in range(i + 1, len(observation)):
                pairwise_matrix[observation[i]][observation[j]] += 1
                pairwise_matrix[observation[j]][observation[i]] -= 1

    # Construct weighted directed graph
    G = nx.DiGraph()
    for i in range(n):
        for j in range(n):
            if pairwise_matrix[i][j] > 0:
                G.add_edge(i, j, weight=pairwise_matrix[i][j])

    # Attempt topological sorting
    try:
        order = list(nx.topological_sort(G))
    except nx.NetworkXUnfeasible:
        # Graph has cycles, use a heuristic to break cycles
        order = list(nx.topological_sort(nx.DiGraph(G)))

    return order

def toponym_from_graph_strong_component(results, G, order_observations):
    connected_components = list(nx.strongly_connected_components(G))

    # For each connected component, create a group of results
    grouped_results = []
    for component in connected_components:
        component = [int(i) for i in component]
        observations = [ob for ob in order_observations if all([i in component for i in ob])]

        if len(observations) == 0:
            group = [results[i] for i in component]
            grouped_results.append(group)
        else:
            observations = [[component.index(i) for i in obs] for obs in observations]
            order = minimize_observation_error_sorting(observations, len(component))
            group = [results[component[i]] for i in order]
            grouped_results.append(group)

    return grouped_results

def _split_community(subgraph:nx.Graph, threshold):
    if len(subgraph) <= threshold:
        return [subgraph]
    else:
        communities = nx.algorithms.community.kernighan_lin_bisection(subgraph)
        return _split_community(subgraph.subgraph(communities[0]), threshold) + _split_community(subgraph.subgraph(communities[1]), threshold)

def toponym_from_graph_community_detection(results, G: nx.DiGraph, threshold = 10):
    communities = list(nx.weakly_connected_components(G))

    # For each connected component, create a group of results
    grouped_results = []
    splited_communities = []
    for c in communities:
        subgraph = G.subgraph(c).to_undirected()
        splited_communities.extend(_split_community(subgraph, threshold))
    
    for community in splited_communities:
        group = [results[int(i)] for i in community]
        grouped_results.append(group)

    return grouped_results

### Operations

In [5]:
results = rr.read_json(flattened_detection_path)

use_style_embeddings = False
if 'style_embedding' in results[0].keys():
    use_style_embeddings = True

grouper = GrouperCaller(grouper_model_path)

directed_graph, order_observations = group_toponyms(results, grouper, use_style_embeddings=use_style_embeddings)

rr.save_toponym_graph(directed_graph, grouper_graph_path)
rr.save_json_nested(order_observations, grouper_graph_path.replace('.gexf', '.json'))

100%|██████████| 5328/5328 [10:09<00:00,  8.75it/s]


In [7]:
results = rr.read_json(flattened_detection_path)
directed_graph = rr.read_toponym_graph(grouper_graph_path)
order_observations = rr.read_json_nested(grouper_graph_path.replace('.gexf', '.json'))

toponyms = toponym_from_graph_strong_component(results, directed_graph, order_observations)

rr.save_json_nested(toponyms, toponym_detection_path)

### Visualization

In [5]:
toponyms = rr.read_json_nested(toponym_detection_path)

vis = PolygonVisualizer()
vis.canvas_from_image(Image.open(img_path))
vis.draw_multiple(toponyms).save(toponym_detection_path.replace('.json', '.jpg'))

In [8]:
toponyms = rr.read_json_nested(toponym_detection_path)

toponyms_final = rr.extract_toponyms_from_result_groups(toponyms)

print(len(toponyms_final))

vis = PolygonVisualizer()
vis.canvas_from_image(Image.open(img_path))
vis.draw_toponyms(toponyms_final).save(toponym_detection_path.replace('.json', '.jpg'))


3264
