In [None]:
import json
import pymupdf
from PIL import Image
import time

In [None]:
from google.cloud import vision
import io
from PIL import Image, ImageDraw


def ocr_image(image_path):
    """
    Performs OCR on the given image and returns detected text elements with bounding boxes.

    Args:
        image_path (str): Path to the input image.

    Returns:
        list: A list of dictionaries containing 'description' and 'bounding_box'.
    """
    client = vision.ImageAnnotatorClient()

    with io.open(image_path, 'rb') as image_file:
        content = image_file.read()

    image = vision.Image(content=content)

    response = client.text_detection(image=image)
    texts = response.text_annotations

    if response.error.message:
        raise Exception(f'API Error: {response.error.message}')

    if not texts:
        print('No text detected.')
        return []

    # Each text annotation includes description and bounding_poly
    # The first element is the entire detected text, subsequent elements are individual words
    text_elements = []
    for text in texts[1:]:  # Skip the first element
        element = {
            'description': text.description,
            'bounding_box': [(vertex.x, vertex.y) for vertex in text.bounding_poly.vertices]
        }
        text_elements.append(element)

    return text_elements

def merge_text_elements(text_elements, max_x_distance=10, max_y_distance=5):
    """
    Merges text elements that are horizontally close and vertically aligned based on bounding box sides.

    Args:
        text_elements (list): List of text elements with 'description' and 'bounding_box'.
        max_x_distance (int): Maximum horizontal distance in pixels to consider for merging.
        max_y_distance (int): Maximum vertical distance in pixels to consider for merging.

    Returns:
        list: A new list of merged text elements.
    """
    if not text_elements:
        return []

    # Sort text elements by top-left y-coordinate, then by left x-coordinate
    sorted_elements = sorted(text_elements, key=lambda el: (el['bounding_box'][0][1], el['bounding_box'][0][0]))
    elements_to_keep = []
    while True:
        merged = False
        merged_elements = []
        current_element = sorted_elements[0].copy()
        
        for next_element in sorted_elements[1:]:
            #print("Considering merging ", current_element['description'], " and ", next_element['description'])
            #print("  y coordinates: ", [coord[1] for coord in current_element['bounding_box']], " and ", [coord[1] for coord in next_element['bounding_box']])
            # Extract bounding box coordinates
            current_bbox = current_element['bounding_box']
            next_bbox = next_element['bounding_box']

            # Determine right side of current and left side of next
            current_right_x = max([coord[0] for coord in current_bbox])
            current_left_x = min([coord[0] for coord in current_bbox])
            next_left_x = min([coord[0] for coord in next_bbox])
            next_right_x = max([coord[0] for coord in next_bbox])

            # Calculate horizontal distance between right of current and left of next
            horizontal_distance1 = next_left_x - current_right_x
            horizontal_distance2 = current_left_x - next_right_x

            # Extract top y-coordinates for vertical alignment
            current_top_y = current_bbox[0][1]
            next_top_y = next_bbox[0][1]
            vertical_distance = abs(next_top_y - current_top_y)
            # print("  Vertical distance: ", vertical_distance)
            # print("  Horizontal distance1: ", horizontal_distance1)
            # print("  Horizontal distance2: ", horizontal_distance2)
            # Decide whether to merge
            if (abs(horizontal_distance1) < max_x_distance or abs(horizontal_distance2) < max_x_distance) and abs(vertical_distance) <= max_y_distance:
                # Merge descriptions
                # Decide which is left and which is right
                if current_right_x < next_left_x:
                    current_element['description'] += next_element['description']
                else:
                    current_element['description'] = next_element['description'] + current_element['description']
                # Merge bounding boxes
                # New left_x is min of current and next
                new_left_x = min([coord[0] for coord in current_bbox] + [coord[0] for coord in next_bbox])
                # New right_x is max of current and next
                new_right_x = max([coord[0] for coord in current_bbox] + [coord[0] for coord in next_bbox])
                # New top_y is min of current and next
                new_top_y = min([coord[1] for coord in current_bbox] + [coord[1] for coord in next_bbox])
                # New bottom_y is max of current and next
                new_bottom_y = max([coord[1] for coord in current_bbox] + [coord[1] for coord in next_bbox])

                # Define new bounding box
                new_bounding_box = [
                    (new_left_x, new_top_y),
                    (new_right_x, new_top_y),
                    (new_right_x, new_bottom_y),
                    (new_left_x, new_bottom_y)
                ]
                # print("  Merged them!")
                current_element['bounding_box'] = new_bounding_box
                merged = True
            else:
                # No merge; add the current element to the list
                merged_elements.append(next_element)

        # Add the last element
        # merged_elements.append(current_element)
        
        # Update sorted_elements for next iteration
        sorted_elements = merged_elements.copy()
        # if merged:
        #     merged_elements.insert(0, current_element)
        elements_to_keep.append(current_element)
        
        # print("Nothing else to merge. Keeping ", current_element)
        # elements_to_keep.append(current_element)
        if len(sorted_elements) == 0:
            # Finished with all elements
            break
        else:
            current_element = merged_elements[0]
                
    return elements_to_keep

def find_text(target_text, text_elements):
    """
    Finds the first occurrence of the target text in the list of text elements.

    Args:
        target_text (str): The text to search for.
        text_elements (list): List of text elements with 'description' and 'bounding_box'.

    Returns:
        dict or None: The text element containing the target text or None if not found.
    """
    target_lower = target_text.strip().lower().replace(',', '').replace(' ', '').replace('.', '')
    for element in text_elements:
        if element['description'].strip().lower().replace(',', '').replace(' ', '').replace('.', '') == target_lower:
            return element
    return None


def compute_coordinates_lysvidde(target_element):
    """
    Computes new coordinates based on the position of the target text element.

    Args:
        target_element (dict): The text element containing the target text.

    Returns:
        dict: A dictionary containing the new coordinates.
    """
    # Assuming the bounding box is a rectangle with vertices ordered clockwise starting from top-left
    # We'll use the top-left vertex as the reference point
    top_left = target_element['bounding_box'][0]
    x_target = top_left[0]
    y_target = top_left[1]

    new_coords = {
        'x_target': x_target,
        'y_target': y_target,
        'x_1': x_target + 1130,
        'y_1': y_target - 20,
        'x_2': x_target + 1130,
        'y_2': y_target + 3,
        'x_3': x_target + 1130,
        'y_3': y_target + 29
    }

    return new_coords


def find_elements_containing_point(x, y, text_elements):
    """
    Finds all text elements whose bounding boxes contain the given (x, y) point.

    Args:
        x (int): The x-coordinate of the point.
        y (int): The y-coordinate of the point.
        text_elements (list): List of text elements with 'description' and 'bounding_box'.

    Returns:
        list: A list of text elements containing the point.
    """
    containing_elements = []
    for element in text_elements:
        bbox = element['bounding_box']
        # Extract x and y coordinates separately
        x_coords = [coord[0] for coord in bbox]
        y_coords = [coord[1] for coord in bbox]
        min_x, max_x = min(x_coords), max(x_coords)
        min_y, max_y = min(y_coords), max(y_coords)

        if min_x <= x <= max_x and min_y <= y <= max_y:
            containing_elements.append(element)

    return containing_elements

def draw_image_with_bounding_boxes(image_path, text_elements, output_path='annotated_image.jpg'):
    image = Image.open(image_path)
    draw = ImageDraw.Draw(image)
    for element in text_elements:
        draw.line(element['bounding_box'], fill='blue', width=2)
    image.save(output_path)

def perform_ocr(image_filename):
    # Perform OCR on the image
    texts_with_bounding_box = ocr_image(image_filename)
    # texts_with_bounding_box = [{"description":"Fredrikstad","bounding_box":[[207,1084],[303,1083],[303,1101],[207,1102]]},{"description":"bru","bounding_box":[[309,1083],[336,1083],[336,1101],[309,1101]]},{"description":",","bounding_box":[[337,1083],[342,1083],[342,1100],[337,1100]]},{"description":"G","bounding_box":[[348,1083],[362,1083],[362,1100],[348,1100]]}]
    
    draw_image_with_bounding_boxes(image_filename, texts_with_bounding_box, f"annotated_image.jpg")
    
    # Merge text elements based on proximity
    merged_texts = merge_text_elements(texts_with_bounding_box, max_x_distance=10, max_y_distance=5)
    print(len(merged_texts))
    draw_image_with_bounding_boxes(image_filename, merged_texts, f"annotated_image_merged.jpg")
    return merged_texts


def find_lysvidde(ocr_elements, lighthouse_name):
    # Find the target text
    target_text = find_text(lighthouse_name, ocr_elements)

    if not target_text:
        print(f"Text '{lighthouse_name}' not found in the image.")
        return None

    # Compute coordinates for Lysvidde
    coords = compute_coordinates_lysvidde(target_text)

    # Extract the new points
    new_points = [
        (coords['x_1'], coords['y_1']),
        (coords['x_2'], coords['y_2']),
        (coords['x_3'], coords['y_3'])
    ]

    lysvidde = None
    # Find text elements containing these new points
    containing_texts_dict = {}
    for idx, (x, y) in enumerate(new_points, start=1):
        containing_texts = find_elements_containing_point(x, y, ocr_elements)
        containing_texts_dict[(x, y)] = containing_texts
        if containing_texts:
            for text in containing_texts:
                value = float(text['description'].replace(",", "."))
                if lysvidde is None:
                    lysvidde = value
                lysvidde = max(lysvidde, value)
        
    return lysvidde

In [10]:
import base64
import vertexai
from vertexai.generative_models import GenerativeModel, Part
import vertexai.generative_models as generative_models
import requests
import os
import json
import base64
import random

locations = ["us-central1",
    "asia-east1",
    "asia-east2",
    "asia-northeast1",
    "asia-northeast3",
    "asia-south1",
    "asia-southeast1",
    "australia-southeast1",
    "europe-central2",
    "europe-north1",
    "europe-southwest1",
    "europe-west1",
    "europe-west2",
    "europe-west3",
    "europe-west4",
    "europe-west6",
    "europe-west8",
    "europe-west9",
    "me-central1",
    "me-central2",
    "me-west1",
    "northamerica-northeast1",
    "southamerica-east1",
    "us-east1",
    "us-east4",
    "us-east5",
    "us-south1",
    "us-west1",
    "us-west4"]
    
def extract_lighthouses_using_gemini(image_file_name, location):
    file = open(image_file_name, "rb")
    image_bytes = file.read()
    
    vertexai.init(project="cognitedata-development", location=location)
    model = GenerativeModel(
        "gemini-1.5-pro-002",
    )
    chat = model.start_chat()

    image = Part.from_data(mime_type="image/png", data=image_bytes)

    generation_config = {
        "max_output_tokens": 8192,
        "temperature": 0.0,
        "top_p": 0.95,
        "response_mime_type": "application/json",
        "response_schema": {
            'type_': 'OBJECT',
            'properties': {
                'items': {
                    'type_': 'ARRAY',
                    'items': {
                        'type_': 'OBJECT',
                        'properties': {
                            'latitude': {
                                'type_': 'OBJECT',
                                'properties': {
                                    'degrees': {
                                        'type_': 'INTEGER',
                                        'description': 'Degrees of latitude, ranging from -90 to 90.',
                                    },
                                    'minutes': {
                                        'type_': 'NUMBER',
                                        'description': 'Minutes of latitude, ranging from 0 to 60.',
                                    },
                                },
                                'required': [
                                    'degrees',
                                    'minutes',
                                ]
                            },
                            'longitude': {
                                'type_': 'OBJECT',
                                'properties': {
                                    'degrees': {
                                        'type_': 'INTEGER',
                                        'description': 'Degrees of longitude, ranging from -180 to 180.',
                                    },
                                    'minutes': {
                                        'type_': 'NUMBER',
                                        'description': 'Minutes of longitude, ranging from 0 to 60.',
                                    },
                                },
                                'required': [
                                    'degrees',
                                    'minutes',
                                ]
                            },
                            'pattern': {
                                'type_': 'STRING',
                                'description': 'Flash pattern. Called Karakter in the input.',
                            },
                            'description': {
                                'type_': 'STRING',
                            },
                            'heightOverGround': {
                                'type_': 'NUMBER',
                                'description': 'Height over ground. Must be smaller than or equal height.',
                            },
                            'height': {
                                'type_': 'NUMBER',
                                'description': 'Height over sea. Must be larger than or equal to heightOverGround.',
                            },
                            'sectors': {
                                'type_': 'ARRAY',
                                'items': {
                                    'type_': 'OBJECT',
                                    'properties': {
                                        'color': {
                                            'type_': 'STRING',
                                            'description': 'Color of sector. Typically R, G or W.',
                                        },
                                        'start': {
                                            'type_': 'NUMBER',
                                            'description': 'Start angle of sector [degrees in range 0-360].',
                                        },
                                        'stop': {
                                            'type_': 'NUMBER',
                                            'description': 'Start angle of sector [degrees in range 0-360].',
                                        },
                                        'description': {
                                            'type_': 'STRING',
                                        },
                                    },
                                    'required': [
                                        'color',
                                        'start',
                                        'stop',
                                        'description',
                                    ]
                                },
                            },
                            'area': {
                                'type_': 'STRING',
                            },
                            'name': {
                                'type_': 'STRING',
                            },
                            'location': {
                                'type_': 'STRING'
                            }
                        },
                        'required': [
                            'latitude',
                            'longitude',
                            'height',
                            'sectors',
                            'name',
                            'area'
                        ]
                    },
                },
            },
            'required': [
                'items',
            ]
        },
    }

    # safety_settings = {
    #     generative_models.HarmCategory.HARM_CATEGORY_HATE_SPEECH: generative_models.HarmBlockThreshold.BLOCK_NONE,
    #     generative_models.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: generative_models.HarmBlockThreshold.BLOCK_NONE,
    #     generative_models.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: generative_models.HarmBlockThreshold.BLOCK_NONE,
    #     generative_models.HarmCategory.HARM_CATEGORY_HARASSMENT: generative_models.HarmBlockThreshold.BLOCK_NONE,
    # }
    
    response = chat.send_message(
      [image, "Generate JSON for these lighthouses. 'Karakter' is used for flash pattern. When reading sectors, use the sector color column with a single letter. Ensure that all sectors are included. All sector start/stop are positive numbers. If no sectors exist, or no maxRange, ignore the lighthouse."],
      generation_config=generation_config,
    )
    
    return json.loads(response.candidates[0].content.text)['items']


In [None]:
def convert_pdf_page_to_image(page, image_file_name, mask_lysvidde):
    pix = page.get_pixmap(dpi=200)
    img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
    # Mask rectangle between x=1300 and x=1432
    if mask_lysvidde:
        for y in range(0, pix.height):
            for x in range(1300, 1432):
                img.putpixel((x, y), (255, 255, 255))
    else:
        # Mask between x in range 540, 1300 and x in range 1432, 2300
        for y in range(0, pix.height):
            for x in range(1000, 1300):
                img.putpixel((x, y), (255, 255, 255))
            for x in range(1432, 2300):
                img.putpixel((x, y), (255, 255, 255))
        
    img.save(image_file_name, quality=100)

def validate_text_existence(page_text, page_number, lighthouse):
    # TODO: in case the same coordinates appear more than once, we can verify
    # that the value appears the right amount of times (or more).
    errors = []
    
    needles = {
        "latitude_minutes": str(lighthouse['latitude']['minutes']),
        "latitude_degrees": str(lighthouse['latitude']['degrees']),
        "longitude_minutes": str(lighthouse['longitude']['minutes']),
        "longitude_degrees": str(lighthouse['longitude']['degrees']),
        "height": str(lighthouse['height']).replace(".", ","),
    }
    
    for sector_index, sector in enumerate(lighthouse['sectors']):
        start_str = str(sector['start']).replace(".", ",")
        stop_str = str(sector['stop']).replace(".", ",")
        needles[f"{sector_index}_start"] = start_str
        needles[f"{sector_index}_stop"] = stop_str
    
    if "pattern" in lighthouse:
        needles["pattern"] = lighthouse["pattern"]
    
    if "range" in lighthouse:
        if lighthouse["range"] == int(lighthouse["range"]):
            # maxRange will appear without decimal if it is an integer
            needles["range"] = str(int(lighthouse["range"]))
        else:
            needles["range"] = str(lighthouse["range"]).replace(".", ",")
        
    minimum_value_count = {}
    for value in needles.values():
        if not value in minimum_value_count:
            minimum_value_count[value] = 0
        minimum_value_count[value] += 1
    
    for key, value in needles.items():
        value_count_on_page = page_text.count(value)
        if value_count_on_page < minimum_value_count[value]:
            errors.append(f"Value {key} missing at least once for {lighthouse['name']} on page {page_number+1} (value is {value} should appear {minimum_value_count[value]} times)")
    return errors

def validate_extracted_lighthouses(page, page_number, lighthouses):
    page_text = page.get_text()
    
    errors = []
    
    for lighthouse in lighthouses:
        errors.extend(validate_text_existence(page_text, page_number, lighthouse))
        
    return errors

import re
import random
import time

def ensure_space_before_rgw(input_string):
    # Sometimes, there should be a space (e.g. Q W) where 
    # the language model thinks it is QW without space.
    
    # Regex pattern to match a letter followed by R, G, or W, but not if they follow another R, G, or W
    pattern = r'(?<=[a-zA-Z])(?=[RGW])(?<![RGW])'
    
    # Replace function to add a space before R, G, or W
    result = re.sub(pattern, ' ', input_string)
    return result



def parse_lighthouses_for_page(document, page_number):
    image_file_name_masked_lysvidde = f"pages/page{page_number}_masked_lysvidde.png"
    image_file_name_masked_rest = f"pages/page{page_number}_masked_rest.png"
    page = document.load_page(page_number)
    
    convert_pdf_page_to_image(page, image_file_name_masked_lysvidde, mask_lysvidde=True)
    convert_pdf_page_to_image(page, image_file_name_masked_rest, mask_lysvidde=False)
    
    decay_factor = 1
    maximum_backoff = 32000
    start = time.time()
    while True:
        now = time.time()
        if now-start > 120:
            raise Exception(f"Failed requests after 120 seconds to Gemini")
        
        location = random.choice(locations)
        try:
            lighthouses_on_page = extract_lighthouses_using_gemini(image_file_name=image_file_name_masked_lysvidde, location=location)
            
            # Loop through lighthouses_without_range and add range from lighthouses_range
            ocr_elements = perform_ocr(image_file_name_masked_rest)
            for lighthouse in lighthouses_on_page:
                # Sometimes Gemini returns a name with a newline in it
                lighthouse["name"] = lighthouse["name"].split("\n")[0]
                lysvidde = find_lysvidde(ocr_elements, lighthouse["name"])
                lighthouse['range'] = lysvidde
            break
        except Exception as e:
            print(f"Error calling gemini on {location}: {e}", flush=True)
            wait_time = min(maximum_backoff, (decay_factor + random.randint(1,1000)))
            time.sleep(wait_time / 1000) # Sleep random number of ms
            decay_factor *= 2
    
    # Since lower case L and upper case I looks similar,
    # the models confuse them sometimes. We don't. Since FI 
    # is not a valid flash pattern, replace with its valid Fl value.
    for lighthouse in lighthouses_on_page:
        if "pattern" in lighthouse:
            if "FI" in lighthouse["pattern"]:
                lighthouse["pattern"] = lighthouse["pattern"].replace("FI", "Fl")
            # Sometimes lys is clipped so it looks like lvs
            lighthouse["pattern"] = lighthouse["pattern"].replace("lvs", "lys")
            

            lighthouse["pattern"] = ensure_space_before_rgw(lighthouse["pattern"])
            
    errors = validate_extracted_lighthouses(page, page_number, lighthouses_on_page)
    return lighthouses_on_page, errors

# pdf_path = "Fyrliste_HeleLandet.pdf"
# document = pymupdf.open(pdf_path)
# lighthouses_on_page, errors = parse_lighthouses_for_page(document, 29)
# errors

In [None]:
def parse_page(document, page_number):
    # Define the text you are looking for
    search_text = ["Lysvidde", "Fyrnr.", "Kartnr."]
    
    page = document.load_page(page_number)
    
    # Extract text from the page
    text = page.get_text()

    # Check if the page contains the search text
    should_parse_page = all(map(lambda needle: needle in text, search_text))
    if not should_parse_page:
        return [], []
    return parse_lighthouses_for_page(document, page_number)

In [None]:
import concurrent.futures
all_errors = []
all_lighthouses = []

pdf_path = "Fyrliste_HeleLandet.pdf"
document = pymupdf.open(pdf_path)

lighthouses_per_page = [0] * len(document)
errors_per_page = [0] * len(document)

with concurrent.futures.ThreadPoolExecutor(max_workers=30) as executor:
    futures = {executor.submit(parse_page, document, page_number): page_number for page_number in range(len(document))}
    
    for future in concurrent.futures.as_completed(futures):
        page_number = futures[future]
        try:
            lighthouses_on_page, errors = future.result()
            if len(lighthouses_on_page) > 0:
                with open(f"lighthouses/page_{page_number+1}.json", "w", encoding="utf-8") as f:
                    json.dump(lighthouses_on_page, f, indent=2, ensure_ascii=False)
            print(f"Found {len(lighthouses_on_page)} lighthouses and {len(errors)} errors on page {page_number+1}")
            all_errors.extend(errors)
            all_lighthouses.extend(lighthouses_on_page)
            lighthouses_per_page[page_number] = len(lighthouses_on_page)
            errors_per_page[page_number] = len(errors)
            
        except Exception as e:
            print(f"Error parsing page {page_number + 1}: {e}")


In [11]:
pdf_path = "Fyrliste_HeleLandet.pdf"
document = pymupdf.open(pdf_path)
lighthouses_on_page, errors = parse_page(document, 45)
print(json.dumps(lighthouses_on_page, indent=2, ensure_ascii=False))
len(lighthouses_on_page)

98
[
  {
    "area": "Fredrikstad vestre løp",
    "height": 6.0,
    "latitude": {
      "degrees": 59,
      "minutes": 10.7749
    },
    "longitude": {
      "degrees": 10,
      "minutes": 52.3458
    },
    "name": "Gåsungene",
    "sectors": [
      {
        "color": "G",
        "description": "Fra innover Vikerlandet til 62m SØ av Krossnefjellet lanterne.",
        "start": 184.4,
        "stop": 214.8
      },
      {
        "color": "W",
        "description": "Til 107m V av nebbet S av Tankodden, klar V av grønnstakene ved Sturødgrunnen.",
        "start": 214.8,
        "stop": 217.0
      },
      {
        "color": "R",
        "description": "Til 71m SV av Kråka",
        "start": 217.0,
        "stop": 323.9
      },
      {
        "color": "G",
        "description": "Til 442m V av Måkekollflu stang",
        "start": 323.9,
        "stop": 359.5
      },
      {
        "color": "W",
        "description": "Til 205m Ø av Lille Marnet, klar Ø av BRB stake på Torgau

8

In [None]:
with open("parsed_lighthouses.json", "w") as f:
    json.dump(all_lighthouses, f)
with open("parsed_lighthouse_errors.json", "w") as f:
    json.dump(all_errors, f)

In [None]:
all_errors

In [None]:
perform_ocr('pages/page41_masked_rest.png')