Skip to content

Commit

Permalink
tree: NodeSet is not hashable, do not use it as dict key!
Browse files Browse the repository at this point in the history
NodeSet is not hashable, and was used in numerous places as a dict key.
This is wrong and even fails in Python 3. This patch fixes this.

In PropagationTreeRouter, self.table is now a list of (dest, gateways)
pairs instead of a dict.

For other cases, we use the string representation of the NodeSet as the
dict key.

Change-Id: Ie48e3993551475dcc424dff1d6fee4c67591cc1f
  • Loading branch information
thiell committed Aug 8, 2017
1 parent 0d83413 commit ca3454c
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 32 deletions.
14 changes: 6 additions & 8 deletions lib/ClusterShell/Propagation.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,23 +66,21 @@ def table_generate(self, root, topology):
destination nodes and the values are the next hop gateways to
use to reach these nodes.
"""
self.table = {}
try:
root_group = topology.find_nodegroup(root)
except TopologyError:
msgfmt = "Invalid root or gateway node: %s"
raise RouteResolvingError(msgfmt % root)

self.table = []
for group in root_group.children():
self.table[group.nodeset] = NodeSet()
dest = NodeSet()
stack = [group]
while len(stack) > 0:
curr = stack.pop()
self.table[group.nodeset].add(curr.children_ns())
dest.update(curr.children_ns())
stack += curr.children()

# reverse table (it was crafted backward)
self.table = dict((v, k) for k, v in self.table.items())
self.table.append((dest, group.nodeset))

def dispatch(self, dst):
"""dispatch nodes from a target nodeset to the directly
Expand All @@ -101,7 +99,7 @@ def dispatch(self, dst):
# yield nexthop, nexthop

# Check for remote targets, that require a gateway to be reached
for network in self.table:
for network, _ in self.table:
dst_inter = network & dst
dst.difference_update(dst_inter)
for host in dst_inter.nsiter():
Expand Down Expand Up @@ -134,7 +132,7 @@ def next_hop(self, dst):
# node[10-19] | gateway[1-2]
# ...
# ---------
for network, nexthops in self.table.items():
for network, nexthops in self.table:
# destination contained in current network
if dst in network:
res = self._best_next_hop(nexthops)
Expand Down
20 changes: 12 additions & 8 deletions lib/ClusterShell/Task.py
Original file line number Diff line number Diff line change
Expand Up @@ -1305,10 +1305,12 @@ def _pchannel(self, gateway, metaworker):
"""Get propagation channel for gateway (create one if needed).
Use self.gateways dictionary that allows lookup like:
gateway => (worker channel, set of metaworkers)
gateway (string) => (worker channel, set of metaworkers)
"""
gwstr = str(gateway)

# create gateway channel if needed
if gateway not in self.gateways:
if gwstr not in self.gateways:
chan = PropagationChannel(self, gateway)
logger = logging.getLogger(__name__)
logger.info("pchannel: creating new channel %s", chan)
Expand All @@ -1328,10 +1330,10 @@ def _pchannel(self, gateway, metaworker):
chanworker.SNAME_STDERR = chan.SNAME_ERROR
self.schedule(chanworker)
# update gateways dict
self.gateways[gateway] = (chanworker, set([metaworker]))
self.gateways[gwstr] = (chanworker, set([metaworker]))
else:
# TODO: assert chanworker is running (need Worker.running())
chanworker, metaworkers = self.gateways[gateway]
chanworker, metaworkers = self.gateways[gwstr]
metaworkers.add(metaworker)
return chanworker.eh

Expand All @@ -1344,19 +1346,21 @@ def _pchannel_release(self, gateway, metaworker):
logger = logging.getLogger(__name__)
logger.debug("pchannel_release %s %s", gateway, metaworker)

if gateway not in self.gateways:
gwstr = str(gateway)

if gwstr not in self.gateways:
logger.error("pchannel_release: no pchannel found for gateway %s",
gateway)
gwstr)
else:
# TODO: delay gateway closing when other gateways are running
chanworker, metaworkers = self.gateways[gateway]
chanworker, metaworkers = self.gateways[gwstr]
metaworkers.remove(metaworker)
if len(metaworkers) == 0:
logger.info("pchannel_release: destroying channel %s",
chanworker.eh)
chanworker.abort()
# delete gateway reference
del self.gateways[gateway]
del self.gateways[gwstr]


def task_self(defaults=None):
Expand Down
30 changes: 14 additions & 16 deletions lib/ClusterShell/Worker/Tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def __init__(self, nodes, handler, timeout, **kwargs):

self.metahandler = MetaWorkerEventHandler(self)

# gateway -> active targets selection
# gateway (string) -> active targets selection
self.gwtargets = {}

def _set_task(self, task):
Expand Down Expand Up @@ -260,9 +260,9 @@ def _launch(self, nodes):

# And launch stuffs
next_hops = self._distribute(self.task.info("fanout"), nodes.copy())
self.logger.debug("next_hops=%s"
% [(str(n), str(v)) for n, v in next_hops.items()])
for gw, targets in next_hops.items():
self.logger.debug("next_hops=%s" % [(str(n), str(v))
for n, v in next_hops])
for gw, targets in next_hops:
if gw == targets:
self.logger.debug('task.shell cmd=%s source=%s nodes=%s '
'timeout=%s remote=%s', self.command,
Expand Down Expand Up @@ -332,15 +332,13 @@ def _launch(self, nodes):

def _distribute(self, fanout, dst_nodeset):
"""distribute target nodes between next hop gateways"""
distribution = {}
self.router.fanout = fanout

distribution = {}
for gw, dstset in self.router.dispatch(dst_nodeset):
if gw in distribution:
distribution[gw].add(dstset)
else:
distribution[gw] = dstset
return distribution
distribution.setdefault(str(gw), NodeSet()).add(dstset)

return tuple((NodeSet(k), v) for k, v in distribution.items())

def _copy_remote(self, source, dest, targets, gateway, timeout, reverse):
"""run a remote copy in tree mode (using gateway)"""
Expand All @@ -349,7 +347,7 @@ def _copy_remote(self, source, dest, targets, gateway, timeout, reverse):

self._target_count += len(targets)

self.gwtargets[gateway] = targets.copy()
self.gwtargets[str(gateway)] = targets.copy()

# tar commands are built here and launched on targets
if reverse:
Expand All @@ -375,7 +373,7 @@ def _execute_remote(self, cmd, targets, gateway, timeout):

self._target_count += len(targets)

self.gwtargets[gateway] = targets.copy()
self.gwtargets[str(gateway)] = targets.copy()

pchan = self.task._pchannel(gateway, self)
pchan.shell(nodes=targets, command=cmd, worker=self, timeout=timeout,
Expand Down Expand Up @@ -438,7 +436,7 @@ def _on_remote_node_close(self, node, rc, gateway):
self._rcopy_bufs = {}
self._rcopy_tars = {}

self.gwtargets[gateway].remove(node)
self.gwtargets[str(gateway)].remove(node)
self._close_count += 1
self._check_fini(gateway)

Expand All @@ -448,7 +446,7 @@ def _on_remote_node_timeout(self, node, gateway):
self.logger.debug("_on_remote_node_timeout %s via gw %s", node, gateway)
self._close_count += 1
self._has_timeout = True
self.gwtargets[gateway].remove(node)
self.gwtargets[str(gateway)].remove(node)
self._check_fini(gateway)

def _on_node_close(self, node, rc):
Expand Down Expand Up @@ -480,13 +478,13 @@ def _check_fini(self, gateway=None):

# check completion of targets per gateway
if gateway:
targets = self.gwtargets[gateway]
targets = self.gwtargets[str(gateway)]
if not targets:
# no more active targets for this gateway
self.logger.debug("WorkerTree._check_fini %s call pchannel_"
"release for gw %s", self, gateway)
self.task._pchannel_release(gateway, self)
del self.gwtargets[gateway]
del self.gwtargets[str(gateway)]

def _write_remote(self, buf):
"""Write buf to remote clients only."""
Expand Down

0 comments on commit ca3454c

Please sign in to comment.