Skip to content

Commit

Permalink
feat: Make opstate more discoverable for users (#413)
Browse files Browse the repository at this point in the history
This condenses the opstate namespace into its own class, and adds
an `.about()`.  This means that `.opstate` will always exist
(as opposed to just if there are functions there or not).

Work on #396
  • Loading branch information
shinmog committed Feb 4, 2022
1 parent 6a67320 commit f4f08d0
Show file tree
Hide file tree
Showing 5 changed files with 400 additions and 393 deletions.
58 changes: 58 additions & 0 deletions panos/base.py
Expand Up @@ -71,6 +71,7 @@ class PanObject(object):
HA_SYNC = True
TEMPLATE_NATIVE = False
_UNKNOWN_PANOS_VERSION = (sys.maxsize, 0, 0)
OPSTATES = {}

def __init__(self, *args, **kwargs):
# Set the 'name' variable
Expand Down Expand Up @@ -130,6 +131,9 @@ def _setups(self):
if callable(f):
f()

def _setup_opstate(self):
self.opstate = OpStateContainer(self, self.OPSTATES)

def __str__(self):
return self.uid

Expand Down Expand Up @@ -3513,6 +3517,60 @@ def refreshall(
return instances


class OpStateContainer(object):
"""Container for all opstate namespaces.
The name "opstate" is short for "operational state" and acts as a container
for non-configuration functionality to exist.
"""

def __init__(self, obj, config):
for namespace, cls in config.items():
setattr(self, namespace, cls(obj))

def about(self):
"""Returns information about this object's opstate namespaces.
Returns:
dict: Keys are the opstate's namespace, values are the classes.
"""
return vars(self)


class OpState(object):
"""Parent class for all opstate namespaces."""

def __init__(self, obj, *args, **kwargs):
self.obj = obj
self._setup(*args, **kwargs)

def _setup(self, *args, **kwargs):
"""Called during __init__."""
pass

def _str(self, elm, field):
if elm is not None:
val = elm.find("./{0}".format(field))
if val is not None:
return val.text

def _int(self, elm, field):
val = self._str(elm, field)
if val is not None:
return int(val)

def _datetime(self, elm, field, fmt):
val = self._str(elm, field)
if val is not None:
try:
return datetime.datetime.strptime(val, fmt)
except ValueError:
pass
return val


class PanDevice(PanObject):
"""A Palo Alto Networks device
Expand Down
192 changes: 85 additions & 107 deletions panos/panorama.py
Expand Up @@ -26,13 +26,64 @@
import panos
import panos.errors as err
from panos import base, firewall, getlogger, policies, yesno
from panos.base import ENTRY, MEMBER, PanObject, Root
from panos.base import ENTRY, MEMBER, OpState, PanObject, Root
from panos.base import VarPath as Var
from panos.base import VersionedPanObject, VersionedParamPath

logger = getlogger(__name__)


class DeviceGroupHierarchy(OpState):
"""Operational state handling for device group hierarchy.
Args:
parent (str): This device group's parent.
"""

def _setup(self):
self.parent = None

def refresh(self):
"""Refresh the ``parent`` from the state."""

dev = self.obj.panorama()
state = dev.opstate.dg_hierarchy.fetch()
self.parent = state.get(self.obj.uid)

def update(self):
"""Change this device group's hierarchical parent.
**Modifies the live device**
This operation results in a job being submitted to the backend, which
this function will block until the move is completed. The return value of
this function is what is returned from
:meth:`panos.base.PanDevice.syncjob()`.
Returns:
dict: Job result
"""
dev = self.obj.panorama()
logger.debug(
'{0}: update hierarchical parent for "{1}": {2}'.format(
dev.id, self.obj.uid, self.parent
)
)

e = ET.Element("request")
em = ET.SubElement(e, "move-dg")
eme = ET.SubElement(em, "entry", {"name": self.obj.name})

if self.parent is not None:
ET.SubElement(eme, "new-parent-dg").text = self.parent

cmd = ET.tostring(e, encoding="utf-8")
resp = dev.op(cmd, cmd_xml=False)
return dev.syncjob(resp)


class DeviceGroup(VersionedPanObject):
"""Panorama Device-group
Expand Down Expand Up @@ -70,6 +121,9 @@ class DeviceGroup(VersionedPanObject):
"policies.PreRulebase",
"policies.PostRulebase",
)
OPSTATES = {
"dg_hierarchy": DeviceGroupHierarchy,
}

def _setup(self):
# xpaths
Expand All @@ -82,9 +136,6 @@ def _setup(self):

self._params = tuple(params)

def _setup_opstate(self):
self.opstate = DeviceGroupOpState(self)

@property
def vsys(self):
return self.name
Expand All @@ -95,68 +146,6 @@ def devicegroup(self):
def xpath_vsys(self):
return self.xpath()

def _setup_opstate(self):
self.opstate = DeviceGroupOpState(self)


class DeviceGroupOpState(object):
"""Operational state handling for device group classes."""

def __init__(self, obj):
self.dg_hierarchy = DeviceGroupHierarchy(obj)


class DeviceGroupHierarchy(object):
"""Operational state handling for device group hierarchy.
Args:
parent (str): This device group's parent.
"""

def __init__(self, obj):
self.obj = obj
self.parent = None

def refresh(self):
"""Refresh the ``parent`` from the state."""

dev = self.obj.panorama()
state = dev.opstate.dg_hierarchy.fetch()
self.parent = state.get(self.obj.uid)

def update(self):
"""Change this device group's hierarchical parent.
**Modifies the live device**
This operation results in a job being submitted to the backend, which
this function will block until the move is completed. The return value of
this function is what is returned from
:meth:`panos.base.PanDevice.syncjob()`.
Returns:
dict: Job result
"""
dev = self.obj.panorama()
logger.debug(
'{0}: update hierarchical parent for "{1}": {2}'.format(
dev.id, self.obj.uid, self.parent
)
)

e = ET.Element("request")
em = ET.SubElement(e, "move-dg")
eme = ET.SubElement(em, "entry", {"name": self.obj.name})

if self.parent is not None:
ET.SubElement(eme, "new-parent-dg").text = self.parent

cmd = ET.tostring(e, encoding="utf-8")
resp = dev.op(cmd, cmd_xml=False)
return dev.syncjob(resp)


class Template(VersionedPanObject):
"""A panorama template.
Expand Down Expand Up @@ -375,6 +364,33 @@ def _setup(self):
self._params = tuple(params)


class PanoramaDeviceGroupHierarchy(OpState):
"""Operational state handling for device group hierarchy."""

def fetch(self):
"""Returns a dict of device groups and their parents.
Keys in the dict are the device group's name, while the value is the
name of that device group's parent. Top level device groups will have
a parent of ``None``.
Returns:
dict
"""

resp = self.obj.op("show dg-hierarchy")
data = resp.find("./result/dg-hierarchy")

ans = {}
nodes = [(None, x) for x in data.findall("./dg")]
for parent, elm in iter(nodes):
ans[elm.attrib["name"]] = parent
nodes.extend((elm.attrib["name"], x) for x in elm.findall("./dg"))

return ans


class Panorama(base.PanDevice):
"""Panorama device
Expand Down Expand Up @@ -428,6 +444,9 @@ class Panorama(base.PanDevice):
"plugins.CloudServicesPlugin",
"policies.Rulebase",
)
OPSTATES = {
"dg_hierarchy": PanoramaDeviceGroupHierarchy,
}

def __init__(
self,
Expand Down Expand Up @@ -862,47 +881,6 @@ def get_vm_auth_keys(self):

return ans

def _setup_opstate(self):
self.opstate = PanoramaOpState(self)


class PanoramaOpState(object):
"""Panorama OP state handling."""

def __init__(self, obj):
self.dg_hierarchy = PanoramaDeviceGroupHierarchy(obj)


class PanoramaDeviceGroupHierarchy(object):
"""Operational state handling for device group hierarchy."""

def __init__(self, obj):
self.obj = obj
self.parent = None

def fetch(self):
"""Returns a dict of device groups and their parents.
Keys in the dict are the device group's name, while the value is the
name of that device group's parent. Top level device groups will have
a parent of ``None``.
Returns:
dict
"""

resp = self.obj.op("show dg-hierarchy")
data = resp.find("./result/dg-hierarchy")

ans = {}
nodes = [(None, x) for x in data.findall("./dg")]
for parent, elm in iter(nodes):
ans[elm.attrib["name"]] = parent
nodes.extend((elm.attrib["name"], x) for x in elm.findall("./dg"))

return ans


class PanoramaCommit(object):
"""Normalization of a Panorama commit.
Expand Down

0 comments on commit f4f08d0

Please sign in to comment.