In [69]:
import os
import re
import csv
import ast
import pandas as pd
from PIL import Image

from api_ocr import call_mistral_ocr
from local_ocr import call_local_ocr

In [70]:
# -------------------
# STEP 1: Read config.txt and api_key.txt
# -------------------
def read_config(config_path="config.txt"):
    config = {}
    with open(config_path, "r") as f:
        for line in f:
            if "=" in line:
                key, value = line.strip().split("=", 1)
                config[key.strip()] = value.strip()
    return config

config = read_config()

variant = config.get("variant")
input_image_size = tuple(map(int, config.get("input_image_size").split("x")))
image_dir = config.get("image_dir", "input")
output_dir = config.get("output_dir", "output")
config_lang = config.get("language", "").lower()  # NEW
os.makedirs(output_dir, exist_ok=True)

# Check for API key
api_key_path = "api_key.txt"

try:
    with open(api_key_path, "r") as f:
        api_key = f.read().strip()
except FileNotFoundError:
    print("API key file not found. Using local model...")
    api_key = None


In [71]:
# -------------------
# STEP 2: Load var_ref.py to get Gamma document
# -------------------
def load_var_ref(file_path="var_ref.py"):
    with open(file_path, "r") as f:
        content = f.read()
    var_ref = ast.literal_eval(content.split("=", 1)[1].strip())
    return var_ref

var_ref = load_var_ref()
gamma_file = var_ref.get(variant)
gamma_path = os.path.join("gamma-master", gamma_file)

In [72]:
# -------------------
# STEP 3: Load Gamma CSV
# -------------------
df = pd.read_csv(gamma_path)

In [73]:
# -------------------
# STEP 4: Check image size tolerance
# -------------------
def check_image_size(image_path, expected_size, tolerance=0.02):
    img = Image.open(image_path)
    actual_size = img.size  # (width, height)
    return all(abs(a - e) / e <= tolerance for a, e in zip(actual_size, expected_size))


In [74]:
# -------------------
# STEP 5: Extract buic id & language from filename
# -------------------
def parse_filename(filename):
    match = re.match(r".*_(\d+_\d+)_([A-Z]+)\.png", filename, re.IGNORECASE)
    if match:
        buic_id = match.group(1)
        lang = match.group(2).lower()
        return buic_id, lang
    return None, None

In [75]:
# -------------------
# STEP 6: Process images
# -------------------
results = []
mismatch_counter = {}  # track mismatches per language

for filename in os.listdir(image_dir):
    if not filename.lower().endswith(".png"):
        continue

    image_path = os.path.join(image_dir, filename)

    # Extract buic id and language (if not in config)
    buic_id, lang = parse_filename(filename)
    if not buic_id:
        continue

    # Row lookup
    row = df[df[df.columns[0]] == buic_id]
    if row.empty:
        continue

    # Determine language columns
    if config_lang and config_lang != "all":
        lang_cols = [c for c in df.columns if c.startswith(config_lang)]
    elif config_lang == "all":
        lang_cols = [c for c in df.columns if re.match(r"^[a-z]{2}", c, re.I)]
    else:
        # fallback: infer from filename
        if not lang:
            continue
        lang_cols = [c for c in df.columns if c.startswith(lang)]

    if not lang_cols:
        continue

    # Combine expected texts for each language prefix
    grouped_langs = {}
    for col in lang_cols:
        prefix = re.match(r"^[a-z]+", col, re.I).group(0).lower()
        val = str(row.iloc[0][col]) if not pd.isna(row.iloc[0][col]) else ""
        val = val.strip()
        if not val:
            continue
        if prefix not in grouped_langs:
            grouped_langs[prefix] = []
        grouped_langs[prefix].append(val)

    if not grouped_langs:
        continue

    # Initialize row
    row_result = {"Id": buic_id}

    # Handle size mismatch
    if not check_image_size(image_path, input_image_size):
        for prefix, expected_list in grouped_langs.items():
            combined_expected = " ".join(expected_list)
            row_result.update({
                f"{prefix} Expected": combined_expected,
                f"{prefix} Extracted": "Faulty image",
                f"{prefix} Result": "FAIL"
            })
            mismatch_counter[prefix] = mismatch_counter.get(prefix, 0) + 1
        results.append(row_result)
        continue

    # OCR Handling
    if api_key:  # Use Mistral OCR API
        ocr_response = call_mistral_ocr(api_key, image_path)
    else:  # Use local OCR
        ocr_response = call_local_ocr(image_path)

    pages = ocr_response.get("pages", [])
    if pages:
        text = " ".join(page.get("markdown", "") for page in pages)
        extracted_text = " ".join(text.split()).strip()
    else:
        extracted_text = ""

    # For each language (combine columns)
    row_result = {"Id": buic_id}
    for prefix, expected_list in grouped_langs.items():
        if any(e in ["{#1}", "(#1)"] for e in expected_list):
            row_result.update({
                f"{prefix} Expected": " ".join(expected_list),
                f"{prefix} Extracted": "dynamic input",
                f"{prefix} Result": "SKIP"
            })
            continue

        combined_expected = " ".join(expected_list)
        result = "PASS" if combined_expected in extracted_text else "FAIL"
        if result == "FAIL":
            mismatch_counter[prefix] = mismatch_counter.get(prefix, 0) + 1

        row_result.update({
            f"{prefix} Expected": combined_expected,
            f"{prefix} Extracted": extracted_text,
            f"{prefix} Result": result
        })

    results.append(row_result)

In [76]:
# -------------------
# STEP 7: Save results CSV
# -------------------
if results:
    # Collect all headers (preserve order: Id first, then langs)
    fieldnames = ["Id"]
    for r in results:
        for key in r.keys():
            if key not in fieldnames:
                fieldnames.append(key)

    # Base filename
    base_file = os.path.join(output_dir, f"{variant}_ocr_results.csv")
    output_file = base_file

    # If file exists, add suffix (1), (2), ...
    if os.path.exists(output_file):
        counter = 1
        base_name, ext = os.path.splitext(base_file)
        while os.path.exists(output_file):
            output_file = f"{base_name} ({counter}){ext}"
            counter += 1

    # Write CSV
    with open(output_file, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(results)

        # Write mismatch counts row
        mismatch_row = {fieldnames[0]: "Total Mismatches"}
        for field in fieldnames[1:]:
            if field.endswith("Result"):
                prefix = field.split()[0].lower()
                mismatch_row[field] = mismatch_counter.get(prefix, 0)
        writer.writerow(mismatch_row)

    print(f"Results saved to {output_file}")
else:
    print("No results to save.")


Results saved to output\C1AHSEVO_AMDC_ocr_results (1).csv
