In [1]:
import pandas as pd
import os
import json
import matplotlib.pyplot as plt
import seaborn as sns
import cv2
import numpy as np
import math
import time
import torch

from PIL import Image, ImageDraw
from sklearn.metrics.pairwise import euclidean_distances
from matplotlib.patches import Rectangle
from gingerit.gingerit import GingerIt
from rapidfuzz.distance import Levenshtein

from transformers import AutoImageProcessor, AutoModelForObjectDetection

### One function for each step in the pipeline

In [2]:
def get_ocr_data(json_file):
    with open(json_file) as fp:
        data = json.loads(fp.read())
    
    nodes = []

    for b in data['Blocks']:
        if b['BlockType'] == 'LINE' and (len(b['Text']) > 2):
            node = {'text': b['Text'], 
                    'left': b['Geometry']['BoundingBox']['Left'], 
                    'top': b['Geometry']['BoundingBox']['Top'],
                    'right': b['Geometry']['BoundingBox']['Left'] + b['Geometry']['BoundingBox']['Width'],
                    'bottom': b['Geometry']['BoundingBox']['Top'] + b['Geometry']['BoundingBox']['Height']}

            nodes.append(node)
    
    return pd.DataFrame(nodes)

In [3]:
def open_image(filename):
    
    img = cv2.imread(filename)
    
    return cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

In [4]:
def threshold_image(image):
    
    img = image.copy()
    
    img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
    
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    
    mean_tone_value = np.mean(gray)
    
    #print(mean_tone_value)
    
    if mean_tone_value < 128:
        
        gray = 255 - gray
        mean_tone_value = np.mean(gray)
    
    threshold_value = int(mean_tone_value * 0.8)
    
    _, threshold = cv2.threshold(gray, threshold_value, 255, cv2.THRESH_BINARY)
    
    threshold = 1 - (threshold / 255.)
    
    return threshold
    

In [5]:
def set_bounding_boxes_in_pixels(df, img):
    
    img_height = img.shape[0]
    img_width = img.shape[1]
    
    for i, row in df.iterrows():
         
        df.at[i, 'left']   = int(round(row['left'] * img_width))
        df.at[i, 'right']  = int(round(row['right'] * img_width))
        df.at[i, 'top']    = int(round(row['top'] * img_height))
        df.at[i, 'bottom'] = int(round(row['bottom'] * img_height))
        
    df['left']   = df['left'].astype(int)
    df['right']  = df['right'].astype(int)
    df['top']    = df['top'].astype(int)
    df['bottom'] = df['bottom'].astype(int)
    
    return df

In [6]:
def get_font_size(df):
    
    df['font_size'] = df.bottom - df.top
    
    df['font_size'] = (df['font_size'] - df['font_size'].mean()) / (df['font_size'].std() + 1e-6)
    
    df['font_size'] = (df['font_size'].apply(lambda x: round(x)) + 10).astype(int)
    
    return df

In [7]:
def substract_bounding_boxes(df, img, erotion_percent = 0):
    
    img_out = img.copy()

    for i, row in df.iterrows():
        
        width = row['right'] - row['left']
        erotion_width = int(round((width * erotion_percent) / 100))
        
        height = row['bottom'] - row['top']
        erotion_height = int(round((height * erotion_percent) / 100))
        

        img_out[ (row['top'] + erotion_height)  : (row['bottom'] - erotion_height), 
                 (row['left'] + erotion_width) : (row['right'] - erotion_width) ] = 0
    
    return img_out

In [8]:
def close_shape_gaps5(image, ocr,
                      dist_threshold_percent = 30, 
                      activation_lower_th = 40, 
                      activation_upper_th = 70):

    img = image.copy()
    img = (1-img) * 10

    kernel = np.ones((3, 3), np.uint8)
    kernel[1,1] = 10

    dst = cv2.filter2D(img,-1,kernel).astype(int)

    points_thr = np.where((dst > activation_lower_th) & (dst < activation_upper_th))

    points = []
    for p_i in range(len(points_thr[0])): 
        points.append([points_thr[0][p_i], points_thr[1][p_i]])

    points = np.stack(points, axis=0)

    nodes_points = []

    nodes_points.extend([[row.top, row.left] for i, row in ocr.iterrows()])
    nodes_points.extend([[row.top, row.right] for i, row in ocr.iterrows()])
    nodes_points.extend([[row.bottom, row.right] for i, row in ocr.iterrows()])
    nodes_points.extend([[row.bottom, row.left] for i, row in ocr.iterrows()])

    nodes_points   = np.array(nodes_points)
    dist_matrix    = euclidean_distances(points)
    max_bb_height  = (ocr.bottom - ocr.top).max()
    dist_threshold = int((max_bb_height * dist_threshold_percent)/100)

    below_th = np.where((dist_matrix < dist_threshold) & (dist_matrix > 0)) # zero is trivial distance, no need to fill any gap

    img_out = image.copy()

    for i in range(len(below_th[0])):

        p1 = points[below_th[0][i]]
        p2 = points[below_th[1][i]]

        dist_to_nodes = euclidean_distances(np.stack([p1, p2]), nodes_points)
        closest_node = np.argmin(dist_to_nodes) % len(ocr)

        closest_node_height = ocr.loc[closest_node, 'bottom'] - ocr.loc[closest_node, 'top']

        dist_threshold = int((closest_node_height * dist_threshold_percent)/100)

        if np.linalg.norm(p2-p1) < dist_threshold:

            cv2.line(img_out, [p1[1],p1[0]], [p2[1],p2[0]],  (1, 1, 1), thickness=1)
    
    return img_out

In [9]:
def stamp_bounding_boxes_on_image(df, img, erotion_percent = 10):
    
    img_out = img.copy()

    for i, row in df.iterrows():
        
        width = row['right'] - row['left']
        erotion_width = int(round((width * erotion_percent) / 100))
        
        height = row['bottom'] - row['top']
        erotion_height = int(round((height * erotion_percent) / 100))
        

        img_out[ (row['top'] + erotion_height)  : (row['bottom'] - erotion_height), 
                 (row['left'] + erotion_width) : (row['right'] - erotion_width) ] = 1
    
    return img_out

In [10]:
def get_filled_shapes(img):
    
    contours, tree = cv2.findContours(cv2.convertScaleAbs(img), cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)

    img_out = np.zeros_like(img)

    for i, contour in enumerate(contours):
        cv2.drawContours(img_out, [contour], 0, (1, 1, 1), thickness=cv2.FILLED)
        
    return img_out

In [11]:
def get_masks(img, max_iter=10):
    

    kernel = np.ones((3, 3), np.uint8)

    img_eroded = [img.copy()]
    contours_iter = []

    for i in range(max_iter):
        contours, tree = cv2.findContours(cv2.convertScaleAbs(img_eroded[-1]), 
                                          cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)
        contours_iter.append(contours)
        img_eroded.append(cv2.erode(img_eroded[-1], kernel, iterations = 1))
    
    min_contours = len(contours_iter[-1])
    min_contours_iteration = len(contours_iter)-1

    for i in range(len(contours_iter)-1, -1, -1):
        if len(contours_iter[i]) > min_contours:
            min_contours_iteration = i+1
            break
            
            
    nodes_mask = img_eroded[min_contours_iteration]
    
    nodes_mask_dilated = cv2.dilate(nodes_mask, kernel, iterations=min_contours_iteration+1)
    edges_mask = np.maximum((img_eroded[0] - nodes_mask_dilated), 0)


    return nodes_mask, edges_mask

In [12]:
def get_edges_endpoints(edges_mask, min_edge_length_percentage = 3):
    
    final_edges = []

    contour_idswithendpoint = []
    edge_lengths = []
    
    edge_thickness = []
    
    min_edge_length_pixels = (min_edge_length_percentage / 100) * edges_mask.shape[0]

    contours, tree = cv2.findContours(cv2.convertScaleAbs(edges_mask), cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)
    contour_data = []  # A list to store (contour_id, contour, endpoints) tuples
    contour_id = 0

    for contour in contours:
        perimeter = cv2.arcLength(contour, True)
        edge_lengths.append(perimeter)  
        contours_length = len(contour)

        # Avoid division by zero
        if contours_length == 0:
            contours_length = 1
        area = cv2.contourArea(contour)
        # Calculate the thickness of the contour
        thickness = area / contours_length
        edge_thickness.append(thickness)  
        
        c = max([contour], key=cv2.contourArea)
        
        extreme_points = []

        extreme_points.append(np.array(c[c[:, :, 0].argmin()][0]))
        extreme_points.append(np.array(c[c[:, :, 0].argmax()][0]))
        extreme_points.append(np.array(c[c[:, :, 1].argmin()][0]))
        extreme_points.append(np.array(c[c[:, :, 1].argmax()][0]))
        
        extreme_points = np.stack(extreme_points, axis=0)

        contour_data.append((contour_id, contour, extreme_points))


        dist_mat = euclidean_distances(extreme_points)
        if np.max(dist_mat) > min_edge_length_pixels:

            ext_indeces = np.unravel_index(np.argmax(dist_mat), shape=dist_mat.shape)


            final_endpoints = [extreme_points[ext_indeces[0]], extreme_points[ext_indeces[1]]]

            final_edges.append(final_endpoints)
            # Step 2 (Continued): Record contour ID of the endpoints

            contour_idswithendpoint.append(contour_id)
        contour_id = contour_id+1


            
    n = len(edge_lengths)
    ids = np.arange(n).reshape((-1, 1))
    combined_table = np.column_stack((ids, edge_lengths, edge_thickness))    

    selected_rows = combined_table[np.isin(combined_table[:, 0], contour_idswithendpoint)]
    return np.stack(final_edges),selected_rows

In [13]:
def get_nodes(ocr, nodes_mask, threshold_iou = 0.8):
    
    df = ocr.copy()
    nodes_contours, tree = cv2.findContours(cv2.convertScaleAbs(nodes_mask), cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)

    
    for i, row in df.iterrows():

        area = (row['right'] - row['left']) * (row['bottom'] - row['top'])

        max_iou = 0
        max_iou_i_node = -1
        

        for i_node, contour in enumerate(nodes_contours):

            empty_img = np.zeros_like(nodes_mask)

            cv2.drawContours(empty_img, [contour], 0, (1, 1, 1), thickness=-1)

            intersection = empty_img[row['top']:row['bottom'], row['left']:row['right']].sum()

            iou = intersection / area

            if iou > max_iou:
                max_iou = iou
                max_iou_i_node = i_node

        if max_iou > threshold_iou:

            df.at[i, 'node_id'] = max_iou_i_node
            
    df['text'] = df.groupby('node_id')['text'].transform(lambda x: '\n'.join(x))
    df.drop_duplicates('text', inplace=True)

    df =df[df.node_id.notna()]
    df.node_id = df.node_id.astype(int)
    
    df.reset_index(drop=True, inplace=True)
    
    return df

In [14]:
def get_conections(nodes_df, edges_endpoints, img, dist_threshold_percentage = 5):
    
    nodes_contours = []
    nodes_ids = []
    
    for i, row in nodes_df.iterrows():
        
        img_out = np.zeros_like(img, dtype=np.uint16)
        
        img_out[ row.top : row.bottom, row.left : row.right ] = 1
        
        contour, tree = cv2.findContours(cv2.convertScaleAbs(img_out), cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)
        
        assert len(contour) == 1
        
        nodes_contours.append(contour[0])
        nodes_ids.append(row.node_id)
        
    
    
    edges_endpoints = edges_endpoints.astype(np.uint16)

    connections = []

    dist_threshold_in_pixels = int((dist_threshold_percentage / 100) * img.shape[0])

    for edge in edges_endpoints:

        connection = [None, None]

        for i_endpoint, endpoint in enumerate(edge):

            min_dist_to_node = 9e3
            min_dist_node_n = -1

            for i_node, node in enumerate(nodes_contours):

                min_dist = cv2.pointPolygonTest(node, endpoint, True) * (-1)


                if min_dist < min_dist_to_node:
                    min_dist_to_node = min_dist
                    min_dist_node_n = nodes_ids[i_node]

            if min_dist_to_node < dist_threshold_in_pixels:

                connection[i_endpoint] = min_dist_node_n

        if connection[0] is not None and connection[1] is not None and connection[0] != connection[1]:
            connections.append(connection) 

    df = pd.DataFrame(connections, columns=['node a', 'node b'])

    froze_set = set([frozenset([row['node a'], row['node b']]) for i, row in df.iterrows()])

    df = pd.DataFrame(froze_set, columns=['node a', 'node b'])
    
    return df

In [15]:
def filter_image_nodes_from_annotated_df(df):
    
    df = df[df.text.apply(lambda x: x.startswith('image') == False)]
    
    if 'type' in df.columns:
        return df[df.type != 'image']
    else:
        return df

In [16]:
def join_close_nodes(ocr, vertical_distance_threshold_percent=25, horizontal_distance_threshold_percent=50):
    
    df = ocr.copy()
    
    while True:

        to_add = []
        to_remove = []

        flag_updates_made = False

        for i,row_a in df.iterrows():
            for j,row_b in df.iterrows():

                row_a_height = row_a.bottom - row_a.top
                row_b_height = row_b.bottom - row_b.top

                row_a_width = row_a.right - row_a.left
                row_b_width = row_b.right - row_b.left

                mean_height = (row_a_height + row_b_height) / 2
                mean_width  = (row_a_width + row_b_width) / 2

                vertical_distance_threshold_pixels   = (vertical_distance_threshold_percent / 100) * mean_height
                horizontal_distance_threshold_pixels = (horizontal_distance_threshold_percent / 100) * mean_width

                if (j > i and 
                    abs(row_b.top - row_a.bottom) < vertical_distance_threshold_pixels and
                    abs(row_b.left - row_a.left) < horizontal_distance_threshold_pixels):
                    
                    df.at[i, 'text'] = row_a.text + ' ' + row_b.text
                    df.at[i, 'bottom'] = row_b.bottom
                    df.at[i, 'left'] = min(row_a.left, row_b.left)
                    df.at[i, 'right'] = max(row_a.right, row_b.right)
                    df.at[i, 'font_size'] = (row_a.font_size + row_b.font_size) / 2

                    df = df.drop(j, axis=0)
                    df.reset_index(drop=True, inplace=True)

                    flag_updates_made = True
                   
                    break

            if flag_updates_made:
                break
        
        if flag_updates_made == False:
            break
    
    return df

In [17]:
parser = GingerIt()

def spellcheck2(text):
    text = text.replace('&', 'and')
    res = parser.parse(text)
    output = res['result']
    output = output.replace(' and ', ' & ')
    return output

In [18]:
def pair_nodes_with_closest_annotations(pred_df, annotated_df, dist_threshold = 0.35):
    
    predicted_df = pred_df.copy()
    
    predicted_df.insert(column='closest_ann_text', loc=1, value=None)
    predicted_df.insert(column='closest_ann_node_id', loc=len(predicted_df.columns), value=-1)
    predicted_df.insert(column='closest_ann_dist', loc=len(predicted_df.columns), value=1)
    
    for i, row_a in predicted_df.iterrows():
        
        min_dist = dist_threshold
        min_dist_text = None
        min_dist_node_id = -1
        
        for j, row_b in annotated_df.iterrows():

            dist = Levenshtein.normalized_distance(row_a.text.lower(), row_b.text.lower())
            if dist < min_dist:
                min_dist = dist
                min_dist_text = row_b.text
                min_dist_node_id = row_b.node_id
        
        predicted_df.at[i, 'closest_ann_text']    = min_dist_text
        predicted_df.at[i, 'closest_ann_node_id'] = min_dist_node_id
        predicted_df.at[i, 'closest_ann_dist']    = min_dist
    
    predicted_df = predicted_df[predicted_df.closest_ann_dist < dist_threshold]
    
    return predicted_df

# 

## functions to calculate metrics

In [19]:
def get_nodes_metrics(ann_nodes, nodes_df):
    
    nodes_df = pair_nodes_with_closest_annotations(nodes_df.copy(), ann_nodes)
    
    predictions = set(nodes_df.closest_ann_text.apply(lambda x: x.lower()).tolist())
    annotations = set(ann_nodes.text.apply(lambda x: x.lower()).tolist())
    
    tp = annotations & predictions
    
    precision = len(tp) / (len(predictions) + 1e-6)
    recall    = len(tp) / (len(annotations) + 1e-6)
    f1        = (2 * precision * recall) / (precision+recall + 1e-6)
    
    return f1, precision, recall

In [20]:
def get_edges_metrics(ann_nodes, nodes_df, ann_edges, connections_df):
    
    a = pd.merge(ann_edges, ann_nodes,  how='left', left_on='node_a', right_on='node_id')
    b = pd.merge(a, ann_nodes,  how='left', left_on='node_b', right_on='node_id')
    c = b[['text_x', 'text_y']].dropna()
    annotations = set([frozenset([row['text_x'].lower(), row['text_y'].lower()]) for i, row in c.iterrows()])


    nodes_df = pair_nodes_with_closest_annotations(nodes_df, ann_nodes)
    a = pd.merge(connections_df, nodes_df,  how='left', left_on='node a', right_on='node_id')
    b = pd.merge(a, nodes_df,  how='left', left_on='node b', right_on='node_id')
    c = b[['closest_ann_text_x', 'closest_ann_text_y']].fillna('None')
    predictions = set([frozenset([row['closest_ann_text_x'].lower(), row['closest_ann_text_y'].lower()]) for i, row in c.iterrows()])

    tp = annotations & predictions

    precision = len(tp) / (len(predictions) + 1e-6)
    recall    = len(tp) / (len(annotations) + 1e-6)
    f1        = (2 * precision * recall) / (precision+recall + 1e-6)

    return f1, precision, recall

In [21]:
def get_direction_metrics(ann_nodes, nodes_df, ann_edges, connections_df):
    
    a = pd.merge(ann_edges, ann_nodes,  how='left', left_on='node_a', right_on='node_id')
    b = pd.merge(a, ann_nodes,  how='left', left_on='node_b', right_on='node_id')
    
    if b['destination_node'].isna().all():
        b['destination_node'] = -1
        
    c = pd.merge(b, ann_nodes,  how='left', left_on='destination_node', right_on='node_id')
    annotations = c[['text_x', 'text_y',  'text']]
    annotations.columns = ['node a', 'node b', 'destination node']

    annotations.loc[(ann_edges.direction.apply(str.lower) != 'directed') & 
              (ann_edges.direction.apply(str.lower) != 'direction') & 
              (ann_edges.direction.apply(str.lower) != 'direct'), 'destination node'] = -1
    


    nodes_df = pair_nodes_with_closest_annotations(nodes_df.copy(), ann_nodes)

    a = pd.merge(connections_df, nodes_df,  how='left', left_on='node a', right_on='node_id')
    b = pd.merge(a, nodes_df,  how='left', left_on='node b', right_on='node_id')
    
    if b['destination node'].isna().all():
        b['destination node'] = -1
        
    c = pd.merge(b, nodes_df,  how='left', left_on='destination node', right_on='node_id')
    predictions = c[['closest_ann_text_x', 'closest_ann_text_y', 'closest_ann_text']].fillna('None')
    predictions.columns = ['node a', 'node b', 'destination node']
    predictions.loc[predictions['destination node']=='None', 'destination node'] = -1
    
      

    tp = 0
    for i, row in annotations.iterrows():
        a = predictions[(predictions['node a'] == row['node a']) & 
                    (predictions['node b'] == row['node b']) &
                    (predictions['destination node'] == row['destination node'])]

        b = predictions[(predictions['node a'] == row['node b']) & 
                    (predictions['node b'] == row['node a']) &
                    (predictions['destination node'] == row['destination node'])]

        if len(a) + len(b) > 0:
            tp += 1

    precision = tp / (len(predictions) + 1e-6)
    recall    = tp / (len(annotations) + 1e-6)
    f1        = (2 * precision * recall) / (precision+recall + 1e-6)

    return f1, precision, recall

In [22]:
def show_results(nodes_f1, nodes_pr, nodes_re, 
                 edges_f1, edges_pr, edges_re,
                 directions_f1, directions_pr, directions_re):
    
    sns.set_theme()

    fig, ax = plt.subplots(1,3, figsize=(12,4))

    ax[0].plot(range(1,len(nodes_f1)+1), nodes_f1, label='F1')
    ax[0].plot(range(1,len(nodes_pr)+1), nodes_pr, label='Precision')
    ax[0].plot(range(1,len(nodes_re)+1), nodes_re, label='Recall')
    ax[0].set_ylim(0,1)
    ax[0].set_title('Node retrieval')
    ax[0].legend()

    ax[1].plot(range(1,len(edges_f1)+1), edges_f1, label='F1')
    ax[1].plot(range(1,len(edges_pr)+1), edges_pr, label='Precision')
    ax[1].plot(range(1,len(edges_re)+1), edges_re, label='Recall')
    ax[1].set_ylim(0,1)
    ax[1].set_title('Edge retrieval')
    ax[1].legend()
    
    ax[2].plot(range(1,len(directions_f1)+1), directions_f1, label='F1')
    ax[2].plot(range(1,len(directions_pr)+1), directions_pr, label='Precision')
    ax[2].plot(range(1,len(directions_re)+1), directions_re, label='Recall')
    ax[2].set_ylim(0,1)
    ax[2].set_title('Directionality retrieval')
    ax[2].legend()
    
    print(f'Mean F1-score for node retrieval: {np.mean(nodes_f1)}')
    print(f'Mean F1-score for edge retrieval: {np.mean(edges_f1)}')
    print(f'Mean F1-score for directionality retrieval: {np.mean(directions_f1)}')


    plt.show()

# 

## get all annotated examples

In [23]:
main_folder = '../annotated_examples'

ann_examples = sorted(os.listdir(main_folder))

n_examples = len(ann_examples)

print(f'number of annotated examples: {n_examples}')

number of annotated examples: 21


In [24]:
to_exclude_to_time_complexity = ['Hipshot for inflation',
                                 'IMG_0300-1664016398.2217-scaled',
                                 'Mathematics Map',
                                 'erythrocytes L1_2']

#

# 

# adding edge direction

In [25]:
def calculate_contours_thickness(contours):
    thickness_list = []

    for contour in contours:
        # Calculate the area of the contour
        area = cv2.contourArea(contour)

        # Calculate the length of the contour
        length = len(contour)

        # Avoid division by zero
        if length == 0:
            length = 1

        # Calculate the thickness of the contour
        thickness = area / length

        # Add the thickness to the thickness list
        thickness_list.append(thickness)

    return thickness_list

In [26]:
def preprocess_for_arrow_detection(img_gray,dilate,erode):
     
    img_blur = cv2.GaussianBlur(img_gray, (5, 5), 1)
    
    img_canny = cv2.Canny(img_blur, 50, 50)
    
    kernel = np.ones((3, 3))
    
    img_dilate = cv2.dilate(img_canny, kernel, iterations=dilate)
    img_erode  = cv2.erode(img_dilate, kernel, iterations=erode)
    
    return img_erode



def find_tip(points, convex_hull):
    
    length = len(points)
    
    indices = np.setdiff1d(range(length), convex_hull)

    for i in range(2):
        
        j = indices[i] + 2
        if j > length - 1:
            j = length - j
            
        if np.all(points[j] == points[indices[i - 1] - 2]):
            return tuple(points[j])
        
def find_arrow_tail(arrow_tip, contour):
    # Calculate the distances between the arrow tip and all points in the contour
    distances = [np.linalg.norm(arrow_tip - point[0]) for point in contour]

    # Find the index of the point with the maximum distance (farthest point)
    farthest_point_index = np.argmax(distances)

    # Get the farthest point coordinates
    arrow_tail = tuple(contour[farthest_point_index][0])

    return arrow_tail

def detect_arrows(img, dilate_max=5, erode_max=5, rounding_max = 0.05, rounding_step=0.002):
    
    arrow_contours = []
    
    arrow_tips = []
    arrow_origins = []
    arrow_lengths = []
    arrow_thickness = []
    
    dilates = list(range(0, dilate_max))
    erotions = list(range(0, erode_max))
    roundings = np.arange(0.001, rounding_max, rounding_step)

    combinations = []

    for d in dilates:
        for e in erotions:
            for r in roundings:
                combinations.append({'dilate': d, 'erotion': e, 'rounding': r})
    
    for comb in combinations:
        
        contours, hierarchy = cv2.findContours(preprocess_for_arrow_detection(img, comb['dilate'], comb['erotion']), 
                                               cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)

        for cnt in contours:
            peri = cv2.arcLength(cnt, True)
            approx = cv2.approxPolyDP(cnt, comb['rounding'] * peri, True)
            hull = cv2.convexHull(approx, returnPoints=False)
            sides = len(hull)

            if 6 > sides > 3 and sides + 2 == len(approx):
                
                bol_repeated_contour = False
                
                for i,c in enumerate(arrow_contours):
                    if cv2.matchShapes(c, cnt, 1, 0.0) < 1:
                        bol_repeated_contour = True
                        break
                
                if bol_repeated_contour == False:
                    arrow_contours.append(cnt)
                    arrow_tip = find_tip(approx[:, 0, :], hull.squeeze())

                    if arrow_tip:
                        arrow_tips.append(arrow_tip)
                        arrow_tail = find_arrow_tail(arrow_tip, cnt)
                        # caculate the lenth
                        length = np.linalg.norm(np.array(arrow_tip) - np.array(arrow_tail))
                        arrow_lengths.append(length)
                        dist_mat = euclidean_distances(np.expand_dims(arrow_tip, axis=0), np.squeeze(cnt))
                        
                        
                        area = cv2.contourArea(cnt)

                        # Calculate the length of the contour
                        contours_length = len(cnt)

                        # Avoid division by zero
                        if contours_length == 0:
                            contours_length = 1

                        # Calculate the thickness of the contour
                        thickness = area / contours_length
                        arrow_thickness.append(thickness)
                        arrow_origin = np.squeeze(cnt)[np.argmax(dist_mat)]
                        arrow_origins.append(arrow_origin)

    
    arrow_origins = np.array(arrow_origins)
    arrow_tips    = np.array(arrow_tips)
    arrow_lengths = np.array(arrow_lengths)
    arrow_thickness = np.array(arrow_thickness)
        
    return arrow_origins, arrow_tips,arrow_lengths,arrow_thickness

In [27]:
def get_edges_endpoints_directionality(edges_endpoints, tips, origins, dist_threshold=50):

    tips_origins = np.concatenate([tips, origins], axis=1)
    origins_tips = np.concatenate([origins, tips], axis=1)

    dist_mat_tips_origins = euclidean_distances(edges_endpoints.reshape((edges_endpoints.shape[0], 4)), tips_origins)
    dist_mat_origins_tips = euclidean_distances(edges_endpoints.reshape((edges_endpoints.shape[0], 4)), origins_tips)

    min_dist = []
    min_dist.append(np.min(dist_mat_tips_origins, axis=1))
    min_dist.append(np.min(dist_mat_origins_tips, axis=1))

    origins_or_tips = np.argmin(min_dist, axis = 0)

    abs_min_dist = np.array([min_dist[selected][i] for i, selected in enumerate(origins_or_tips)])

    directions = [None] * len(edges_endpoints)

    for index in np.where(abs_min_dist < dist_threshold)[0]:
        directions[index] = origins_or_tips[index]

    return directions

In [28]:
def get_conections(nodes_df, edges_endpoints, edges_directionalities, img, dist_threshold_percentage = 5):
    
    nodes_contours = []
    nodes_ids = []
    edge_id = 0
    edges_id =[]
    for i, row in nodes_df.iterrows():
        
        img_out = np.zeros_like(img, dtype=np.uint16)
        
        img_out[ row.top : row.bottom, row.left : row.right ] = 1
        
        contour, tree = cv2.findContours(cv2.convertScaleAbs(img_out), cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)
        
        assert len(contour) == 1
        
        nodes_contours.append(contour[0])
        nodes_ids.append(row.node_id)
        
    
    
    edges_endpoints = edges_endpoints.astype(np.uint16)

    connections = []
    destination_nodes = []
    


    dist_threshold_in_pixels = int((dist_threshold_percentage / 100) * img.shape[0])

    for edge_i, edge in enumerate(edges_endpoints):

        connection = [None, None]

        for i_endpoint, endpoint in enumerate(edge):

            min_dist_to_node = 9e3
            min_dist_node_n = -1

            for i_node, node in enumerate(nodes_contours):

                min_dist = cv2.pointPolygonTest(node, endpoint, True) * (-1)


                if min_dist < min_dist_to_node:
                    min_dist_to_node = min_dist
                    min_dist_node_n = nodes_ids[i_node]

            if min_dist_to_node < dist_threshold_in_pixels:

                connection[i_endpoint] = min_dist_node_n

        if connection[0] is not None and connection[1] is not None and connection[0] != connection[1]:
            connections.append(connection) 
            edges_id.append(edge_id)
            if edges_directionalities[edge_i] is not None:
                dest_node = connection[edges_directionalities[edge_i]]
            else:
                dest_node = None
                
            destination_nodes.append(dest_node)
        edge_id = edge_id+1

    df_pre = pd.DataFrame(connections, columns=['node a', 'node b'])
    df_pre.insert(column='destination node', loc=2, value=destination_nodes)

#     froze_set = set([frozenset([row['node a'], row['node b']]) for i, row in df_pre.iterrows()])

#     df = pd.DataFrame(froze_set, columns=['node a', 'node b'])
#     df['destination node'] = None
    
#     for i, row in df.iterrows():
        
#         dest_nodes = df_pre.loc[((df_pre['node a'] == row['node a']) & (df_pre['node b'] == row['node b']) |
#                         (df_pre['node b'] == row['node a']) & (df_pre['node a'] == row['node b'])), 'destination node']
        
#         df.at[i, 'destination node'] = dest_nodes.max()
#     print(edges_id)
    
    return df_pre,edges_id


## Load detectors (ViT) models

In [29]:
# drawing_checkpoint = "peter9356/models"
# edge_tip_checkpoint = "peter9356/models"
# node_checkpoint = "peter9356/models"
drawing_checkpoint = "models/drawing"
edge_tip_checkpoint = "models/edge_tip"
node_checkpoint = "models/node"

drawing_image_processor = AutoImageProcessor.from_pretrained(drawing_checkpoint)
drawing_model = AutoModelForObjectDetection.from_pretrained(drawing_checkpoint)

edge_tip_image_processor = AutoImageProcessor.from_pretrained(edge_tip_checkpoint)
edge_tip_model = AutoModelForObjectDetection.from_pretrained(edge_tip_checkpoint)

node_image_processor = AutoImageProcessor.from_pretrained(node_checkpoint)
node_model = AutoModelForObjectDetection.from_pretrained(node_checkpoint)

def get_drawings_predictions(image, threshold=0.1):

    img     = Image.fromarray(image)
    inputs  = drawing_image_processor(images=img, return_tensors="pt")
    outputs = drawing_model(**inputs)
    
    target_sizes = torch.tensor([img.size[::-1]])
    return drawing_image_processor.post_process_object_detection(outputs, target_sizes=target_sizes, threshold=threshold)[0]

def get_edge_tip_predictions(image, threshold=0.1):

    img     = Image.fromarray(image)
    inputs  = edge_tip_image_processor(images=img, return_tensors="pt")
    outputs = edge_tip_model(**inputs)
    
    target_sizes = torch.tensor([img.size[::-1]])
    return edge_tip_image_processor.post_process_object_detection(outputs, target_sizes=target_sizes, threshold=threshold)[0]

def get_node_predictions(image, threshold=0.1):

    img     = Image.fromarray(image)
    inputs  = node_image_processor(images=img, return_tensors="pt")
    outputs = node_model(**inputs)
    
    target_sizes = torch.tensor([img.size[::-1]])
    return node_image_processor.post_process_object_detection(outputs, target_sizes=target_sizes, threshold=threshold)[0]
    

In [30]:
print(type(drawing_image_processor))

<class 'transformers.models.yolos.image_processing_yolos.YolosImageProcessor'>


In [31]:
def get_edges_endpoints_directionality_vit(edges_endpoints, image, edges_directionalities, score_threshold):    
    
    edge_tips_vit = get_edge_tip_predictions(image)
    
    tolerance_margin_pixels = 5
    min_score               = 0.3
    min_difference_score    = 0.1

    edges_directionalities_vit = []
    vit_scores                 = []

    for i, edge_endpoint in enumerate(edges_endpoints):

        x_0 = edge_endpoint[0][0]
        y_0 = edge_endpoint[0][1]

        x_1 = edge_endpoint[1][0]
        y_1 = edge_endpoint[1][1]

        scores_0 = [0]
        scores_1 = [0]

        for j, edge_tip_vit in enumerate(edge_tips_vit['boxes']):

            bb_x0 = edge_tip_vit[0] - tolerance_margin_pixels
            bb_x1 = edge_tip_vit[2] + tolerance_margin_pixels

            bb_y0 = edge_tip_vit[1] - tolerance_margin_pixels
            bb_y1 = edge_tip_vit[3] + tolerance_margin_pixels


            if (x_0 >= bb_x0 and x_0 <= bb_x1 and
                y_0 >= bb_y0 and y_0 <= bb_y1):

                scores_0.append(round(float(edge_tips_vit['scores'][j]), 2))


            if (x_1 >= bb_x0 and x_1 <= bb_x1 and
                y_1 >= bb_y0 and y_1 <= bb_y1):

                scores_1.append(round(float(edge_tips_vit['scores'][j]), 2))

        max_score_0 = np.max(scores_0)
        max_score_1 = np.max(scores_1)

        if max_score_0 > min_score and max_score_0 > max_score_1 + min_difference_score:
            edges_directionalities_vit.append(0)
            vit_scores.append(max_score_0)

        elif max_score_1 > min_score and max_score_1 > max_score_0 + min_difference_score:
            edges_directionalities_vit.append(1)
            vit_scores.append(max_score_1)

        else:
            edges_directionalities_vit.append(None)
            vit_scores.append(0)
    
    
    #compare vit and opencv approaches
    
    final_edges_directionalities = []
    
    for i in range(len(edges_directionalities)):
    
    
        if edges_directionalities[i] == edges_directionalities_vit[i]:
            
            final_edges_directionalities.append(edges_directionalities[i])
            
            
            
        elif edges_directionalities[i] == None and vit_scores[i] > score_threshold: 
            
            final_edges_directionalities.append(edges_directionalities_vit[i])
            
            
            
        elif edges_directionalities_vit[i] == None:
            
            final_edges_directionalities.append(edges_directionalities[i])
            
        
        else:
            
            if vit_scores[i] > score_threshold:
                final_edges_directionalities.append(edges_directionalities_vit[i])
            else:
                final_edges_directionalities.append(edges_directionalities[i])
    
    return final_edges_directionalities

In [32]:
def remove_repeated_connections(df):
    
    to_remove = []
    
    for i, row in df.iterrows():
        
        repeated_rows = df[(((df['node a'] == row['node a']) & (df['node b'] == row['node b'])) |
                          ((  df['node a'] == row['node b']) & (df['node b'] == row['node a'])))]
        
        
        if len(repeated_rows) > 1:
            
            flag_done = False
            
            for j, row2 in repeated_rows.iterrows():
            
                if np.isnan(row2['destination node']) == False:
                    
                    to_remove.extend(repeated_rows.index)
                    to_remove.remove(j)
                    flag_done = True
                    break
            
            if flag_done == False:
                to_remove.extend(repeated_rows.index[1:])
    
    
    df.drop(index=list(set(to_remove)), inplace=True)
    
    return df

In [33]:
all_nodes_df = []
all_connections_df = []
all_images = []

for index in range(len(ann_examples)):

    example_folder = ann_examples[index]
    
    if example_folder in to_exclude_to_time_complexity:
        continue
    
    print(example_folder)
    
    image_file_name = [f for f in os.listdir(os.path.join(main_folder, example_folder)) 
                        if f.lower().endswith('png') or f.lower().endswith('jpg')][0]
    
    
    ocr = get_ocr_data(os.path.join(main_folder, example_folder, example_folder, 'analyzeDocResponse.json'))

    image = []
    
    image_file_name = [f for f in os.listdir(os.path.join(main_folder, example_folder)) 
                        if f.lower().endswith('png') or f.lower().endswith('jpg')][0]
    
    image.append(open_image(os.path.join(main_folder, example_folder,image_file_name)))
    
    
    ocr = set_bounding_boxes_in_pixels(ocr, image[-1])
    ocr = get_font_size(ocr)
    ocr = join_close_nodes(ocr, vertical_distance_threshold_percent=25, horizontal_distance_threshold_percent=50)
  
    image_no_thresholding = cv2.convertScaleAbs(substract_bounding_boxes(ocr, image[0], 20))
    arrow_origins, arrow_tips, arrow_length,arrow_thickness = detect_arrows(image_no_thresholding, dilate_max=5, erode_max=5, rounding_max = 0.05, rounding_step=0.002)
    
    image.append(threshold_image(image[-1]))
    image.append(stamp_bounding_boxes_on_image(ocr, image[-1], erotion_percent = 10))
    #image.append(close_shape_gaps5(image[-1], ocr, dist_threshold_percent = 30))
    
    image.append(get_filled_shapes(image[-1]))
    
    nodes_mask, edges_mask = get_masks(image[-1], max_iter=5)
    
    image.append(nodes_mask)
    image.append(edges_mask)
    
    edges_endpoints, edges_endpoints_features = get_edges_endpoints(edges_mask, min_edge_length_percentage=1.5)
    
    edges_directionalities = get_edges_endpoints_directionality(edges_endpoints, arrow_tips, arrow_origins, dist_threshold=50)
    
    
    #uses vit to predict edge_tip and compares both opencv and vit approaches using a simple logic
    final_edges_directionalities = get_edges_endpoints_directionality_vit(edges_endpoints, image[0], 
                                                                          edges_directionalities, score_threshold=0.3)
    
    
    nodes_df = get_nodes(ocr, nodes_mask, threshold_iou = 0.3)
    
    nodes_df['text'] = nodes_df.text.apply(spellcheck2)

    connections_df,edges_id = get_conections(nodes_df, edges_endpoints, final_edges_directionalities, 
                                    image[-1], dist_threshold_percentage = 20)
    
    
    final_edges_feature = edges_endpoints_features[edges_id]
    df_edge_features = pd.DataFrame(final_edges_feature, columns=['id', 'length', 'thickness'])
    
    connections_df = pd.concat([connections_df, df_edge_features], axis=1)
    
    connections_df = remove_repeated_connections(connections_df)

    
    all_nodes_df.append(nodes_df)
    all_connections_df.append(connections_df)
    all_images.append(image[0])


104024FC-16A3-4D4B-8EEA-75055D623129-1661588297.695


IndexError: list index out of range

In [None]:
nodes_f1 = []
nodes_pr = []
nodes_re = []

edges_f1 = []
edges_pr = []
edges_re = []

directions_f1 = []
directions_pr = []
directions_re = []

counter = 0

for index in range(len(ann_examples)):

    example_folder = ann_examples[index]
    
    if example_folder in to_exclude_to_time_complexity:
        continue
    
    print(example_folder)

    ann_edges = pd.read_excel(os.path.join(main_folder, example_folder, 'annotated edges.xlsx'))
    
    ann_nodes = pd.read_excel(os.path.join(main_folder, example_folder, 'annotated nodes.xlsx'))
    ann_nodes = filter_image_nodes_from_annotated_df(ann_nodes)
    
    f1, pr, re = get_nodes_metrics(ann_nodes, all_nodes_df[counter])
    
    nodes_f1.append(f1)
    nodes_pr.append(pr)
    nodes_re.append(re)
    
    f1, pr, re = get_edges_metrics(ann_nodes, all_nodes_df[counter], ann_edges, all_connections_df[counter])
    
    edges_f1.append(f1)
    edges_pr.append(pr)
    edges_re.append(re)
    
    f1, pr, re = get_direction_metrics(ann_nodes, all_nodes_df[counter], ann_edges, all_connections_df[counter])
    
    directions_f1.append(f1)
    directions_pr.append(pr)
    directions_re.append(re)
    
    counter += 1
    
show_results(nodes_f1, nodes_pr, nodes_re, 
             edges_f1, edges_pr, edges_re,
             directions_f1, directions_pr, directions_re)