In [6]:
import yaml
import os

In [2]:
import re
with open("../data/labels/victor/NL-HaNA_2.10.50_45_0091.jpg.html", 'r', encoding='utf-8') as f:
    constructed_html = f.read()

In [3]:
from bs4 import BeautifulSoup

soup = BeautifulSoup(constructed_html, 'html.parser')

rows = soup.find_all('tr')

# --- Step 1: make a grid keeping track of rowspans ---
logical_rows = []
rowspans = {}  # {(col_idx): [remaining_rows, cell_content]}

for r_idx, tr in enumerate(rows):
    # Start with any carried over rowspans from previous rows
    current_row = []
    to_remove = []

    # Fill carried-down cells first
    for col_idx, (remaining, cell) in rowspans.items():
        current_row.append(cell)
        rowspans[col_idx][0] -= 1
        if rowspans[col_idx][0] <= 0:
            to_remove.append(col_idx)
    for col_idx in to_remove:
        del rowspans[col_idx]

    # Now add new cells from this row
    c_idx = 0
    for td in tr.find_all('td'):
        while any(k == c_idx for k in rowspans):  # skip columns occupied by carried cells
            c_idx += 1
        text = td.get_text(" ", strip=True)
        cell_id = td.get('id')
        r_idx = int(td.get('row', r_idx))
        c_idx = int(td.get('col', c_idx))
        rowspan = int(td.get('rowspan', 1))
        colspan = int(td.get('colspan', 1))

        cell_data = {
            'text': text,
            'id': cell_id,
            'row': r_idx,
            'col': c_idx,
            'rowspan': rowspan,
            'colspan': colspan
        }

        # Place cell in current row
        current_row.append(cell_data)

        # Store for future rows if rowspan > 1
        if rowspan > 1:
            rowspans[c_idx] = [rowspan - 1, cell_data]

        c_idx += colspan

    logical_rows.append(current_row)

# TODO: update the code, so that it can run for more than one person / row

In [None]:

def generate_text(logical_rows, temp_path="../data/temp/"):
    import yaml

    cell_spans = []     # list of dicts: {id, start, end}
    row_text = ""       # concatenated text of the entire table
    cursor = 0          # tracks current char offset

    if not os.path.exists(temp_path):
        os.makedirs(temp_path)

    for i, row in enumerate(logical_rows):
        if i != 1:
            continue
        for cell in row:
            text = cell["text"]
            cid = cell["id"]

            start = cursor
            end = start + len(text)

            cell_spans.append({
                "id": cid,
                "start": start,
                "end": end,
                "text": text
            })

            row_text += text + '\n'
            cursor = end
        with open(os.path.join(temp_path , "table_cells.yaml"), "w", encoding="utf-8") as f:
            yaml.dump(cell_spans, f, allow_unicode=True, sort_keys=False)

        with open(os.path.join(temp_path, "row.txt"), "w+") as f:
            f.write(row_text)

generate_text(logical_rows)

In [10]:
import subprocess

def run_ontogpt(input_path="row.txt",
                template="personbasicinfo.yaml",
                model="ollama/llama3",
                output="person.yaml",
                cwd=None,
                env=None):
    """
    Run the same command as the %%bash cell from Python.
    - cwd: working directory where row.txt and template live (e.g. "../data/temp")
    - env: optional environment dict (os.environ copy + overrides)
    Returns subprocess.CompletedProcess
    """
    cmd = ["ontogpt", "extract", "-i", input_path, "-t", template, "-m", model, "-o", output]
    proc = subprocess.run(cmd, cwd=cwd, env=env, capture_output=True, text=True)
    if proc.returncode != 0:
        print("ontogpt failed (rc={}):".format(proc.returncode))
        print(proc.stderr)
        raise RuntimeError("ontogpt command failed")
    print(proc.stdout)
    return proc

# Example usage: run where generate_text wrote row.txt (default "../data/temp/")
run_ontogpt(cwd="../data/temp")




CompletedProcess(args=['ontogpt', 'extract', '-i', 'row.txt', '-t', 'personbasicinfo.yaml', '-m', 'ollama/llama3', '-o', 'person.yaml'], returncode=0, stdout='', stderr='ERROR:root:Cannot find slot for here_are_the_extracted_entities_in_the_requested_format in Here are the extracted entities in the requested format:\nERROR:root:Cannot find slot for here_is_the_text_split_into_fields_in_the_specified_format in Here is the text split into fields in the specified format:\nERROR:root:Cannot find slot for note in Note: There is no mention of a Rochell entity in the provided text, which suggests that this might be an incorrect or incomplete entry.\n')

In [12]:
def map_text_spans_to_cell(yaml_path, cells_path:yaml):
    def find_cell_for_span(start, spans):
        """
        Return the cell_id whose text covers the character span starting at 'start'.
        Span belongs to the cell where:  cell.start <= start < cell.end
        """
        for item in spans:
            if item["start"] <= start < item["end"]:
                return item["id"]
        return None
    
    # ------------------------------
    # 1. Load ONTOGPT output file
    # ------------------------------
    with open(yaml_path, "r", encoding="utf-8") as f:
        data = yaml.safe_load(f)

    named_entities = data.get("named_entities", [])

    # ------------------------------
    # 2. Load cell_spans index
    # ------------------------------

    with open(cells_path, "r", encoding="utf-8") as f:
        cell_spans = yaml.safe_load(f)


    # ------------------------------
    # 3. For each named entity, map span to cell id
    # ------------------------------

    for ent in named_entities:
        spans = ent.get("original_spans", [])
        ent_cells = []

        for span_str in spans:
            try:
                start, end = map(int, span_str.split(":"))
            except:
                continue

            cell_id = find_cell_for_span(start, cell_spans)
            ent_cells.append(cell_id)

        # Add new slot "cell"
        if ent_cells:
            # if only one span, store a single value
            ent["cell"] = ent_cells[0] if len(ent_cells) == 1 else ent_cells
        else:
            ent["cell"] = None


    # ------------------------------
    # 4. Save updated YAML
    # ------------------------------

    output_path = yaml_path
    with open(output_path, "w", encoding="utf-8") as f:
        yaml.dump(data, f, sort_keys=False, allow_unicode=True)

    print("✓ Updated YAML saved to", output_path)

map_text_spans_to_cell("../data/temp/person.yaml", "../data/temp/table_cells.yaml")

✓ Updated YAML saved to ../data/temp/person.yaml


In [14]:
def convert_yaml_to_json():
    import yaml
    import json
    from copy import deepcopy


    def load_yaml(path):
        with open(path, "r", encoding="utf-8") as f:
            return yaml.safe_load(f)


    def get_entity_for_value(raw_value, named_entities):
        """
        Given a raw value like 'AUTO:Wageningen',
        return the full named_entity dict (label, cell, original_spans),
        or None if not found.
        """
        if not isinstance(raw_value, str):
            return None

        for ent in named_entities:
            if ent.get("id") == raw_value:
                return ent

        return None


    def process_field(value, named_entities):
        """
        Converts a YAML value into normalized JSON format with:
        - value
        - cell
        - original_spans
        Handles nested dicts.
        """

        # Case 1: nested dict → process each subfield recursively
        if isinstance(value, dict):
            processed = {}
            for k, v in value.items():
                processed[k] = process_field(v, named_entities)
            return processed

        # Case 2: literal field or AUTO reference
        ent = get_entity_for_value(value, named_entities)

        if ent:
            # match found: use curated label + provenance
            return {
                "value": ent.get("label"),
                "cell": ent.get("cell"),
                "original_spans": ent.get("original_spans")
            }

        # No entity match → raw literal value
        return {
            "value": value,
            "cell": None,
            "original_spans": None
        }


    def convert_yaml_to_person_json(data):
        extracted = deepcopy(data["extracted_object"])
        named_entities = data["named_entities"]

        person = {}

        # Process all fields dynamically
        for key, value in extracted.items():
            person[key] = process_field(value, named_entities)

        return {"persons": [person]}


    def write_json(path, obj):
        with open(path, "w+", encoding="utf-8") as f:
            json.dump(obj, f, indent=2, ensure_ascii=False)


    # -------------------------------
    # Example usage
    # -------------------------------
    if __name__ == "__main__":
        yaml_path = "../data/temp/person.yaml"
        json_out = "../data/temp/person.json"

        data = load_yaml(yaml_path)
        result = convert_yaml_to_person_json(data)
        write_json(json_out, result)

        print(json.dumps(result, indent=2, ensure_ascii=False))
