# Step 1: Parsing from schema.org ttl file

In [1]:
import rdflib
import json
import logging

# Configure basic logging
logging.basicConfig(level=logging.INFO)

# Define the input schema file path (assuming schema.txt is in the same directory)
# In a real application, this path would be configurable.
SCHEMA_FILE = "schema.txt"
SCHEMA_FORMAT = "turtle" # Explicitly state the format of schema.txt

# Define output path for JSON-LD (optional, for inspection)
JSON_LD_OUTPUT_FILE = "schema_output.jsonld"

def parse_schema_to_graph(file_path: str, file_format: str) -> rdflib.Graph | None:
    """
    Parses an RDF schema file into an rdflib Graph object.

    Args:
        file_path: Path to the schema file.
        file_format: The format of the RDF file (e.g., 'turtle', 'xml', 'json-ld').

    Returns:
        An rdflib.Graph object or None if parsing fails.
    """
    g = rdflib.Graph()
    try:
        logging.info(f"Attempting to parse schema file: {file_path} (format: {file_format})")
        g.parse(source=file_path, format=file_format)
        logging.info(f"Successfully parsed {len(g)} triples.")
        return g
    except FileNotFoundError:
        logging.error(f"Error: Schema file not found at {file_path}")
        return None
    except Exception as e:
        logging.error(f"Error parsing schema file '{file_path}' with format '{file_format}': {e}")
        return None

def serialize_graph_to_jsonld(graph: rdflib.Graph, output_file: str) -> bool:
    """
    Serializes an rdflib Graph object to a JSON-LD file.

    Args:
        graph: The rdflib.Graph object.
        output_file: Path to save the JSON-LD output file.

    Returns:
        True if serialization was successful, False otherwise.
    """
    if graph is None:
        logging.error("Cannot serialize: Graph object is None.")
        return False
    try:
        logging.info(f"Attempting to serialize graph to JSON-LD: {output_file}")
        # Common context for schema.org can help make JSON-LD more readable
        # Using a standard context URL
        context = {
            "@vocab": "https://schema.org/"
            # Add other prefixes if needed, though @vocab covers schema.org terms
        }
        # Serialize to JSON-LD format
        # Note: rdflib's json-ld serialization might produce a list of objects
        json_ld_data = graph.serialize(format='json-ld', context=context, indent=2)

        with open(output_file, 'w', encoding='utf-8') as f:
            f.write(json_ld_data)
        logging.info(f"Successfully serialized graph to {output_file}")
        return True
    except Exception as e:
        logging.error(f"Error serializing graph to JSON-LD: {e}")
        return False


### Execute Step 1:

In [2]:
# Execute Step 1:
# --- Main execution block for this step ---
if __name__ == "__main__":
    # Step 1: Parse the schema
    schema_graph = parse_schema_to_graph(SCHEMA_FILE, SCHEMA_FORMAT)

    if schema_graph:
        # Step 2 (as requested): Serialize to JSON-LD for inspection
        serialize_graph_to_jsonld(schema_graph, JSON_LD_OUTPUT_FILE)

        # Next steps (to be implemented in subsequent chunks) would involve
        # analyzing 'schema_graph' to extract class/property info
        # and then generating Pydantic models.
        logging.info("Graph parsed. Ready for Step 2: Schema Analysis (in next code chunk).")
    else:
        logging.error("Failed to parse schema graph. Cannot proceed.")

INFO:root:Attempting to parse schema file: schema.txt (format: turtle)
INFO:root:Successfully parsed 8904 triples.
INFO:root:Attempting to serialize graph to JSON-LD: schema_output.jsonld
INFO:root:Successfully serialized graph to schema_output.jsonld
INFO:root:Graph parsed. Ready for Step 2: Schema Analysis (in next code chunk).


# Step 2: 

In [3]:
import rdflib
from rdflib.namespace import RDF, RDFS, OWL, XSD # Common RDF/RDFS/OWL namespaces
from typing import List, Set, Dict, Optional, NamedTuple
import logging

# Assuming the 'parse_schema_to_graph' function from Chunk 1 exists
# and 'schema_graph' is the rdflib.Graph object returned by it.

# Define the schema.org namespace
SCHEMA = rdflib.Namespace("https://schema.org/")

In [4]:
# Define simple structures to hold extracted info
class PropertyInfo(NamedTuple):
    uri: rdflib.URIRef
    label: Optional[str]
    comment: Optional[str]
    domains: Set[rdflib.URIRef] # Classes where this property applies
    ranges: Set[rdflib.URIRef] # Expected types for this property's value

class ClassInfo(NamedTuple):
    uri: rdflib.URIRef
    label: Optional[str]
    comment: Optional[str]
    superclasses: Set[rdflib.URIRef] # Direct parent classes
    properties: Set[rdflib.URIRef] # Properties associated via domainIncludes


In [5]:
def get_label(graph: rdflib.Graph, subject: rdflib.URIRef) -> Optional[str]:
    """Gets the rdfs:label for a subject."""
    label = graph.value(subject=subject, predicate=RDFS.label)
    return str(label) if label else None

def get_comment(graph: rdflib.Graph, subject: rdflib.URIRef) -> Optional[str]:
    """Gets the rdfs:comment for a subject."""
    comment = graph.value(subject=subject, predicate=RDFS.comment)
    return str(comment) if comment else None

def find_schema_classes(graph: rdflib.Graph) -> Set[rdflib.URIRef]:
    """Finds all subjects defined as RDFS Classes within the schema.org namespace."""
    classes = set()
    # Find things explicitly declared as rdfs:Class or owl:Class
    for class_type in [RDFS.Class, OWL.Class]:
        for subject in graph.subjects(predicate=RDF.type, object=class_type):
            # Filter to include only those within the schema.org namespace
            if str(subject).startswith(str(SCHEMA)):
                 # Also check if it's a schema.org DataType, treat those differently later
                 is_datatype = (subject, RDF.type, SCHEMA.DataType) in graph
                 if not is_datatype:
                    classes.add(subject)
    # Schema.org also defines types like schema:Person without explicitly stating
    # rdf:type rdfs:Class in all serializations, but implies they are classes
    # by using them in domain/range or subClassOf. A more robust approach
    # might involve looking for usage in rdfs:subClassOf, :domainIncludes, :rangeIncludes
    # For now, primarily rely on explicit declaration if present.
    logging.info(f"Found {len(classes)} potential schema.org classes.")
    return classes

def find_schema_properties(graph: rdflib.Graph) -> Set[rdflib.URIRef]:
    """Finds all subjects defined as RDF Properties within the schema.org namespace."""
    properties = set()
    for subject in graph.subjects(predicate=RDF.type, object=RDF.Property):
         if str(subject).startswith(str(SCHEMA)):
             properties.add(subject)
    logging.info(f"Found {len(properties)} potential schema.org properties.")
    return properties

def get_property_details(graph: rdflib.Graph, prop_uri: rdflib.URIRef) -> PropertyInfo:
    """Extracts details for a given property URI."""
    label = get_label(graph, prop_uri)
    comment = get_comment(graph, prop_uri)
    domains = set(graph.objects(subject=prop_uri, predicate=SCHEMA.domainIncludes))
    ranges = set(graph.objects(subject=prop_uri, predicate=SCHEMA.rangeIncludes))
    return PropertyInfo(uri=prop_uri, label=label, comment=comment, domains=domains, ranges=ranges)

def get_class_details(graph: rdflib.Graph, class_uri: rdflib.URIRef, all_properties: Dict[rdflib.URIRef, PropertyInfo]) -> ClassInfo:
    """Extracts details for a given class URI."""
    label = get_label(graph, class_uri)
    comment = get_comment(graph, class_uri)
    superclasses = set(graph.objects(subject=class_uri, predicate=RDFS.subClassOf))
    # Find properties where this class is listed in the domain
    associated_properties = set()
    for prop_uri, prop_info in all_properties.items():
        if class_uri in prop_info.domains:
            associated_properties.add(prop_uri)

    # Also consider properties inherited from superclasses (requires recursive lookup - omitted for v0.1 simplicity)
    # For v0.1, we primarily care about properties directly associated via domainIncludes

    return ClassInfo(uri=class_uri, label=label, comment=comment, superclasses=superclasses, properties=associated_properties)

def analyze_schema_graph(graph: rdflib.Graph) -> Dict[str, Dict]:
    """
    Analyzes the RDF graph to extract structured info about classes and properties.

    Returns:
        A dictionary containing 'classes' and 'properties' information.
    """
    if not graph:
        logging.error("Analysis failed: Graph is empty or None.")
        return {"classes": {}, "properties": {}}

    schema_classes_uris = find_schema_classes(graph)
    schema_property_uris = find_schema_properties(graph)

    properties_info = {}
    for prop_uri in schema_property_uris:
        properties_info[prop_uri] = get_property_details(graph, prop_uri)

    classes_info = {}
    for class_uri in schema_classes_uris:
        # Skip RDFS/OWL base classes if they somehow got included
        if class_uri in [RDFS.Resource, OWL.Thing, RDFS.Class]:
             continue
        classes_info[class_uri] = get_class_details(graph, class_uri, properties_info)

    logging.info(f"Analyzed {len(classes_info)} classes and {len(properties_info)} properties.")
    return {"classes": classes_info, "properties": properties_info}


### Execute Step 2:

In [6]:
# --- Main execution block demonstrating analysis ---
if __name__ == "__main__":
    # Assume schema_graph is loaded from Step 1 (re-parse for standalone demo)
    schema_graph = parse_schema_to_graph(SCHEMA_FILE, SCHEMA_FORMAT)

    if schema_graph:
        # Step 2: Analyze the graph
        analyzed_schema = analyze_schema_graph(schema_graph)

        # Example: Print info for schema:Person and schema:address
        person_uri = SCHEMA.Person
        address_prop_uri = SCHEMA.address

        if person_uri in analyzed_schema["classes"]:
            print("\n--- Analysis for schema:Person ---")
            print(analyzed_schema["classes"][person_uri])
            print("-" * 30)

        if address_prop_uri in analyzed_schema["properties"]:
            print("\n--- Analysis for schema:address ---")
            print(analyzed_schema["properties"][address_prop_uri])
            print("-" * 30)

        # Output confirms we have structured data ready for mapping rules (Step 3)
        # and code generation (Step 4) in the next chunks.
        logging.info("Schema analysis complete. Ready for Step 3: Mapping Logic Definition.")
    else:
        logging.error("Failed to parse schema graph for analysis.")


INFO:root:Attempting to parse schema file: schema.txt (format: turtle)
INFO:root:Successfully parsed 8904 triples.
INFO:root:Found 628 potential schema.org classes.
INFO:root:Found 921 potential schema.org properties.
INFO:root:Analyzed 628 classes and 921 properties.
INFO:root:Schema analysis complete. Ready for Step 3: Mapping Logic Definition.



--- Analysis for schema:Person ---
ClassInfo(uri=rdflib.term.URIRef('https://schema.org/Person'), label='Person', comment='A person (alive, dead, undead, or fictional).', superclasses={rdflib.term.URIRef('https://schema.org/Thing')}, properties={rdflib.term.URIRef('https://schema.org/gender'), rdflib.term.URIRef('https://schema.org/email'), rdflib.term.URIRef('https://schema.org/brand'), rdflib.term.URIRef('https://schema.org/deathPlace'), rdflib.term.URIRef('https://schema.org/weight'), rdflib.term.URIRef('https://schema.org/workLocation'), rdflib.term.URIRef('https://schema.org/parent'), rdflib.term.URIRef('https://schema.org/makesOffer'), rdflib.term.URIRef('https://schema.org/hasOfferCatalog'), rdflib.term.URIRef('https://schema.org/knows'), rdflib.term.URIRef('https://schema.org/duns'), rdflib.term.URIRef('https://schema.org/birthPlace'), rdflib.term.URIRef('https://schema.org/birthDate'), rdflib.term.URIRef('https://schema.org/spouse'), rdflib.term.URIRef('https://schema.org/add

# Step 3:

In [11]:
import rdflib
from rdflib.namespace import RDF, RDFS, OWL, XSD
from typing import List, Set, Dict, Optional, NamedTuple, Union, ForwardRef, Any
from pydantic import BaseModel, Field, EmailStr, AnyUrl
from datetime import date, datetime, time
import re
import keyword
import rdflib
from rdflib.namespace import RDF, RDFS, OWL, XSD
from typing import List, Set, Dict, Optional, NamedTuple, Union, ForwardRef, Any, cast
# Added imports for richer types
from pydantic import BaseModel, Field, EmailStr, AnyUrl, constr, conint, condecimal, field_validator, model_validator
from datetime import date, datetime, time, timedelta
import isodate # Library to parse ISO 8601 durations
import decimal
import keyword
import logging
import re
import textwrap

In [12]:
class Quantity(BaseModel):
    """
    Base model for quantitative values based on schema.org/Quantity.
    Actual value and unit are often in subclasses or specific properties.
    This primarily serves as a conceptual base.
    """
    model_config = {'extra': 'allow'} # Allow extra fields as Quantity is generic

class Distance(Quantity):
    """
    Represents a distance based on schema.org/Distance.
    Uses value and unit representation common in QuantitativeValue.
    """
    # Based on properties commonly used with QuantitativeValue for distance
    value: Optional[float] = Field(None, description="The numerical value of the distance.")
    unitCode: Optional[str] = Field(None, description="UN/CEFACT Common Code (3 characters) or URL for the unit of measurement. E.g., 'MTR' for meter, 'KM' for kilometer, 'FT' for foot, 'INH' for inch.")
    unitText: Optional[str] = Field(None, description="A string indicating the unit of measurement. Useful if unitCode is not applicable or needs clarification. E.g., 'meters', 'miles'.")

    model_config = {'extra': 'forbid'}

    # Add validation if needed, e.g., check unitCode format

class Duration(Quantity):
    """
    Represents a duration based on schema.org/Duration.
    Stores duration as datetime.timedelta, parsed from ISO 8601 duration format.
    """
    # Pydantic doesn't have native ISO 8601 duration parsing, use validator
    value_iso8601: Optional[str] = Field(None, alias="iso8601Duration", description="Duration in ISO 8601 format (e.g., P1Y2M3DT4H5M6S).")
    value_timedelta: Optional[timedelta] = Field(None, exclude=True, description="Parsed timedelta value (internal).") # Exclude from standard model dump

    model_config = {'extra': 'forbid', 'populate_by_name': True} # Allow using alias on input

    @model_validator(mode='before')
    @classmethod
    def parse_duration(cls, data: Any) -> Any:
        if isinstance(data, dict):
            iso_duration_str = data.get("value_iso8601") or data.get("iso8601Duration")
            if iso_duration_str and isinstance(iso_duration_str, str):
                try:
                    # Use isodate library to parse ISO 8601 duration
                    td = isodate.parse_duration(iso_duration_str)
                    data['value_timedelta'] = td
                    # Keep original string too
                    data['value_iso8601'] = iso_duration_str
                except (isodate.ISO8601Error, ValueError) as e:
                    # Or raise validation error depending on strictness needed
                    logging.warning(f"Could not parse ISO 8601 duration '{iso_duration_str}': {e}")
                    data['value_timedelta'] = None # Set internal value to None on error
            # If timedelta is provided directly
            elif data.get('value_timedelta') and isinstance(data.get('value_timedelta'), timedelta):
                 # Optionally generate ISO string if needed, though complex
                 pass
        elif isinstance(data, str):
             # Allow direct initialization from ISO string
             try:
                 td = isodate.parse_duration(data)
                 return {'value_iso8601': data, 'value_timedelta': td}
             except (isodate.ISO8601Error, ValueError) as e:
                 logging.warning(f"Could not parse ISO 8601 duration string '{data}': {e}")
                 return {'value_iso8601': data, 'value_timedelta': None} # Return original string, mark as failed parse

        return data # Return dict for Pydantic processing

    def __str__(self) -> str:
        """Return ISO 8601 string representation."""
        if self.value_timedelta is not None:
             try:
                 # isodate can also format, but requires careful handling of years/months
                 # For simplicity, return original string if present, else standard timedelta str
                 return self.value_iso8601 or str(self.value_timedelta)
             except Exception:
                 return str(self.value_timedelta) # Fallback
        return self.value_iso8601 or "Invalid Duration"


class DefinedTerm(BaseModel):
    """
    Represents a term from a defined set based on schema.org/DefinedTerm.
    """
    # Core properties often associated with DefinedTerm
    termCode: Optional[str] = Field(None, description="A code that identifies this DefinedTerm within a DefinedTermSet.")
    name: Optional[str] = Field(None, description="The name of the item.")
    description: Optional[str] = Field(None, description="A description of the item.")
    # Allow referencing the set it belongs to, if known
    inDefinedTermSet: Optional[AnyUrl] = Field(None, description="A DefinedTermSet Organization or DataCatalog that contains this term.")

    model_config = {'extra': 'allow'} # Allow potential other properties from schema.org or extensions


In [13]:
# --- Mapping Configuration ---
# Map schema.org basic types to Python/Pydantic types
# This needs careful expansion based on VC-Zero availability and desired types
TYPE_MAP = {
    # Schema.org Types
    SCHEMA.Text: "str",
    SCHEMA.URL: "pydantic.AnyUrl",
    SCHEMA.Date: "datetime.date",
    SCHEMA.DateTime: "datetime.datetime",
    SCHEMA.Time: "datetime.time",
    SCHEMA.Number: "float",
    SCHEMA.Float: "float",
    SCHEMA.Integer: "int",
    SCHEMA.Boolean: "bool",
    SCHEMA.Quantity: "Quantity", # Map base Quantity
    SCHEMA.Distance: "Distance", # Map specific Quantity subclass
    SCHEMA.Duration: "Duration", # Map specific Quantity subclass
    SCHEMA.Mass: "Quantity",      # Map other Quantities to base for now
    SCHEMA.Energy: "Quantity",     # Map other Quantities to base for now
    SCHEMA.CssSelectorType: "str",
    SCHEMA.XPathType: "str",
    SCHEMA.DefinedTerm: "DefinedTerm", # RICH MAPPING
    SCHEMA.DigitalPlatformEnumeration: "str", # Keep as string for v0.1 enum simplicity
    # XSD Types (ensure decimal import)
    XSD.string: "str",
    XSD.date: "datetime.date",
    XSD.dateTime: "datetime.datetime",
    XSD.time: "datetime.time",
    XSD.integer: "int",
    XSD.decimal: "decimal.Decimal",
    XSD.float: "float",
    XSD.double: "float",
    XSD.boolean: "bool",
    XSD.anyURI: "pydantic.AnyUrl",
    XSD.duration: "Duration", # Map XSD duration too
}
# Known common aliases for properties
PROPERTY_ALIAS_MAP = {
    SCHEMA.birthDate: {'validation_alias': 'dob'},
    # Add other common aliases as needed
}

In [14]:
def safe_python_identifier(name: str) -> str:
    """Converts a name to a valid Python identifier, handling keywords."""
    if keyword.iskeyword(name):
        return name + "_"
    # Basic check for valid identifier start/characters - can be improved
    if not name or not (name[0].isalpha() or name[0] == '_'):
        name = '_' + name # Ensure valid start if needed
    # Replace invalid characters (simplistic)
    name = re.sub(r'\W|^(?=\d)', '_', name)
    return name

def map_uri_to_classname(uri: rdflib.URIRef) -> str:
    """Converts a schema.org URI to a Python CamelCase class name."""
    if not str(uri).startswith(str(SCHEMA)):
        # Handle non-schema.org URIs if necessary, maybe return original or raise error
        return str(uri).split('/')[-1].split('#')[-1] # Best guess
    local_name = uri.replace(SCHEMA, "")
    # Basic check for upper camel case, assuming schema.org mostly uses this
    if local_name and local_name[0].isupper():
        return safe_python_identifier(local_name)
    else:
        # Attempt to convert potentially lowerCamelCase or other cases
        # This is a simple heuristic, might need refinement
        parts = re.split(r'[-_ ]', local_name)
        return safe_python_identifier("".join(part.capitalize() for part in parts))


def map_uri_to_fieldname(uri: rdflib.URIRef) -> str:
    """Converts a schema.org property URI to a Python snake_case field name."""
    if not str(uri).startswith(str(SCHEMA)):
         return safe_python_identifier(str(uri).split('/')[-1].split('#')[-1].lower()) # Best guess
    local_name = uri.replace(SCHEMA, "")
    # Convert camelCase or PascalCase to snake_case
    s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', local_name)
    snake_case_name = re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
    return safe_python_identifier(snake_case_name)

def map_range_to_typehint(
    ranges: Set[rdflib.URIRef],
    class_registry: Set[str], # Set of known generated class names
    default_optional: bool = True,
    use_list_for_multi: bool = True # Assume List for properties allowing multiple values in RDF? Risky default.
                                     # Schema.org generally doesn't use OWL cardinality.
                                     # A safer default might be Optional[Union[T1, T2, List[T1], List[T2]]]
                                     # For v0.1, let's keep it simpler: use Optional[Union[...]]
) -> str:
    """Maps a set of RDF range URIs to a Python type hint string."""
    if not ranges:
        return "typing.Any" # No range specified

    mapped_types = set()
    for r_uri in ranges:
        if r_uri in TYPE_MAP:
            mapped_types.add(TYPE_MAP[r_uri])
        else:
            # Assume it's a class defined in our ontology
            class_name = map_uri_to_classname(r_uri)
            if class_name in class_registry:
                 # Use string literal for forward reference
                mapped_types.add(f"'{class_name}'")
            elif str(r_uri) == str(SCHEMA.Thing) or str(r_uri) == str(RDFS.Resource):
                 mapped_types.add("typing.Any") # Map generic Thing/Resource
            else:
                 # Unknown range URI - treat as Any or potentially raise error/warning
                 logging.warning(f"Unknown range URI encountered: {r_uri}. Mapping to Any.")
                 mapped_types.add("typing.Any")

    # Remove duplicates and sort for consistent output
    unique_types = sorted(list(mapped_types))

    if not unique_types:
         return "typing.Any" # Should not happen if ranges is not empty, but safety check

    # Build the Union if multiple types
    type_hint_core = ""
    if len(unique_types) == 1:
        type_hint_core = unique_types[0]
    else:
        type_hint_core = f"typing.Union[{', '.join(unique_types)}]"

    # Handle Optionality (defaulting to Optional for v0.1 simplicity)
    # A more advanced version would check OWL cardinality if present
    if default_optional:
        # Check if None is effectively already included via Optional[...] in the union parts
        is_already_optional = any(t.startswith("typing.Optional[") or t == 'None' for t in unique_types)
        if not is_already_optional:
             return f"typing.Optional[{type_hint_core}]"
        else:
             # If Optional is already part of a Union, just return the Union
             # e.g., Union[Optional['Thing'], str] is valid
             # This logic might need refinement based on desired strictness
             return type_hint_core
    else:
        return type_hint_core

def get_field_metadata(prop_info: PropertyInfo) -> Dict[str, Union[str, Dict]]:
    """Generates arguments for pydantic.Field based on PropertyInfo."""
    args = {}
    if prop_info.comment:
        # Basic cleaning of comment string
        clean_comment = ' '.join(prop_info.comment.split())
        args['description'] = clean_comment
    # Add aliases for common variations
    if prop_info.uri in PROPERTY_ALIAS_MAP:
        # Pydantic v2 alias handling might differ slightly, adjust as needed
        for alias_type, alias_value in PROPERTY_ALIAS_MAP[prop_info.uri].items():
             args[alias_type] = alias_value # e.g., validation_alias='dob'

    # Example: Add examples if available (assuming they could be parsed from RDF)
    # if prop_info.examples: args['examples'] = prop_info.examples

    # Default value is None for Optional fields, handled by type hint + Field(None)
    # Required fields would have no default in Field()
    # We default to Optional, so default is usually None
    default_value = None

    field_args_str = f"default={default_value}"
    if args:
        args_repr = ', '.join(f"{k}={repr(v)}" for k, v in args.items())
        field_args_str += f", {args_repr}"

    # Return structure suitable for formatting into Field(...) call
    # Returning dict for easier manipulation before final string formatting
    return {'default': default_value, **args}


### Not in use
def get_all_ancestors(graph: rdflib.Graph, class_uri: rdflib.URIRef, known_classes_uris: Set[rdflib.URIRef]) -> Set[rdflib.URIRef]:
    """Recursively find all superclass URIs for a given class URI within our known set."""
    ancestors = set()
    # Use graph.transitive_objects for potentially more efficient RDFS reasoning if needed,
    # but manual recursion works fine.
    parents = set(graph.objects(subject=class_uri, predicate=RDFS.subClassOf))
    for parent_uri in parents:
        # Include direct parent only if it's one we are generating/tracking
        # and avoid recursing into excessively generic external types like owl:Thing
        if parent_uri in known_classes_uris and parent_uri not in [RDFS.Resource, OWL.Thing]:
            if parent_uri not in ancestors: # Avoid infinite loops in case of cycles in data
                ancestors.add(parent_uri)
                ancestors.update(get_all_ancestors(graph, parent_uri, known_classes_uris)) # Recurse
    return ancestors
### Not in use


def get_base_classes(class_info: ClassInfo, class_registry: Set[str]) -> List[str]:
    """Determines the base classes for a Pydantic model."""
    base_class_names = set()
    if class_info.superclasses:
        for super_uri in class_info.superclasses:
             # Only inherit from known classes within our generated set or schema base types we handle
             mapped_name = map_uri_to_classname(super_uri)
             # Avoid inheriting from extremely generic types unless necessary
             # and ensure it's a class we are actually generating
             if mapped_name in class_registry and mapped_name not in ['Thing']:
                 base_class_names.add(mapped_name)

    if not base_class_names:
        # All models ultimately inherit from pydantic.BaseModel
        # If no *other* valid parent identified, use BaseModel directly
        # This handles top-level classes like Thing (if we generated it) or classes
        # whose parents aren't in our selected scope.
         base_class_names.add("pydantic.BaseModel")
    else:
        # If inheriting from other generated models, ensure BaseModel is implicitly included
        # Pydantic handles this automatically if the parents inherit from BaseModel.
        pass

    # Sort for consistent order, although MRO is determined by Python
    return sorted(list(base_class_names))


### Execute Step 3:

In [15]:
# --- Example Usage (Conceptual - assumes 'analyzed_schema' from Step 2 exists) ---
if __name__ == "__main__":
    # This block is illustrative; the actual generator script (Step 4)
    # will use these functions extensively.

    # Placeholder for the registry of class names we intend to generate
    # In the real generator, this would be populated from analyzed_schema['classes']
    # known_generated_classes = {map_uri_to_classname(uri) for uri in analyzed_schema["classes"]}
    known_generated_classes = {"Person", "PostalAddress", "Thing"} # Manual example

    # Example: Map range for schema:address
    address_prop_uri = SCHEMA.address
    # Assume address_prop_info = analyzed_schema['properties'][address_prop_uri]
    # Example ranges for address: {SCHEMA.PostalAddress, SCHEMA.Text}
    address_ranges = {SCHEMA.PostalAddress, SCHEMA.Text}
    address_type_hint = map_range_to_typehint(address_ranges, known_generated_classes)
    print(f"Mapping for schema:address ranges ({address_ranges}):")
    print(f" -> Type Hint: {address_type_hint}") # Expect Optional[Union['PostalAddress', str]]

    # Example: Map range for schema:givenName
    givenname_prop_uri = SCHEMA.givenName
    # Assume givenname_prop_info = analyzed_schema['properties'][givenname_prop_uri]
    # Example ranges for givenName: {SCHEMA.Text}
    givenname_ranges = {SCHEMA.Text}
    givenname_type_hint = map_range_to_typehint(givenname_ranges, known_generated_classes)
    print(f"\nMapping for schema:givenName ranges ({givenname_ranges}):")
    print(f" -> Type Hint: {givenname_type_hint}") # Expect Optional[str]

    # Example: Get base classes for schema:Person
    person_uri = SCHEMA.Person
    # Assume person_class_info = analyzed_schema['classes'][person_uri]
    # Example superclasses for Person: {SCHEMA.Thing}
    person_superclasses_uris = {SCHEMA.Thing}
    # Simulate ClassInfo structure
    class PersonInfoSim(NamedTuple): superclasses: Set; uri: Any=None; label: Any=None; comment: Any=None; properties: Any=None
    person_class_info_sim = PersonInfoSim(superclasses=person_superclasses_uris)
    person_bases = get_base_classes(person_class_info_sim, known_generated_classes)
    print(f"\nMapping for schema:Person superclasses ({person_superclasses_uris}):")
    print(f" -> Base Classes: {person_bases}") # Expect ['BaseModel'] if Thing excluded, or ['Thing'] if Thing generated

    print("\nMapping logic defined. Ready for Step 4: Pydantic Class Code Generation.")


Mapping for schema:address ranges ({rdflib.term.URIRef('https://schema.org/Text'), rdflib.term.URIRef('https://schema.org/PostalAddress')}):
 -> Type Hint: typing.Optional[typing.Union['PostalAddress', str]]

Mapping for schema:givenName ranges ({rdflib.term.URIRef('https://schema.org/Text')}):
 -> Type Hint: typing.Optional[str]

Mapping for schema:Person superclasses ({rdflib.term.URIRef('https://schema.org/Thing')}):
 -> Base Classes: ['pydantic.BaseModel']

Mapping logic defined. Ready for Step 4: Pydantic Class Code Generation.


In [16]:
# Step 4:

In [17]:
import rdflib
from rdflib.namespace import RDF, RDFS, OWL, XSD
from typing import List, Set, Dict, Optional, NamedTuple, Union, ForwardRef, Any, cast, TYPE_CHECKING # Added TYPE_CHECKING
from pydantic import BaseModel, Field, EmailStr, AnyUrl
from datetime import date, datetime, time, timedelta
import isodate
import decimal
import keyword
import logging
import os
import pathlib
from collections import defaultdict
import textwrap

# --- Configuration ---
OUTPUT_DIR = "output_ontology"
MODELS_SUBDIR = "models"
BASE_ONTOLOGY_MODULE = "core_ontology_v0_1" # Used for potential future imports if split



In [19]:
# --- Helper to get import paths (adjust module structure if needed) ---
def get_module_path_for_class(class_name: str) -> str:
    """Determines the expected module name for a given class name."""
    # Assumes snake_case filename based on class name
    # This might need adjustment if filename generation logic changes
    potential_field_name = map_uri_to_fieldname(SCHEMA[class_name]) # Hacky way to get snake_case
    return f".{safe_python_identifier(potential_field_name)}" # Relative import

In [20]:
def generate_pydantic_model_code(
    class_info: ClassInfo,
    properties_info: Dict[rdflib.URIRef, PropertyInfo],
    all_class_names: Set[str] # All class names being generated
) -> str:
    """Generates the Python code string for a single Pydantic model, including robust imports."""

    class_name = map_uri_to_classname(class_info.uri)
    
    base_uris = class_info.superclasses
    potential_base_names = {map_uri_to_classname(uri) for uri in base_uris}
    valid_base_names = sorted([name for name in potential_base_names if name in all_class_names and name != 'Thing'])

    # --- START: Revised Import Handling ---
    # Use sets to automatically handle duplicates
    core_imports = set()
    typing_imports_specific = set() # Specific items needed from typing
    pydantic_imports_specific = set() # Always need these
    datetime_imports_specific = set()
    other_imports = set() # For things like decimal, isodate
    cross_module_imports = set() # For other generated models (used in TYPE_CHECKING)
    rich_type_imports = set()
    runtime_base_class_imports = set() 

    # Determine base class string and imports needed for bases
    if not valid_base_names:
        base_class_str = "BaseModel"
        pydantic_imports_specific.add("BaseModel")
    else:
        base_class_str = ", ".join(valid_base_names)
        for base_name in valid_base_names:
             # Assume base classes are in other modules within the same package
            cross_module_imports.add(base_name)
            # runtime_base_class_imports.add(f"from .{get_module_path_for_class(base_name)} import {base_name}")

    # Analyze fields to determine necessary imports
    field_definitions = []
    sorted_property_uris = sorted(list(class_info.properties))
    field_added = False

    for prop_uri in sorted_property_uris:
        if prop_uri not in properties_info: continue # Skip if property info missing

        prop_info = properties_info[prop_uri]
        field_name = map_uri_to_fieldname(prop_info.uri)
        type_hint_str = map_range_to_typehint(prop_info.ranges, all_class_names)

        # --- Refined Import Tracking based on Type Hint String ---
        if "Optional" in type_hint_str: typing_imports_specific.add("Optional")
        if "List" in type_hint_str: typing_imports_specific.add("List")
        if "Union" in type_hint_str: typing_imports_specific.add("Union")
        if "Any" in type_hint_str: typing_imports_specific.add("Any")
        # Add VC-Zero/rich types if they come from specific modules
        if "datetime.date" in type_hint_str: datetime_imports_specific.add("date")
        if "datetime.datetime" in type_hint_str: datetime_imports_specific.add("datetime")
        if "datetime.time" in type_hint_str: datetime_imports_specific.add("time")
        if "timedelta" in type_hint_str: datetime_imports_specific.add("timedelta")
        if "decimal.Decimal" in type_hint_str: other_imports.add("import decimal")
        # Add imports for Pydantic types used
        if "pydantic.AnyUrl" in type_hint_str: pydantic_imports_specific.add("AnyUrl")
        if "pydantic.EmailStr" in type_hint_str: pydantic_imports_specific.add("EmailStr")
        # Add other specific pydantic types as needed based on TYPE_MAP
        if "Quantity" in type_hint_str: rich_type_imports.add("Quantity")
        if "Distance" in type_hint_str: rich_type_imports.add("Distance")
        if "Duration" in type_hint_str: rich_type_imports.add("Duration")
        if "DefinedTerm" in type_hint_str: rich_type_imports.add("DefinedTerm")

        # Track cross-module imports needed for type hints (used within TYPE_CHECKING)
        potential_classes_in_hint = set(re.findall(r"'(\w+)'", type_hint_str))
        for potential_class in potential_classes_in_hint:
             if potential_class in all_class_names and potential_class != class_name:
                 cross_module_imports.add(potential_class)
        # --- End Refined Import Tracking ---

        field_args_dict = get_field_metadata(prop_info)
        # Generate Field(...) call (same as before)
        field_args_parts = []
        default_val_repr = repr(field_args_dict.pop('default', None))
        field_args_parts.append(default_val_repr)
        field_args_parts.extend(f"{k}={repr(v)}" for k, v in field_args_dict.items())
        field_call = f"Field({', '.join(field_args_parts)})"

        # *** CRITICAL FIX: Ensure type hints use the *imported names*, not module prefixes ***
        # Basic replacement - more robust parsing might be needed for complex nested hints
        final_type_hint = type_hint_str.replace("pydantic.", "") # Remove prefix if imported directly
        final_type_hint = final_type_hint.replace("datetime.", "") # Remove prefix if imported directly
        # Replace typing prefixes only if specific types are imported
        if typing_imports_specific:
            final_type_hint = final_type_hint.replace("typing.", "")

        field_definitions.append(f"    {field_name}: {final_type_hint} = {field_call}")
        field_added = True

    # Filter out base classes already imported at runtime from type-hint-only imports
    typehint_only_imports = cross_module_imports - set(valid_base_names)
    # print(f"Type-hint-only forward refs: {typehint_only_imports}")

    will_generate_type_checking_block = bool(typehint_only_imports)

    if will_generate_type_checking_block:
        typing_imports_specific.add("TYPE_CHECKING") # Only add it if the block will exist
    # print(f"Will generate TYPE_CHECKING block? {will_generate_type_checking_block}")

    # --- Assemble the full class code with Corrected Imports ---
    code_parts = []
    code_parts.append("from __future__ import annotations") # Keep this first

    # Add standard library imports
    if datetime_imports_specific:
        code_parts.append(f"from datetime import {', '.join(sorted(list(datetime_imports_specific)))}")
    code_parts.extend(sorted(list(other_imports))) # Like 'import decimal'

    # Generate the main typing import line (if needed)
    if typing_imports_specific:
        code_parts.append(f"from typing import {', '.join(sorted(list(typing_imports_specific)))}")

    # Add Pydantic imports
    if field_added:
        pydantic_imports_specific.add("Field")
    if pydantic_imports_specific:
        code_parts.append(f"from pydantic import {', '.join(sorted(list(pydantic_imports_specific)))}")
    logging.debug(f"Final pydantic imports needed: {pydantic_imports_specific}")

    code_parts.extend(sorted(list(runtime_base_class_imports)))

    if rich_type_imports:
         code_parts.append(f"from .base_types import {', '.join(sorted(list(rich_type_imports)))}")


    # Add forward reference imports within TYPE_CHECKING block
    # Only include classes that are *not* base classes (already imported above if needed)
    typehint_only_imports = cross_module_imports - set(valid_base_names)
    if typehint_only_imports:
        # code_parts.append("\nif TYPE_CHECKING:")
        for class_to_import in sorted(list(typehint_only_imports)):
            code_parts.append(f"from {get_module_path_for_class(class_to_import)} import {class_to_import}")

    if valid_base_names:
        # Ensure base classes are imported for the class definition line
        # This might duplicate imports already added above but ensures availability
        for base_name in valid_base_names:
            runtime_base_class_imports.add(f"from {get_module_path_for_class(base_name)} import {base_name}")
    
    code_parts.extend(sorted(list(runtime_base_class_imports)))
    code_parts.append("\n") # Separator

    # Add class definition
    class_docstring = f'"""\n    {class_name}: {textwrap.shorten(class_info.comment or "No description provided.", width=70)}\n\n    Generated from: {class_info.uri}\n    """'
    code_parts.append(f"class {class_name}({base_class_str}):")
    code_parts.append(f"    {class_docstring}")

    if not field_definitions:
        code_parts.append("    pass")
    else:
        code_parts.extend(field_definitions)

    code_parts.append("\n    model_config = {'extra': 'forbid'}")

    return "\n".join(code_parts) + "\n"


In [21]:
BASE_TYPES_CODE = """
from __future__ import annotations # Keep first
from pydantic import (
    BaseModel, Field, AnyUrl, field_validator,
    model_validator, condecimal, constr, EmailStr # Added EmailStr just in case, adjust as needed
)
from typing import Optional, List, Union, Any
from datetime import date, datetime, time, timedelta
import decimal
import isodate # Requires: pip install isodate
import logging

# Configure basic logging if needed within this module too
# logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) # Use logger for warnings

class Quantity(BaseModel):
    \"\"\"
    Base model for quantitative values based on schema.org/Quantity.
    Actual value and unit are often in subclasses or specific properties.
    This primarily serves as a conceptual base.
    \"\"\"
    model_config = {'extra': 'allow'} # Allow extra fields as Quantity is generic

class Distance(Quantity):
    \"\"\"
    Represents a distance based on schema.org/Distance.
    Uses value and unit representation common in QuantitativeValue.
    \"\"\"
    # Based on properties commonly used with QuantitativeValue for distance
    value: Optional[float] = Field(None, description="The numerical value of the distance.")
    unitCode: Optional[str] = Field(None, description="UN/CEFACT Common Code (3 characters) or URL for the unit of measurement. E.g., 'MTR' for meter, 'KM' for kilometer, 'FT' for foot, 'INH' for inch.")
    unitText: Optional[str] = Field(None, description="A string indicating the unit of measurement. Useful if unitCode is not applicable or needs clarification. E.g., 'meters', 'miles'.")

    model_config = {'extra': 'forbid'}

    # Add validation if needed, e.g., check unitCode format

class Duration(Quantity):
    \"\"\"
    Represents a duration based on schema.org/Duration.
    Stores duration as datetime.timedelta, parsed from ISO 8601 duration format.
    \"\"\"
    # Pydantic doesn't have native ISO 8601 duration parsing, use validator
    # Use alias to allow input using schema.org's likely property name if it differs
    value_iso8601: Optional[str] = Field(None, validation_alias='iso8601Duration', serialization_alias='iso8601Duration', description="Duration in ISO 8601 format (e.g., P1Y2M3DT4H5M6S).")
    value_timedelta: Optional[timedelta] = Field(None, exclude=True, description="Parsed timedelta value (internal).") # Exclude from standard model dump

    model_config = {'extra': 'forbid', 'populate_by_name': True} # Allow using alias on input

    @model_validator(mode='before')
    @classmethod
    def parse_duration(cls, data: Any) -> Any:
        if isinstance(data, dict):
            iso_duration_str = data.get("value_iso8601") or data.get("iso8601Duration")
            # Parse only if timedelta isn't already provided and string exists
            if iso_duration_str and isinstance(iso_duration_str, str) and 'value_timedelta' not in data:
                try:
                    td = isodate.parse_duration(iso_duration_str)
                    data['value_timedelta'] = td
                    data['value_iso8601'] = iso_duration_str # Ensure original is stored
                except (isodate.ISO8601Error, ValueError) as e:
                    logger.warning(f"Could not parse ISO 8601 duration '{iso_duration_str}': {e}")
                    data['value_timedelta'] = None
                    data['value_iso8601'] = iso_duration_str # Keep original invalid string
            # If timedelta provided directly, ensure value_timedelta field is populated
            elif data.get('value_timedelta') and isinstance(data.get('value_timedelta'), timedelta):
                 pass # Already populated
        elif isinstance(data, str):
             # Allow direct initialization from ISO string
             try:
                 td = isodate.parse_duration(data)
                 return {'value_iso8601': data, 'value_timedelta': td}
             except (isodate.ISO8601Error, ValueError) as e:
                 logger.warning(f"Could not parse ISO 8601 duration string '{data}': {e}")
                 return {'value_iso8601': data, 'value_timedelta': None}

        return data # Return dict for Pydantic processing

    # Optional: Add property to access timedelta easily
    @property
    def timedelta(self) -> Optional[timedelta]:
        return self.value_timedelta

    def __str__(self) -> str:
        \"\"\"Return ISO 8601 string representation if available.\"\"\"
        # Prefer original string if available, otherwise format timedelta (basic)
        if self.value_iso8601:
            return self.value_iso8601
        elif self.value_timedelta is not None:
             try:
                 # Attempt basic formatting back (might lose fidelity vs isodate.duration_isoformat)
                 return str(self.value_timedelta)
             except Exception:
                 return "Invalid Duration Timedelta"
        return "Invalid/Missing Duration"


class DefinedTerm(BaseModel):
    \"\"\"
    Represents a term from a defined set based on schema.org/DefinedTerm.
    \"\"\"
    # Core properties often associated with DefinedTerm
    termCode: Optional[str] = Field(None, description="A code that identifies this DefinedTerm within a DefinedTermSet.")
    name: Optional[str] = Field(None, description="The name of the item.")
    description: Optional[str] = Field(None, description="A description of the item.")
    # Allow referencing the set it belongs to, if known (using AnyUrl for flexibility)
    inDefinedTermSet: Optional[AnyUrl] = Field(None, description="A DefinedTermSet Organization or DataCatalog that contains this term.")

    model_config = {'extra': 'allow'} # Allow potential other properties from schema.org or extensions

class Money(BaseModel):
     \"\"\"
     Represents an amount of money with a currency. Based on schema.org concepts
     often used with PriceSpecification or MonetaryAmount.
     \"\"\"
     # Using 'amount' and 'currency' inspired by common patterns, not a direct schema.org/Money type
     amount: Optional[decimal.Decimal] = Field(None, description="The amount of money.")
     currency: Optional[constr(pattern=r'^[A-Z]{3}$')] = Field(None, description="ISO 4217 Currency Code") # type: ignore

     @field_validator('amount', mode='before')
     @classmethod
     def clean_amount(cls, v: Any) -> Optional[decimal.Decimal]: # Added type hints
         # Indentation Level 2 (Inside Function)
         if isinstance(v, (int, float)):
             try: # Indentation Level 3
                 return decimal.Decimal(v) # Indentation Level 4
             except Exception as e: # Indentation Level 3
                  logger.error(f"Error converting {v} to Decimal: {e}")
                  raise ValueError(f"Cannot convert {v} to Decimal") # Indentation Level 4
         if isinstance(v, str): # Indentation Level 2
             try: # Indentation Level 3
                 return decimal.Decimal(v.strip()) # Indentation Level 4
             except decimal.InvalidOperation: # Indentation Level 3
                  raise ValueError(f"Invalid decimal format for amount: {v}") # Indentation Level 4
         # Allow existing Decimals or None to pass through
         if isinstance(v, decimal.Decimal) or v is None: # Indentation Level 2
              return v # Indentation Level 3
         # Raise error for other unexpected types
         raise ValueError(f"Unexpected type for amount: {type(v)}") # Indentation Level 2

     model_config = {'extra': 'forbid'}

# Add other base types/VC-Zeros below if needed

"""

In [22]:
import rdflib
from rdflib.namespace import RDF, RDFS, OWL, XSD
from typing import List, Set, Dict, Optional, NamedTuple, Union, Any, cast # Ensure necessary types for function signature
import logging
import os
import pathlib
import shlex # If used by helper functions, ensure imported
import textwrap # If used by helper functions

logger = logging.getLogger(__name__) # Define logger for use within function

def generate_ontology_models(
     analyzed_schema: Dict[str, Dict],
     output_base_dir: str,
     models_subdir: str = "models"
     ) -> None: # Added return type hint
    """
    Generates Pydantic model files from analyzed schema info, including base_types.py
    and a correctly structured __init__.py.
    """

    classes_info: Dict[rdflib.URIRef, ClassInfo] = analyzed_schema.get("classes", {})
    properties_info: Dict[rdflib.URIRef, PropertyInfo] = analyzed_schema.get("properties", {})

    if not classes_info: # Check classes_info primarily
        logging.error("No class information found in analyzed schema. Cannot generate models.")
        return
    if not properties_info:
         logging.warning("No property information found in analyzed schema. Models may lack fields.")
         # Proceed cautiously, or return depending on desired strictness

    output_path = pathlib.Path(output_base_dir) / models_subdir
    output_path.mkdir(parents=True, exist_ok=True)
    # Ensure main __init__.py exists before generating submodules
    (output_path / "__init__.py").touch()

    # Define URIs and names for types handled ONLY in base_types.py
    base_type_uris = {
        SCHEMA.Quantity, SCHEMA.Distance, SCHEMA.Duration, SCHEMA.DefinedTerm,
        # Add SCHEMA.Money if defined in BASE_TYPES_CODE
        SCHEMA.Money
    }
    base_type_names = {map_uri_to_classname(uri) for uri in base_type_uris if uri in classes_info} # Get names only for types actually present

    # --- Generate base_types.py ---
    base_types_path = output_path / "base_types.py"
    try:
        # Assuming BASE_TYPES_CODE string is defined globally or passed in
        global BASE_TYPES_CODE
        if 'BASE_TYPES_CODE' not in globals():
             raise NameError("BASE_TYPES_CODE string not found.")

        with open(base_types_path, "w", encoding="utf-8") as f:
            f.write(BASE_TYPES_CODE)
        logging.info(f"Generated base types file: {base_types_path}")
    except Exception as e:
        logging.error(f"Failed to write base_types.py: {e}", exc_info=True)
        return # Stop if base types cannot be written

    all_class_names = {map_uri_to_classname(uri) for uri in classes_info.keys()}
    # Add base type names to the known class registry for type hinting purposes
    all_class_names.update(base_type_names)

    # --- (Optional but recommended) Topological Sort for Generation Order ---
    # Using alphabetical as fallback, assuming TYPE_CHECKING handles most cycles
    generation_order_uris = sorted(list(classes_info.keys()))
    logging.info("Generating models in alphabetical order by URI.")
    # Add more robust sorting here if needed based on hierarchy


    # --- Generate individual model files, SKIPPING base types ---
    generated_files = 0
    # **** Store mapping from module name part to main class name ****
    module_to_class_map: Dict[str, str] = {}

    for class_uri in generation_order_uris:
        if class_uri not in classes_info: continue

        # **** CORRECTED SKIP LOGIC ****
        if class_uri in base_type_uris:
            logging.debug(f"Skipping individual file generation for {class_uri} (defined in base_types.py)")
            continue
        # **** END CORRECTION ****

        class_info = classes_info[class_uri]
        class_name = map_uri_to_classname(class_uri)
        module_name_part = map_uri_to_fieldname(class_uri) # snake_case name for file/module

        try:
            model_code = generate_pydantic_model_code(class_info, properties_info, all_class_names)
            file_path = output_path / f"{module_name_part}.py"
            with open(file_path, "w", encoding="utf-8") as f:
                f.write(model_code)

            # **** Store mapping IF generation succeeded ****
            module_to_class_map[module_name_part] = class_name
            generated_files += 1

        except Exception as e:
            logging.error(f"Failed to generate code for class {class_name} ({class_uri}): {e}", exc_info=True) # Add traceback

    logging.info(f"Generated {generated_files} specific Pydantic model files in {output_path}")


    # --- *** CORRECTED __init__.py Generation *** ---
    init_py_path = output_path / "__init__.py"
    try:
        with open(init_py_path, "w", encoding="utf-8") as f:
            f.write("# flake8: noqa\n")
            f.write("# Auto-generated __init__.py for ontology models\n\n")
            f.write("import logging\n")
            f.write("import importlib\n")
            f.write("import pkgutil\n")
            f.write("from typing import TYPE_CHECKING # TYPE_CHECKING might be needed by rebuild_all\n")
            f.write("from pydantic import BaseModel # Needed for rebuild_all check\n\n")

            f.write("logger: logging.Logger = logging.getLogger(__name__)\n\n") # Define early

            # 1. Import explicitly from base_types
            if base_type_names:
                f.write("# --- Import Base Types ---\n")
                f.write("try:\n")
                f.write(f"    from .base_types import ({', '.join(sorted(list(base_type_names)))})\n") # Explicit names
                f.write("except ImportError:\n")
                f.write("    logger.warning('Could not import base_types')\n\n")

            # 2. Import explicitly from all OTHER generated modules
            if module_to_class_map:
                 f.write("# --- Import Generated Models ---\n")
            generated_class_names = set()
            for module_name_part in sorted(module_to_class_map.keys()):
                class_name = module_to_class_map[module_name_part]
                # Ensure we don't try to re-import base types if map is somehow wrong
                if class_name not in base_type_names:
                    f.write(f"try:\n")
                    f.write(f"    from .{module_name_part} import {class_name}\n")
                    generated_class_names.add(class_name)
                    f.write(f"except ImportError:\n")
                    f.write(f"    logger.warning(f'Could not import {class_name} from .{module_name_part}')\n")

            # 3. Define __all__ for cleaner namespace
            all_names = sorted(list(base_type_names | generated_class_names))
            f.write("\n__all__ = [\n")
            for name in all_names:
                f.write(f'    "{name}",\n') # Use double quotes for names in list
            f.write("]\n\n")


            # 4. Include rebuild_all function and call (with fixed annotation)
            f.write("# --- Rebuild models to resolve forward references ---\n")
            f.write("def rebuild_all() -> None:\n") # Ensure annotation is present
            f.write("    package_name = __name__\n")
            f.write("    package = importlib.import_module(package_name)\n")
            f.write("    rebuilt_models = set()\n")
            f.write('    logger.debug(f"Attempting rebuild in {package_name}")\n\n')
            f.write("    for _, module_name_part, _ in pkgutil.iter_modules(package.__path__, package_name + '.') :\n")
            f.write("        # Skip rebuild attempt on __init__ itself or base_types if desired\n")
            f.write("        if module_name_part.endswith('.__init__') or module_name_part.endswith('.base_types'):\n")
            f.write("            continue\n")
            f.write("        try:\n")
            f.write("            module = importlib.import_module(module_name_part)\n")
            f.write("            for attribute_name in dir(module):\n")
            f.write("                attribute = getattr(module, attribute_name)\n")
            f.write("                if (isinstance(attribute, type) and\n")
            f.write("                        issubclass(attribute, BaseModel) and\n") # Check issubclass safely
            f.write("                        attribute is not BaseModel and\n")
            f.write("                        hasattr(attribute, 'model_rebuild') and\n") # Check if it has the method
            f.write("                        attribute not in rebuilt_models):\n")
            f.write("                    try:\n")
            f.write('                        logger.debug(f"Rebuilding: {attribute.__name__}")\n')
            f.write("                        attribute.model_rebuild(force=True)\n")
            f.write("                        rebuilt_models.add(attribute)\n")
            f.write("                    except Exception as e_rebuild:\n")
            f.write("                        logger.error(f'Error rebuilding model {attribute.__name__} in {module_name_part}: {e_rebuild}', exc_info=False)\n")
            f.write("        except ModuleNotFoundError:\n")
            f.write("            logger.warning(f\"Module not found during rebuild: {module_name_part}\")\n")
            f.write("        except Exception as e_import:\n")
            f.write("             logger.error(f'Error importing module {module_name_part} during rebuild: {e_import}', exc_info=False)\n\n")

            f.write("# Run rebuild automatically on import\n")
            f.write("try:\n")
            f.write("    rebuild_all()\n")
            f.write("    logger.info(f'Pydantic models in {__name__} package rebuilt.')\n")
            f.write("except Exception as e_global:\n")
            f.write("    logger.error(f'Global error during model rebuild: {e_global}', exc_info=True)\n")

        logging.info(f"Successfully generated __init__.py at {init_py_path}")

    except Exception as e:
        logging.error(f"Failed to write __init__.py: {e}", exc_info=True)


### Execution Step 4

In [23]:
# Step 4: --- Main execution block demonstrating generation ---
if __name__ == "__main__":
    # Assume schema_graph is loaded from Step 1
    # Assume analyzed_schema is created from Step 2
    schema_graph = parse_schema_to_graph(SCHEMA_FILE, SCHEMA_FORMAT)
    if schema_graph:
        analyzed_schema = analyze_schema_graph(schema_graph)
        if analyzed_schema and analyzed_schema.get("classes"):
            # Step 4: Generate the Pydantic models
            generate_ontology_models(analyzed_schema, OUTPUT_DIR, MODELS_SUBDIR)
            logging.info(f"Pydantic model code generation complete. Check the '{OUTPUT_DIR}/{MODELS_SUBDIR}' directory.")
            logging.info("Ready for Step 5: Post-Processing & Verification.")
        else:
             logging.error("Schema analysis did not produce class/property data.")
    else:
        logging.error("Failed to parse schema graph for generation.")

INFO:root:Attempting to parse schema file: schema.txt (format: turtle)
INFO:root:Successfully parsed 8904 triples.
INFO:root:Found 628 potential schema.org classes.
INFO:root:Found 921 potential schema.org properties.
INFO:root:Analyzed 628 classes and 921 properties.
INFO:root:Generated base types file: output_ontology/models/base_types.py
INFO:root:Generating models in alphabetical order by URI.
INFO:root:Generated 625 specific Pydantic model files in output_ontology/models
INFO:root:Successfully generated __init__.py at output_ontology/models/__init__.py
INFO:root:Pydantic model code generation complete. Check the 'output_ontology/models' directory.
INFO:root:Ready for Step 5: Post-Processing & Verification.


# Step 5: Postprocessing

In [None]:
# Example: postprocess.py

import subprocess
import sys
import logging
import pathlib
import shlex # Use shlex for safer command construction

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')

# Define target directories (adjust paths as needed)
TARGET_DIR = pathlib.Path("output_ontology") # Main output directory
MODELS_DIR = TARGET_DIR / "models" # Specific directory with models
TESTS_DIR = TARGET_DIR / "tests"  # Directory for tests

# Tool Configurations (Ideally read from pyproject.toml or similar)
# Example: Fail bandit on medium+ severity, medium+ confidence
BANDIT_ARGS = ["-r", str(MODELS_DIR), "-ll", "--exit-zero"] # Exit-zero initially to report, fail later based on output if needed
# Example: MyPy needs strict checks
MYPY_ARGS = ["--strict", str(MODELS_DIR)]
# Example: Ruff - fix specific issues, check others
RUFF_FIX_ARGS = ["check", str(TARGET_DIR), "--fix", "--select", "F", "--select", "E", "--select", "W", "--select", "I"] # Fix F401, E, W, I (Imports etc)
RUFF_CHECK_ARGS = ["check", str(TARGET_DIR)] # Check everything else

def run_command(command: list[str], cwd: str = ".", check: bool = True, capture: bool = False) -> subprocess.CompletedProcess | None:
    """Runs a command, logs, checks for errors, optionally captures output."""
    command_str = shlex.join(command) # Safer than " ".join
    logging.info(f"Running: {command_str}")
    try:
        result = subprocess.run(
            command,
            capture_output=capture,
            text=True,
            check=check, # Raises CalledProcessError if return code is non-zero
            cwd=cwd,
            encoding='utf-8'
        )
        if capture:
            if result.stdout: logging.debug(f"Stdout:\n{result.stdout}")
            if result.stderr: logging.debug(f"Stderr:\n{result.stderr}") # Debug level for stderr unless error occurred
        logging.info(f"Command succeeded: {command_str}")
        return result
    except FileNotFoundError:
        logging.error(f"Error: Command not found: {command[0]}. Is the tool installed and in PATH?")
        raise # Re-raise to stop the pipeline
    except subprocess.CalledProcessError as e:
        logging.error(f"Command failed with exit code {e.returncode}: {command_str}")
        if capture: # Log captured output on error
             if e.stdout: logging.error(f"Failed command stdout:\n{e.stdout}")
             if e.stderr: logging.error(f"Failed command stderr:\n{e.stderr}")
        raise # Re-raise to stop the pipeline
    except Exception as e:
        logging.error(f"An unexpected error occurred running {command_str}: {e}")
        raise # Re-raise to stop the pipeline

def run_post_processing_pipeline():
    """
    Executes a robust post-processing and verification pipeline
    for the generated ontology models.
    Stops immediately if any critical step fails.
    """
    logging.info("--- Starting Robust Post-Processing & Verification Pipeline ---")
    try:
        # Preparation: Ensure directories and necessary files exist
        MODELS_DIR.mkdir(parents=True, exist_ok=True)
        (MODELS_DIR / "py.typed").touch()
        TESTS_DIR.mkdir(parents=True, exist_ok=True)
        (TESTS_DIR / "__init__.py").touch()
        logging.info("Directories and marker files ensured.")

        # Stage 1: Formatting (Fail on error)
        # Using Ruff format as an example, Black is also fine
        logging.info("Stage 1: Formatting...")
        run_command(["ruff", "format", str(TARGET_DIR)], check=True)

        # Stage 2: Linting & Auto-Fixing (Fail on error)
        # Run fix for specific safe-to-fix categories first
        logging.info("Stage 2a: Linting & Auto-Fixing (Imports, Syntax, Style)...")
        run_command(["ruff", "check", str(TARGET_DIR), "--fix", "--select", "F", "--select", "E", "--select", "W", "--select", "I", "--exit-zero-even-if-fixed"], check=True)
        # Run check for remaining issues (don't auto-fix things like complexity)
        logging.info("Stage 2b: Linting (Remaining Checks)...")
        run_command(["ruff", "check", str(TARGET_DIR)], check=True) # Fail if non-fixable errors remain

        # Stage 3: Static Type Checking (Fail on error)
        logging.info("Stage 3: Type Checking...")
        run_command(["mypy"] + MYPY_ARGS, check=True)

        # Stage 4: Security Scanning (Fail on error - check output if needed)
        logging.info("Stage 4: Security Scanning...")
        # Run bandit, capture output, decide later if specific findings should fail the build
        bandit_result = run_command(["bandit"] + BANDIT_ARGS, check=False, capture=True)
        # Example: Fail build only if Bandit found issues and exited non-zero (if not using --exit-zero)
        # Or parse bandit_result.stdout (JSON format possible) for high-severity issues
        # For now, we just log it based on its exit code (using --exit-zero means it won't fail the script here)
        if bandit_result and bandit_result.returncode != 0:
             logging.warning("Bandit found issues, but configured not to fail the build (--exit-zero). Review output.")
             # If not using --exit-zero, the run_command would have raised an error if issues found.

        # Stage 5: Testing (Fail on error)
        logging.info("Stage 5: Testing...")
        logging.warning("Ensure tests are present in " + str(TESTS_DIR))
        run_command(["pytest", str(TESTS_DIR)], check=True)

        logging.info("--- Pipeline Completed Successfully ---")

    except (subprocess.CalledProcessError, FileNotFoundError, Exception) as e:
        # Error already logged by run_command
        logging.critical("--- Pipeline Failed ---")
        sys.exit(1) # Exit with non-zero code to signal failure

if __name__ == "__main__":
    run_post_processing_pipeline()