In [None]:
import json
import os
import re
from collections import Counter
from copy import deepcopy
from typing import List, Optional

import numpy as np
import tqdm as notebook_tqdm
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from sentence_transformers import SentenceTransformer
from sklearn.cluster import HDBSCAN
from sklearn.feature_extraction.text import TfidfVectorizer
from transformers import logging as transformers_logging

from dq_swirl.clients.async_llm_client import AsyncLLMClient
from dq_swirl.ingestion.rust_ingestion import smart_parse_batch
from dq_swirl.ingestion.structure_analyzer import StructuralAnalyzer

transformers_logging.set_verbosity_error()

In [None]:
load_dotenv("../secrets.env")

## Messy Data

In [None]:
messy_data = [
    "Order 1001: Buyer=John Davis, Location=Columbus, OH, Total=$742.10, Items: laptop, hdmi cable",
    "Order 1004:   Buyer=  AMANDA SMITH ,Location=Seattle, WA,Total=$50.00, Items: desk lamp",
    "Order 1005: Buyer=Raj Patel, Total=1,200.50, Items: monitor, stand, cable",
    "Order 1006: total=$89.99, location=Miami, FL, buyer=Elena Rossi, Items: keyboard",
    "Order 1007: Buyer=Chris P., Location=Denver, CO, Total=$12.00, Items: stickers -- [DISCOUNT APPLIED]",
    "Order 1008: Buyer=O'Connor, S., Location=Portland, OR, Total=$0.00, Items: ",
    "Order 1011: Buyer=John Davis, Location=Columbus, OH, Total=$742.10, Items: laptop, hdmi cable",
    "Order 1012: Buyer=Sarah Liu, Location=Austin, TX, Total=$156.55, Items: headphones",
    "Order 1013: Buyer=Mike Turner, Location=Cleveland, OH, Total=$1299.99, Items: gaming pc, mouse",
    "Order 1014: Buyer=Rachel Kim, Locadtion=Seattle, WA, Total=$89.50, Items: coffee maker",
    "Order 1015: Buyer=Chris Myers, Location=Cincinnati, OH, Total=$512.00, Items: monitor, desk lamp",
    "Order=1016, Buyer=Jake Myers, Total=$1,512.00, Items: monitor,",
    '{"id": "usr_001", "name": "Alex Johnson", "role": "admin", "isActive": true, "createdAt": "2025-11-02T09:14:23Z"}',
    '{"id": "usr_002", "name": "Maria Lopez", "email": "maria.lopez@example.com", "role": "editor", "isActive": null, "createdAt": "2025-12-18T16:47:10Z", "lastLoginIp": "192.168.1.42"}',
    '{"id": "usr_003", "email": "samir.patel@example.com", "role": "viewer", "isActive": false, "createdAt": "08/05/2024"}',
    '{"id": 4, "name": "Chen Wei", "email": "chen.wei@example.com", "isActive": true, "createdAt": null}',
    '{"id": "usr_005", "name": "Broken Record", "email": "broken@example.com"}',
    "Order 1017: Buyer=Griffin Arora, Location=Columbia, SC, Total=$512.00, Items: monitor, desk lamp, Discount: yes",
    "Order=1018, Buyer=Jae Arora, Location=Dreher, FL, Total=$6.00, Items: chair, Discount: true, phone=123-456-789",
    "Order=1019, Buyer=Jae Kao, Location=Atlanta, GA, Total=$12.00, Items: desk, Discount: False, phone=123-456-789",
    "2026-01-30 14:22:01 INFO User login successful user_id=123",
    "2026-01-30 14:22:01 INFO User login successful",
    "level =INFO, user =Sam, id=1",
    "timestamp=2026-01-30T14:22:01Z level=INFO user=alice action=login success=true",
    "level=INFO cpu_usage=1,234.56 memory=512MB",
    '{"level":"INFO","service":"orders","order_id":1001,"status":"created"}',
    '[2026-01-31 17:11:22 +0000] [7] [INFO] 127.0.0.1:56718 - - [31/Jan/2026:17:11:22 +0000] "GET /health 1.1" 200 16 "-" "curl/8.14.1"',
    "2026-01-31 17:11:00 swirl [DEBUG] saq_worker.py:28 Running cron job health check",
]

## Data Preprocessing

In [None]:
#################################################################################
################################# Grammar Parsing ###############################
#################################################################################

string_batch = []
string_json_batch = []
for msg in messy_data:
    if not (msg.startswith("[") and msg.endswith("]")) and not (
        msg.startswith("{") and msg.endswith("}")
    ):
        string_batch.append(msg)
    else:
        string_json_batch.append(msg)

print(f"\nUNSTRUCTURED STRING SAMPLES: {len(string_batch)}\n")
print(f"JSON STRING SAMPLES: {len(string_json_batch)}\n")


string_samples = smart_parse_batch(string_batch)

for i, (msg, parsed) in enumerate(string_samples):
    print(f"Original: {msg}\nParsed: {parsed}\n")


json_samples = []
leftovers = []

for msg in string_json_batch:
    try:
        data = json.loads(msg)
        json_samples.append((msg, data))
    except Exception:
        leftovers.append((msg, None))


data_samples = string_samples + json_samples

print(f"\nTOTAL SAMPLES: {len(data_samples)}\nERROR SAMPLES: {len(leftovers)}\n")

#################################################################################
############################### Structure Analyzer ##############################
#################################################################################


analyzer = StructuralAnalyzer(ignore_unparsed=False)

hash_counts = Counter()
unique_structures = {}

for raw, parsed in data_samples:
    result = analyzer.generate_fingerprint(raw, parsed)
    signature_hash = result["hash"]
    hash_counts[signature_hash] += 1
    unique_structures[signature_hash] = unique_structures.get(signature_hash, result)

print(
    f"Detected {len(unique_structures)} unique schemas across {len(data_samples)} records.\n"
)

for h, count in sorted(hash_counts.items()):
    print(f"Schema {h} ({count} occurrences):")
    print(f"  Layout: {unique_structures[h]['signature']}")
    print("-" * 30)


#################################################################################
############################# Structural Clustering #############################
#################################################################################


def conjoin_signatures(registry_output: dict):
    hashes = list(registry_output.keys())

    signatures_as_text = [
        " ".join(registry_output[h]["signature"].keys()) for h in hashes
    ]

    vectorizer = TfidfVectorizer(analyzer="char", ngram_range=(3, 5))
    matrix = vectorizer.fit_transform(signatures_as_text)

    clusterer = HDBSCAN(
        min_cluster_size=2,
        metric="euclidean",
        copy=True,
    )
    labels = clusterer.fit_predict(matrix.toarray())

    conjoined_map = {}
    for i, cluster_id in enumerate(labels):
        h = hashes[i]
        conjoined_map[h] = {
            "cluster_id": int(cluster_id),
            "keys": list(registry_output[h]["signature"].keys()),
            "is_outlier": cluster_id == -1,
        }

    sorted_dict = dict(sorted(conjoined_map.items()))
    return sorted_dict


#################################################################################
############################## Semantic Clustering ##############################
#################################################################################


def conjoin_signatures_semantic(
    registry_output: dict,
    embedding_model: str = "all-MiniLM-L6-v2",
    cache_dir: str = "./.models",
):
    hashes = list(registry_output.keys())
    if not hashes:
        return {}

    signatures_as_text = []
    for h in hashes:
        h_dict = dict(registry_output[h]["signature"])
        # remove the 'black hole' field that swallows everything
        h_dict.pop("_unparsed", None)

        # sort keys to ensure structural identity regardless of log order
        sorted_keys = sorted(h_dict.keys())

        if not sorted_keys:
            text_rep = "schema:empty_blob"
        else:
            # 'field:' prefix to define the role of the tokens
            text_rep = " ".join([f"field:{k}" for k in sorted_keys])

        signatures_as_text.append(text_rep)

    model = SentenceTransformer(embedding_model, cache_folder=cache_dir)
    embeddings = model.encode(signatures_as_text)
    X = np.ascontiguousarray(embeddings, dtype=np.float64)

    clusterer = HDBSCAN(
        min_cluster_size=2,
        min_samples=1,
        metric="cosine",
        cluster_selection_epsilon=0.08,
        cluster_selection_method="eom",
        allow_single_cluster=True,
        copy=True,
    )

    labels = clusterer.fit_predict(X)

    conjoined_map = {}
    for i, cluster_id in enumerate(labels):
        h = hashes[i]
        # unique IDs to outliers so they don't group into one '-1' bucket
        final_id = int(cluster_id) if cluster_id != -1 else (400 + i)

        conjoined_map[h] = {
            "cluster_id": final_id,
            "keys": list(registry_output[h]["signature"].keys()),
            "is_outlier": cluster_id == -1,
        }

    return conjoined_map


# run structure clustering
structure_cluster_map = conjoin_signatures(analyzer.signature_map)
structure_clusters = {}
for k, v in structure_cluster_map.items():
    cluster_id = v["cluster_id"]
    keys = v["keys"]
    is_outlier = bool(v["is_outlier"])
    structure_clusters[cluster_id] = structure_clusters.get(cluster_id, [])
    structure_clusters[cluster_id].append(
        {"signature_hash": k, "fields": keys, "is_outlier": is_outlier}
    )
print(f"Structural Clusters: \n{json.dumps(structure_clusters, indent=4)}\n")


# run semantic clustering
print(json.dumps(analyzer.signature_map, indent=4))
semantic_cluster_map = conjoin_signatures_semantic(analyzer.signature_map)
semantic_clusters = {}
for k, v in semantic_cluster_map.items():
    cluster_id = v["cluster_id"]
    keys = v["keys"]
    is_outlier = bool(v["is_outlier"])
    semantic_clusters[cluster_id] = semantic_clusters.get(cluster_id, [])
    semantic_clusters[cluster_id].append(
        {"signature_hash": k, "fields": keys, "is_outlier": is_outlier}
    )
print(f"Semantic Clusters: \n{json.dumps(semantic_clusters, indent=4)}\n")

In [None]:
# semantic cluster map to raw string, rough parsed dict, structure cluster, structure signature hash
cluster_dict = {}
for cluster_id, records in semantic_clusters.items():
    cluster_dict[cluster_id] = cluster_dict.get(cluster_id, [])
    for rec in records:
        signature_hash = rec["signature_hash"]
        analyzer_records = analyzer.signature_map[signature_hash]["records"]
        fields_li = rec["fields"]
        # ignore _unparsed
        fields_li = [f for f in fields_li if f != "_unparsed"]
        if len(fields_li) < 1:
            continue
        for r in analyzer_records:
            parsed_dict = r["parsed"]
            # ignore _unparsed
            parsed_dict.pop("_unparsed", None)
            if len(parsed_dict) < 1:
                continue
            cluster_dict[cluster_id].append(
                {
                    "signature_hash": signature_hash,
                    "structure_cluster_id": structure_cluster_map[signature_hash].get(
                        "cluster_id"
                    ),
                    "raw": r["raw"],
                    "parsed": parsed_dict,
                    "fields": fields_li,
                }
            )

print(json.dumps(cluster_dict, indent=4))

## LLM Client Setup

In [None]:
# llm connection
API_KEY = os.getenv("LLM_API_KEY")
# api_base_url = os.getenv("LsLM_BASE_URL")
api_base_url = "https://openrouter.ai/api/v1"
# model = "openai/google/gemma-3-27b-it"
MODEL = "openai/gpt-oss-120b:exacto"

In [None]:
client = AsyncLLMClient(
    MODEL,
    api_base_url,
)

## LLM Prompts

In [None]:
# prompts
PYDANTIC_SYSTEM_PROMPT = """You are a Data Architect. Your goal is to perform unsupervised schema inference on a sample of unstructured data.

Generate a Pydantic `BaseModel` class that represents the "Gold Standard" foundation for this data pattern. 

Instructions:
- Normalization: Suggest clean, snake_case keys for the identified fields.
- If you see a string value for a field that follows a consistent structure (e.g., "<city>, <state>") then make sure that structure is accurately typed in the BaseModel.
- Determine what fields should be required vs optional based on overall semantic meaning of the entity you are creating a BaseModel class for.

Constraints:
- Include a detailed description for each field using the `Field` class to explain what the field is and if there are any expected structural patterns (e.g., `state` should be two letters).
- Create supplemental BaseModel classes where necessary to preserve semantic clarity.
- Do NOT include any regex.
- You MUST wrap your code in a python block with the following start marking "```python" and end marking "```".
- If a field appears in some rows but not others, mark it as `Optional`.
- You are only allowed to use the following imports: "from typing import List, Dict, Optional; from pydantic import BaseModel, Field".
- Return ONLY the Pydantic class definitions (you are allowed to generate multiple as long as they are logically linked).
"""

PYDANTIC_USER_PROMPT = """Please analyze the following representative samples of a new data pattern and generate the Pydantic 'Foundation' model.

### Data Samples:
{samples}
"""

## Generate Pydantic BaseModel Class

In [None]:
class ModelResponseStructure(BaseModel):
    code_string: str = Field(
        ...,
        description="generated python code",
    )
    entrypoint_class_name: str = Field(
        ...,
        description="name of entrypoint base model class in the code generated",
    )


def extract_python_code(text):
    """
    Extracts the Python code block from a string.

    Returns:
        str: The extracted source code or an empty string if not found.
    """
    block_pattern = r"```(?:python)?\s*(.*?)\s*```"
    match = re.search(block_pattern, text, re.DOTALL)

    return match.group(1).strip() if match else ""


for c_id, records in cluster_dict.items():
    string_li = [r["raw"] for r in records]
    messages = [
        {
            "role": "system",
            "content": PYDANTIC_SYSTEM_PROMPT,
        },
        {
            "role": "user",
            "content": PYDANTIC_USER_PROMPT.format(
                samples=string_li,
            ),
        },
    ]

    buffer = []
    response = await client.chat(
        messages=messages,
        stream=True,
        temperature=0.0,
        response_format=ModelResponseStructure,
    )
    async for chunk in response:
        if chunk.choices and chunk.choices[0].delta.content:
            content = chunk.choices[0].delta.content
            print(content, end="", flush=True)
            buffer.append(content)

    resp = "".join(buffer)
    resp: ModelResponseStructure = ModelResponseStructure(**json.loads(resp))

    if not resp.code_string.startswith("```python"):
        resp.code_string = f"```python\n{resp.code_string}\n```"

    code = extract_python_code(resp.code_string)

    namespace = {}
    exec(code, globals(), namespace)

    # access the function from the namespace dictionary
    cls = namespace.get(resp.entrypoint_class_name)
    cls.model_rebuild(_types_namespace=namespace)
    schema = cls.model_json_schema()

    print()
    break

## Langgraph Robustness and Stategraph 

In [None]:
ARCHITECT_PROMPT = """You are a Lead Data Architect.
Define a simple Pydantic v2 `BaseModel` that represents the "Gold Standard" foundation for the data pattern found in the input samples.

INPUT SAMPLES (Multiple variations):
{samples}

REQUIREMENTS:
1. Normalization: Suggest clean, snake_case keys for the identified fields.
2. Determine what fields should be required vs optional based on overall semantic meaning of the entity you are creating a BaseModel class for.
3. Include a detailed description for each field using the `Field` class to explain what the field is and if there are any expected structural patterns (e.g., `state` should be two letters).
4. Do NOT include any regex.
5. You MUST wrap your code in a python block with the following start marking "```python" and end marking "```".
6. Create supplemental BaseModel classes where necessary to preserve semantic clarity.
7. You are ONLY allowed to use the following imports: "from typing import List, Dict, Optional; from pydantic import BaseModel, Field".
8. Keep primary keys as type string.
9. Infer best data type from string value (e.g., money should be a float, "true/false" or "yes/no" fields should be a boolean, and fields that represent multiple entities should use a representative aggregate data structure type)
10. NEVER set potentially boolean fields as optional. Instead, when not explicitly declared, infer as to what the default value ought based on the semantic meaning of the field and how it appears in the samples that do provide it.
11. Perform semantic merging: Identify fields across structural variants that share the same intent and conjoin them under a single, definitive schema key to avoid redundancy (e.g., "location" vs "city", "state", "zip code")
12. Avoid information loss when it comes to key:value pairs in the sample data.

Return ONLY the Python code for the class. Include necessary imports (from pydantic import BaseModel, Field, etc.).
"""

CODER_PROMPT = """You are a Senior Data Engineer.
Your task is to write a concise but effective transformation function `transform_to_models(parsed_dict: list[dict]) -> list[dict]` that maps roughly parsed dictionaries into the provided pydantic v2 target schema base model definition.

TARGET SCHEMA (Python Pydantic v2 BaseModel):
{schema}

SOURCE SAMPLES:
{samples}

Logic Requirements:
1. Use a 'coalesce' approach: for each target field, check all possible source keys from the input dictionary samples.
2. The Target Schema is the gold standard so ensure that the transformation function maps and casts the data types of the input appropriately.
3. Use parsed_dict.get() for optional fields.
4. Infer best data type from string (e.g., "$120.00" should be a float, and "true" should be a boolean). 
5. ALL python code must be encapsulated by the `transform_to_models()` function -- if it's not in that function it will not be run.

Return ONLY the Python code for the function `transform_to_models`. Do not include the Pydantic class in your response.
"""

CODE_EXECUTION = """
from pydantic import BaseModel, Field, ValidationError
from typing import *
import json, re

{schema}

{parser_code}
"""

In [None]:
from __future__ import annotations

import json
import operator
import traceback
from pathlib import Path
from typing import Annotated, Any, Dict, List, Literal, Optional, TypedDict

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, StateGraph
from pydantic import BaseModel, Field

from dq_swirl.utils.log_utils import get_custom_logger

logger = get_custom_logger()


def extract_python_code(text):
    """
    Extracts the Python code block from a string.

    Returns:
        str: The extracted source code or an empty string if not found.
    """
    block_pattern = r"```(?:python)?\s*(.*?)\s*```"
    match = re.search(block_pattern, text, re.DOTALL)

    return match.group(1).strip() if match else ""


class ModelResponseStructure(BaseModel):
    code_string: str = Field(
        ...,
        description="generated python code",
    )
    entrypoint_class_name: str = Field(
        ...,
        description="name of entrypoint base model class in the code generated",
    )


class MultiAgentState(TypedDict):
    semantic_id: str
    structure_cluster_id: str
    data_pairs_all: List[Dict[str, Any]]
    data_pairs_structure: List[Dict[str, Any]]
    gold_schema: Annotated[Optional[ModelResponseStructure], lambda old, new: new]
    parser_code: Annotated[Optional[str], lambda old, new: new]
    feedback: Annotated[Optional[str], lambda old, new: new]
    error_type: Annotated[
        Optional[Literal["SCHEMA_ISSUE", "CODE_ISSUE"]], lambda old, new: new
    ]
    attempts: Annotated[int, operator.add]  # increment
    export_map: Annotated[Optional[Dict[str, Any]], lambda old, new: new]


async def architect_node(state: MultiAgentState) -> Dict[str, Any]:
    """_summary_

    :param state: _description_
    :return: _description_
    """
    if state.get("gold_schema") and state.get("error_type") != "SCHEMA_ISSUE":
        return {"attempts": 0}

    logger.info(f"[Architect] Defining Semantic Goal: {state['attempts']}")

    # diversity is key to generalize
    samples = json.dumps([p["parsed"] for p in state["data_pairs_all"][:100]], indent=2)

    logger.debug(samples)

    prompt = ARCHITECT_PROMPT.format(
        samples=samples,
    )
    buffer = []
    response = await client.chat(
        messages=[
            {
                "role": "user",
                "content": prompt,
            }
        ],
        api_key_override=API_KEY,
        stream=True,
        temperature=0.0,
        response_format=ModelResponseStructure,
        num_retries=5,
    )
    async for chunk in response:
        if chunk.choices and chunk.choices[0].delta.content:
            content = chunk.choices[0].delta.content
            buffer.append(content)

    resp = "".join(buffer)
    resp = ModelResponseStructure(**json.loads(resp))
    resp.code_string = extract_python_code(resp.code_string)
    print(resp.code_string)

    return {
        "gold_schema": resp,
        "attempts": 1,
        "feedback": None,
        "error_type": None,
    }


async def schema_tester_node(state: MultiAgentState) -> Dict[str, Any]:
    """_summary_

    :param state: _description_
    :return: _description_
    """
    logger.info(f"[Scehma Tester] Validating Functional BaseModel: {state['attempts']}")
    python_base_model_str = state["gold_schema"].code_string

    env = {}
    try:
        exec(python_base_model_str, globals(), env)
        cls_name = state["gold_schema"].entrypoint_class_name
        model = env[cls_name]

        model.model_rebuild(_types_namespace=env)
        _ = model.model_json_schema()

        return {"feedback": "SUCCESS"}
    except Exception as e:
        err_msg = traceback.format_exc()
        logger.exception(e)
        return {
            "feedback": err_msg,
            "error_type": "SCHEMA_ISSUE",
        }


async def coder_node(state: MultiAgentState) -> Dict[str, Any]:
    """_summary_

    :param state: _description_
    :return: _description_
    """
    logger.info(f"[Coder] Parser for Gold Schema: {state['attempts']}")
    samples = json.dumps(
        [rec["parsed"] for rec in state["data_pairs_structure"]], indent=2
    )

    prompt = CODER_PROMPT.format(
        schema=state["gold_schema"].code_string, samples=samples
    )
    buffer = []
    response = await client.chat(
        messages=[
            {
                "role": "user",
                "content": prompt,
            }
        ],
        api_key_override=API_KEY,
        stream=True,
        temperature=0.0,
        num_retries=5,
    )
    async for chunk in response:
        if chunk.choices and chunk.choices[0].delta.content:
            content = chunk.choices[0].delta.content
            print(content, end="", flush=True)
            buffer.append(content)

    resp = "".join(buffer)
    code = extract_python_code(resp)

    return {
        "parser_code": code,
        "attempts": 1,
        "feedback": None,
        "error_type": None,
    }


async def code_tester_node(state: MultiAgentState) -> Dict[str, Any]:
    """_summary_

    :param state: _description_
    :return: _description_
    """
    logger.info(f"[Code Tester] Stress-testing parser: {state['attempts']}")
    full_code = CODE_EXECUTION.format(
        schema=state["gold_schema"].code_string,
        parser_code=state["parser_code"],
    )

    env = {}
    try:
        cls_name = state["gold_schema"].entrypoint_class_name
        exec(full_code, globals(), env)
        func = env["transform_to_models"]
        model = env[cls_name]
        model.model_rebuild(_types_namespace=env)

        input_data = [pair["parsed"] for pair in state["data_pairs_structure"]]
        mapped_batch = func(input_data)
        for mapped_dict in mapped_batch:
            model.model_validate(mapped_dict)
            logger.debug(f"Input: {mapped_dict} -- PASSED")
        return {
            "feedback": "SUCCESS",
        }
    except Exception as e:
        err_msg = traceback.format_exc()
        try:
            logger.debug(f"Input: {mapped_dict} -- FAILED")
            logger.exception(e)
        except Exception:
            pass
        return {
            "feedback": err_msg,
            "error_type": "CODE_ISSUE",
        }


async def exporter_node(state: MultiAgentState) -> Dict[str, Any]:
    """_summary_

    :param state: _description_
    :return: _description_
    """
    # get necesary fields
    semantic_cluster_id = state["semantic_id"]
    structure_cluster_id = state["structure_cluster_id"]
    data_pairs_structure = state["data_pairs_structure"]
    base_model_name = state["gold_schema"].entrypoint_class_name.lower()

    base_model_code_str = state["gold_schema"].code_string
    parser_code_str = state["parser_code"]
    export_map = state.get("export_map", {})

    # make the directory
    dir_name = f"sem_{semantic_cluster_id}-{base_model_name}"
    dir_path = Path(dir_name)
    dir_path.mkdir(exist_ok=True)

    # write the base model
    base_model_fpath = os.path.join(dir_path, f"{base_model_name}_base_model.py")
    with open(base_model_fpath, "w") as f:
        f.write(base_model_code_str)

    logger.info(f"Exported: {base_model_fpath}")

    # write parser
    parser_fpath = os.path.join(
        dir_path,
        f"{base_model_name}_parser-struct_{structure_cluster_id}.py",
    )
    with open(parser_fpath, "w") as f:
        f.write(parser_code_str)

    logger.info(f"Exported: {parser_fpath}")

    # update export map
    struct_records = [
        {
            rec["signature_hash"]: rec["fields"],
        }
        for rec in data_pairs_structure
    ]
    export_map[semantic_cluster_id] = export_map.get(semantic_cluster_id, {})
    export_map[semantic_cluster_id]["base_model_fpath"] = export_map[
        semantic_cluster_id
    ].get("base_model_fpath", base_model_fpath)
    export_map[semantic_cluster_id]["structure_clusters"] = export_map[
        semantic_cluster_id
    ].get("structure_clusters", [])
    export_map[semantic_cluster_id]["structure_clusters"].append(
        {
            "id": structure_cluster_id,
            "struct_records": struct_records,
            "parser_fpath": parser_fpath,
        }
    )

    return {
        "feedback": "DONE",
        "export_map": export_map,
    }


def schema_router(state: MultiAgentState) -> str:
    """Determines if we move to Coder or retry the Architect"""
    feedback = state.get("feedback")
    attempts = state.get("attempts", 0)

    if feedback == "SUCCESS":
        return "coder"

    # if failed too many times, just stop the process
    if attempts >= 3:
        logger.error(f"Schema failed after {attempts} attempts. Aborting.")
        return "end"

    return "architect"


def code_router(state: MultiAgentState) -> str:
    """Determines if we export or retry Coder/Architect."""
    feedback = state.get("feedback")
    error_type = state.get("error_type")
    attempts = state.get("attempts", 0)

    if feedback == "SUCCESS":
        return "exporter"

    if attempts >= 6:
        return "end"

    # Specific routing based on where the failure happened
    if error_type == "SCHEMA_ISSUE":
        return "architect"

    # Default to retrying the coder for CODE_ISSUE or unknown errors
    return "coder"


## Define Graph
workflow = StateGraph(MultiAgentState)
workflow.add_node("architect", architect_node)
workflow.add_node("schema_tester", schema_tester_node)
workflow.add_node("coder", coder_node)
workflow.add_node("code_tester", code_tester_node)
workflow.add_node("exporter", exporter_node)

workflow.add_edge(START, "architect")
workflow.add_edge("architect", "schema_tester")
workflow.add_conditional_edges(
    "schema_tester",
    schema_router,
    {
        "architect": "architect",
        "coder": "coder",
        "end": END,
    },
)

workflow.add_edge("coder", "code_tester")
workflow.add_conditional_edges(
    "code_tester",
    code_router,
    {
        "architect": "architect",
        "coder": "coder",
        "exporter": "exporter",
        "end": END,
    },
)

workflow.add_edge("exporter", END)

app = workflow.compile(checkpointer=MemorySaver())

In [None]:
async def run_data_matrix(all_data: Dict[int, List[Dict]]) -> Dict[str, Any]:
    """_summary_

    :param all_data: _description_
    """
    shared_export_map = {}

    for sem_id, records in all_data.items():
        struct_groups = {}
        for s in records:
            cid = s["structure_cluster_id"]
            struct_groups.setdefault(cid, []).append(s)

        shared_gold_schema = None

        for struct_id, pairs in struct_groups.items():
            config = {
                "configurable": {
                    "thread_id": f"sem_{sem_id}_str_{struct_id}",
                }
            }

            initial_state = {
                "semantic_id": str(sem_id),
                "structure_cluster_id": str(struct_id),
                "data_pairs_all": records,
                "data_pairs_structure": pairs,
                "gold_schema": shared_gold_schema,
                "export_map": shared_export_map,
                "feedback": None,
                "attempts": 0,
            }

            final_output = await app.ainvoke(initial_state, config)
            shared_gold_schema = final_output.get("gold_schema")
            shared_export_map = final_output.get("export_map")
            logger.info(
                f"--- Finished Sem{sem_id}-Struct{struct_id} ---\n{json.dumps(shared_export_map, indent=4)}"
            )

    return shared_export_map


code_export_map = await run_data_matrix(cluster_dict)

## Testing Generated Parsers on Unseen Data

In [None]:
print(json.dumps(code_export_map, indent=4))

In [None]:
unseen_messy_data = [
    "Order=2011, Buyer=Gemma Claude, Location=Atlanta, GA, Total=$1,356.00, Items: hairpin, wallet, keys, Discount: True, phone=123-456-789",
    "Order 2005: Buyer=Anthropic Google, Location=Austin, TX, Total=$213,00,00.55, Items: headphones",
]

In [None]:
## get map of structure signature -> code
signature_map = {}
for semantic_cluster_id, export_dict in code_export_map.items():
    base_model_fpath = export_dict["base_model_fpath"]
    structure_clusters = export_dict["structure_clusters"]
    for cluster_dict in structure_clusters:
        structure_id = cluster_dict["id"]
        structure_records = cluster_dict["struct_records"]
        parser_fpath = cluster_dict["parser_fpath"]
        for struct_dict in structure_records:
            signature_hash = list(struct_dict.keys())[0]
            fields = struct_dict[signature_hash]
            signature_map[signature_hash] = signature_map.get(signature_hash, {})
            signature_map[signature_hash]["base_model_fpath"] = signature_map[
                signature_hash
            ].get("base_model_fpath", base_model_fpath)
            signature_map[signature_hash]["parser_fpath"] = signature_map[
                signature_hash
            ].get("parser_fpath", parser_fpath)
            signature_map[signature_hash]["fields"] = signature_map[signature_hash].get(
                "fields", fields
            )


signature_map

In [None]:
messy_data = unseen_messy_data

#################################################################################
################################# Grammar Parsing ###############################
#################################################################################

string_batch = []
string_json_batch = []
for msg in messy_data:
    if not (msg.startswith("[") and msg.endswith("]")) and not (
        msg.startswith("{") and msg.endswith("}")
    ):
        string_batch.append(msg)
    else:
        string_json_batch.append(msg)

print(f"\nUNSTRUCTURED STRING SAMPLES: {len(string_batch)}\n")
print(f"JSON STRING SAMPLES: {len(string_json_batch)}\n")


string_samples = smart_parse_batch(string_batch)

for i, (msg, parsed) in enumerate(string_samples):
    print(f"Original: {msg}\nParsed: {parsed}\n")


json_samples = []
leftovers = []

for msg in string_json_batch:
    try:
        data = json.loads(msg)
        json_samples.append((msg, data))
    except Exception:
        leftovers.append((msg, None))


data_samples = string_samples + json_samples

print(f"\nTOTAL SAMPLES: {len(data_samples)}\nERROR SAMPLES: {len(leftovers)}\n")

#################################################################################
############################### Structure Analyzer ##############################
#################################################################################


new_analyzer = StructuralAnalyzer(ignore_unparsed=False)

hash_counts = Counter()
unique_structures = {}

for raw, parsed in data_samples:
    result = new_analyzer.generate_fingerprint(raw, parsed)
    signature_hash = result["hash"]
    hash_counts[signature_hash] += 1
    unique_structures[signature_hash] = unique_structures.get(signature_hash, result)

print(
    f"Detected {len(unique_structures)} unique schemas across {len(data_samples)} records.\n"
)

for h, count in sorted(hash_counts.items()):
    print(f"Schema {h} ({count} occurrences):")
    print(f"  Layout: {unique_structures[h]['signature']}")
    print("-" * 30)

In [None]:
for hash_signature, records in new_analyzer.signature_map.items():
    try:
        hash_dict = signature_map[hash_signature]
        parsed_records = [r["parsed"] for r in records["records"]]
        logger.debug(f"Parsed Records: {parsed_records}")
        logger.debug(f"Found Hash Dict: {hash_dict}")
    except KeyError:
        logger.exception(f"Hash Signature: {hash_signature} Does Not Exist!")

In [None]:
import importlib.util
from pathlib import Path


def load_function(file_path, function_name) -> callable:
    # 1. Convert string path to a Path object for easier handling
    path = Path(file_path)

    # 2. Give it a placeholder name for the session
    module_name = path.stem  # This uses the filename without .py

    # 3. Create a 'spec' (the blueprint for the module)
    spec = importlib.util.spec_from_file_location(module_name, str(path))

    # 4. Create the actual module object from that spec
    module = importlib.util.module_from_spec(spec)

    # 5. Execute the module so the functions actually exist
    spec.loader.exec_module(module)

    # 6. Get the function from the module and call it
    func = getattr(module, function_name)
    return func


for hash_signature, records in new_analyzer.signature_map.items():
    hash_dict = signature_map[hash_signature]
    function_fpath = hash_dict["parser_fpath"]
    transform_func = load_function(function_fpath, "transform_to_models")
    raw_recrods = [r["raw"] for r in records["records"]]
    parsed_records = [r["parsed"] for r in records["records"]]
    logger.debug(f"BEFORE: {json.dumps(raw_recrods, indent=2)}")
    result = transform_func(parsed_records)
    logger.debug(f"AFTER: {json.dumps(result, indent=2)}")