From add991f94949f9b4133be487a09bb694d709dde1 Mon Sep 17 00:00:00 2001 From: freshavocado7 Date: Tue, 28 May 2024 14:29:01 +0200 Subject: [PATCH] feat: Implement modelling rules and validation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Note that this feature is distinct from the "Validate Model" option in the Capella GUI. Validation in capellambse uses its own custom set of rules. Also note that this is only an initial version, which still has a lot of limitations. In future iterations, the applied ruleset may be extended, modified, or made more configurable from a user's perspective. Co-authored-by: Ernst Würger Co-authored-by: Martin Lehmann Co-authored-by: Viktor Kravchenko --- .pre-commit-config.yaml | 1 + capellambse/extensions/validation/__init__.py | 37 ++ capellambse/extensions/validation/__main__.py | 47 ++ .../extensions/validation/_validate.py | 465 ++++++++++++++++++ .../validation/report-template.html.jinja | 302 ++++++++++++ capellambse/extensions/validation/rules.py | 427 ++++++++++++++++ capellambse/model/common/__init__.py | 21 + capellambse/model/layers/la.py | 2 +- pyproject.toml | 3 + tests/test_validation.py | 165 +++++++ 10 files changed, 1469 insertions(+), 1 deletion(-) create mode 100644 capellambse/extensions/validation/__init__.py create mode 100644 capellambse/extensions/validation/__main__.py create mode 100644 capellambse/extensions/validation/_validate.py create mode 100644 capellambse/extensions/validation/report-template.html.jinja create mode 100644 capellambse/extensions/validation/rules.py create mode 100644 tests/test_validation.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 42cc6d0b..bf276c0d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -108,6 +108,7 @@ repos: - mypy==1.10.0 - click==8.1.7 - diskcache==5.0 + - jinja2==3.1.3 - markupsafe==2.0 - platformdirs==4.2.0 - pytest-cov diff --git a/capellambse/extensions/validation/__init__.py b/capellambse/extensions/validation/__init__.py new file mode 100644 index 00000000..60800915 --- /dev/null +++ b/capellambse/extensions/validation/__init__.py @@ -0,0 +1,37 @@ +# SPDX-FileCopyrightText: Copyright DB InfraGO AG +# SPDX-License-Identifier: Apache-2.0 +"""The module provides management and evaluation of validation rules. + +Validation rules are conditions ensuring that specific modeling +guidelines are followed. These rules apply to particular types of model +elements or to diagrams, and values of metrics. By evaluating each rule, +the module generates validation results, indicating whether the +corresponding guideline has been satisfied or not. This way, the module +helps maintain the quality and consistency of the model. +""" + +from ._validate import * + +from . import rules # isort: skip + + +def init() -> None: + # pylint: disable=redefined-outer-name # false-positive + import capellambse + from capellambse.model import common as c + + c.set_accessor( + capellambse.MelodyModel, + "validation", + c.AlternateAccessor(ModelValidation), + ) + capellambse.MelodyModel.validate = property( # type: ignore[attr-defined] + lambda self: self.validation.validate + ) + + c.set_accessor( + c.GenericElement, "validation", c.AlternateAccessor(ElementValidation) + ) + c.GenericElement.validate = property( # type: ignore[attr-defined] + lambda self: self.validation.validate + ) diff --git a/capellambse/extensions/validation/__main__.py b/capellambse/extensions/validation/__main__.py new file mode 100644 index 00000000..0b3d93ae --- /dev/null +++ b/capellambse/extensions/validation/__main__.py @@ -0,0 +1,47 @@ +# SPDX-FileCopyrightText: Copyright DB InfraGO AG +# SPDX-License-Identifier: Apache-2.0 +from __future__ import annotations + +import logging +import typing as t + +import click +import jinja2 + +import capellambse + + +@click.command() +@click.option("-m", "--model", required=True, type=capellambse.ModelCLI()) +@click.option( + "-o", + "--output", + type=click.File("w", atomic=True), + required=True, + help="Output file to render the template into", +) +@click.option("-t", "--template", help="An optional custom template to render") +def _main( + model: capellambse.MelodyModel, + template: str | None, + output: t.IO[str], +) -> None: + logging.basicConfig() + + loader: jinja2.BaseLoader + if template is None: + loader = jinja2.PackageLoader("capellambse", "extensions/validation") + template = "report-template.html.jinja" + else: + loader = jinja2.FileSystemLoader(".") + env = jinja2.Environment(loader=loader) + + with output: + env.get_template(template).stream( + model=model, + results=model.validation.validate(), + ).dump(output) + + +if __name__ == "__main__": + _main() diff --git a/capellambse/extensions/validation/_validate.py b/capellambse/extensions/validation/_validate.py new file mode 100644 index 00000000..a4d431cf --- /dev/null +++ b/capellambse/extensions/validation/_validate.py @@ -0,0 +1,465 @@ +# SPDX-FileCopyrightText: Copyright DB InfraGO AG +# SPDX-License-Identifier: Apache-2.0 +from __future__ import annotations + +__all__ = [ + "Category", + "ElementValidation", + "ModelValidation", + "RealType", + "Result", + "Results", + "Rule", + "Rules", + "Validation", + "VirtualType", + "rule", + "virtual_type", +] + +import collections.abc as cabc +import dataclasses +import enum +import logging +import typing as t + +from lxml import etree + +import capellambse +import capellambse.model.modeltypes as mt +from capellambse import model +from capellambse.model import common as c + +LOGGER = logging.getLogger(__name__) +_T = t.TypeVar("_T", bound=t.Union[c.GenericElement, "VirtualType"]) + + +@dataclasses.dataclass(frozen=True) +class VirtualType(t.Generic[_T]): + name: str + real_type: type[_T] + filter: cabc.Callable[[_T], bool] + + def search(self, model_: capellambse.MelodyModel) -> c.ElementList: + assert isinstance(self.real_type, (str, type(c.GenericElement))) + return model_.search(self.real_type).filter(self.filter) + + def matches(self, obj: c.GenericElement) -> bool: + return isinstance(obj, self.real_type) and self.filter(obj) + + +@dataclasses.dataclass(frozen=True) +class RealType(t.Generic[_T]): + class_: type[_T] + + @property + def name(self) -> str: + return self.class_.__name__ + + def search(self, model_: capellambse.MelodyModel) -> c.ElementList: + assert isinstance(self.class_, (str, type(c.GenericElement))) + return model_.search(self.class_) + + def matches(self, obj: c.GenericElement) -> bool: + return isinstance(obj, self.class_) + + +class _VirtualTypesRegistry(cabc.Mapping[str, t.Union[VirtualType, RealType]]): + def __init__(self) -> None: + self.__registry: dict[str, VirtualType] = {} + + def __iter__(self) -> cabc.Iterator[str]: + yield from self.__registry + + def __len__(self) -> int: + return len(self.__registry) + + def __contains__(self, key: object) -> bool: + return key in self.__registry + + def __getitem__(self, key: str) -> VirtualType | RealType: + try: + return self.__registry[key] + except KeyError: + pass + + _, class_ = c.resolve_handler(key) + return RealType(class_) + + def register(self, vtype: VirtualType) -> None: + try: + known = self.__registry[vtype.name] + except KeyError: + pass + else: + raise RuntimeError(f"Virtual type already known: {known}") + self.__registry[vtype.name] = vtype + + +_types_registry = _VirtualTypesRegistry() + + +def virtual_type( + real_type: str | type[_T], +) -> cabc.Callable[[cabc.Callable[[_T], bool]], VirtualType[_T]]: + if isinstance(real_type, str): + (cls,) = t.cast(tuple[type[_T], ...], c.find_wrapper(real_type)) + else: + cls = real_type + + def decorate(func: cabc.Callable[[_T], bool]) -> VirtualType[_T]: + vtype = VirtualType(func.__name__, cls, func) + _types_registry.register(vtype) + return vtype + + return decorate + + +@virtual_type(model.oa.OperationalActivity) +def OperationalActivity(obj): + return obj != obj._model.oa.root_activity + + +@virtual_type(model.ctx.SystemFunction) +def SystemFunction(obj): + return obj != obj._model.sa.root_function + + +@virtual_type(model.la.LogicalFunction) +def LogicalFunction(obj): + return obj != obj._model.la.root_function + + +@virtual_type(model.pa.PhysicalFunction) +def PhysicalFunction(obj): + return obj != obj._model.pa.root_function + + +class Category(mt._StringyEnumMixin, enum.Enum): + """A category for a rule.""" + + REQUIRED = enum.auto() + RECOMMENDED = enum.auto() + SUGGESTED = enum.auto() + + +@dataclasses.dataclass +class Result: + """The result of checking a validation rule against a model object.""" + + rule: Rule + object: c.GenericElement + passed: bool + + def __repr__(self) -> str: + """Return the representation of a result.""" + return ( + f"{type(self).__name__}(rule={self.rule!r}," + f" object={self.object._short_repr_()}," + f" value={self.passed!r})" + ) + + +@dataclasses.dataclass(frozen=True) +class Rule(t.Generic[_T]): + """A validation rule.""" + + id: str + name: str + types: frozenset[str] + rationale: str + category: Category + action: str + validate: cabc.Callable[[_T], bool] + + def find_objects( + self, model_: capellambse.MelodyModel + ) -> cabc.Iterator[_T]: + seen: set[str] = set() + for i in self.types: + for obj in _types_registry[i].search(model_): + if obj.uuid in seen: + continue + seen.add(obj.uuid) + yield obj + + def applies_to(self, obj: c.GenericElement) -> bool: + """Check whether this Rule applies to a specific element.""" + return any(_types_registry[i].matches(obj) for i in self.types) + + +class Rules(dict[str, Rule]): + """Stores validation rules indexed by their ID.""" + + def by_category(self, category: Category | str) -> list[Rule]: + """Filter the validation rules by category.""" + if isinstance(category, str): + category = Category[category] + return [i for i in self.values() if i.category == category] + + def by_type(self, type: type[c.GenericElement] | str) -> list[Rule]: + """Filter the validation rules by type.""" + if not isinstance(type, str): + type = type.__name__ + return [i for i in self.values() if type in i.types] + + +_VALIDATION_RULES = Rules() + + +class Results: + """A set of validation results.""" + + def __init__( + self, + results: cabc.Iterable[tuple[tuple[Rule, str], Result]] = (), + ) -> None: + self.__container = dict(results) + + def __len__(self) -> int: + return len(self.__container) + + def __iter__(self) -> cabc.Iterator[Result]: + return iter(self.__container.values()) + + def get_result( + self, rule_: Rule | str, target: str | c.GenericElement, / + ) -> Result | None: + if isinstance(rule_, str): + rule_ = _VALIDATION_RULES[rule_] + if not isinstance(target, str): + target = target.uuid + assert isinstance(target, str) + + try: + return self.__container[rule_, target] + except KeyError: + return None + + def iter_rules(self) -> cabc.Iterator[Rule]: + seen: set[str] = set() + for result in self.__container.values(): + if result.rule.id in seen: + continue + seen.add(result.rule.id) + yield result.rule + + def iter_objects(self) -> cabc.Iterator[c.GenericElement]: + seen: set[str] = set() + for result in self.__container.values(): + if result.object.uuid in seen: + continue + seen.add(result.object.uuid) + yield result.object + + def iter_results(self) -> cabc.Iterator[Result]: + return iter(self) + + def iter_compliant_objects(self) -> cabc.Iterator[c.GenericElement]: + for obj in self.iter_objects(): + if all(i.passed for i in self.by_object(obj).iter_results()): + yield obj + + def by_rule(self, key: Rule | str, /) -> Results: + if not isinstance(key, Rule): + key = _VALIDATION_RULES[key] + return Results( + ((rule_, objid), result) + for (rule_, objid), result in self.__container.items() + if rule_ == key + ) + + def by_object(self, target: str | c.GenericElement, /) -> Results: + """Filter the validation results by the target object.""" + if not isinstance(target, str): + target = target.uuid + return Results( + ((rule_, objid), result) + for (rule_, objid), result in self.__container.items() + if target == objid + ) + + def by_category(self, category: Category | str, /) -> Results: + """Filter the validation results by category.""" + return Results( + ((rule_, objid), result) + for (rule_, objid), result in self.__container.items() + if rule_.category == category + ) + + def by_passed(self, passed: bool, /) -> Results: + """Filter the validation results by whether the rule passed or not.""" + return Results( + ((rule_, objid), result) + for (rule_, objid), result in self.__container.items() + if result.passed == passed + ) + + def by_type(self, /, *types: str) -> Results: + """Filter the validation results by target object type.""" + if not types: + raise TypeError("Results.by_type requires at least one argument") + typeobjs = [_types_registry[i] for i in types] + return Results( + ((rule_, objid), result) + for (rule_, objid), result in self.__container.items() + if any(t.matches(result.object) for t in typeobjs) + ) + + +def rule( + id: str, + category: Category, + *, + name: str, + rationale: str, + action: str, + types: ( + str + | VirtualType[_T] + | type[_T] + | cabc.Iterable[str | VirtualType[_T] | type[_T]] + ), +) -> cabc.Callable[[cabc.Callable[[_T], bool]], Rule]: + """Create a validation rule. + + This decorator registers the validator function as a modelling rule. + The function is called with each model object to be validated. It + must return True if the rule passed, and False if it did not pass. + + Parameters + ---------- + category + The category of severity for this rule. + types + Types of objects that this rule applies to. + + Object types can be either real types (subclasses of + :class:`~capellambse.model.common.GenericElement`) or virtual + types created with the :func:`virtual_type` decorator. These can + be freely mixed in the rule decorator. + id + The unique ID of this rule. + + If another rule with this ID already exists, an error is raised. + name + Human-readable short name for the rule. + rationale + Text describing why the rule is useful. + action + Human-reabale short description of what needs to be changed for + the rule to pass. + """ + if id in _VALIDATION_RULES: + raise ValueError(f"Duplicate rule ID: {id}") + + if not types: + raise TypeError("No 'types' specified") + if isinstance(types, type): + type_names = [types.__name__] + elif isinstance(types, (RealType, VirtualType)): + type_names = [types.name] + elif isinstance(types, str): + type_names = [types] + else: + type_names = [] + for i in types: + if isinstance(i, str): + type_names.append(i) + elif isinstance(i, (RealType, VirtualType)): + type_names.append(i.name) + else: + type_names.append(i.__name__) + + def rule_decorator(validator: cabc.Callable[[_T], bool], /) -> Rule: + rule_ = Rule( + id, + name, + frozenset(type_names), + rationale, + category, + action, + validator, + ) + _VALIDATION_RULES.setdefault(rule_.id, rule_) + assert _VALIDATION_RULES[rule_.id] is rule_ + return rule_ + + return rule_decorator + + +class Validation: + """Basic class for access to validation rules and results.""" + + _model: capellambse.MelodyModel + _element: etree._Element + _constructed: bool + + parent = c.AlternateAccessor(c.GenericElement) + + def __init__(self, **kw: t.Any) -> None: + raise TypeError("Cannot create Validation object this way") + + @classmethod + def from_model( + cls, model_: capellambse.MelodyModel, element: etree._Element + ) -> Validation: + """Create a Validation object for a MelodyModel or an element.""" + self = cls.__new__(cls) + self._model = model_ + self._element = element + self._constructed = True + return self + + +class ModelValidation(Validation): + """Provides access to the model's validation rules and results.""" + + @property + def rules(self) -> Rules: + """Return all registered validation rules.""" + return _VALIDATION_RULES + + def validate(self) -> Results: + """Execute all registered validation rules and store results.""" + all_results = [] + for rule_ in _VALIDATION_RULES.values(): + for obj in rule_.find_objects(self._model): + all_results.append( + ( + (rule_, obj.uuid), + Result(rule_, obj, rule_.validate(obj)), + ) + ) + return Results(all_results) + + def search(self, /, *typenames: str) -> c.ElementList[t.Any]: + found: dict[str, t.Any] = {} + for i in typenames: + objs = _types_registry[i].search(self._model) + found.update((o.uuid, o._element) for o in objs) + return c.MixedElementList(self._model, list(found.values())) + + +class ElementValidation(Validation): + """Provides access to the model's validation rules and results.""" + + @property + def rules(self) -> list[Rule]: + """Return all registered validation rules that apply to this object.""" + obj = self.parent + return [i for i in _VALIDATION_RULES.values() if i.applies_to(obj)] + + def validate(self) -> Results: + """Validate this element against the rules that apply to it.""" + obj = self.parent + all_results = [] + for rule_ in _VALIDATION_RULES.values(): + if rule_.applies_to(obj): + all_results.append( + ( + (rule_, obj.uuid), + Result(rule_, obj, rule_.validate(obj)), + ) + ) + return Results(all_results) diff --git a/capellambse/extensions/validation/report-template.html.jinja b/capellambse/extensions/validation/report-template.html.jinja new file mode 100644 index 00000000..63ebfc7d --- /dev/null +++ b/capellambse/extensions/validation/report-template.html.jinja @@ -0,0 +1,302 @@ +{#- + # SPDX-FileCopyrightText: Copyright DB Netz AG and the capellambse contributors + # SPDX-License-Identifier: Apache-2.0 +-#} + +{% set CATEGORIES = ["REQUIRED", "RECOMMENDED", "SUGGESTED"] -%} +{% set object_types = [ + "Capability", + "SystemActor", + "SystemComponent", + "SystemFunction", + "ComponentExchange", + "FunctionalExchange" +] -%} +{% set results = results.by_type(*object_types) -%} + +{% macro show_compliance_score(results, align=None) -%} + {% if results | count > 0 -%} + {% set total = results | count -%} + {% set passed = results.by_passed(True) | count -%} + {% set score = (passed / total * 100) | round(1) -%} + {% if score < 30 -%} + {% set color = "#bc2604" -%} + {% elif score < 70 -%} + {% set color = "#ae930d" -%} + {% else -%} + {% set color = "#0a6600" -%} + {% endif -%} + + {{score}}% ({{passed}} / {{total}}) + +
+ {% else -%} + not applicable +
+ {% endif -%} +{% endmacro -%} + + + + + + +System Analysis Layer: Modelling Rules Compliance Report + + +

System Analysis Layer: Modeling Rules Compliance Report

+

+ The report provides an assessment of adherence to Modeling Rules and Best Practices by the contents of the System Analysis model layer. +

+ +

Assessment Summary

+We reviewed {{ model.validation.search(*object_types) | list | count }} model objects and found that overall compliance with the modelling rules is:
+{% for category in CATEGORIES -%} + {% set category_results = results.by_category(category) -%} + {% set total = category_results | count -%} + {% set passed = category_results.by_passed(True) | count -%} + {% set score = (passed / total * 100) | round(1) if total else 100 -%} + {% if score < 30 -%} + {% set color = "#bc2604" -%} + {% elif score < 70 -%} + {% set color = "#ae930d" -%} + {% else -%} + {% set color = "#0a6600" -%} + {% endif -%} + {{score}}% ({{passed}} / {{total}}) for {{category}} + {%- if loop.last %}.{% else %}; {% endif -%} +{% endfor %} + + + + + + + + + + {% for category in CATEGORIES -%} + + {% endfor -%} + + + + {% for obj_type in object_types %} + {% set total = model.validation.search(obj_type) | count %} + + + {% for category in CATEGORIES %}{% endfor %} + {% endfor %} + +
Reviewed objectsTotal objectsCompliance to applicable modeling rules
{{ category }}
{{ obj_type }}{{ total }} + {{ show_compliance_score(results.by_category(category).by_type(obj_type)) }} +
+ +

Object Compliance Analysis

+

This section evaluates the selected objects' adherence to modeling rules. If an object needs additional work or has + potential for improvement, we provide actionable suggestions to enhance its compliance.

+ +{% for obj_type in object_types %} +

{{ obj_type }}

+{% if not results.by_type(obj_type) -%} +

No rules apply to {{ obj_type }} objects or no objects of interest found in the selected layer.

+{% else -%} +{% for obj in results.by_type(obj_type).by_passed(False).iter_objects() | sort(attribute="name") -%} + {% if loop.first -%} + + + + + + + + + + {% endif -%} + {% set categories = results.by_object(obj).by_passed(False) | map(attribute="rule.category") | sort(attribute="value") | unique | list -%} + {% for category in categories -%} + {% set cat_results = results.by_object(obj).by_category(category) -%} + + {% if loop.first -%} + + {% endif -%} + + + + {% endfor -%} + {% if loop.last -%} + +
{{ obj_type }}Rule Compliance (%)Improvement Suggestions (Actions)
{{ obj.name }} + {{ category }}: + {{ show_compliance_score(cat_results, align="right") }} + + {% for rule in cat_results.by_passed(False).iter_rules() -%} + {% if loop.first %}{% endif -%} + {% else -%} + No action required + {% endfor -%} +
+ {% endif -%} +{% endfor -%} + +{% set categories_of_interest = [] -%} +{% for category in CATEGORIES -%} + {% set objects_of_interest = [] -%} + {% for obj in objects if results.by_category(category).by_uuid(obj.uuid) and not results.by_category(category).by_value(False).by_uuid(obj.uuid) -%} + {% set _ = objects_of_interest.append(obj) -%} + {% endfor -%} + {% if objects_of_interest | count > 0 -%} + {% set _ = categories_of_interest.append([category, objects_of_interest]) -%} + {% endif -%} +{% endfor -%} +{% for category in CATEGORIES if results.by_type(obj_type).by_category(category) | count > 0 and results.by_type(obj_type).by_category(category).iter_compliant_objects() | list | count > 0 %} + {% if loop.first %} +

These objects fully comply with at least one rule category:

+ {% endif %} + {% set results_of_interest = results.by_type(obj_type).by_category(category) -%} + {% set compliant_objects = results_of_interest.iter_compliant_objects() | sort(attribute="name") -%} +

+ All {{ results_of_interest.iter_rules() | list | count }} + {{category}} rules were met by the following {{ compliant_objects | count }} object(s): + {{ compliant_objects | join("; ", attribute="name")}} +

+{% endfor %} +{% endif -%} +{% endfor -%} + +

Validation results by rule

+ + + + + {% for rule in results.iter_rules() | sort(attribute="id") | sort(attribute="category.value") -%} + {% set rule_results = results.by_rule(rule) -%} + + + + + + + {%- endfor %} + +
Rule ID + Definition + Passed + Failed +
+ {{rule.id}}
+ {{rule.category.name}} +

Applies to: {{ rule.types | list | join(", ") }}

+
+ {{ rule.name }}
+ {{ rule.rationale }} +

Recommended action:
{{ rule.action }}

+
{{ show_compliance_score(rule_results) }} + {% for result in rule_results | sort(attribute="object.name") if not result.passed -%} + {% if loop.first %}{% endif -%} + {% else -%} + All {{ rule_results.iter_objects() | list | count}} object(s) comply to this rule + {%- endfor %} +
+ + + diff --git a/capellambse/extensions/validation/rules.py b/capellambse/extensions/validation/rules.py new file mode 100644 index 00000000..8a04e92a --- /dev/null +++ b/capellambse/extensions/validation/rules.py @@ -0,0 +1,427 @@ +# SPDX-FileCopyrightText: Copyright DB InfraGO AG +# SPDX-License-Identifier: Apache-2.0 +from __future__ import annotations + +import re +import typing as t + +import capellambse +from capellambse.extensions import validation +from capellambse.model import modeltypes +from capellambse.model.crosslayer import capellacommon as cc +from capellambse.model.crosslayer import fa +from capellambse.model.layers import ctx as sa +from capellambse.model.layers import la, oa, pa + +from . import _validate +from ._validate import rule + + +@validation.virtual_type(sa.SystemComponent) +def SystemActor(cmp: sa.SystemComponent) -> bool: + return cmp.is_actor + + +@validation.virtual_type(pa.PhysicalComponent) +def BehaviourPhysicalComponent(cmp: pa.PhysicalComponent) -> bool: + return cmp.nature == modeltypes.PhysicalComponentNature.BEHAVIOR + + +@validation.virtual_type(cc.State) +def OperationalState(state: cc.State) -> bool: + layer = _find_layer(state) + return isinstance(layer, oa.OperationalAnalysis) + + +@validation.virtual_type(cc.State) +def SystemState(state: cc.State) -> bool: + layer = _find_layer(state) + return isinstance(layer, sa.SystemAnalysis) + + +@validation.virtual_type(cc.State) +def LogicalState(state: cc.State) -> bool: + layer = _find_layer(state) + return isinstance(layer, la.LogicalArchitecture) + + +@validation.virtual_type(cc.State) +def PhysicalState(state: cc.State) -> bool: + layer = _find_layer(state) + return isinstance(layer, pa.PhysicalArchitecture) + + +def _find_layer( + obj, +) -> ( + oa.OperationalAnalysis + | sa.SystemAnalysis + | la.LogicalArchitecture + | pa.PhysicalArchitecture +): + parent = obj.parent + while not isinstance( + parent, + ( + oa.OperationalAnalysis, + sa.SystemAnalysis, + la.LogicalArchitecture, + pa.PhysicalArchitecture, + ), + ): + parent = parent.parent + return parent + + +# 00. Common +@rule( + category=_validate.Category.RECOMMENDED, + types=[ + sa.Capability, + sa.SystemFunction, + sa.SystemComponent, + oa.Entity, + oa.OperationalCapability, + oa.OperationalActivity, + la.LogicalFunction, + la.LogicalComponent, + pa.PhysicalFunction, + pa.PhysicalComponent, + cc.State, + ], + id="Rule-001", + name="Object has a description or summary", + rationale=( + "A comprehensive description or summary for an object is essential to" + " ensure a clear understanding of its purpose, function, and role" + " within the system. Providing a concise, yet informative description" + " fosters better collaboration among team members, reduces ambiguity," + " and facilitates efficient decision-making." + ), + action="fill the description and/or summary text fields", +) +def has_non_empty_description_or_summary( + obj: capellambse.model.GenericElement, +) -> bool: + return bool(obj.description) or bool(obj.summary) + + +@rule( + category=_validate.Category.REQUIRED, + types=[ + sa.Capability, + oa.OperationalCapability, + ], + id="Rule-002", + name="Capability involves an Entity / Actor", + rationale=( + "Each Capability serves a need and brings a benefit for at least one" + " of the Actors/Entities. By involving an actor / entity in a" + " Capability we explicitly name stakeholders behind the Capability." + ), + action="Add at least one involved Actor or Entity.", +) +def capability_involves_entity(obj: capellambse.model.GenericElement) -> bool: + if isinstance(obj, oa.OperationalCapability): + return bool(obj.involved_entities) + return bool(obj.involved_components) + + +# FIXME Spacy and the NLP model should load lazily on rule evaluation +def _try_load_language_model() -> ( + tuple[spacy.Language | None, t.Literal[1, 2]] +): + try: + return spacy.load(_NLP_NAME), 2 + except OSError: + pass + + try: + from spacy import cli + + cli.download(_NLP_NAME) + except Exception: + pass + + try: + return spacy.load(_NLP_NAME), 2 + except OSError: + return None, 1 + + +_NLP_NAME = "en_core_web_lg" +try: + import spacy + + _HAS_SPACY = 2 +except ImportError: + _nlp = None + _HAS_SPACY = 0 +else: + _nlp, _HAS_SPACY = _try_load_language_model() + + +@rule( + category=_validate.Category.REQUIRED, + types=[ + sa.Capability, + sa.SystemFunction, + oa.OperationalCapability, + oa.OperationalActivity, + la.LogicalFunction, + pa.PhysicalFunction, + ], + id="Rule-003", + name=( + "Behavior name follows verb-noun pattern" + if _HAS_SPACY == 2 + else "Can't check if behavior name follows verb-noun pattern" + ), + rationale=( + "Using the verb-noun pattern for naming behaviors promotes clarity," + " consistency, and effective communication across the system. Adhering" + " to this convention simplifies understanding and management for all" + " stakeholders. Please revise any non-compliant names to align with" + " this proven practice." + ), + action=( + "Install spacy and download the natural language model.", + "Download the natural language model.", + ( + 'change the object name to follow the pattern of "VERB NOUN",' + ' for example "brew coffee"' + ), + )[_HAS_SPACY], +) +def behavior_name_follows_verb_noun_pattern( + obj: capellambse.model.GenericElement, +) -> bool: + if _nlp is None: + return False + + text = re.sub(r"^\d+: ", "", obj.name) + if len(text) < 1: + return False + doc = _nlp(text) + if len(doc) < 2: + return False + + # Check if the first token is a verb + if doc[0].pos_ != "VERB": + return False + + # Skip any number of adjectives and adverbs following the verb + i = 1 + while i < len(doc) and doc[i].pos_ in ("ADJ", "ADV"): + i += 1 + + # If there's a noun after the adjectives/adverbs, the pattern is valid + if i < len(doc) and doc[i].pos_ in ("NOUN", "PROPN"): + return True + + return False + + +@rule( + category=_validate.Category.RECOMMENDED, + types=[ + sa.Capability, + oa.OperationalCapability, + la.CapabilityRealization, + # The CapabilityRealization in the pa layer is also affected by this + # rule but is not defined as pa.CapabilityRealization since its the + # same class as la.CapabilityRealization + ], + id="Rule-004", + name="Capability has a defined pre-condition", + rationale=( + "Defining a pre-condition for a Capability helps clarify the necessary" + " state of the primary Actor before interacting with the System, which" + " enables a better understanding of the context and ensures" + " prerequisites are met for successful system performance within the" + " scope of the Capability." + ), + action=( + "Define a pre-condition for this Capability that describes the initial" + " state or requirements of the primary Actor before interacting with" + " the System, to provide clarity on the starting context for the" + " Capability." + ), +) +def has_precondition(obj: capellambse.model.GenericElement) -> bool: + return obj.precondition is not None + + +@rule( + category=_validate.Category.RECOMMENDED, + types=[ + sa.Capability, + oa.OperationalCapability, + la.CapabilityRealization, + ], + id="Rule-005", + name="Capability has a defined post-condition", + rationale=( + "Defining a post-condition for a Capability helps establish the" + " expected state of the primary Actor after interacting with the" + " System, ensuring clear understanding of the desired outcome and" + " enabling effective evaluation of system performance within the scope" + " of the Capability." + ), + action=( + "Define a post-condition for this Capability that describes the" + " expected state or outcome for the primary Actor after interacting" + " with the System, to ensure clear understanding of the desired" + " results and enable effective evaluation of system performance." + ), +) +def has_postcondition(obj): + return obj.postcondition is not None + + +@rule( + category=_validate.Category.REQUIRED, + types=[fa.FunctionalExchange], + id="Rule-006", + name=( + "All Functional exchanges shall be allocated to at least one component" + " exchange" + ), + rationale=( + "Each functional exchange should be allocated to any one of the" + " component exchange. Otherwise this implies that the information" + " flow is not considered or apportioned in the architecture." + ), + action=( + "Allocated the functional exchange to the appropriate component" + " exchange" + ), +) +def functional_exchange_allocated_to_component_exchange( + obj: fa.FunctionalExchange, +) -> bool: + if _find_layer(obj).name == "Physical Architecture": + return bool(obj.allocating_component_exchange) + return True + + +# 01. Operational Analysis +@rule( + category=_validate.Category.REQUIRED, + types=[oa.OperationalCapability], + id="Rule-007", + name=( + "Capability involves at least two Activities from different" + " entities/actors" + ), + rationale=( + "An operational capability should have at least two operational" + " activities in a sequence, performed by two different entity/actor to" + " qualify it to be meaningful." + ), + action=( + "involve two operational activities from two different entity/actor" + ), +) +def capability_involves_two_activities_from_different_entities( + obj: oa.OperationalCapability, +) -> bool: + actors = {activity.owner for activity in obj.involved_activities} + return len(actors) > 1 + + +@rule( + category=_validate.Category.REQUIRED, + types=[oa.OperationalActivity], + id="Rule-008", + name="Activity has at least one interaction with another activity", + rationale=( + "To have any effect in the context of the model, an Operational " + " Activity needs to interact (i.e. receive inputs from and/or send " + " outputs to) with the rest of the model, i.e. with other Operational " + " Activities." + ), + action=( + "Define an interaction for the operational activity with another" + " activity" + ), +) +def activity_has_interaction_with_another_activity( + obj: oa.OperationalActivity, +) -> bool: + return bool(obj.related_exchanges) + + +# 02. System Analysis +@rule( + category=_validate.Category.REQUIRED, + types=[sa.Capability], + id="Rule-009", + name=( + "System capability involves at least one Actor Function and one" + " System function" + ), + rationale=( + "A well-defined Capability should involve at least one Actor Function" + " and one system function. This helps understanding the functional" + " contribution of an Actor in the scope of the Capability." + ), + action=( + "involve at least one actor function and system function in the" + " Capability" + ), +) +def capability_involves_actor_and_system_function(obj: sa.Capability) -> bool: + actor_functions = [ + fnc + for fnc in obj.involved_functions + if fnc.owner and fnc.owner.is_actor + ] + system_functions = [ + fnc + for fnc in obj.involved_functions + if fnc.owner and not fnc.owner.is_actor + ] + return len(actor_functions) > 0 and len(system_functions) > 0 + + +@rule( + category=_validate.Category.REQUIRED, + types=[sa.Capability], + id="Rule-010", + name="IS- and SHOULD entity involvements match", + rationale=( + "Capability should involve both Actors and Functions, with Functions" + " allocated to respective entities (Actor or System). A mismatch" + " between these two lists might suggest that a function is linked to" + " an uninvolved actor, or an actor is involved without having any" + " associated functions for that capability. Ensuring proper alignment" + " helps maintain logical consistency and efficiency in the system" + " design." + ), + action=( + "To correct the mismatch, review and update the Capability's involved" + " Actors and Functions, ensuring each Actor contributes at least one" + " Function, and each Function is allocated to an appropriate Actor or" + " System" + ), +) +def is_and_should_entity_involvements_match(obj: sa.Capability) -> bool: + is_involvements = {x.owner.uuid for x in obj.involved_functions if x.owner} + should_involvements = {x.uuid for x in obj.involved_components} + return is_involvements == should_involvements + + +@rule( + category=_validate.Category.RECOMMENDED, + types=[sa.SystemFunction], + id="SF-040", + name="Function shall have at least one input or output", + rationale=( + "A Function without inputs or outputs may not effectively contribute" + " to the overall model, as it would not interact with other functions." + ), + action="consider adding inputs and / or outputs to the Function.", +) +def function_has_inputs_and_outputs(obj: sa.SystemFunction) -> bool: + return len(obj.inputs) > 0 or len(obj.outputs) > 0 diff --git a/capellambse/model/common/__init__.py b/capellambse/model/common/__init__.py index 2562f5e9..be3de048 100644 --- a/capellambse/model/common/__init__.py +++ b/capellambse/model/common/__init__.py @@ -8,6 +8,7 @@ import collections import collections.abc as cabc +import operator import typing as t import capellambse @@ -163,6 +164,26 @@ def register_xtype_handler(cls: type[T]) -> type[T]: return register_xtype_handler +def resolve_handler(xtype: str) -> tuple[str, type[t.Any]]: + matches: list[tuple[str, type[t.Any]]] = [] + + if ":" in xtype: + ismatch: cabc.Callable[[str, str], bool] = operator.eq + searchname = xtype + else: + ismatch = str.endswith + searchname = ":" + xtype + + for i in XTYPE_HANDLERS.values(): + matches.extend(t for t in i.items() if ismatch(t[0], searchname)) + + if len(matches) < 1: + raise ValueError(f"No handlers found for xsi:type: {xtype}") + if len(matches) > 1: + raise RuntimeError(f"Multiple handlers for xsi:type: {xtype}") + return matches[0] + + from .accessors import * from .accessors import _NewObject as new_object from .element import * diff --git a/capellambse/model/layers/la.py b/capellambse/model/layers/la.py index 856a25bd..df44dfcc 100644 --- a/capellambse/model/layers/la.py +++ b/capellambse/model/layers/la.py @@ -76,7 +76,7 @@ class LogicalComponentPkg(c.GenericElement): packages: c.Accessor -@c.xtype_handler(XT_ARCH) +@c.xtype_handler(None) class CapabilityRealization(c.GenericElement): """A capability.""" diff --git a/pyproject.toml b/pyproject.toml index 7c41c301..bd924e72 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,6 +61,7 @@ docs = [ test = [ "click", + "jinja2>=3.1.3", "pytest", "pytest-cov", "requests-mock", @@ -68,6 +69,7 @@ test = [ cli = [ "click>=8.1.7", + "jinja2>=3.1.3", ] decl = [] @@ -97,6 +99,7 @@ zip = "capellambse.filehandler.zip:ZipFileHandler" filtering = "capellambse.extensions.filtering:init" pvmt = "capellambse.extensions.pvmt:init" reqif = "capellambse.extensions.reqif:init" +validation = "capellambse.extensions.validation:init" [tool.black] line-length = 79 diff --git a/tests/test_validation.py b/tests/test_validation.py new file mode 100644 index 00000000..0cc8e237 --- /dev/null +++ b/tests/test_validation.py @@ -0,0 +1,165 @@ +# SPDX-FileCopyrightText: Copyright DB InfraGO AG +# SPDX-License-Identifier: Apache-2.0 +from __future__ import annotations + +import pathlib + +import pytest +from click import testing as clitest + +import capellambse +from capellambse.extensions import validation +from capellambse.extensions.validation import __main__ +from capellambse.model.layers import ctx, la + +TEST_UUID = "da12377b-fb70-4441-8faa-3a5c153c5de2" +TEST_RULE_ID = "Rule-001" +TEST_RULE_PARAMS = { + "name": "Always True", + "rationale": "True", + "action": "Nothing", +} + +# pylint: disable=redefined-outer-name + + +@pytest.fixture(scope="function") +def fake_registry(monkeypatch): + registry = validation.Rules() + monkeypatch.setattr(validation._validate, "_VALIDATION_RULES", registry) + return registry + + +def test_decorated_rules_are_added_to_global_registry(fake_registry): + @validation.rule( + TEST_RULE_ID, + validation.Category.REQUIRED, + types=[la.LogicalFunction], + **TEST_RULE_PARAMS, + ) + def testrule(_): + return True + + assert list(fake_registry.items()) == [(testrule.id, testrule)] + + +def test_rules_can_be_filtered_by_object_type(fake_registry): + @validation.rule( + TEST_RULE_ID, + validation.Category.REQUIRED, + types=[la.LogicalComponent, la.LogicalFunction], + **TEST_RULE_PARAMS, + ) + def testrule(_): + return True + + assert list(fake_registry.items()) == [(testrule.id, testrule)] + assert fake_registry.by_type(la.LogicalComponent) == [testrule] + assert fake_registry.by_type(la.LogicalFunction) == [testrule] + assert fake_registry.by_type(ctx.SystemFunction) == [] + + +def test_model_gives_access_to_the_full_set_of_rules( + model: capellambse.MelodyModel, +) -> None: + assert len(model.validation.rules) > 0 + + +def test_model_object_gives_access_to_rules_that_apply_to_it( + model: capellambse.MelodyModel, +): + obj = model.by_uuid(TEST_UUID) + assert isinstance(obj.validation, validation.ElementValidation) + + rules = obj.validation.rules + + assert len(rules) > 0 + assert all(rule.applies_to(obj) for rule in rules) + + +def test_validation_results_filtering( + # pylint: disable-next=unused-argument + fake_registry: validation.Rules, + model: capellambse.MelodyModel, +) -> None: + @validation.rule( + "TEST-LC", + validation.Category.REQUIRED, + types=[la.LogicalComponent], + **TEST_RULE_PARAMS, + ) + def test_lc(_: la.LogicalComponent) -> bool: + return True + + @validation.rule( + "TEST-LF", + validation.Category.RECOMMENDED, + types=[la.LogicalFunction], + **TEST_RULE_PARAMS, + ) + def test_lf(_: la.LogicalFunction) -> bool: + return True + + assert model.search(la.LogicalComponent), "Empty test model?" + assert model.search(la.LogicalFunction), "Empty test model?" + + results = model.validation.validate() + assert results + + required = results.by_category("REQUIRED") + assert len(required) > 0 + assert all(i.category.name == "REQUIRED" for i in required.iter_rules()) + + component = results.by_type("LogicalComponent") + assert len(component) > 0 + assert sum(1 for _ in component.iter_objects()) == len( + model.search("LogicalComponent") + ) + assert all( + type(i).__name__ == "LogicalComponent" + for i in component.iter_objects() + ) + + function = results.by_type("LogicalFunction") + assert len(function) > 0 + assert ( + sum(1 for _ in function.iter_objects()) + # -1 because the root function is not validated + == len(model.search("LogicalFunction")) - 1 + ) + assert all( + type(i).__name__ == "LogicalFunction" for i in function.iter_objects() + ) + + +def test_MelodyModel_validation(model: capellambse.MelodyModel): + assert isinstance(model.validation, validation.ModelValidation) + assert model.validation.rules + + results = model.validate() + + assert results + + +def test_ModelObject_validation(model: capellambse.MelodyModel): + obj = model.by_uuid(TEST_UUID) + assert isinstance(obj.validation, validation.ElementValidation) + assert obj.validation.rules + + results = obj.validate() + + assert results + + +def test_cli_creates_a_validation_report(tmp_path: pathlib.Path): + output_file = tmp_path / "report.html" + runner = clitest.CliRunner() + + result = runner.invoke( + __main__._main, + ["-mtest-5.0", f"-o{output_file}"], + ) + + assert result.exit_code == 0, f"CLI returned code {result.exit_code}" + assert output_file.exists() + assert output_file.read_bytes() != b""