In [1]:
from __future__ import annotations

import os
import json 
from dataclasses import dataclass
from pxr import Usd, UsdGeom, UsdShade, Sdf, Gf, Vt
from typing import Any, Dict, List, Optional, Tuple, Set, Union

from helper.utilities import set_stage_metadata,make_box_mesh_points_indices
from helper.utilities import JsonVettingError,_require,_as_float,_as_str,_validate_asset_path,ensure_parent_dir,load_parts_json

Vetting Proccess of JSON files

In [2]:
@dataclass
class VettedPart:
    raw: Dict[str, Any]
    name: str
    uid: str
    ptype: str
    dims_m: Tuple[float, float, float]
    meters_per_unit: float
    up_axis: str
    translate: Tuple[float, float, float]
    geom_path: str
    material_path: str
    parent: Optional[str]
    children: List[str]

class VettingProc():
    def __init__(self,file: Union[str, List[Dict[str, Any]]]):
        self.parts = load_parts_json(source=file)

        if not isinstance(self.parts, list) or not self.parts:
            raise JsonVettingError("Top-level JSON must be a non-empty list of parts.")
        
        self.by_name = self.validate(self.parts)
        self.reconcile()
        self.biderectional()
        self.graph_val()

    def validate(self,parts: List[Dict[str, Any]]) -> Dict[str, VettedPart]:
        """
        This function performs Phase 1 of the vetting proccess. It validates that required keys exist, and saves them to 
        an extensible object, allowing scalable computations.

        Inputs: 
        parts : Compiled SysML v2 Parts from JSON file.

        Returns:
        by_name : Saved part information by name, to a dictionary Dict[str, VettedPart], where VettedPart is an extensible object containing part info.
        """
        
        by_name: Dict[str, VettedPart] = {} 

        for i, p in enumerate(parts):
            if not isinstance(p, dict):
                raise JsonVettingError(f"Part at index {i} is not an object/dict.")

            ctx = f"part[{i}]" # contexts string for errors
            
            name = _as_str(_require(p, "name", ctx), f"{ctx}.name") # checks name exist in p dictionary, check the value is not an empty string,strips whitespaces
            uid = _as_str(_require(p, "id", ctx), f"{ctx}.id") # same as name
            ptype = str(p.get("type", "Part"))

            if name in by_name:
                raise JsonVettingError(f"Duplicate part name '{name}' (names must be unique).")

            dims_obj = _require(p, "dimensions", ctx)
            if not isinstance(dims_obj, dict):
                raise JsonVettingError(f"{ctx}.dimensions must be an object/dict.")

            dims_list = _require(dims_obj, "dims_m", f"{ctx}.dimensions")
            if not (isinstance(dims_list, list) and len(dims_list) == 3):
                raise JsonVettingError(f"{ctx}.dimensions.dims_m must be a list of length 3.")
            
            dims_m = tuple(_as_float(dims_list[j], f"{ctx}.dimensions.dims_m[{j}]") for j in range(3))  # type: ignore
            if any(d <= 0 for d in dims_m):
                raise JsonVettingError(f"{ctx}.dimensions.dims_m must be > 0, got {dims_m}")

            meters_per_unit = _as_float(dims_obj.get("metersPerUnit", 1.0), f"{ctx}.dimensions.metersPerUnit")
            up_axis = str(dims_obj.get("upAxis", "Z")).upper()
            if up_axis not in ("Z", "Y"):
                raise JsonVettingError(f"{ctx}.dimensions.upAxis must be 'Z' or 'Y', got {up_axis!r}")

            tx = _as_float(dims_obj.get("X", 0.0), f"{ctx}.dimensions.X")
            ty = _as_float(dims_obj.get("Y", 0.0), f"{ctx}.dimensions.Y")
            tz = _as_float(dims_obj.get("Z", 0.0), f"{ctx}.dimensions.Z")
            translate = (tx, ty, tz)

            meta = _require(p, "metadata", ctx)
            if not isinstance(meta, dict):
                raise JsonVettingError(f"{ctx}.metadata must be an object/dict.")
            geom_path = _validate_asset_path(_require(meta, "geometry", f"{ctx}.metadata"), f"{ctx}.metadata.geometry")
            material_path = _validate_asset_path(_require(meta, "material", f"{ctx}.metadata"), f"{ctx}.metadata.material")

            parent = p.get("parent", None)
            parent = _as_str(parent, f"{ctx}.parent") if parent is not None else None

            children = p.get("children", [])
            if children is None:
                children = []
            if not isinstance(children, list) or not all(isinstance(c, str) for c in children):
                raise JsonVettingError(f"{ctx}.children must be a list of strings.")
            children = [c for c in children if c.strip()]

            by_name[name] = VettedPart(
                raw=p,
                name=name,
                uid=uid,
                ptype=ptype,
                dims_m=dims_m,
                meters_per_unit=meters_per_unit,
                up_axis=up_axis,
                translate=translate,
                geom_path=geom_path,
                material_path=material_path,
                parent=parent,
                children=children,
            )
        return by_name
    
    def reconcile(self):
        """
        ....
        
        Second pass: hierarchy reconciliation + existence checks
        Pass 2: Phase A -> Ensure all referenced parents/children exist.
        """
        for name, vp in self.by_name.items(): # this checks that every part the declares parent/childern, that pparent/children actually exists
            if vp.parent and vp.parent not in self.by_name:
                raise JsonVettingError(f"Part '{name}' references missing parent '{vp.parent}'.")

            for c in vp.children:
                if c not in self.by_name:
                    raise JsonVettingError(f"Part '{name}' lists missing child '{c}'.")

    def biderectional(self):
        """
        .....
        
        Pass 2 : Phase B -> Bidirectional reconciliation: make parent and children consistent
        Reconcile parent<->children bidirectionally
        (mutate dataclasses safely by reassigning)
        Add missing child links to parent based on child's parent
        If child says it has a parent, the parent’s children list must include the child
        Why by_name.items() ?
        You are mutating (appending to) children lists while iterating. Iterating a dict view while mutating nested objects 
        is usually okay, but converting to a list makes the iteration snapshot stable and avoids subtle issues.
        """

        for name, vp in list(self.by_name.items()):
            if vp.parent:
                p = self.by_name[vp.parent]
                if name not in p.children:
                    p.children.append(name)

        # Add missing parent links to child based on parent's children
        #If parent lists a child, ensure the child agrees:
        #if child has no parent set → set it to this parent.
        # If child already has a parent:
            #if it matches → OK
            #if it differs → contradiction → error

        for name, vp in list(self.by_name.items()):
            for c in vp.children:
                child = self.by_name[c]
                if child.parent is None:
                    child.parent = name
                elif child.parent != name:
                    raise JsonVettingError(
                        f"Child '{c}' has parent '{child.parent}' but is also listed under '{name}'."
                    )

    def graph_val(self):
        """
        .... 
        
        Pass 2: Phase C -> Global graph validity, Cycle check via DFS
        visiting : nodes currently on the recursion stack ("gray")
        visited : nodes fully processed ("black")
        """
        def dfs(node: str, visiting: Set[str], visited: Set[str]) -> None:

            if node in visiting:
                raise JsonVettingError(f"Cycle detected in hierarchy at '{node}'.")
            
            if node in visited:
                return
            
            visiting.add(node)
            for ch in self.by_name[node].children:
                dfs(ch, visiting, visited)

            visiting.remove(node)
            visited.add(node)

        roots = [n for n, vp in self.by_name.items() if vp.parent is None]
        if not roots:
            raise JsonVettingError("No root found (every part has a parent).")

        visited: Set[str] = set()
        for r in roots:
            dfs(r, set(), visited)
        
        if len(visited) != len(self.by_name):
            unreachable = [n for n in self.by_name if n not in visited]
            raise JsonVettingError(f"Unreachable parts (not under any root): {unreachable}")

In [4]:
class USDBuilder:
    """
    Takes vetted parts (by_name dict) and generates:
      - geometry layers (assets/geoms)
      - material layers (assets/mats)
      - component layers (assets/components)

    Designed so you can re-run it as the model grows:
      - Existing folders are preserved
      - Files are overwritten by default (toggleable)
    """

    def __init__(
        self,
        by_name: Dict[str, VettedPart],
        *,
        assets_dir: str = "assets",
        geoms_subdir: str = "geoms",
        mats_subdir: str = "mats",
        comps_subdir: str = "components",
        overwrite: bool = True,
        use_paths_from_vetted: bool = False,
    ):
        """
        use_paths_from_vetted:
          - False (recommended): generate deterministic paths from part.name
          - True: use vp.geom_path and vp.material_path from your vetted data
        """
        self.by_name = by_name
        self.overwrite = overwrite
        self.use_paths_from_vetted = use_paths_from_vetted

        self.assets_dir = assets_dir
        self.geoms_dir = os.path.join(assets_dir, geoms_subdir)
        self.mats_dir = os.path.join(assets_dir, mats_subdir)
        self.comps_dir = os.path.join(assets_dir, comps_subdir)

        os.makedirs(self.geoms_dir, exist_ok=True)
        os.makedirs(self.mats_dir, exist_ok=True)
        os.makedirs(self.comps_dir, exist_ok=True)

    def _ref_path(self, target_path: str, authored_layer_path: str) -> str:
        """
        Return a path to target_path relative to the directory of authored_layer_path.
        This makes USD references robust regardless of where you open the stage from.
        """
        base_dir = os.path.dirname(os.path.abspath(authored_layer_path))
        return os.path.relpath(os.path.abspath(target_path), start=base_dir)

    def _set_stage_metadata(self, stage: Usd.Stage, *, meters_per_unit: float, up_axis: str) -> None:
        stage.SetMetadata("metersPerUnit", float(meters_per_unit))
        UsdGeom.SetStageUpAxis(stage, UsdGeom.Tokens.z if up_axis.upper() == "Z" else UsdGeom.Tokens.y)

    def _ensure_can_write(self, path: str) -> None:
        parent = os.path.dirname(path)
        if parent:
            os.makedirs(parent, exist_ok=True)
        if (not self.overwrite) and os.path.exists(path):
            raise FileExistsError(f"File exists and overwrite=False: {path}")

    def _make_box_mesh(self, dims_m: Tuple[float, float, float]):
        L, W, H = dims_m
        hx, hy, hz = L / 2.0, W / 2.0, H / 2.0

        pts = [
            Gf.Vec3f(-hx, -hy, -hz),
            Gf.Vec3f(+hx, -hy, -hz),
            Gf.Vec3f(+hx, +hy, -hz),
            Gf.Vec3f(-hx, +hy, -hz),
            Gf.Vec3f(-hx, -hy, +hz),
            Gf.Vec3f(+hx, -hy, +hz),
            Gf.Vec3f(+hx, +hy, +hz),
            Gf.Vec3f(-hx, +hy, +hz),
        ]

        counts = [4, 4, 4, 4, 4, 4]
        indices = [
            0, 1, 2, 3,   # bottom
            4, 5, 6, 7,   # top
            0, 1, 5, 4,   # -y
            3, 2, 6, 7,   # +y
            0, 3, 7, 4,   # -x
            1, 2, 6, 5,   # +x
        ]

        extent = [Gf.Vec3f(-hx, -hy, -hz), Gf.Vec3f(+hx, +hy, +hz)]
        return pts, counts, indices, extent

    def _geom_path_for(self, vp: VettedPart) -> str:
        if self.use_paths_from_vetted:
            return vp.geom_path
        return os.path.join(self.geoms_dir, f"{vp.name}_geom.usda")

    def _mat_path_for(self, vp: VettedPart) -> str:
        if self.use_paths_from_vetted:
            return vp.material_path
        return os.path.join(self.mats_dir, f"{vp.name}_mat.usda")

    def _comp_path_for(self, vp: VettedPart) -> str:
        return os.path.join(self.comps_dir, f"{vp.name}.usda")

    def _material_name_for(self, vp: VettedPart) -> str:
        """
        Stable material name inside the material layer.
        Later: map this from vp.raw["attributes"]["material"] or similar.
        """
        return f"{vp.name}_Material"

    def write_geometry_layer(self, vp: VettedPart) -> str:
        geom_path = self._geom_path_for(vp)
        self._ensure_can_write(geom_path)

        stage = Usd.Stage.CreateNew(geom_path)
        self._set_stage_metadata(stage, meters_per_unit=vp.meters_per_unit, up_axis=vp.up_axis)

        points, counts, indices, extent = self._make_box_mesh(vp.dims_m)

        mesh_prim_name = f"{vp.name}_Geom"
        mesh = UsdGeom.Mesh.Define(stage, f"/{mesh_prim_name}")
        mesh.CreatePointsAttr(points)
        mesh.CreateFaceVertexCountsAttr(counts)
        mesh.CreateFaceVertexIndicesAttr(indices)
        mesh.CreateSubdivisionSchemeAttr(UsdGeom.Tokens.none)
        mesh.CreateExtentAttr(extent)

        stage.SetDefaultPrim(mesh.GetPrim())
        stage.GetRootLayer().Save()
        return geom_path

    def write_material_layer(self, vp: VettedPart) -> str:
        mat_path = self._mat_path_for(vp)
        self._ensure_can_write(mat_path)

        stage = Usd.Stage.CreateNew(mat_path)
        self._set_stage_metadata(stage, meters_per_unit=vp.meters_per_unit, up_axis=vp.up_axis)

        material_name = self._material_name_for(vp)
        mat_prim_path = Sdf.Path(f"/Materials/{material_name}")

        material = UsdShade.Material.Define(stage, mat_prim_path)

        shader = UsdShade.Shader.Define(stage, mat_prim_path.AppendChild("PreviewSurface"))
        shader.CreateIdAttr("UsdPreviewSurface")
        shader.CreateInput("diffuseColor", Sdf.ValueTypeNames.Color3f).Set(Gf.Vec3f(0.7, 0.7, 0.7))
        shader.CreateInput("metallic", Sdf.ValueTypeNames.Float).Set(0.9)
        shader.CreateInput("roughness", Sdf.ValueTypeNames.Float).Set(0.35)

        material.CreateSurfaceOutput().ConnectToSource(shader.ConnectableAPI(), "surface")

        stage.SetDefaultPrim(material.GetPrim())
        stage.GetRootLayer().Save()
        return mat_path

    def write_component_layer(self, vp: VettedPart, *, geom_path: str, mat_path: str) -> str:
        comp_path = self._comp_path_for(vp)
        self._ensure_can_write(comp_path)

        stage = Usd.Stage.CreateNew(comp_path)
        self._set_stage_metadata(stage, meters_per_unit=vp.meters_per_unit, up_axis=vp.up_axis)

        root = UsdGeom.Xform.Define(stage, f"/{vp.name}")
        root_prim = root.GetPrim()
        stage.SetDefaultPrim(root_prim)

        root_prim.CreateAttribute("asset:id", Sdf.ValueTypeNames.String).Set(vp.uid)
        root_prim.CreateAttribute("asset:type", Sdf.ValueTypeNames.String).Set(vp.ptype)

        root_prim.CreateAttribute("spatial:dims_m", Sdf.ValueTypeNames.Double3).Set(
            Gf.Vec3d(float(vp.dims_m[0]), float(vp.dims_m[1]), float(vp.dims_m[2]))
        )
        root_prim.CreateAttribute("spatial:metersPerUnit", Sdf.ValueTypeNames.Double).Set(float(vp.meters_per_unit))
        root_prim.CreateAttribute("spatial:upAxis", Sdf.ValueTypeNames.String).Set(vp.up_axis)

        attrs = vp.raw.get("attributes", {})
        for k, v in attrs.items():
            if isinstance(v, (int, float)):
                root_prim.CreateAttribute(f"attr:{k}", Sdf.ValueTypeNames.Double).Set(float(v))
            else:
                root_prim.CreateAttribute(f"attr:{k}", Sdf.ValueTypeNames.String).Set(str(v))

        geom_ref = self._ref_path(geom_path, comp_path)
        mat_ref  = self._ref_path(mat_path,  comp_path)

        # Geometry reference inside component
        geom_mesh = UsdGeom.Mesh.Define(stage, f"/{vp.name}/geom")
        geom_mesh.GetPrim().GetReferences().AddReference(
            geom_ref,
            f"/{vp.name}_Geom"
        )

        materials_scope = UsdGeom.Scope.Define(stage, f"/{vp.name}/materials")
        materials_scope.GetPrim().GetReferences().AddReference(mat_ref)

        material_name = self._material_name_for(vp)
        bound_mat_path = Sdf.Path(f"/{vp.name}/materials/Materials/{material_name}")
        bound_mat = UsdShade.Material.Get(stage, bound_mat_path)

        if bound_mat and bound_mat.GetPrim().IsValid():
            UsdShade.MaterialBindingAPI.Apply(geom_mesh.GetPrim()).Bind(bound_mat)
        else:
            geom_mesh.GetPrim().SetCustomDataByKey(
                "materialHint", f"{mat_ref}:/Materials/{material_name}"
            )

        stage.GetRootLayer().Save()
        return comp_path


    # Public API

    def build_all_parts(self) -> Dict[str, Dict[str, str]]:
        """
        Build (geom, mat, component) for every vetted part.
        Returns a dict:
          outputs[name] = {"geom": ..., "mat": ..., "component": ...}
        """
        outputs: Dict[str, Dict[str, str]] = {}

        for name, vp in self.by_name.items():
            geom_path = self.write_geometry_layer(vp)
            mat_path = self.write_material_layer(vp)
            comp_path = self.write_component_layer(vp, geom_path=geom_path, mat_path=mat_path)

            outputs[name] = {"geom": geom_path, "mat": mat_path, "component": comp_path}

        return outputs

    def write_assembly_scene(
        self,
        *,
        scene_path: str = "scenes/assembly.usda",
        root_name: Optional[str] = None,
        instanceable: bool = False,
        include_root_as_instance: bool = True,
        debug_refs: bool = False,
    ) -> str:
        """
        Builds a single USD scene that instantiates the vetted hierarchy by referencing each
        part's component file at /World/<root>/<child>/<grandchild>/...
        and applying translate from JSON (vp.translate = (X,Y,Z)).

        Assumes you already generated:
            assets/components/{part.name}.usda
        """
        self._ensure_can_write(scene_path)
        stage = Usd.Stage.CreateNew(scene_path)

        roots = [n for n, vp in self.by_name.items() if vp.parent is None]
        if not roots:
            raise ValueError("No root found (every part has a parent).")

        if root_name is None:
            root_name = roots[0]
        if root_name not in self.by_name:
            raise ValueError(f"root_name '{root_name}' not found. Available roots: {roots}")

        root_part = self.by_name[root_name]

        self._set_stage_metadata(
            stage,
            meters_per_unit=float(root_part.meters_per_unit),
            up_axis=str(root_part.up_axis),
        )

        world = UsdGeom.Xform.Define(stage, "/World")
        stage.SetDefaultPrim(world.GetPrim())

        root_path = Sdf.Path(f"/World/{root_name}")
        root_xf = UsdGeom.Xform.Define(stage, root_path)
        root_xf.AddTranslateOp().Set(Gf.Vec3d(*root_part.translate))

        if include_root_as_instance:
            root_comp_abs = self._comp_path_for(root_part)               
            root_comp_ref = self._ref_path(root_comp_abs, scene_path)    

            prim = root_xf.GetPrim()
            if instanceable:
                prim.SetInstanceable(True)

            if debug_refs:
                print(f"[REF] {root_path} -> {root_comp_ref} :/{root_name}")

            prim.GetReferences().AddReference(root_comp_ref, f"/{root_name}")

        def place(node_name: str, parent_path: Sdf.Path) -> None:
            node = self.by_name[node_name]
            prim_path = parent_path.AppendChild(node_name)

            x = UsdGeom.Xform.Define(stage, prim_path)
            x.AddTranslateOp().Set(Gf.Vec3d(*node.translate))

            prim = x.GetPrim()
            if instanceable:
                prim.SetInstanceable(True)

            comp_abs = self._comp_path_for(node)              
            comp_ref = self._ref_path(comp_abs, scene_path)   

            if debug_refs:
                print(f"[REF] {prim_path} -> {comp_ref} :/{node_name}")

            prim.GetReferences().AddReference(comp_ref, f"/{node_name}")

            for ch in node.children:
                place(ch, prim_path)

        for ch in root_part.children:
            place(ch, root_path)

        stage.GetRootLayer().Save()
        return scene_path


In [5]:
model1 = VettingProc(file="habmod.json")
vetted_parts = model1.by_name

builder = USDBuilder(vetted_parts, assets_dir="assets", overwrite=True, use_paths_from_vetted=False)
outputs = builder.build_all_parts()

scene = builder.write_assembly_scene(
    scene_path="scenes/HabitationAssembly.usda",
    root_name="HabitationModule",
    include_root_as_instance=True,
    debug_refs=True,  
)

print("Scene:", scene)

[REF] /World/HabitationModule -> ../assets/components/HabitationModule.usda :/HabitationModule
[REF] /World/HabitationModule/O2Tank1 -> ../assets/components/O2Tank1.usda :/O2Tank1
[REF] /World/HabitationModule/O2Tank2 -> ../assets/components/O2Tank2.usda :/O2Tank2
Scene: scenes/HabitationAssembly.usda
