Skip to content

Commit

Permalink
feat(loader): Automate namespace management
Browse files Browse the repository at this point in the history
The manual namespace management introduced in #243 was flawed: First, it
didn't support accurately versioning the introduced namespaces based on
the model's active viewpoints, and second, it had to be done manually in
several places (which it wasn't).

The new approach instead introduces a new method `update_namespaces`,
which is automatically called while saving to bring all namespace maps
up to date.
  • Loading branch information
Wuestengecko committed Apr 15, 2024
1 parent 763ed2d commit aa654a7
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 54 deletions.
108 changes: 59 additions & 49 deletions capellambse/loader/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,22 +283,53 @@ def idcache_reserve(self, new_id: str) -> None:
"""Reserve the given ID for an element to be inserted later."""
self.__idcache[new_id] = None

def add_namespace(self, name: str, uri: str) -> None:
"""Add the given namespace to this tree's root element."""
fragroot = self.root
olduri = fragroot.nsmap.get(name)
if olduri is not None and olduri != uri:
LOGGER.warning(
"Namespace %r already registered with URI %r", name, olduri
)
return
if uri == olduri:
def update_namespaces(self, viewpoints: cabc.Mapping[str, str]) -> None:
"""Update the current namespace map.
Parameters
----------
viewpoints
A mapping from viewpoint names to the version activated in
the model.
If an element from a versioned Plugin is encountered, but
the Plugin's viewpoint is not activated in the model, an
error is raised and no update is performed.
"""
new_nsmap: dict[str, str] = {
"xmi": _n.NAMESPACES["xmi"],
"xsi": _n.NAMESPACES["xsi"],
}
for elem in self.root.iter():
xtype = helpers.xtype_of(elem)
if xtype is None:
continue
ns, _, _ = xtype.partition(":")
plugin = _n.NAMESPACES_PLUGINS.get(ns)
if plugin is None:
continue

uri = plugin.name.rstrip("/")
if plugin.version is not None:
assert plugin.viewpoint is not None
vp_version = viewpoints.get(plugin.viewpoint)
if not vp_version:
raise CorruptModelError(
f"Viewpoint not activated: {plugin.viewpoint}"
)
uri += f"/{vp_version}"

assert new_nsmap.get(ns) in (None, uri)
new_nsmap[ns] = uri

assert new_nsmap
if self.root.nsmap == new_nsmap:
return

new_root = self.root.makeelement(
self.root.tag,
attrib=self.root.attrib,
nsmap={**self.root.nsmap, name: uri},
nsmap=dict(sorted(new_nsmap.items())),
)
new_root.extend(self.root.getchildren())

Expand Down Expand Up @@ -515,6 +546,8 @@ def save(self, **kw: t.Any) -> None:
" (hint: pass i_have_a_recent_backup=True)"
)

self.update_namespaces()

LOGGER.debug("Saving model %r", self.get_model_info().title)
with self.filehandler.write_transaction(**kw) as unsupported_kws:
if unsupported_kws:
Expand All @@ -532,6 +565,21 @@ def save(self, **kw: t.Any) -> None:
with self.resources[resname].open(fname, "wb") as f:
tree.write_xml(f)

def update_namespaces(self) -> None:
"""Update the namespace definitions on each fragment root.
This method is automatically called while saving to ensure that
all namespaces necessary for the current model elements are
registered on the fragment roots.
"""
vp = dict(self.referenced_viewpoints())
for fname, fragment in self.trees.items():
if fragment.fragment_type != FragmentType.SEMANTIC:
continue

LOGGER.debug("Updating namespaces on fragment %s", fname)
fragment.update_namespaces(vp)

def idcache_index(self, subtree: etree._Element) -> None:
"""Index the IDs of ``subtree``.
Expand Down Expand Up @@ -577,44 +625,6 @@ def idcache_rebuild(self) -> None:
for tree in self.trees.values():
tree.idcache_rebuild()

def add_namespace(
self,
fragment: str | pathlib.PurePosixPath | etree._Element,
name: str,
uri: str | None = None,
/,
) -> None:
"""Add the given namespace to the given tree's root element.
Parameters
----------
fragment
Either the name of a fragment (as
:class:`~pathlib.PurePosixPath` or :class:`str`), or a model
element. In the latter case, the fragment that contains this
element is used.
name
The canonical name of this namespace. This typically uses
reverse DNS notation in Capella.
uri
The namespace URI. If not specified, the canonical name will
be used to look up the URI in the list of known namespaces.
"""
if isinstance(fragment, etree._Element):
fragment = self.find_fragment(fragment)
elif isinstance(fragment, str):
fragment = pathlib.PurePosixPath(fragment)
elif not isinstance(fragment, pathlib.PurePosixPath):
raise TypeError(f"Invalid fragment specifier {fragment!r}")

if not uri:
try:
uri = _n.NAMESPACES[name]
except KeyError:
raise ValueError(f"Unknown namespace {name!r}") from None

self.trees[fragment].add_namespace(name, uri)

def generate_uuid(
self, parent: etree._Element, *, want: str | None = None
) -> str:
Expand Down
3 changes: 0 additions & 3 deletions capellambse/model/common/element.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,6 @@ def __init__(
for key, val in kw.items():
if key == "xtype":
self._element.set(helpers.ATT_XT, val)
self._model._loader.add_namespace(
parent, val.split(":", maxsplit=1)[0]
)
elif not isinstance(
getattr(type(self), key),
(accessors.Accessor, properties.AttributeProperty),
Expand Down
7 changes: 5 additions & 2 deletions tests/test_model_creation_deletion.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def test_create_adds_missing_namespace_to_fragment(
module = model.by_uuid("85a31dd7-7755-486b-b803-1df8915e2cf9")

module.requirements.create(name="TestReq")
model._loader.update_namespaces()

assert "Requirements" in model._element.nsmap

Expand All @@ -100,12 +101,14 @@ def test_adding_a_namespace_preserves_the_capella_version_comment(
assert "Requirements" not in model._element.nsmap, "Precondition failed"
prev_elements = list(model._element.itersiblings(preceding=True))
assert len(prev_elements) == 1, "No version comment to preserve?"
module = model.by_uuid("85a31dd7-7755-486b-b803-1df8915e2cf9")

model._loader.add_namespace(model._element, "Requirements")
module.requirements.create(name="TestReq")
model._loader.update_namespaces()

assert "Requirements" in model._element.nsmap
prev_elements = list(model._element.itersiblings(preceding=True))
assert len(prev_elements) == 1
assert model.info.capella_version != "UNKNOWN"


def test_deleting_an_object_purges_references_from_AttrProxyAccessor(
Expand Down

0 comments on commit aa654a7

Please sign in to comment.