Skip to content

Commit

Permalink
fix(panos.base.PanObject.refresh_variable): Refresh works again for r…
Browse files Browse the repository at this point in the history
…egular and attrib style params (#446)

Fixes #444
  • Loading branch information
shinmog committed Apr 27, 2022
1 parent ab4d088 commit 20dd7b7
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 11 deletions.
38 changes: 27 additions & 11 deletions panos/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ def update(self, variable):
% (type(self), self.uid, variable)
)
device.set_config_changed()
path, value, var_path = self._get_param_specific_info(variable)
path, attr, value, var_path = self._get_param_specific_info(variable)
if var_path.vartype == "attrib":
raise NotImplementedError("Cannot update 'attrib' style params")
xpath = "{0}/{1}".format(self.xpath(), path)
Expand Down Expand Up @@ -868,7 +868,17 @@ def _get_param_specific_info(self, variable):
# Not an 'entry' variable
varpath = re.sub(regex, getattr(self, matchedvar.variable), varpath)

return (varpath, value, var)
# For vartype=attrib params, we need the containing XML element.
attr = None
if var.vartype == "attrib":
tokens = varpath.rsplit("/", 1)
attr = tokens[-1]
if len(tokens) == 1:
varpath = None
else:
varpath = tokens[0]

return (varpath, attr, value, var)

def refresh(
self, running_config=False, refresh_children=True, exceptions=True, xml=None
Expand Down Expand Up @@ -937,10 +947,10 @@ def refresh_variable(self, variable, running_config=False, exceptions=False):
msg = '{0}: refresh_variable({1}) called on {2} object "{3}"'
logger.debug(msg.format(device.id, variable, self.__class__.__name__, self.uid))

info = self._get_param_specific_info(variable)
path = info[0]
var_path = info[2]
xpath = "{0}/{1}".format(self.xpath(), path)
path, attr, value, var_path = self._get_param_specific_info(variable)
xpath = self.xpath()
if path is not None:
xpath += "/{0}".format(path)
err_msg = "Object doesn't exist: {0}".format(xpath)
setattr(self, variable, [] if var_path.vartype in ("member", "entry") else None)

Expand All @@ -957,7 +967,7 @@ def refresh_variable(self, variable, running_config=False, exceptions=False):
return

# Determine the first element to look for in the XML
lasttag = path.rsplit("/", 1)[-1]
lasttag = xpath.rsplit("/", 1)[-1]
obj = root.find("result/" + lasttag)
if obj is None:
if exceptions:
Expand All @@ -967,13 +977,13 @@ def refresh_variable(self, variable, running_config=False, exceptions=False):
if hasattr(var_path, "parse_value_from_xml_last_tag"):
# Versioned class
settings = {}
var_path.parse_value_from_xml_last_tag(obj, settings)
var_path.parse_value_from_xml_last_tag(obj, settings, attr)
setattr(self, variable, settings.get(variable))
else:
# Classic class
# Rebuild the elements that are lost by refreshing the
# variable directly
sections = path.split("/")[:-1]
sections = xpath.split("/")[:-1]
root = ET.Element("root")
next_element = root
for section in sections:
Expand Down Expand Up @@ -2677,7 +2687,8 @@ def _get_param_specific_info(self, param):
parameter attached to this PanObject / VersionedPanObject.
Returns:
A three element tuple of the variable's xpath (str), the value of
A four element tuple of the variable's xpath (str), the attribute
name (if this is vartype="attrib"), the value of
the variable, and the full ``VarPath`` or ``ParamPath`` object that
is responsible for handling this variable.
Expand Down Expand Up @@ -2725,7 +2736,12 @@ def _get_param_specific_info(self, param):
p = token.format(**settings)
xpath.append(p)

return ("/".join(xpath), value, var_path)
# Remove the last part of vartype=attrib variable xpath parts.
attr = None
if var_path.vartype == "attrib":
attr = xpath.pop()

return ("/".join(xpath) or None, attr, value, var_path)

def parse_xml(self, xml):
"""Parse the given XML into this object's parameters.
Expand Down
102 changes: 102 additions & 0 deletions tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1245,10 +1245,112 @@ def _setup(self):
)
)
params.append(Base.VersionedParamPath("someint", path="someint", vartype="int"))
params.append(
Base.VersionedParamPath("base_uuid", path="uuid", vartype="attrib")
)
params.append(Base.VersionedParamPath("action", path="config/action"))
params.append(
Base.VersionedParamPath(
"action_uuid", path="config/action/uuid", vartype="attrib"
)
)

self._params = tuple(params)


class TestVariableRefreshes(unittest.TestCase):
def obj(self, key, attribs, value):
o = MyVersionedObject("foo")
o.xpath = mock.Mock(return_value="/unit/test/xpath/for/entry[@name='foo']")

av = ""
if attribs:
for k, v in attribs.items():
av += ' {0}="{1}"'.format(k, v)
respString = "<response><result><{0}{1}>{2}</{0}></result></response>".format(
key, av, value, key
)
resp = ET.fromstring(respString)

spec = {
"id": "unittest",
"get_device_version.return_value": o._UNKNOWN_PANOS_VERSION,
"xapi.show.return_value": resp,
"xapi.get.return_value": resp,
}
m = mock.Mock(**spec)

o.nearest_pandevice = mock.Mock(return_value=m)

return m, o

def test_entry_refresh(self):
m, o = self.obj("entries", None, "<entry name='one'/><entry name='two'/>")

ans = o.refresh_variable("entries")

m.xapi.get.assert_called_once_with(
o.xpath() + "/multiple/entries", retry_on_peer=o.HA_SYNC,
)
self.assertEqual(ans, o.entries)
self.assertEqual(ans, ["one", "two"])

def test_member_refresh(self):
m, o = self.obj(
"members", None, "<member>first</member><member>second</member>"
)

ans = o.refresh_variable("members")

m.xapi.get.assert_called_once_with(
o.xpath() + "/multiple/members", retry_on_peer=o.HA_SYNC,
)
self.assertEqual(ans, o.members)
self.assertEqual(ans, ["first", "second"])

def test_int_refresh(self):
m, o = self.obj("someint", None, "42")

ans = o.refresh_variable("someint")

m.xapi.get.assert_called_once_with(
o.xpath() + "/someint", retry_on_peer=o.HA_SYNC,
)
self.assertEqual(ans, o.someint)
self.assertEqual(ans, 42)

def test_base_attrib_refresh(self):
m, o = self.obj("entry", {"name": "foo", "uuid": "1234-56-789"}, "")

ans = o.refresh_variable("base_uuid")

m.xapi.get.assert_called_once_with(o.xpath(), retry_on_peer=o.HA_SYNC)
self.assertEqual(ans, o.base_uuid)
self.assertEqual(ans, "1234-56-789")

def test_string_refresh(self):
m, o = self.obj("action", {"uuid": "1234-56-789"}, "DENY")

ans = o.refresh_variable("action")

m.xapi.get.assert_called_once_with(
o.xpath() + "/config/action", retry_on_peer=o.HA_SYNC,
)
self.assertEqual(ans, o.action)
self.assertEqual(ans, "DENY")

def test_nested_attrib_refresh(self):
m, o = self.obj("action", {"uuid": "1234-56-789"}, "DENY")

ans = o.refresh_variable("action_uuid")

m.xapi.get.assert_called_once_with(
o.xpath() + "/config/action", retry_on_peer=o.HA_SYNC,
)
self.assertEqual(ans, o.action_uuid)
self.assertEqual(ans, "1234-56-789")


class TestEqual(unittest.TestCase):
def test_ordered(self):
o1 = MyVersionedObject("a", ["a", "b"], ["c", "d"], 5)
Expand Down

0 comments on commit 20dd7b7

Please sign in to comment.