In [1]:
from dotenv import load_dotenv

load_dotenv()


True

In [None]:
from pathlib import Path
import sys

import json
import re

from typing import Dict, List

from uuid import uuid4

from datetime import date, datetime

from langchain_google_genai import ChatGoogleGenerativeAI

# Ensure notebooks can import project modules.
PROJECT_ROOT = Path().resolve().parent
if str(PROJECT_ROOT) not in sys.path:
    sys.path.insert(0, str(PROJECT_ROOT))

from Models import artifact_db
from Models.artifact_models import (
    Artifact,
    DocumentExtraction,
    Person,
    Location,
    ContextChunk,
    Event,
    Milestone,
)


In [None]:
# Configure the artifact source and metadata.
ARTIFACT_FILE = Path("../Library/data.txt")
MAX_CHARS = 8000  # trim super long sources for notebook experimentation

ARTIFACT_TITLE = "Nikola Tesla - Test Manuscript"
ARTIFACT_AUTHOR = "Nikola Tesla"

artifact_db.init_db()
artifact_metadata = Artifact(title=ARTIFACT_TITLE, author=ARTIFACT_AUTHOR)
print(f"Reading from: {ARTIFACT_FILE.resolve()}")


In [None]:
SCHEMA_JSON = json.dumps(DocumentExtraction.model_json_schema(), indent=2)
DOCUMENT_EXTRACTION_INSTRUCTIONS = """You are a historian data assistant. Read the artifact text and extract structured data.
Please follow these guidelines:
- Capture a concise artifact summary of all the key people, locations (with address + coordinates when present), events, and milestones.
- Map page numbers to `[start, end]` ranges where possible.
- Use ISO dates (`YYYY-MM-DD`) when you can infer an exact day. Otherwise provide the most precise partial you can (e.g., `1919`, `1903-05`).
- Person and location names should stay consistent so they can be re-linked later.
- Only include milestone or event participants when the text clearly states their involvement.
"""


def load_artifact_text(path: Path, max_chars: int) -> str:
    text = path.read_text(encoding="utf-8")
    return text[:max_chars]


def build_prompt(artifact: Artifact, text: str) -> str:
    return f"""{DOCUMENT_EXTRACTION_INSTRUCTIONS}

Artifact metadata:
{json.dumps(artifact.model_dump(), indent=2, default=str)}

Target schema:
{SCHEMA_JSON}

Artifact text:
{text}
"""


In [None]:
llm = ChatGoogleGenerativeAI(
    model="gemini-2.5-pro",
    temperature=0.0,
    max_retries=2,
)
structured_llm = llm.with_structured_output(DocumentExtraction)
print("LLM ready")


In [None]:
raw_text = load_artifact_text(ARTIFACT_FILE, MAX_CHARS)
prompt = build_prompt(artifact_metadata, raw_text)
extraction = structured_llm.invoke(prompt)
# enforce trusted artifact metadata so downstream IDs remain stable
extraction = extraction.model_copy(update={"artifact": artifact_metadata})
extraction


In [None]:
print(f"Artifact: {extraction.artifact.title} ({extraction.artifact.author})")
print(f"People: {len(extraction.persons)} | Locations: {len(extraction.locations)} | Events: {len(extraction.events)} | Milestones: {len(extraction.milestones)}")
if extraction.context_chunks:
    print(f"Context chunks: {len(extraction.context_chunks)}")


In [None]:
def _json_or_null(value):
    if value in (None, [], {}):
        return None
    return json.dumps(value)


def _maybe_date(value: str | None):
    if not value:
        return None
    cleaned = value.strip()
    if not cleaned:
        return None
    normalized = cleaned.replace("XX", "01")
    for fmt in ("%Y-%m-%d", "%Y-%m", "%Y"):
        try:
            parsed = datetime.strptime(normalized, fmt)
            if fmt == "%Y":
                return date(parsed.year, 1, 1)
            if fmt == "%Y-%m":
                return date(parsed.year, parsed.month, 1)
            return parsed.date()
        except ValueError:
            continue
    match = re.search(r"(\d{4})", cleaned)
    if match:
        return date(int(match.group(1)), 1, 1)
    return None

def persist_document(extraction: DocumentExtraction) -> Dict[str, int]:
    conn = artifact_db.get_connection()
    cur = conn.cursor()

    artifact = extraction.artifact
    cur.execute(
        """
        INSERT OR REPLACE INTO artifacts (id, title, author, publication_year, time_period_start, time_period_end, created_at)
        VALUES (?, ?, ?, ?, ?, ?, ?)
        """,
        (
            str(artifact.id),
            artifact.title,
            artifact.author,
            artifact.publication_year,
            artifact.time_period_start,
            artifact.time_period_end,
            artifact.created_at.isoformat(),
        ),
    )

    person_lookup: Dict[str, str] = {}
    for person_data in extraction.persons:
        key = person_data.name.strip().lower()
        if not key or key in person_lookup:
            continue
        person = Person(
            name=person_data.name,
            aliases=person_data.aliases,
            artifact_id=artifact.id,
            birth_year=person_data.birth_year,
            death_year=person_data.death_year,
        )
        cur.execute(
            """
            INSERT OR REPLACE INTO persons (id, name, aliases, artifact_id, birth_year, death_year, created_at)
            VALUES (?, ?, ?, ?, ?, ?, ?)
            """,
            (
                str(person.id),
                person.name,
                _json_or_null(person.aliases),
                str(person.artifact_id),
                person.birth_year,
                person.death_year,
                person.created_at.isoformat(),
            ),
        )
        person_lookup[key] = str(person.id)

    location_lookup: Dict[str, str] = {}
    for location_data in extraction.locations:
        key = location_data.name.strip().lower()
        if not key or key in location_lookup:
            continue
        location = Location(
            name=location_data.name,
            aliases=location_data.aliases,
            artifact_id=artifact.id,
            address=location_data.address,
            latitude=location_data.latitude,
            longitude=location_data.longitude,
        )
        cur.execute(
            """
            INSERT OR REPLACE INTO locations (id, name, aliases, artifact_id, address, latitude, longitude, created_at)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            """,
            (
                str(location.id),
                location.name,
                _json_or_null(location.aliases),
                str(location.artifact_id),
                location.address,
                location.latitude,
                location.longitude,
                location.created_at.isoformat(),
            ),
        )
        location_lookup[key] = str(location.id)

    chunk_lookup: Dict[str, str] = {}
    for chunk_data in extraction.context_chunks:
        chunk = ContextChunk(
            artifact_id=artifact.id,
            chunk_label=chunk_data.chunk_label,
            page_range=chunk_data.page_range,
            summary=chunk_data.summary,
            key_persons=chunk_data.key_persons,
            key_locations=chunk_data.key_locations,
        )
        cur.execute(
            """
            INSERT OR REPLACE INTO context_chunks (id, artifact_id, chunk_label, page_range, summary, key_persons, key_locations, created_at)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            """,
            (
                str(chunk.id),
                str(chunk.artifact_id),
                chunk.chunk_label,
                _json_or_null(chunk.page_range),
                chunk.summary,
                _json_or_null(chunk.key_persons),
                _json_or_null(chunk.key_locations),
                chunk.created_at.isoformat(),
            ),
        )
        if chunk.chunk_label:
            chunk_lookup[chunk.chunk_label.strip().lower()] = str(chunk.id)

    for event_data in extraction.events:
        context_chunk_id = None
        if event_data.context_label:
            context_chunk_id = chunk_lookup.get(event_data.context_label.strip().lower())
        event = Event(
            description=event_data.description,
            artifact_id=artifact.id,
            page_range=event_data.page_range,
            context_chunk_id=context_chunk_id,
            event_type=event_data.event_type,
            event_date=_maybe_date(event_data.event_date),
        )
        cur.execute(
            """
            INSERT OR REPLACE INTO events (id, description, artifact_id, page_range, context_chunk_id, event_type, event_date, created_at)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            """,
            (
                str(event.id),
                event.description,
                str(event.artifact_id),
                _json_or_null(event.page_range),
                str(event.context_chunk_id) if event.context_chunk_id else None,
                event.event_type,
                event.event_date.isoformat() if event.event_date else None,
                event.created_at.isoformat(),
            ),
        )
        for person_name in event_data.person_names:
            key = person_name.strip().lower()
            person_id = person_lookup.get(key)
            if person_id:
                cur.execute(
                    "INSERT OR IGNORE INTO event_participants (event_id, person_id, role) VALUES (?, ?, ?)",
                    (str(event.id), person_id, None),
                )
        for location_name in event_data.location_names:
            key = location_name.strip().lower()
            location_id = location_lookup.get(key)
            if location_id:
                cur.execute(
                    "INSERT OR IGNORE INTO event_venues (event_id, location_id) VALUES (?, ?)",
                    (str(event.id), location_id),
                )

    for milestone_data in extraction.milestones:
        person_id = person_lookup.get(milestone_data.person_name.strip().lower())
        if not person_id:
            continue
        milestone = Milestone(
            person_id=person_id,
            artifact_id=artifact.id,
            milestone_type=milestone_data.milestone_type,
            milestone_date=_maybe_date(milestone_data.milestone_date),
            description=milestone_data.description,
        )
        cur.execute(
            """
            INSERT OR REPLACE INTO milestones (id, person_id, artifact_id, milestone_type, milestone_date, description, created_at)
            VALUES (?, ?, ?, ?, ?, ?, ?)
            """,
            (
                str(milestone.id),
                str(milestone.person_id),
                str(milestone.artifact_id),
                milestone.milestone_type,
                milestone.milestone_date.isoformat() if milestone.milestone_date else None,
                milestone.description,
                milestone.created_at.isoformat(),
            ),
        )
        if milestone_data.location_name:
            location_id = location_lookup.get(milestone_data.location_name.strip().lower())
            if location_id:
                cur.execute(
                    "INSERT OR IGNORE INTO milestone_places (milestone_id, location_id) VALUES (?, ?)",
                    (str(milestone.id), location_id),
                )

    conn.commit()
    conn.close()
    return {
        "artifact": artifact.title,
        "persons": len(person_lookup),
        "locations": len(location_lookup),
        "context_chunks": len(extraction.context_chunks),
        "events": len(extraction.events),
        "milestones": len(extraction.milestones),
    }


In [None]:
ingest_report = persist_document(extraction)
ingest_report


In [None]:
stop

In [None]:
import os
import re
from typing import Optional, Tuple

try:
    from exa_py import Exa
except ImportError as exc:
    raise ImportError("Install exa_py to run EXA enrichment: pip install exa_py") from exc

EXA_API_KEY = os.getenv("EXA_API_KEY")
if not EXA_API_KEY:
    raise RuntimeError("Missing EXA_API_KEY environment variable for EXA lookups.")

exa_client = Exa(EXA_API_KEY)

conn = artifact_db.get_connection()
locations = conn.execute(
    "SELECT id, name, address, latitude, longitude FROM locations ORDER BY created_at ASC"
).fetchall()

def _get_value(obj, key, default=None):
    if isinstance(obj, dict):
        return obj.get(key, default)
    return getattr(obj, key, default)

def _extract_coordinates(text: str) -> Optional[Tuple[float, float]]:
    if not text:
        return None
    normalized = text.replace("Â°", " ")
    labeled = re.search(
        r"latitude\s*[:=]?\s*(-?\d{1,2}(?:\.\d+)?)\D{0,30}longitude\s*[:=]?\s*(-?\d{1,3}(?:\.\d+)?)",
        normalized,
        flags=re.IGNORECASE | re.DOTALL,
    )
    if labeled:
        lat, lon = map(float, labeled.groups())
        if abs(lat) <= 90 and abs(lon) <= 180:
            return (lat, lon)
    lowered = normalized.lower()
    if "coord" not in lowered and "lat" not in lowered:
        return None
    generic = re.search(
        r"(-?\d{1,2}(?:\.\d+)?)[^\d-]{0,8}?(-?\d{1,3}(?:\.\d+)?)",
        normalized,
    )
    if generic:
        lat, lon = map(float, generic.groups())
        if abs(lat) <= 90 and abs(lon) <= 180:
            return (lat, lon)
    return None

def _lookup_coordinates(name: str, address: Optional[str]) -> Optional[Tuple[float, float]]:
    query_parts = [name]
    if address and address.strip():
        query_parts.append(address.strip())
    query_parts.append("GPS coordinates")
    query = " ".join(query_parts)
    search_response = exa_client.search(
        query,
        type="neural",
        use_autoprompt=True,
        num_results=5,
    )
    result_ids = []
    for result in _get_value(search_response, "results", []):
        result_id = _get_value(result, "id")
        if result_id:
            result_ids.append(result_id)
    if not result_ids:
        return None
    contents = exa_client.get_contents(result_ids)
    for content in _get_value(contents, "results", []):
        snippets = []
        text = _get_value(content, "text") or ""
        if text:
            snippets.append(text)
        highlights = _get_value(content, "highlights", []) or []
        for highlight in highlights:
            snippet = _get_value(highlight, "snippet") or ""
            if snippet:
                snippets.append(snippet)
        coords = _extract_coordinates(" \n".join(snippets))
        if coords:
            return coords
    return None

updates = []
for row in locations:
    if row["latitude"] is not None and row["longitude"] is not None:
        continue
    coords = _lookup_coordinates(row["name"], row["address"])
    if not coords:
        continue
    conn.execute(
        "UPDATE locations SET latitude = ?, longitude = ? WHERE id = ?",
        (coords[0], coords[1], row["id"]),
    )
    updates.append(
        {
            "name": row["name"],
            "address": row["address"],
            "latitude": coords[0],
            "longitude": coords[1],
        }
    )

conn.commit()
conn.close()
{"enriched": len(updates), "details": updates}
