In [1]:
import json
import os
import re
from collections import Counter
from copy import deepcopy
from typing import List, Optional

import tqdm as notebook_tqdm
from dotenv import load_dotenv
from pydantic import BaseModel, Field, constr, field_validator
from sentence_transformers import SentenceTransformer
from sklearn.cluster import HDBSCAN
from sklearn.feature_extraction.text import TfidfVectorizer
from transformers import logging as transformers_logging

from dq_swirl.clients.async_llm_client import AsyncLLMClient
from dq_swirl.ingestion.structure_analyzer import StructuralAnalyzer
from dq_swirl.rust_ingestion import smart_parse_batch

transformers_logging.set_verbosity_error()

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
load_dotenv("../secrets.env")

True

## Messy Data

In [3]:
messy_data = [
    "Order 1001: Buyer=John Davis, Location=Columbus, OH, Total=$742.10, Items: laptop, hdmi cable",
    "Order 1004:   Buyer=  AMANDA SMITH ,Location=Seattle, WA,Total=$50.00, Items: desk lamp",
    "Order 1005: Buyer=Raj Patel, Total=1,200.50, Items: monitor, stand, cable",
    "Order 1006: total=$89.99, location=Miami, FL, buyer=Elena Rossi, Items: keyboard",
    "Order 1007: Buyer=Chris P., Location=Denver, CO, Total=$12.00, Items: stickers -- [DISCOUNT APPLIED]",
    "Order 1008: Buyer=O'Connor, S., Location=Portland, OR, Total=$0.00, Items: ",
    "Order 1011: Buyer=John Davis, Location=Columbus, OH, Total=$742.10, Items: laptop, hdmi cable",
    "Order 1012: Buyer=Sarah Liu, Location=Austin, TX, Total=$156.55, Items: headphones",
    "Order 1013: Buyer=Mike Turner, Location=Cleveland, OH, Total=$1299.99, Items: gaming pc, mouse",
    "Order 1014: Buyer=Rachel Kim, Locadtion=Seattle, WA, Total=$89.50, Items: coffee maker",
    "Order 1015: Buyer=Chris Myers, Location=Cincinnati, OH, Total=$512.00, Items: monitor, desk lamp",
    "Order=1016, Buyer=Jake Myers, Total=$1,512.00, Items: monitor,",
    '{"id": "usr_001", "name": "Alex Johnson", "role": "admin", "isActive": true, "createdAt": "2025-11-02T09:14:23Z"}',
    '{"id": "usr_002", "name": "Maria Lopez", "email": "maria.lopez@example.com", "role": "editor", "isActive": null, "createdAt": "2025-12-18T16:47:10Z", "lastLoginIp": "192.168.1.42"}',
    '{"id": "usr_003", "email": "samir.patel@example.com", "role": "viewer", "isActive": false, "createdAt": "08/05/2024"}',
    '{"id": 4, "name": "Chen Wei", "email": "chen.wei@example.com", "isActive": true, "createdAt": null}',
    '{"id": "usr_005", "name": "Broken Record", "email": "broken@example.com"}',
    "Order 1017: Buyer=Griffin Arora, Location=Columbia, SC, Total=$512.00, Items: monitor, desk lamp, Discount: yes",
    # "Order=1018, Buyer=Jae Arora, Location=Dreher, FL, Total=$6.00, Items: chair, Discount: true, phone=123-456-789" ,
    # "Order=1019, Buyer=Jae Kao, Location=Atlanta, GA, Total=$12.00, Items: desk, Discount: False, phone=123-456-789" ,
    "2026-01-30 14:22:01 INFO User login successful user_id=123",
    "2026-01-30 14:22:01 INFO User login successful",
    "level =INFO, user =Sam, id=1",
    "timestamp=2026-01-30T14:22:01Z level=INFO user=alice action=login success=true",
    "level=INFO cpu_usage=1,234.56 memory=512MB",
    '{"level":"INFO","service":"orders","order_id":1001,"status":"created"}',
    '[2026-01-31 17:11:22 +0000] [7] [INFO] 127.0.0.1:56718 - - [31/Jan/2026:17:11:22 +0000] "GET /health 1.1" 200 16 "-" "curl/8.14.1"',
    "2026-01-31 17:11:00 swirl [DEBUG] saq_worker.py:28 Running cron job health check",
]

## Data Preprocessing

In [4]:
#################################################################################
################################# Grammar Parsing ###############################
#################################################################################

string_batch = []
string_json_batch = []
for msg in messy_data:
    if not (msg.startswith("[") and msg.endswith("]")) and not (
        msg.startswith("{") and msg.endswith("}")
    ):
        string_batch.append(msg)
    else:
        string_json_batch.append(msg)

print(f"\nUNSTRUCTURED STRING SAMPLES: {len(string_batch)}\n")
print(f"JSON STRING SAMPLES: {len(string_json_batch)}\n")


string_samples = smart_parse_batch(string_batch)

for i, (msg, parsed) in enumerate(string_samples):
    print(f"Original: {msg}\nParsed: {parsed}\n")


json_samples = []
leftovers = []

for msg in string_json_batch:
    try:
        data = json.loads(msg)
        json_samples.append((msg, data))
    except Exception:
        leftovers.append((msg, None))


data_samples = string_samples + json_samples

print(f"\nTOTAL SAMPLES: {len(data_samples)}\nERROR SAMPLES: {len(leftovers)}\n")

#################################################################################
############################### Structure Analyzer ##############################
#################################################################################


analyzer = StructuralAnalyzer(ignore_unparsed=False)

hash_counts = Counter()
unique_structures = {}

for raw, parsed in data_samples:
    result = analyzer.generate_fingerprint(raw, parsed)
    signature_hash = result["hash"]
    hash_counts[signature_hash] += 1
    unique_structures[signature_hash] = unique_structures.get(signature_hash, result)

print(
    f"Detected {len(unique_structures)} unique schemas across {len(data_samples)} records.\n"
)

for h, count in sorted(hash_counts.items()):
    print(f"Schema {h} ({count} occurrences):")
    print(f"  Layout: {unique_structures[h]['signature']}")
    print("-" * 30)


#################################################################################
############################# Structural Clustering #############################
#################################################################################


def conjoin_signatures(registry_output: dict):
    hashes = list(registry_output.keys())

    signatures_as_text = [
        " ".join(registry_output[h]["signature"].keys()) for h in hashes
    ]

    vectorizer = TfidfVectorizer(analyzer="char", ngram_range=(3, 5))
    matrix = vectorizer.fit_transform(signatures_as_text)

    clusterer = HDBSCAN(
        min_cluster_size=2,
        metric="euclidean",
        copy=True,
    )
    labels = clusterer.fit_predict(matrix.toarray())

    conjoined_map = {}
    for i, cluster_id in enumerate(labels):
        h = hashes[i]
        conjoined_map[h] = {
            "cluster_id": int(cluster_id),
            "keys": list(registry_output[h]["signature"].keys()),
            "is_outlier": cluster_id == -1,
        }

    sorted_dict = dict(sorted(conjoined_map.items()))
    return sorted_dict


#################################################################################
############################## Semantic Clustering ##############################
#################################################################################


def conjoin_signatures_semantic(
    registry_output: dict,
    embedding_model: str = "all-MiniLM-L6-v2",
    cache_dir: str = "./.models",
):
    hashes = list(registry_output.keys())
    registry_copy = deepcopy(registry_output)

    signatures_as_text = []
    for h in hashes:
        h_dict = registry_copy[h]["signature"]
        h_dict.pop("_unparsed", None)
        signatures_as_text.append(", ".join(h_dict))

    model = SentenceTransformer(embedding_model, cache_folder=cache_dir)
    embeddings = model.encode(signatures_as_text)

    clusterer = HDBSCAN(
        min_cluster_size=2,
        min_samples=1,
        metric="cosine",
        cluster_selection_epsilon=0.18,
        cluster_selection_method="eom",
        copy=True,
    )
    labels = clusterer.fit_predict(embeddings.astype("float64"))

    conjoined_map = {}
    for i, cluster_id in enumerate(labels):
        h = hashes[i]
        conjoined_map[h] = {
            "cluster_id": int(cluster_id),
            "keys": list(registry_output[h]["signature"].keys()),
            "is_outlier": cluster_id == -1,
        }

    return dict(sorted(conjoined_map.items()))


# run structure clustering
structure_cluster_map = conjoin_signatures(analyzer.signature_map)
structure_clusters = {}
for k, v in structure_cluster_map.items():
    cluster_id = v["cluster_id"]
    keys = v["keys"]
    is_outlier = bool(v["is_outlier"])
    structure_clusters[cluster_id] = structure_clusters.get(cluster_id, [])
    structure_clusters[cluster_id].append(
        {"signature_hash": k, "fields": keys, "is_outlier": is_outlier}
    )
print(f"Structural Clusters: \n{json.dumps(structure_clusters, indent=4)}\n")


# run semantic clustering
print(json.dumps(analyzer.signature_map, indent=4))
semantic_cluster_map = conjoin_signatures_semantic(analyzer.signature_map)
semantic_clusters = {}
for k, v in semantic_cluster_map.items():
    cluster_id = v["cluster_id"]
    keys = v["keys"]
    is_outlier = bool(v["is_outlier"])
    semantic_clusters[cluster_id] = semantic_clusters.get(cluster_id, [])
    semantic_clusters[cluster_id].append(
        {"signature_hash": k, "fields": keys, "is_outlier": is_outlier}
    )
print(f"Semantic Clusters: \n{json.dumps(semantic_clusters, indent=4)}\n")




UNSTRUCTURED STRING SAMPLES: 20

JSON STRING SAMPLES: 6

Original: Order 1001: Buyer=John Davis, Location=Columbus, OH, Total=$742.10, Items: laptop, hdmi cable
Parsed: {'order': '1001', 'buyer': 'John Davis', 'location': 'Columbus, OH', 'total': '$742.10', 'items': 'laptop, hdmi cable'}

Original: Order 1004:   Buyer=  AMANDA SMITH ,Location=Seattle, WA,Total=$50.00, Items: desk lamp
Parsed: {'order': '1004', 'buyer': 'AMANDA SMITH', 'location': 'Seattle, WA', 'total': '$50.00', 'items': 'desk lamp'}

Original: Order 1005: Buyer=Raj Patel, Total=1,200.50, Items: monitor, stand, cable
Parsed: {'order': '1005', 'buyer': 'Raj Patel', 'total': '1,200.50', 'items': 'monitor, stand, cable'}

Original: Order 1006: total=$89.99, location=Miami, FL, buyer=Elena Rossi, Items: keyboard
Parsed: {'order': '1006', 'total': '$89.99', 'location': 'Miami, FL', 'buyer': 'Elena Rossi', 'items': 'keyboard'}

Original: Order 1007: Buyer=Chris P., Location=Denver, CO, Total=$12.00, Items: stickers -- [DIS

Loading weights: 100%|██████████| 103/103 [00:00<00:00, 1945.13it/s, Materializing param=pooler.dense.weight]                             


Semantic Clusters: 
{
    "0": [
        {
            "signature_hash": "28d9f3b14d0e5516a186062212502d0c",
            "fields": [
                "order",
                "buyer",
                "locadtion",
                "total",
                "items"
            ],
            "is_outlier": false
        },
        {
            "signature_hash": "461a895ef9c5046dd2cb5026b6a62de0",
            "fields": [
                "order",
                "buyer",
                "location",
                "total",
                "items",
                "discount"
            ],
            "is_outlier": false
        },
        {
            "signature_hash": "50eb97a85647221ecc7f65f74d68d156",
            "fields": [
                "order",
                "buyer",
                "total",
                "items"
            ],
            "is_outlier": false
        },
        {
            "signature_hash": "fd116cd512d5ecd2e59edf12fc258b32",
            "fields": [
           

In [5]:
# semantic cluster map to raw string, rough parsed dict, structure cluster, structure signature hash
cluster_dict = {}
for cluster_id, records in semantic_clusters.items():
    cluster_dict[cluster_id] = cluster_dict.get(cluster_id, [])
    for rec in records:
        signature_hash = rec["signature_hash"]
        analyzer_records = analyzer.signature_map[signature_hash]["records"]
        for r in analyzer_records:
            cluster_dict[cluster_id].append(
                {
                    "signature_hash": signature_hash,
                    "structure_cluster_id": structure_cluster_map[signature_hash].get(
                        "cluster_id"
                    ),
                    "raw": r["raw"],
                    "parsed": r["parsed"],
                }
            )

cluster_dict

{0: [{'signature_hash': '28d9f3b14d0e5516a186062212502d0c',
   'structure_cluster_id': 1,
   'raw': 'Order 1014: Buyer=Rachel Kim, Locadtion=Seattle, WA, Total=$89.50, Items: coffee maker',
   'parsed': {'order': '1014',
    'buyer': 'Rachel Kim',
    'locadtion': 'Seattle, WA',
    'total': '$89.50',
    'items': 'coffee maker'}},
  {'signature_hash': '461a895ef9c5046dd2cb5026b6a62de0',
   'structure_cluster_id': 1,
   'raw': 'Order 1017: Buyer=Griffin Arora, Location=Columbia, SC, Total=$512.00, Items: monitor, desk lamp, Discount: yes',
   'parsed': {'order': '1017',
    'buyer': 'Griffin Arora',
    'location': 'Columbia, SC',
    'total': '$512.00',
    'items': 'monitor, desk lamp',
    'discount': 'yes'}},
  {'signature_hash': '50eb97a85647221ecc7f65f74d68d156',
   'structure_cluster_id': 1,
   'raw': 'Order 1005: Buyer=Raj Patel, Total=1,200.50, Items: monitor, stand, cable',
   'parsed': {'order': '1005',
    'buyer': 'Raj Patel',
    'total': '1,200.50',
    'items': 'monitor

## LLM Client Setup

In [6]:
# llm connection
API_KEY = os.getenv("LLM_API_KEY")
# api_base_url = os.getenv("LsLM_BASE_URL")
api_base_url = "https://openrouter.ai/api/v1"
# model = "openai/google/gemma-3-27b-it"
MODEL = "openai/gpt-oss-120b:exacto"

In [7]:
client = AsyncLLMClient(
    MODEL,
    api_base_url,
)

## LLM Prompts

In [8]:
# prompts
PYDANTIC_SYSTEM_PROMPT = """You are a Data Architect. Your goal is to perform unsupervised schema inference on a sample of unstructured data.

Generate a Pydantic `BaseModel` class that represents the "Gold Standard" foundation for this data pattern. 

Instructions:
- Normalization: Suggest clean, snake_case keys for the identified fields.
- If you see a string value for a field that follows a consistent structure (e.g., "<city>, <state>") then make sure that structure is accurately typed in the BaseModel.
- Determine what fields should be required vs optional based on overall semantic meaning of the entity you are creating a BaseModel class for.

Constraints:
- Include a detailed description for each field using the `Field` class to explain what the field is and if there are any expected structural patterns (e.g., `state` should be two letters).
- Create supplemental BaseModel classes where necessary to preserve semantic clarity.
- Do NOT include any regex.
- You MUST wrap your code in a python block with the following start marking "```python" and end marking "```".
- If a field appears in some rows but not others, mark it as `Optional`.
- You are only allowed to use the following imports: "from typing import List, Dict, Optional; from pydantic import BaseModel, Field".
- Return ONLY the Pydantic class definitions (you are allowed to generate multiple as long as they are logically linked).
"""

PYDANTIC_USER_PROMPT = """Please analyze the following representative samples of a new data pattern and generate the Pydantic 'Foundation' model.

### Data Samples:
{samples}
"""

## Generate Pydantic BaseModel Class

In [9]:
class ModelResponseStructure(BaseModel):
    code_string: str = Field(..., description="generated python code")
    entrypoint_class_name: str = Field(
        ..., description="name of entrypoint base model class in the code generated"
    )


def extract_python_code(text):
    """
    Extracts the Python code block from a string.

    Returns:
        str: The extracted source code or an empty string if not found.
    """
    block_pattern = r"```(?:python)?\s*(.*?)\s*```"
    match = re.search(block_pattern, text, re.DOTALL)

    return match.group(1).strip() if match else ""


for c_id, records in cluster_dict.items():
    string_li = [r["raw"] for r in records]
    messages = [
        {
            "role": "system",
            "content": PYDANTIC_SYSTEM_PROMPT},
        {
            "role": "user",
            "content": PYDANTIC_USER_PROMPT.format(
                samples=string_li,
            ),
        },
    ]

    buffer = []
    response = await client.chat(
        messages=messages,
        stream=True,
        temperature=0.0,
        response_format=ModelResponseStructure,
    )
    async for chunk in response:
        if chunk.choices and chunk.choices[0].delta.content:
            content = chunk.choices[0].delta.content
            print(content, end="", flush=True)
            buffer.append(content)

    resp = "".join(buffer)
    resp: ModelResponseStructure = ModelResponseStructure(**json.loads(resp))

    if not resp.code_string.startswith("```python"):
        resp.code_string = f"```python\n{resp.code_string}\n```"

    code = extract_python_code(resp.code_string)

    namespace = {}
    exec(code, globals(), namespace)

    # access the function from the namespace dictionary
    cls = namespace.get(resp.entrypoint_class_name)
    cls.model_rebuild(_types_namespace=namespace)
    schema = cls.model_json_schema()

    fname = f"{resp.entrypoint_class_name.lower()}_base_model.py"
    code = code.encode("ascii", errors="ignore").decode("ascii")
    with open(fname, "w", encoding="utf-8") as f:
        f.write(code)
        print(f"\nSuccessfully wrote code to {fname}")

    print()
    break

{"code_string":"```python\nfrom typing import List, Optional\nfrom pydantic import BaseModel, Field\n\nclass Location(BaseModel):\n    city: str = Field(..., description=\"City name extracted from the location field.\")\n    state: str = Field(..., description=\"Two-letter US state abbreviation, e.g., 'WA'.\")\n\nclass Order(BaseModel):\n    order_id: int = Field(..., description=\"Numeric identifier of the order.\")\n    buyer_name: str = Field(..., description=\"Full name of the buyer as it appears in the record.\")\n    location: Location = Field(..., description=\"Geographic location of the buyer.\")\n    total_amount: float = Field(..., description=\"Total monetary amount of the order in US dollars.\")\n    items: List[str] = Field(..., description=\"List of item names included in the order. May be empty.\")\n    discount_applied: Optional[bool] = Field(None, description=\"Indicates whether a discount was applied to the order.\")\n```","entrypoint_class_name":"Order"}
Successfully

## Langgraph Robustness and Stategraph 

In [10]:
ARCHITECT_PROMPT = """You are a Lead Data Architect.
Define a simple Pydantic v2 `BaseModel` that represents the "Gold Standard" foundation for the data pattern found in the input samples.

INPUT SAMPLES (Multiple variations):
{samples}

REQUIREMENTS:
1. Normalization: Suggest clean, snake_case keys for the identified fields.
2. Optionality: If a field is missing in ANY of the samples, you MUST wrap it in Optional[...].
3. Determine what fields should be required vs optional based on overall semantic meaning of the entity you are creating a BaseModel class for.
4. Include a detailed description for each field using the `Field` class to explain what the field is and if there are any expected structural patterns (e.g., `state` should be two letters).
5. Do NOT include any regex.
6. You MUST wrap your code in a python block with the following start marking "```python" and end marking "```".
7. Create supplemental BaseModel classes where necessary to preserve semantic clarity.
8. You are ONLY allowed to use the following imports: "from typing import List, Dict, Optional; from pydantic import BaseModel, Field".
9. Keep primary keys as type string.
10. Infer best data type from string value (e.g., money should be a float, "true/false" or "yes/no" fields should be a boolean, and fields that represent multiple entities should use a representative aggregate data structure type)
11. NEVER set potentially boolean fields as optional. Instead, when not explicitly declared, infer as to what the default value ought based on the semantic meaning of the field and how it appears in the samples that do provide it.
12. Perform semantic merging: Identify fields across structural variants that share the same intent and conjoin them under a single, definitive schema key to avoid redundancy (e.g., "location" vs "city", "state", "zip code")

Return ONLY the Python code for the class. Include necessary imports (from pydantic import BaseModel, Field, etc.).
"""

CODER_PROMPT = """You are a Senior Data Engineer.
Your task is to write a concise but effective transformation function `transform_to_models(parsed_dict: list[dict]) -> list[dict]` that maps roughly parsed dictionaries into the provided pydantic v2 target schema base model definition.

TARGET SCHEMA (Python Pydantic v2 BaseModel):
{schema}

SOURCE SAMPLES:
{samples}

Logic Requirements:
1. Use a 'coalesce' approach: for each target field, check all possible source keys from the input dictionary samples.
2. Use parsed_dict.get() for optional fields.
3. Infer best data type from string (e.g., "$120.00" should be a float, and "true" should be a boolean). 
4. ALL python code must be encapsulated by the `transform_to_models()` function -- if it's not in that function it will not be run.

Return ONLY the Python code for the function `transform_to_models`. Do not include the Pydantic class in your response.
"""

CODE_EXECUTION = """
from pydantic import BaseModel, Field, ValidationError
from typing import *
import json, re

{schema}

{parser_code}
"""

In [11]:
import json
import operator
from typing import List, TypedDict, Optional, Literal, Dict, Any
from pydantic import BaseModel, Field
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from dq_swirl.utils.log_utils import get_custom_logger 
from typing import Annotated

from __future__ import annotations

from typing import List, Optional
from pydantic import BaseModel, Field
import traceback


logger = get_custom_logger()

class ModelResponseStructure(BaseModel):
    code_string: str = Field(..., description="generated python code")
    entrypoint_class_name: str = Field(
        ..., 
        description="name of entrypoint base model class in the code generated",
    )

class MultiAgentState(TypedDict):
    semantic_id: str
    structure_cluster_id: str
    data_pairs_all: List[Dict[str, Any]]
    data_pairs_structure: List[Dict[str, Any]]
    # Reducers: 'new' replaces 'old' for strings/objects, but we sum 'attempts'
    gold_schema: Annotated[Optional[ModelResponseStructure], lambda old, new: new]
    parser_code: Annotated[Optional[str], lambda old, new: new]
    feedback: Annotated[Optional[str], lambda old, new: new]
    error_type: Annotated[Optional[Literal["SCHEMA_ISSUE", "CODE_ISSUE"]], lambda old, new: new]
    attempts: Annotated[int, operator.add] # Use addition to

async def architect_node(state: MultiAgentState):
    if state.get("gold_schema") and state.get("error_type") != "SCHEMA_ISSUE": 
        return {"attempts": 0}

    logger.info(f"[Architect] Defining Semantic Goal: {state['attempts']}")
    # Send a variety of samples so the architect sees all potential fields
    samples = json.dumps([p['parsed'] for p in state['data_pairs_all'][:100]], indent=2)
    
    prompt = ARCHITECT_PROMPT.format(
        samples=samples,
    )
    buffer = []
    response = await client.chat(
        messages=[
            {
                "role": "user", 
                "content": prompt,
            }
        ],
        api_key_override=API_KEY,
        stream=True,
        temperature=0.0,
        response_format=ModelResponseStructure
    )
    async for chunk in response:
        if chunk.choices and chunk.choices[0].delta.content:
            content = chunk.choices[0].delta.content
            buffer.append(content)

    resp = "".join(buffer)
    resp = ModelResponseStructure(**json.loads(resp))
    resp.code_string = extract_python_code(resp.code_string)
    print(resp.code_string )

    return {
        "gold_schema": resp, 
        "attempts": 1,
        "feedback": None,
        "error_type": None    # CLEAR the error type
    }

async def schema_tester_node(state: MultiAgentState):
    logger.info(f"[Scehma Tester] Validating Functional BaseModel: {state['attempts']}")
    python_base_model_str = state["gold_schema"].code_string
    
    env = {}
    try:
        exec(python_base_model_str, globals(), env)
        cls_name = state["gold_schema"].entrypoint_class_name
        model = env[cls_name]
        
        model.model_rebuild(_types_namespace=env)
        schema = model.model_json_schema()
            
        return {"feedback": "SUCCESS"}
    except Exception as e:
        err_msg = traceback.format_exc()
        logger.exception(e)
        return {"feedback": err_msg, "error_type": "SCHEMA_ISSUE"}

async def coder_node(state: MultiAgentState):
    logger.info(f"[Coder] Parser for Gold Schema: {state['attempts']}")
    samples = json.dumps([rec["parsed"] for rec in state["data_pairs_structure"]], indent=2)
    
    prompt = CODER_PROMPT.format(
        schema=state["gold_schema"].code_string,
        samples=samples
    )
    buffer = []
    response = await client.chat(
        messages=[
            {
                "role": "user", 
                "content": prompt,
            }
        ],
        api_key_override=API_KEY,
        stream=True,
        temperature=0.0,
    )
    async for chunk in response:
        if chunk.choices and chunk.choices[0].delta.content:
            content = chunk.choices[0].delta.content
            print(content, end="", flush=True)
            buffer.append(content)

    resp = "".join(buffer)
    code = extract_python_code(resp)

    return {
        "parser_code": code, 
        "attempts": 1,
        "feedback": None, 
        "error_type": None
    }

async def code_tester_node(state: MultiAgentState):
    logger.info(f"[Code Tester] Stress-testing parser: {state['attempts']}")
    full_code = CODE_EXECUTION.format(
        schema=state["gold_schema"].code_string,
        parser_code=state["parser_code"]
    )
    
    env = {}
    try:
        cls_name = state["gold_schema"].entrypoint_class_name        
        exec(full_code, globals(), env)
        func = env["transform_to_models"]
        model = env[cls_name]
        model.model_rebuild(_types_namespace=env)

        input_data = [pair['parsed'] for pair in state['data_pairs_structure']]
        mapped_batch = func(input_data)
        for mapped_dict in mapped_batch:
            model.model_validate(mapped_dict)
            print(f"Input: {mapped_dict} -- PASSED")
        return {"feedback": "SUCCESS"}
    except Exception as e:
        err_msg = traceback.format_exc()
        try:
            print(f"Input: {mapped_dict} -- FAILED")
        except Exception:
            pass
        return {"feedback": err_msg, "error_type": "CODE_ISSUE"}

async def exporter_node(state: MultiAgentState):
    base_model_name = state["gold_schema"].entrypoint_class_name
    filename = f"parser_{base_model_name}_struct_{state['structure_cluster_id']}.py"
    with open(filename, "w") as f:
        f.write(f"{state['gold_schema'].code_string}\n\n{state['parser_code']}")
    print(f"--> Exported: {filename}")
    return {"feedback": "DONE"}

def schema_router(state: MultiAgentState):
    """Determines if we move to Coder or retry the Architect.
    """
    feedback = state.get("feedback")
    attempts = state.get("attempts", 0)

    if feedback == "SUCCESS":
        return "coder"
    
    # if failed too many times, just stop the process
    if attempts >= 3:
        logger.error(f"Schema failed after {attempts} attempts. Aborting.")
        return "end"
    
    return "architect"

def code_router(state: MultiAgentState):
    """Determines if we export or retry Coder/Architect.
    """
    feedback = state.get("feedback")
    error_type = state.get("error_type")
    attempts = state.get("attempts", 0)

    if feedback == "SUCCESS":
        return "exporter"
    
    if attempts >= 6:
        return "end"
    
    # Specific routing based on where the failure happened
    if error_type == "SCHEMA_ISSUE":
        return "architect" 
    
    # Default to retrying the coder for CODE_ISSUE or unknown errors
    return "coder"



## Define Graph
workflow = StateGraph(MultiAgentState)
workflow.add_node("architect", architect_node) 
workflow.add_node("schema_tester", schema_tester_node) 
workflow.add_node("coder", coder_node)
workflow.add_node("code_tester", code_tester_node)
workflow.add_node("exporter", exporter_node)

workflow.add_edge(START, "architect")
workflow.add_edge("architect", "schema_tester")
workflow.add_conditional_edges(
    "schema_tester", 
    schema_router, 
    {
        "architect": "architect", 
        "coder": "coder", 
        "end": END
    }
)

workflow.add_edge("coder", "code_tester")
workflow.add_conditional_edges(
    "code_tester", 
    code_router, 
    {
        "architect": "architect", 
        "coder": "coder", 
        "exporter": "exporter",
        "end": END
    }
)

workflow.add_edge("exporter", END)

app = workflow.compile(checkpointer=MemorySaver())

  from pydantic.v1.fields import FieldInfo as FieldInfoV1


In [12]:
async def run_data_matrix(all_data: Dict[int, List[Dict]]):
    for sem_id, records in all_data.items():
        struct_groups = {}
        for s in records:
            cid = s['structure_cluster_id']
            struct_groups.setdefault(cid, []).append(s)

        shared_gold_schema = None
        
        for struct_id, pairs in struct_groups.items():
            config = {"configurable": {"thread_id": f"sem_{sem_id}_str_{struct_id}"}}
            
            # Fix the typo 'structure_causter_id' -> 'structure_cluster_id'
            initial_state = {
                "semantic_id": str(sem_id),
                "structure_cluster_id": str(struct_id),
                "data_pairs_all": records,
                "data_pairs_structure": pairs,
                "gold_schema": shared_gold_schema,
                "attempts": 0
            }
            
            final_output = await app.ainvoke(initial_state, config)
            shared_gold_schema = final_output.get('gold_schema')
            
            logger.info(f"--- Finished Cluster {struct_id} ---")

await run_data_matrix(cluster_dict)


[38;5;245m2026-02-02 00:14:46[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:42[0m [Architect] Defining Semantic Goal: 0
[38;5;245m2026-02-02 00:14:50[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:80[0m [Scehma Tester] Validating Functional BaseModel: 1
[38;5;245m2026-02-02 00:14:50[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:99[0m [Coder] Parser for Gold Schema: 1


from typing import List, Optional
from pydantic import BaseModel, Field

class Order(BaseModel):
    """Gold Standard representation of an e‑commerce order."""
    order: str = Field(..., description="Unique identifier for the order, kept as a string.")
    buyer: str = Field(..., description="Name of the customer who placed the order.")
    location: Optional[str] = Field(None, description="City and state of the shipping address in the format 'City, ST'.")
    total: float = Field(..., description="Monetary total of the order expressed in US dollars.")
    items: List[str] = Field(..., description="List of product names purchased in the order. Items are split on commas.")
    discount: bool = Field(False, description="Flag indicating whether a discount was applied to the order.")
```python
def transform_to_models(parsed_dict: list[dict]) -> list[dict]:
    """
    Convert a list of loosely‑structured order dictionaries into a list of dictionaries
    that conform to the target Pydanti

[38;5;245m2026-02-02 00:14:52[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:135[0m [Code Tester] Stress-testing parser: 2


Input: {'order': '1014', 'buyer': 'Rachel Kim', 'location': 'Seattle, WA', 'total': 89.5, 'items': ['coffee maker'], 'discount': False} -- PASSED
Input: {'order': '1017', 'buyer': 'Griffin Arora', 'location': 'Columbia, SC', 'total': 512.0, 'items': ['monitor', 'desk lamp'], 'discount': True} -- PASSED
Input: {'order': '1005', 'buyer': 'Raj Patel', 'location': None, 'total': 1200.5, 'items': ['monitor', 'stand', 'cable'], 'discount': False} -- PASSED
Input: {'order': '1016', 'buyer': 'Jake Myers', 'location': None, 'total': 1512.0, 'items': ['monitor'], 'discount': False} -- PASSED
Input: {'order': '1001', 'buyer': 'John Davis', 'location': 'Columbus, OH', 'total': 742.1, 'items': ['laptop', 'hdmi cable'], 'discount': False} -- PASSED
Input: {'order': '1004', 'buyer': 'AMANDA SMITH', 'location': 'Seattle, WA', 'total': 50.0, 'items': ['desk lamp'], 'discount': False} -- PASSED
Input: {'order': '1006', 'buyer': 'Elena Rossi', 'location': 'Miami, FL', 'total': 89.99, 'items': ['keyboard'

[38;5;245m2026-02-02 00:14:52[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3536762348.py:26[0m --- Finished Cluster 1 ---
[38;5;245m2026-02-02 00:14:52[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:42[0m [Architect] Defining Semantic Goal: 0
[38;5;245m2026-02-02 00:14:56[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:80[0m [Scehma Tester] Validating Functional BaseModel: 1
[38;5;245m2026-02-02 00:14:56[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:99[0m [Coder] Parser for Gold Schema: 1


from typing import Optional
from pydantic import BaseModel, Field

class UserBase(BaseModel):
    """Gold standard user entity."""
    id: str = Field(..., description="Unique identifier for the user, stored as a string.")
    name: Optional[str] = Field(None, description="Full name of the user.")
    email: Optional[str] = Field(None, description="User's email address.")
    is_active: bool = Field(False, description="Indicates whether the user account is active.")
    created_at: Optional[str] = Field(None, description="Timestamp when the user was created, ISO 8601 format if available.")
    role: Optional[str] = Field(None, description="Role assigned to the user, e.g., admin, editor, viewer.")
    last_login_ip: Optional[str] = Field(None, description="IP address from which the user last logged in.")
```python
def transform_to_models(parsed_dict: list[dict]) -> list[dict]:
    """
    Convert loosely‑parsed user dicts into the strict shape required by the
    ``UserBase`` pydantic m

[38;5;245m2026-02-02 00:14:58[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:135[0m [Code Tester] Stress-testing parser: 2
[38;5;245m2026-02-02 00:14:58[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3536762348.py:26[0m --- Finished Cluster 0 ---
[38;5;245m2026-02-02 00:14:58[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:42[0m [Architect] Defining Semantic Goal: 0


Input: {'id': '4', 'name': 'Chen Wei', 'email': 'chen.wei@example.com', 'is_active': True, 'created_at': None, 'role': None, 'last_login_ip': None} -- PASSED
Input: {'id': 'usr_002', 'name': 'Maria Lopez', 'email': 'maria.lopez@example.com', 'is_active': False, 'created_at': '2025-12-18T16:47:10Z', 'role': 'editor', 'last_login_ip': '192.168.1.42'} -- PASSED
Input: {'id': 'usr_001', 'name': 'Alex Johnson', 'email': None, 'is_active': True, 'created_at': '2025-11-02T09:14:23Z', 'role': 'admin', 'last_login_ip': None} -- PASSED
Input: {'id': 'usr_005', 'name': 'Broken Record', 'email': 'broken@example.com', 'is_active': False, 'created_at': None, 'role': None, 'last_login_ip': None} -- PASSED
Input: {'id': 'usr_003', 'name': None, 'email': 'samir.patel@example.com', 'is_active': False, 'created_at': '08/05/2024', 'role': 'viewer', 'last_login_ip': None} -- PASSED
--> Exported: parser_UserBase_struct_0.py


[38;5;245m2026-02-02 00:15:01[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:80[0m [Scehma Tester] Validating Functional BaseModel: 1
[38;5;245m2026-02-02 00:15:01[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:99[0m [Coder] Parser for Gold Schema: 1


from typing import Optional
from pydantic import BaseModel, Field

class LogEntry(BaseModel):
    """Gold Standard model representing a normalized log or user event."""

    user_id: str = Field(
        ..., 
        description="Unique identifier for the user. Merged from 'user_id' and 'id' fields in source data."
    )
    user_name: Optional[str] = Field(
        None,
        description="Human‑readable name of the user when available."
    )
    level: Optional[str] = Field(
        None,
        description="Log level or severity indicator (e.g., INFO, WARN, ERROR)."
    )
    unparsed: Optional[str] = Field(
        None,
        description="Original unparsed log line when provided."
    )
```python
def transform_to_models(parsed_dict: list[dict]) -> list[dict]:
    """
    Transform a list of loosely‑parsed dictionaries into a list of dictionaries
    that match the ``LogEntry`` schema.

    The function:
    * Coalesces possible source keys for each target field.
    * Uses 

[38;5;245m2026-02-02 00:15:03[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:135[0m [Code Tester] Stress-testing parser: 2


Input: {'user_id': 123, 'user_name': None, 'level': None, 'unparsed': '2026-01-30 14:22:01 INFO User login successful'} -- FAILED


[38;5;245m2026-02-02 00:15:03[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:99[0m [Coder] Parser for Gold Schema: 2


```python
def transform_to_models(parsed_dict: list[dict]) -> list[dict]:
    """
    Transform a list of loosely‑parsed dictionaries into a list of dictionaries that
    conform to the ``LogEntry`` schema.

    The function:
    * Coalesces possible source keys for each target field.
    * Infers a more appropriate Python type from string values (int, float, bool).
    * Returns plain ``dict`` objects – suitable for feeding into the Pydantic model.
    """

    def _coalesce(source: dict, candidates: list[str]) -> object | None:
        """Return the first non‑null value found for the given candidate keys."""
        for key in candidates:
            if key in source and source[key] is not None:
                return source[key]
        return None

    def _infer_type(value: object) -> object:
        """Best‑effort conversion of a string to int, float, or bool."""
        if not isinstance(value, str):
            return value
        v = value.strip()
        # Boolean detection


[38;5;245m2026-02-02 00:15:05[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:135[0m [Code Tester] Stress-testing parser: 3
[38;5;245m2026-02-02 00:15:05[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:99[0m [Coder] Parser for Gold Schema: 3


Input: {'user_id': 123, 'user_name': None, 'level': None, 'unparsed': '2026-01-30 14:22:01 INFO User login successful'} -- FAILED
```python
def transform_to_models(parsed_dict: list[dict]) -> list[dict]:
    """
    Transform a list of loosely‑parsed dictionaries into a list of dictionaries that
    conform to the ``LogEntry`` schema.

    The function follows a *coalesce* strategy:
        • ``user_id``  ← first non‑null value from ["user_id", "id"]
        • ``user_name``← first non‑null value from ["user_name", "name", "username"]
        • ``level``    ← first non‑null value from ["level", "log_level", "severity"]
        • ``unparsed`` ← first non‑null value from ["_unparsed", "raw", "original"]

    It also attempts to infer the most appropriate Python type for string values
    (int, float, bool) while preserving strings that cannot be converted.
    """

    def _infer_type(val):
        """Best‑effort conversion of a string to int, float, bool or leave as‑is."""
        if not

[38;5;245m2026-02-02 00:15:07[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:135[0m [Code Tester] Stress-testing parser: 4


Input: {'user_id': '123', 'user_name': None, 'level': None, 'unparsed': '2026-01-30 14:22:01 INFO User login successful'} -- PASSED
--> Exported: parser_LogEntry_struct_3.py


[38;5;245m2026-02-02 00:15:07[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3536762348.py:26[0m --- Finished Cluster 3 ---
[38;5;245m2026-02-02 00:15:07[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:80[0m [Scehma Tester] Validating Functional BaseModel: 0
[38;5;245m2026-02-02 00:15:07[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:99[0m [Coder] Parser for Gold Schema: 0


```python
def transform_to_models(parsed_dict: list[dict]) -> list[dict]:
    """
    Transform a list of loosely‑parsed dictionaries into a list of dictionaries
    that match the ``LogEntry`` target schema.

    The function:
    * Coalesces possible source keys for each target field.
    * Attempts to infer a more specific Python type from string values
      (int, float, bool) while leaving other values untouched.
    * Returns a list of plain ``dict`` objects ready to be fed to the
      ``LogEntry`` Pydantic model.
    """

    def _coerce(value):
        """Best‑effort conversion of a string to int, float or bool."""
        if not isinstance(value, str):
            return value
        v = value.strip()
        # Boolean detection
        low = v.lower()
        if low in {"true", "false"}:
            return low == "true"
        # Integer detection
        if v.isdigit() or (v.startswith("-") and v[1:].isdigit()):
            try:
                return int(v)
            ex

[38;5;245m2026-02-02 00:15:09[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:135[0m [Code Tester] Stress-testing parser: 1
[38;5;245m2026-02-02 00:15:09[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:99[0m [Coder] Parser for Gold Schema: 1


Input: {'user_id': 1, 'user_name': 'Sam', 'level': 'INFO', 'unparsed': None} -- FAILED
```python
def transform_to_models(parsed_dict: list[dict]) -> list[dict]:
    """
    Transform a list of loosely‑parsed dictionaries into a list of dictionaries that
    conform to the ``LogEntry`` target schema.

    The function follows a *coalesce* strategy:
      * ``user_id``  ← first non‑null of ``'user_id'`` then ``'id'`` (required)
      * ``user_name``← first non‑null of ``'user_name'`` then ``'user'``
      * ``level``    ← ``'level'`` (optional)
      * ``unparsed`` ← first non‑null of ``'unparsed'`` then ``'raw'``

    It also attempts to infer the most appropriate Python type for string values:
      * integer strings → ``int``
      * floating‑point strings → ``float``
      * case‑insensitive ``'true'`` / ``'false'`` → ``bool``
      * otherwise the original string is kept.

    Parameters
    ----------
    parsed_dict: list[dict]
        List of dictionaries produced by a prior, per

[38;5;245m2026-02-02 00:15:12[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:135[0m [Code Tester] Stress-testing parser: 2


Input: {'user_id': 1, 'user_name': 'Sam', 'level': 'INFO', 'unparsed': None} -- FAILED


[38;5;245m2026-02-02 00:15:12[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:99[0m [Coder] Parser for Gold Schema: 2


```python
def transform_to_models(parsed_dict: list[dict]) -> list[dict]:
    """
    Transform a list of loosely‑parsed dictionaries into a list of dictionaries
    that match the ``LogEntry`` schema.

    The function:
    * Coalesces possible source keys for each target field.
    * Attempts to infer a more specific Python type from string values
      (int, float, bool) while leaving other values untouched.
    * Returns plain ``dict`` objects – suitable for feeding into the
      ``LogEntry`` Pydantic model elsewhere.
    """

    def _coerce(value):
        """Best‑effort conversion of a string to int, float or bool."""
        if not isinstance(value, str):
            return value
        v = value.strip()
        # Boolean detection
        low = v.lower()
        if low in {"true", "false"}:
            return low == "true"
        # Integer detection
        if v.isdigit() or (v.startswith("-") and v[1:].isdigit()):
            try:
                return int(v)
            

[38;5;245m2026-02-02 00:15:15[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:135[0m [Code Tester] Stress-testing parser: 3


Input: {'user_id': 1, 'user_name': 'Sam', 'level': 'INFO', 'unparsed': None} -- FAILED


[38;5;245m2026-02-02 00:15:15[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:99[0m [Coder] Parser for Gold Schema: 3


```python
def transform_to_models(parsed_dict: list[dict]) -> list[dict]:
    """
    Transform a list of loosely‑parsed dictionaries into a list of dictionaries that
    conform to the ``LogEntry`` schema.

    The function:
    * Coalesces possible source keys for each target field.
    * Uses ``dict.get`` for optional fields.
    * Infers a more appropriate Python type from string values
      (int → float → bool → original string).
    """

    def _coalesce(keys: list[str], src: dict):
        """Return the first non‑None value for any of the supplied keys."""
        for k in keys:
            if (v := src.get(k)) is not None:
                return v
        return None

    def _infer_type(val):
        """Best‑effort conversion of a string to int, float, or bool."""
        if isinstance(val, str):
            low = val.strip().lower()
            # Boolean
            if low in {"true", "false"}:
                return low == "true"
            # Integer
            try:
    

[38;5;245m2026-02-02 00:15:17[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:135[0m [Code Tester] Stress-testing parser: 4


Input: {'user_id': 1, 'user_name': 'Sam', 'level': 'INFO'} -- FAILED


[38;5;245m2026-02-02 00:15:17[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:99[0m [Coder] Parser for Gold Schema: 4


```python
def transform_to_models(parsed_dict: list[dict]) -> list[dict]:
    """
    Transform a list of loosely‑parsed dictionaries into a list of dictionaries
    that match the ``LogEntry`` target schema.

    The function:
    * Coalesces possible source keys for each target field.
    * Attempts to infer a more specific Python type from string values
      (int, float, bool) while leaving other values untouched.
    * Returns a list of plain ``dict`` objects ready for Pydantic validation.
    """

    def _coerce(value):
        """Best‑effort conversion of a string to int, float or bool."""
        if not isinstance(value, str):
            return value
        v = value.strip()
        # Boolean detection
        low = v.lower()
        if low in {"true", "false"}:
            return low == "true"
        # Integer detection
        if v.isdigit() or (v.startswith("-") and v[1:].isdigit()):
            try:
                return int(v)
            except ValueError:
          

[38;5;245m2026-02-02 00:15:19[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:135[0m [Code Tester] Stress-testing parser: 5


Input: {'user_id': 1, 'user_name': 'Sam', 'level': 'INFO', 'unparsed': None} -- FAILED


[38;5;245m2026-02-02 00:15:19[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:99[0m [Coder] Parser for Gold Schema: 5


```python
def transform_to_models(parsed_dict: list[dict]) -> list[dict]:
    """
    Transform a list of loosely‑parsed dictionaries into a list of dictionaries that
    match the ``LogEntry`` target schema.

    The function:
    * Coalesces possible source keys for each target field.
    * Uses ``dict.get`` for optional look‑ups.
    * Infers a more appropriate Python type for string values
      (bool, int, float) while leaving non‑string values untouched.
    * Returns a list of plain ``dict`` objects ready to be fed to the
      ``LogEntry`` Pydantic model.
    """

    def _infer_type(value):
        """Best‑effort conversion of a string to bool, int or float."""
        if not isinstance(value, str):
            return value

        low = value.strip().lower()
        # Boolean detection
        if low == "true":
            return True
        if low == "false":
            return False

        # Integer detection (handles optional leading sign)
        if low.lstrip("-+").i

[38;5;245m2026-02-02 00:15:22[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:135[0m [Code Tester] Stress-testing parser: 6
[38;5;245m2026-02-02 00:15:22[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3536762348.py:26[0m --- Finished Cluster 2 ---
[38;5;245m2026-02-02 00:15:22[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:42[0m [Architect] Defining Semantic Goal: 0


Input: {'user_id': '1', 'user_name': 'Sam', 'level': 'INFO', 'unparsed': None} -- PASSED
--> Exported: parser_LogEntry_struct_2.py


[38;5;245m2026-02-02 00:15:30[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:80[0m [Scehma Tester] Validating Functional BaseModel: 1
[38;5;245m2026-02-02 00:15:30[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:99[0m [Coder] Parser for Gold Schema: 1


from typing import List, Dict, Optional
from pydantic import BaseModel, Field

class LogRecord(BaseModel):
    timestamp: Optional[str] = Field(
        default=None,
        description="ISO 8601 timestamp of the event, e.g., '2026-01-30T14:22:01Z'."
    )
    level: Optional[str] = Field(
        default=None,
        description="Log severity level such as 'INFO', 'DEBUG', etc."
    )
    user: Optional[str] = Field(
        default=None,
        description="Identifier of the user associated with the event."
    )
    action: Optional[str] = Field(
        default=None,
        description="Action performed by the user, e.g., 'login'."
    )
    success: bool = Field(
        default=False,
        description="Indicates whether the action was successful."
    )
    cpu_usage: Optional[float] = Field(
        default=None,
        description="CPU usage percentage, represented as a float. Commas are ignored."
    )
    memory: Optional[str] = Field(
        default=None,
        de

[38;5;245m2026-02-02 00:15:32[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:135[0m [Code Tester] Stress-testing parser: 2
[38;5;245m2026-02-02 00:15:32[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3536762348.py:26[0m --- Finished Cluster 2 ---
[38;5;245m2026-02-02 00:15:32[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:80[0m [Scehma Tester] Validating Functional BaseModel: 0
[38;5;245m2026-02-02 00:15:32[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:99[0m [Coder] Parser for Gold Schema: 0


Input: {'timestamp': '2026-01-30T14:22:01Z', 'level': 'INFO', 'user': 'alice', 'action': 'login', 'memory': None, 'service': None, 'status': None, 'unparsed': None, 'success': True, 'cpu_usage': None, 'order_id': None} -- PASSED
Input: {'timestamp': None, 'level': 'INFO', 'user': None, 'action': None, 'memory': '512MB', 'service': None, 'status': None, 'unparsed': None, 'success': False, 'cpu_usage': 1234.56, 'order_id': None} -- PASSED
--> Exported: parser_LogRecord_struct_2.py
```python
import re
from typing import List, Dict, Any, Optional

def transform_to_models(parsed_dict: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Transform a list of loosely‑parsed dictionaries into a list of dictionaries that
    conform to the LogRecord schema.

    The function follows a *coalesce* strategy: for each target field it checks a
    collection of possible source keys, then falls back to regex extraction from the
    raw `_unparsed` string when available.
    """

    # ----------

[38;5;245m2026-02-02 00:15:38[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3328042808.py:135[0m [Code Tester] Stress-testing parser: 1


Input: {'timestamp': '2026-01-30 14:22:01', 'level': 'INFO', 'user': 'login', 'action': 'login', 'success': True, 'cpu_usage': None, 'memory': None, 'service': None, 'order_id': None, 'status': None, 'unparsed': '2026-01-30 14:22:01 INFO User login successful'} -- PASSED
Input: {'timestamp': '2026-01-31 17:11:22', 'level': 'INFO', 'user': None, 'action': None, 'success': False, 'cpu_usage': None, 'memory': None, 'service': None, 'order_id': None, 'status': None, 'unparsed': '[2026-01-31 17:11:22 +0000] [7] [INFO] 127.0.0.1:56718 - - [31/Jan/2026:17:11:22 +0000] "GET /health 1.1" 200 16 "-" "curl/8.14.1"'} -- PASSED
Input: {'timestamp': '2026-01-31 17:11:00', 'level': 'DEBUG', 'user': None, 'action': 'cron job', 'success': False, 'cpu_usage': None, 'memory': None, 'service': None, 'order_id': None, 'status': None, 'unparsed': '2026-01-31 17:11:00 swirl [DEBUG] saq_worker.py:28 Running cron job health check'} -- PASSED
Input: {'timestamp': None, 'level': 'INFO', 'user': None, 'action': N

[38;5;245m2026-02-02 00:15:38[0m [36;20mswirl[0m [[32;20mINFO[0m] [38;5;245m3536762348.py:26[0m --- Finished Cluster 3 ---
