# Imports

In [1]:
from transformers import AutoProcessor, AutoModelForTokenClassification
import torch
import geopandas as gpd
import os
from pathlib import Path
from utils import sliding_window
from core import LayoutLMv3Interface, SQLite3Interface
ROOT_DIR = Path(os.getcwd()).parent

os.environ["TOKENIZERS_PARALLELISM"] = "false"

layoutlmv3_interface = LayoutLMv3Interface(
    model_path=os.path.join(ROOT_DIR, "models/layoutlmv3/checkpoint-400"),
    processor_path="microsoft/layoutlmv3-large",
    verbose=True
)

sqllite3_interface = SQLite3Interface(
    database=os.path.join(ROOT_DIR, "ecclesia.db"),
)

SCHEMATISMS_DIR = os.path.join(ROOT_DIR, "data/schematyzmy")
OCR_SCHEMATISMS_DIR = os.path.join(ROOT_DIR, "data/ocr_schematyzmy")
RESULTS_DIR = os.path.join(ROOT_DIR, "data/results")

# Utils

In [2]:
def visualize_tokens_with_labels(image_path, words, bboxes, labels, output_path=None):
    """
    Visualize tokens with bounding boxes and colored labels on the image.
    Ignores tokens with label "O".
    Available labels: {'O', 'building_material', 'dedication', 'parish'}
    """
    from PIL import Image, ImageDraw, ImageFont

    # Define colors for each label
    label2color = {
        'building_material': 'red',
        'dedication': 'orange',
        'parish': 'blue',
        'deanery': 'green',
    }

    # Open image
    with Image.open(image_path) as img:
        img = img.convert("RGB")
        draw = ImageDraw.Draw(img)
        try:
            font = ImageFont.truetype("Arial", 14)
        except IOError:
            font = ImageFont.load_default()

        for word, bbox, label in zip(words, bboxes, labels):
            if label == "O":
                continue
            color = label2color.get(label, "black")
            draw.rectangle(bbox, outline=color, width=2)
            # Draw label and word above the box
            label_text = f"{label}: {word}"
            # Use textbbox instead of textsize
            left, top, right, bottom = draw.textbbox((0, 0), label_text, font=font)
            text_width = right - left
            text_height = bottom - top
            text_x, text_y = bbox[0], max(0, bbox[1] - text_height)
            draw.rectangle([text_x, text_y, text_x + text_width, text_y + text_height], fill=(255,255,255,180))
            draw.text((text_x, text_y), label_text, fill=color, font=font)

        if output_path:
            img.save(output_path)
        return img


# Prompts

In [3]:
system_prompt = """You are a specialized extraction assistant that identifies and labels specific information from 19th‑century Latin ecclesiastical schematisms (diocesan notices).

Your task is to extract labeled text spans from Latin‑Polish diocesan notices and return them as a JSON list with positional information. You must align Latin terms in the source text with their Polish equivalents in the ground‑truth table.

---

## Labels and Their Meanings

| Label                  | What to Extract                    | Latin Format Examples                                                                               | Polish Translation              |
| ---------------------- | ---------------------------------- | --------------------------------------------------------------------------------------------------- | ------------------------------- |
| **parish**             | Parish name                        | Usually capitalized; follows the ordinal number **or** appears as the first word in the notice line | Direct name (may vary slightly) |
| **page_number**       | Page number in the schematism      | Usually numeric, often first in notice                                                              | Same number                     |
| **dedication**         | Church's dedication / patron saint | `S.`, `SS.`, `B.M.V.`, `Nativ.` etc.                                                                | Full saint name in Polish       |
| **building_material** | Church construction material       | `mur.`, `mr`, `murata`, `ex muro` (masonry) — `lig.`, `l.`, `dr`, `lignea` (wood)                   | `murowany` / `drewniany`        |
| **deanery**            | Deanery name                       | Appears after `dekanat` or `decanatus`                                                              | Name in Polish                  |

---

## **Parsing Rules** (supersede earlier versions)

1. **Notice boundaries**
   *Ignore headings.* Textual headers such as `ECCLESIA Cathedr. …`, `PAROCHUS …`, etc. belong to the preceding context and **must be ignored** for extraction purposes. The *notice proper* begins at the first line that either:

   * starts with an ordinal number (`"1.", "2.", "—"`, etc.), **or**
   * starts with the parish name in its Polish nominative form followed by a comma.

2. **Parish name (form preference)**

   * Extract the **nominative form that appears verbatim** in the tokens, favouring the Polish spelling with diacritics (e.g., `Tarnów`).
   * Disregard Latinised genitive/locative endings such as `-ae`, `-i`, `-ensis`, etc. (`Tarnoviae`, `Cracoviae`) **unless** no Polish nominative form occurs anywhere in the notice.

3. **Ground‑truth alignment**

   * If the Polish nominative form is present, extract that form so that it matches the Polish ground‑truth table.
   * If only a Latin form is available, extract that Latin form **and** update the ground‑truth table accordingly (outside this assistant).

4. **Other extraction constraints** 

   1. Extract **only** within the boundaries of a single notice.
   2. Never split or merge tokens that are pre‑segmented in the input.
   3. Match Latin terminology with corresponding Polish concepts in the ground truth.
   4. Include positional information (word indices) for each extraction.
   5. Omit any text that doesn't match the specific labels.


---

## Few‑shot Examples
# EXAMPLE 1
### Ground Truth (Polish)
[{'deanery': 'Dąbrowa', 'parish': 'Bolesław', 'dedication': 'Wojciech Biskup Męczennik', 'material_type': 'mr'}]
### Input text
[TEXT_START]
In Circulo quondam Tarnoviensi.

1. Decanatus Dabrovaensis.

1. Bolesław, P. E. p. mur. — a. 1632 per Stanisl,
Ligęza e ligno aedif. 22. Oct. 1634 per Thom. Oborski
Ep. Laodicens. cons. dein desolata et combusta, — a. 1731
mur. et per Joann. Skarbek Arcbi-Epp. Leopoliens. cons.
Jam vero a. 1326 in opere per Aug. Theiner edito de hac
ecclesia et curato mentio fit. T. E. S. Adalbertus E. M,
Matr. ex a° 1648. Patr. T. D. Marianus Eques de Sroczyński.
[TEXT_END]
### OUTPUT
[
  {"label":"deanery", "text":"Decanatus Dabrovaensis.",  "text_match_patch: "In Circulo quondam Tarnoviensi.\n1. <DEANERY>Decanatus Dabrovaensis.</DEANERY>\n1. Bolesław" ,
  {"label":"building_material", "text":"mur.",  "text_match_patch: "combusta, — a. 1731\n<BUILDING_MATERIAL>mur.</BUILDING_MATERIAL> et per" ,
  {"label":"parish", "text":"Bolesław", "text_match_patch: "Decanatus Dabrovaensis.\n1. <PARISH>Bolesław</PARISH>, P. E." ,
  {"label":"dedication", "text":"T. E. S. Adalbertus E. M,",  "text_match_patch: "mentio fit. <DEDICATION>T. E. S. Adalbertus E. M,</DEDICATION>\nMatr." ,
]

# EXAMPLE 2
### Ground Truth (Polish)

Ground truth:  
{'deanery': 'Dąbrowa', 'parish': 'Gręboszów', 'dedication': 'Najświętsza Maryja Panna Wniebowzięta', 'material_type': 'mr'}, 
{'deanery': 'Dąbrowa', 'parish': 'Dąbrowa', 'dedication': 'Wszyscy Święci', 'material_type': 'dr'}]
### Input text
[TEXT_START]
140. Mẹdrzychów Y, 1240. Kupienin ®/, m. 460. — Univ.
Cath. 6206. Jud, 435.

Capitaneatus districtualis et off. postale Dąbrowa.

2. Dabrowa, 0. E. p. lign. A. E. ign. olim praep.
cum proprio Promot. SS. Rosarii. Eccl. antiqua per Nicolaum
Spytek Ligęza Casteļlanum dotata, 1614 per Valerianum
Lubieniecki Epp. Bacoviensem cons. ob vetustatem desolata,
nova amplior 1774. per Cajetanum Potocki Canon. Cracov.
de ligno extructa, per Gregorium Thomam Ziegler Epp. Ty-
niec. 1824. cons. T. E. 00. SS, Matr. ant. ex a. 1611.
Patr. T. D. Eugenius de Jordan Stojowski.
Capitan. distr. et off. post. Dabrowa.

3. Greboszów, P. E. p. mur. A. E. ignot. ast
ante annum 1326, existens, juxta Theiner tom. I. p. 252.
Praesens eccl. a Francisco de Dembiany Dembiński Palat,
Cracov. 1650 aedificata, per Nicol. Oborski Epp. Laodicen.
1675 in honorem Assumptionis B. M. V. cons. Matr. ant.
Nator. -ex a. 165t. Patr. T. D. Sophia Comitissa Załuska,
[TEXT_END]
### OUTPUT
[
   {"label":"parish", "text":"Dąbrowa", "text_match_patch: "postale Dąbrowa. 2. <PARISH>Dabrowa</PARISH>, 0. E." ,
   {"label":"building_material", "text":"lign.",  "text_match_patch: "E. p. <BUILDING_MATERIAL>lign.<BUILDING_MATERIAL> A. E. ign." ,
   {"label":"dedication", "text":"T. E. 00. SS.",  "text_match_patch: "1824. cons. <DEDICATION>T. E. 00. SS.</DEDICATION> Matr. ant." ,
   {"label":"parish", "text":"Greboszów", "text_match_patch: "post. Dabrowa. 3.<PARISH>Greboszów</PARISH>, P. E." ,
   {"label":"building_material", "text":"mur.",  "text_match_patch: "E. p. <BUILDING_MATERIAL>mur.</BUILDING_MATERIAL> A. E. ignot." ,
   {"label": "dedication", "text":"Assumptionis B. M. V.",  "text_match_patch: "in honorem <DEDICATION>Assumptionis B. M. V.</DEDICATION> cons." ,}
]

## END OF EXAMPLES
"""


In [4]:
user_prompt = """
## User Prompt

Extract and label information from this Latin ecclesiastical schematism that corresponds to the provided ground truth table in Polish.

### Ground Truth (Polish)
{ground_truth}


### Instructions
1. Identify Latin terms in the tokens that correspond to the Polish ground truth information.
2. For "material_type", look for terms like "mur.", "mr", "murata" (for masonry) or "lig.", "dr", "lignea" (for wood).
3. For "dedication", find Latin abbreviations like "S.", "SS.", "B.M.V.", "Nativ." that represent the Polish saint/dedication.
4. The "parish" name ussually is the same as the one in the ground truth table. It is very important to match the name with the token sequence.
5. Return only a JSON array with labeled items - exclude anything that doesn't match the required labels.
6. Dont output the ''' json beggin''' and '''json end''' tags.

### Input text
[TEXT_START]
{input_text}
[TEXT_END]

I need a JSON output containing only the labeled information with text exactly as found in the tokens, remember to include the `text_match_patch` with valid text segment.. Response should include only the JSON array, nothing else. The ouput should be a valid JSON array.
Return **only** a JSON array where each element is an object with:

* `label`: one of the defined labels
* `text`: the exact span as it appears in the input tokens (subject to Rule 2)
* `text_match_patch`: the text span with `<LABEL>`…`</LABEL>` tags, sufficient to locate it in context, be sure to include the text before and after the match, **only one pair of tags per match**, should contain around 5 tokens before and after the match, but not more than 10 tokens in total.

"""

# Pytesseract Interface

# LLM Interface

In [5]:
from openai import OpenAI
from mistralai import Mistral
import logging

class LLM:
    def __init__(self, client, model_name: str,  system_prompt: str, user_prompt: str, model_parameters: dict = None, stream: bool = False, no_think: bool = False, verbose: bool = False):
        
        self.client = client
        self.model_name = model_name
        self.system_prompt = system_prompt
        self.user_prompt = user_prompt
        self.messages = [
            {"role": "system", "content": self.system_prompt},
        ]
        self.model_parameters = model_parameters
        self.no_think = no_think
        self.verbose = verbose

        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(logging.INFO)
        
        if not self.logger.handlers:
            handler = logging.StreamHandler()
            formatter = logging.Formatter('[%(levelname)s] %(message)s')
            handler.setFormatter(formatter)
            self.logger.addHandler(handler)
        

    def stream_response(self):
        self.logger.info("\n" + "-"*20 + " Start of streamed response " + "-"*20)
        if isinstance(self.client, OpenAI):
            response = self.client.chat.completions.create(
                model=self.model_name,
                messages=self.messages,
                **self.model_parameters
            )
        elif isinstance(self.client, Mistral):
            response = self.client.chat.stream(
                model=self.model_name,
                messages=self.messages,
                **self.model_parameters
            )
        
        complete_response = ""
        for streamed_response in response:
            if isinstance(self.client, OpenAI):
                streamed_chunk = streamed_response.choices[0].delta.content
            elif isinstance(self.client, Mistral):
                streamed_chunk = streamed_response.data.choices[0].delta.content

            if streamed_chunk:
                if self.verbose:
                    print(streamed_chunk, end="", flush=True)
                complete_response += streamed_chunk
            else:
                if self.verbose:
                    print(" ", end="", flush=True) 
        self.logger.info("\n" + "-"*20 + " End of streamed response " + "-"*20)

        return complete_response
    
    def completions_response(self):
        self.logger.info("\n" + "-"*20 + " Start of non-streamed response " + "-"*20)
        for i in range(3):
            try:
                if isinstance(self.client, Mistral):
                    response = self.client.chat.complete(
                        model=self.model_name,
                        messages=self.messages,
                        **self.model_parameters
                    )
                    response_content = response.choices[0].message.content
                    break
                elif isinstance(self.client, OpenAI):

                    response = self.client.chat.completions.create(
                        model=self.model_name,
                        messages=self.messages,
                        **self.model_parameters
                    )
                    response_content = response.choices[0].message.content
                    break
            except Exception as e:
                self.logger.error(f"Error generating response: {e}")
                if i == 2:
                    raise
        self.logger.info(response_content)
        self.logger.info("\n" + "-"*20 + " End of non-streamed response " + "-"*20)
        return response_content

    def process_response(self, response):
        import re
        import json
        # get rid of the <think> and </think> tags
        json_response = re.sub(r"<think>.*?</think>", "", response, flags=re.DOTALL | re.MULTILINE).strip()
        return json.loads(json_response)

    def generate(self):
        self.logger.info(f"Model name: {self.model_name} - {self.client.__class__.__name__}")
        if self.model_parameters["stream"]: 
            response = self.stream_response()
        else:
            response = self.completions_response()

        return response
    
    def get_labeles(self, page_ocr: str, ground_truth: str):
        self.messages.append({"role": "user", "content": self.user_prompt.format(input_text=page_ocr, ground_truth=ground_truth)})
        if self.no_think:
            self.messages[-1]["content"] += "/no_think"
        response = self.generate()
        
        json_response = self.process_response(response)
        self.logger.info(f"Json response: {json_response}")
        return json_response

# Utils

In [6]:
import re
from rapidfuzz import fuzz

def normalize_text(text: str) -> str:
    """
    Normalize text by:
    1. Converting to lowercase
    2. Removing trailing punctuation
    3. Removing diacritics
    4. Standardizing whitespace
    """
    import unicodedata
    
    # Convert to lowercase
    text = text.lower()
    
    # Remove diacritics (normalize unicode characters)
    text = unicodedata.normalize('NFKD', text).encode('ASCII', 'ignore').decode('ASCII')
    
    # Remove trailing punctuation and standardize whitespace
    text = re.sub(r'[.,;:]+$', '', text)
    text = re.sub(r'\s+', ' ', text).strip()
    
    return text

def find_fuzzy_span_indices(span, tokens, context=None, threshold=60):
    """Find token indices that best match the span, using context if provided."""
    # Normalize the span and its tokens
    span_normalized = normalize_text(span)
    span_tokens = span_normalized.split()
    n = len(span_tokens)
    
    # Normalize all input tokens
    tokens_normalized = [normalize_text(t) for t in tokens]
    
    best_score = -1
    best_indices = []

    # Slide through tokens looking for matches
    for i in range(len(tokens) - n + 1):
        window = tokens[i:i+n]
        window_normalized = " ".join(tokens_normalized[i:i+n])
        
        # Calculate base similarity score using normalized text
        score = fuzz.ratio(span_normalized, window_normalized)
        
        # Check context if provided
        if context and score >= threshold:
            before, after = context
            before_normalized = normalize_text(before)
            after_normalized = normalize_text(after)
            
            if before:
                before_window = " ".join(tokens_normalized[max(0, i-len(before.split())):i])
                if fuzz.ratio(before_normalized, before_window) < 70:  # Stricter threshold
                    score -= 20
            
            if after:
                after_window = " ".join(tokens_normalized[i+n:i+n+len(after.split())])
                if fuzz.ratio(after_normalized, after_window) < 70:
                    score -= 20

        if score > best_score and score >= threshold:
            best_score = score
            best_indices = list(range(i, i+n))

    return best_indices

def extract_context_from_patch(patch, span):
    """Extract normalized context before and after the labeled span."""
    before = after = ""
    match = re.search(r'(.*)<[^>]*>%s</[^>]*>(.*)' % re.escape(span), patch, flags=re.S)
    if match:
        # Get context words (up to 3 words before and after)
        before_words = match.group(1).strip().split()
        after_words = match.group(2).strip().split()
        
        before = " ".join(before_words)
        after = " ".join(after_words)
    
    return before, after

In [7]:

lm_studio_client = OpenAI(
    base_url="http://localhost:1234/v1",
    api_key="lm-studio"       # LM Studio doesn’t enforce a real key
)




labeler_schema = {
    "type": "json_schema",
    "json_schema": {
        "name": "annotations",
        "schema": {
            "type": "array",
            "items": {
                "type": "object",
                "properties": {
                    "label": {
                        "type": "string",
                        "enum": ["parish", "building_material", "dedication", "deanery", "page_number"]
                    },
                    "text": {"type": "string"},
                    "text_match_patch": {"type": "string"}
                },
                "required": ["label", "text", "text_match_patch"]
            },
            "minItems": 1
        }
    }
}

lm_studio_parameters = {
    # "temperature": 0.0,
    # "top_p": 0.9,
    # "stop": ["\n\n"],
    "stream": True,
    "response_format": labeler_schema,
}

# llm = LLM(client=lm_studio_client,
#           model_name="qwen3-30b-a3b-mlx",
#           system_prompt=system_prompt,
#           user_prompt=user_prompt,
#           model_parameters=lm_studio_parameters,
#           verbose=True,
#           no_think=True,
#           )





# Client setup

In [None]:
mistral_client = Mistral(api_key="")

mistral_parameters = {
    "temperature": 0.5,
    # "frequency_penalty": 1.2,  # Add this
    # "presence_penalty": 0.6, 
    # "top_p": 0.9,
    # "stop": ["\n\n"],
    "stream": True,
    "response_format": {
          "type": "json_object",
      }
}

llm = LLM(client=mistral_client,
          model_name="mistral-large-latest",
          system_prompt=system_prompt,
          user_prompt=user_prompt,
          model_parameters=mistral_parameters,
          verbose=True,
          no_think=True,
          )

# Annotation specification

In [9]:
schematisms_to_evaluate = ["wloclawek_1872"]
LLM_ANNOTATIONS_DIR = os.path.join(ROOT_DIR, "data/llm_annotations")

# Logger

In [10]:
import logging

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

# Main loop

In [11]:
for schematism in schematisms_to_evaluate:
    geometries = sqllite3_interface.query(["id", "dekanat", "miejsce", "wezwanie", "material_typ", "the_geom" , "skany"], {"skany": f"'{schematism}'"}, table_name="dane_hasla")
    shapefile_path = os.path.join(SCHEMATISMS_DIR, schematism, "matryca/matryca.shp")

    shp_gdf = gpd.read_file(shapefile_path)

    import geopandas as gpd
    from shapely import wkt

    geom_list = []
    for geom in geometries:
        _, deanery, parish, dedication, material_type, geom, schematism = geom

        try:
            geom = wkt.loads(geom)
        except Exception as e:
            logger.error(f"Error loading WKT: {geom}")
            logger.error(str(e))
            continue
        geom_list.append({
            "deanery": deanery,
            "parish": parish,
            "dedication": dedication,
            "material_type": material_type,
            "geom": geom,
            "schematism": schematism
        })
        logger.debug(f"Geometry loaded: {geom}")

    sql_gdf = gpd.GeoDataFrame(geom_list, geometry="geom", crs=shp_gdf.crs)

    joined_gdf = gpd.sjoin(shp_gdf, sql_gdf, how="inner", predicate="intersects")

    for file_name in sorted(os.listdir(os.path.join(SCHEMATISMS_DIR, schematism))):
        annotation_file = os.path.join(LLM_ANNOTATIONS_DIR, schematism, file_name.replace(".jpg", ".json"))
        if os.path.exists(annotation_file):
            logger.info(f"Skipping existing annotation file: {annotation_file}")
            continue
            
        pytesseract_ocr = PytesseractOCRInterface(
            schematisms_source_dir=SCHEMATISMS_DIR,
            schematisms_ocr_target_dir=OCR_SCHEMATISMS_DIR,
            schematism=schematism,
            langs=["pol", "lat"],
            force_ocr=False,
            verbose=True
        )

        if not file_name.endswith(".jpg"):
            continue

        image_path = os.path.join(SCHEMATISMS_DIR, schematism, file_name)
        page_ground_truth = joined_gdf[joined_gdf["location"] == file_name]
        
        if not page_ground_truth.empty:
            logger.info(f"\n{'='*80}\nProcessing {file_name}...\n{'='*80}")

            ocr_data = pytesseract_ocr.load_ocr_data(file_name)
            tokens = ocr_data["words"]
            labels = ["O"] * len(ocr_data["words"])
            
            # Clean up ground truth data
            clean_ground_truth = page_ground_truth.drop(columns=["geometry", "location", "index_right", "schematism"]).to_markdown(index=False)
            logger.info(f"\nGround Truth:\n{'-'*40}\n{clean_ground_truth}\n{'-'*40}")
            logger.info(f"\nOCR Text:\n{'-'*40}\n{ocr_data['complete_text']}\n{'-'*40}")
            
            labels_json = llm.get_labeles(page_ocr=ocr_data["complete_text"], ground_truth=clean_ground_truth)
            llm.messages.pop(-1)  # Remove the last user message
            


            for annotation in labels_json:

                if isinstance(annotation, str):
                    try:
                        annotation = json.loads(annotation.strip())
                    except json.JSONDecodeError as e:
                        logger.error(f"Error decoding JSON response: {e}")
                        continue

                label = annotation["label"]
                patch = annotation["text_match_patch"]
                
                span = re.search(r'<[^>]*>(.*?)</', patch, flags=re.S)
                if not span:
                    logger.error(f"Error: No span found in patch: {patch}")
                    continue
                else:
                    span = span.group(1)
                    
                # Extract context and find matching tokens
                before, after = extract_context_from_patch(patch, span)
                matching_indices = find_fuzzy_span_indices(span, tokens, context=(before, after))
                
                # Apply labels to matching tokens
                for idx in matching_indices:
                    labels[idx] = label

            if not os.path.exists(os.path.join(LLM_ANNOTATIONS_DIR, schematism)):
                os.makedirs(os.path.join(LLM_ANNOTATIONS_DIR, schematism))
                

            annotated_image = visualize_tokens_with_labels(image_path, ocr_data["words"], ocr_data["bboxes"], labels, output_path=None)
            display(annotated_image)

            with open(annotation_file, "w", encoding="utf-8") as f:
                content = {
                    "words": tokens,
                    "bboxes": ocr_data["bboxes"],
                    "labels": labels
                }
                json.dump(content, f, ensure_ascii=False, indent=4)
                logger.info(f"\nAnnotation file saved: {annotation_file}")
        else:
            logger.debug(f"No ground truth data found for {file_name}")
            continue


Query: SELECT id, dekanat, miejsce, wezwanie, material_typ, the_geom, skany FROM dane_hasla WHERE skany = 'wloclawek_1872'


DataSourceError: /Users/user/Projects/AI_Osrodek/data/schematyzmy/wloclawek_1872/matryca/matryca.shp: No such file or directory