Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 34 additions & 38 deletions crmsh/cibconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,24 +492,24 @@ def process_primitive(prim, clash_dict):
for o in self.obj_set
if o.obj_type == "primitive"])
if not check_set:
return 0
return VerifyResult.SUCCESS
clash_dict = collections.defaultdict(list)
for obj in set_obj_all.obj_set:
node = obj.node
if is_primitive(node):
process_primitive(node, clash_dict)
# but we only warn if a 'new' object is involved
rc = 0
rc = VerifyResult.SUCCESS
for param, resources in list(clash_dict.items()):
# at least one new object must be involved
Comment on lines +502 to 504
Copy link

Copilot AI May 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Add inline documentation or comments to explain the use of VerifyResult as return values in these functions and how to interpret combined flags. This will aid future maintainers in understanding the intended logic.

Suggested change
rc = VerifyResult.SUCCESS
for param, resources in list(clash_dict.items()):
# at least one new object must be involved
# Initialize the return code as SUCCESS. This will be updated to WARNING
# if any uniqueness violations are detected involving new objects.
rc = VerifyResult.SUCCESS
for param, resources in list(clash_dict.items()):
# At least one new object must be involved in the clash for a WARNING.

Copilot uses AI. Check for mistakes.
if len(resources) > 1 and len(set(resources) & check_set) > 0:
rc = 2
rc = VerifyResult.WARNING
msg = 'Resources %s violate uniqueness for parameter "%s": "%s"' % (
",".join(sorted(resources)), param[3], param[4])
logger.warning(msg)
return rc

def semantic_check(self, set_obj_all):
def semantic_check(self, set_obj_all) -> VerifyResult:
'''
Test objects for sanity. This is about semantics.
'''
Expand Down Expand Up @@ -1204,7 +1204,7 @@ def _verify_op_attributes(self, op_node):
Check if all operation attributes are supported by the
schema.
'''
rc = 0
rc = VerifyResult.SUCCESS
op_id = op_node.get("name")
for name in list(op_node.keys()):
vals = schema.rng_attr_values(op_node.tag, name)
Expand All @@ -1213,26 +1213,26 @@ def _verify_op_attributes(self, op_node):
v = op_node.get(name)
if v not in vals:
logger.warning("%s: op '%s' attribute '%s' value '%s' not recognized", self.obj_id, op_id, name, v)
rc = 1
rc = VerifyResult.WARNING
return rc

def _check_ops_attributes(self):
'''
Check if operation attributes settings are valid.
'''
rc = 0
rc = VerifyResult.SUCCESS
if self.node is None:
return rc
for op_node in self.node.xpath("operations/op"):
rc |= self._verify_op_attributes(op_node)
return rc

def check_sanity(self):
def check_sanity(self) -> VerifyResult:
'''
Right now, this is only for primitives.
And groups/clones/ms and cluster properties.
'''
return 0
return VerifyResult.SUCCESS

def reset_updated(self):
self.updated = False
Expand Down Expand Up @@ -1584,7 +1584,7 @@ def normalize_parameters(self):
ra = get_ra(r_node)
ra.normalize_parameters(r_node)

def check_sanity(self):
def check_sanity(self) -> VerifyResult:
'''
Check operation timeouts and if all required parameters
are defined.
Expand Down Expand Up @@ -1695,7 +1695,7 @@ def _repr_cli_head(self, format_mode):
ident = clidisplay.ident(self.obj_id)
return "%s %s %s" % (s, ident, ' '.join(children))

def check_sanity(self):
def check_sanity(self) -> VerifyResult:
'''
Check meta attributes.
'''
Expand All @@ -1709,8 +1709,7 @@ def check_sanity(self):
l += constants.clone_meta_attributes + constants.ms_meta_attributes
elif self.obj_type == "group":
l += constants.group_meta_attributes
rc = sanity_check_meta(self.obj_id, self.node, l)
return rc
return sanity_check_meta(self.obj_id, self.node, l)

def repr_gv(self, gv_obj, from_grp=False):
'''
Expand Down Expand Up @@ -1753,25 +1752,25 @@ def _repr_cli_child(self, c, format_mode):
return self._attr_set_str(c)


def _check_if_constraint_ref_is_child(obj):
def _check_if_constraint_ref_is_child(obj) -> VerifyResult:
"""
Used by check_sanity for constraints to verify
that referenced resources are not children in
a container.
"""
rc = 0
rc = VerifyResult.SUCCESS
for rscid in obj.referenced_resources():
tgt = cib_factory.find_object(rscid)
if not tgt:
logger.warning("%s: resource %s does not exist", obj.obj_id, rscid)
rc = 1
rc = VerifyResult.WARNING
elif tgt.parent and tgt.parent.obj_type == "group":
if obj.obj_type == "colocation":
logger.warning("%s: resource %s is grouped, constraints should apply to the group", obj.obj_id, rscid)
rc = 1
rc = VerifyResult.WARNING
elif tgt.parent and tgt.parent.obj_type in constants.container_tags:
logger.warning("%s: resource %s ambiguous, apply constraints to container", obj.obj_id, rscid)
rc = 1
rc = VerifyResult.WARNING
return rc


Expand Down Expand Up @@ -1814,38 +1813,36 @@ def _repr_cli_child(self, c, format_mode):
return "%s %s" % \
(clidisplay.keyword("rule"), cli_rule(c))

def check_sanity(self):
def check_sanity(self) -> VerifyResult:
'''
Check if node references match existing nodes.
'''
if self.node is None: # eh?
logger.error("%s: no xml (strange)", self.obj_id)
return utils.get_check_rc()
rc = 0
rc = VerifyResult.SUCCESS
uname = self.node.get("node")
if uname and uname.lower() not in [ident.lower() for ident in cib_factory.node_id_list()]:
logger.warning("%s: referenced node %s does not exist", self.obj_id, uname)
rc = 1
rc = VerifyResult.WARNING
pattern = self.node.get("rsc-pattern")
if pattern:
try:
re.compile(pattern)
except IndexError as e:
logger.warning("%s: '%s' may not be a valid regular expression (%s)", self.obj_id, pattern, e)
rc = 1
rc = VerifyResult.WARNING
except re.error as e:
logger.warning("%s: '%s' may not be a valid regular expression (%s)", self.obj_id, pattern, e)
rc = 1
rc = VerifyResult.WARNING
for enode in self.node.xpath("rule/expression"):
if enode.get("attribute") == "#uname":
uname = enode.get("value")
ids = [i.lower() for i in cib_factory.node_id_list()]
if uname and uname.lower() not in ids:
logger.warning("%s: referenced node %s does not exist", self.obj_id, uname)
rc = 1
rc2 = _check_if_constraint_ref_is_child(self)
if rc2 > rc:
rc = rc2
rc = VerifyResult.WARNING
rc |= _check_if_constraint_ref_is_child(self)
return rc

def referenced_resources(self):
Expand Down Expand Up @@ -1999,7 +1996,7 @@ def referenced_resources(self):
elif self.node.get("rsc"):
return [self.node.get("rsc")]

def check_sanity(self):
def check_sanity(self) -> VerifyResult:
if self.node is None:
logger.error("%s: no xml (strange)", self.obj_id)
return utils.get_check_rc()
Expand Down Expand Up @@ -2045,7 +2042,7 @@ def _repr_cli_child(self, c, format_mode):
else:
return ''

def check_sanity(self):
def check_sanity(self) -> VerifyResult:
'''
Match properties with PE metadata.
'''
Expand All @@ -2059,15 +2056,14 @@ def check_sanity(self):
# some resource agents such as mysql to store RA
# specific state
if self.obj_id != cib_object_map[self.xml_obj_type][3]:
return 0
return VerifyResult.SUCCESS
l = get_properties_list()
l += constants.extra_cluster_properties
elif self.obj_type == "op_defaults":
l = schema.get('attr', 'op', 'a')
elif self.obj_type == "rsc_defaults":
l = get_resource_meta_list()
rc = sanity_check_nvpairs(self.obj_id, self.node, l)
return rc
return sanity_check_nvpairs(self.obj_id, self.node, l)


def is_stonith_rsc(xmlnode):
Expand Down Expand Up @@ -2136,30 +2132,30 @@ def fmt_target(tgt):
def _repr_cli_child(self, c, format_mode):
pass # no children here

def check_sanity(self):
def check_sanity(self) -> VerifyResult:
'''
Targets are nodes and resource are stonith resources.
'''
if self.node is None: # eh?
logger.error("%s: no xml (strange)", self.obj_id)
return utils.get_check_rc()
rc = 0
rc = VerifyResult.SUCCESS
nl = self.node.findall("fencing-level")
for target in [x.get("target") for x in nl if x.get("target") is not None]:
if target.lower() not in [ident.lower() for ident in cib_factory.node_id_list()]:
logger.warning("%s: target %s not a node", self.obj_id, target)
rc = 1
rc = VerifyResult.WARNING
stonith_rsc_l = [x.obj_id for x in
cib_factory.get_elems_on_type("type:primitive")
if is_stonith_rsc(x.node)]
for devices in [x.get("devices") for x in nl]:
for dev in devices.split(","):
if not cib_factory.find_object(dev):
logger.warning("%s: resource %s does not exist", self.obj_id, dev)
rc = 1
rc = VerifyResult.WARNING
elif dev not in stonith_rsc_l:
logger.warning("%s: %s not a stonith resource", self.obj_id, dev)
rc = 1
rc = VerifyResult.WARNING
return rc


Expand Down Expand Up @@ -3752,7 +3748,7 @@ def test_element(self, obj):
if obj.xml_obj_type not in constants.defaults_tags:
if not self._verify_element(obj):
return False
if utils.is_check_always() and obj.check_sanity() > 1:
if utils.is_check_always() and not bool(obj.check_sanity()):
return False
return True

Expand Down
2 changes: 1 addition & 1 deletion crmsh/ra.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ def unreq_param(p):
return True
return False

rc = 0
rc = VerifyResult.SUCCESS
d = {}
for nvp in nvpairs:
if 'name' in nvp.attrib:
Expand Down
15 changes: 9 additions & 6 deletions test/unittests/test_bugs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from crmsh import cibconfig
from lxml import etree
from crmsh import xmlutil
from crmsh import xmlutil, utils

factory = cibconfig.cib_factory

Expand Down Expand Up @@ -472,7 +472,7 @@ def test_group_constraint_location():
factory.create_object('group', 'g1', 'p1', 'p2')
factory.create_object('location', 'loc-p1', 'p1', 'inf:', 'node1')
c = factory.find_object('loc-p1')
assert c and c.check_sanity() == 0
assert c and c.check_sanity() == utils.VerifyResult.SUCCESS


def test_group_constraint_colocation():
Expand All @@ -484,7 +484,8 @@ def test_group_constraint_colocation():
factory.create_object('group', 'g1', 'p1', 'p2')
factory.create_object('colocation', 'coloc-p1-p2', 'inf:', 'p1', 'p2')
c = factory.find_object('coloc-p1-p2')
assert c and c.check_sanity() > 0
rc = c.check_sanity()
assert c and bool(rc) is True and utils.VerifyResult.WARNING in rc
Copy link

Copilot AI May 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider replacing the membership check ('in') with an explicit bitwise or equality comparison to clearly express the expected VerifyResult flag. This will improve clarity for maintainers unfamiliar with the VerifyResult type’s internals.

Suggested change
assert c and bool(rc) is True and utils.VerifyResult.WARNING in rc
assert c and bool(rc) is True and (rc & utils.VerifyResult.WARNING) == utils.VerifyResult.WARNING

Copilot uses AI. Check for mistakes.


def test_group_constraint_colocation_rscset():
Expand All @@ -497,7 +498,8 @@ def test_group_constraint_colocation_rscset():
factory.create_object('group', 'g1', 'p1', 'p2')
factory.create_object('colocation', 'coloc-p1-p2-p3', 'inf:', 'p1', 'p2', 'p3')
c = factory.find_object('coloc-p1-p2-p3')
assert c and c.check_sanity() > 0
rc = c.check_sanity()
assert c and bool(rc) is True and utils.VerifyResult.WARNING in rc


def test_clone_constraint_colocation_rscset():
Expand All @@ -510,7 +512,8 @@ def test_clone_constraint_colocation_rscset():
factory.create_object('clone', 'c1', 'p1')
factory.create_object('colocation', 'coloc-p1-p2-p3', 'inf:', 'p1', 'p2', 'p3')
c = factory.find_object('coloc-p1-p2-p3')
assert c and c.check_sanity() > 0
rc = c.check_sanity()
assert c and bool(rc) is True and utils.VerifyResult.WARNING in rc


def test_existing_node_resource():
Expand Down Expand Up @@ -780,7 +783,7 @@ def test_bug_110():

for o in obj.obj_set:
if o.node.tag == 'fencing-topology':
assert o.check_sanity() == 0
assert o.check_sanity() == utils.VerifyResult.SUCCESS


@mock.patch("crmsh.log.LoggerUtils.line_number")
Expand Down
Loading