/
cluster.py
188 lines (154 loc) · 6.54 KB
/
cluster.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
"""
Cluster definition
part of context, Cluster is used to save connection information.
"""
from teuthology.orchestra import run
class Cluster(object):
"""
Manage SSH connections to a cluster of machines.
"""
def __init__(self, remotes=None):
"""
:param remotes: A sequence of 2-tuples of this format:
(Remote, [role_1, role_2 ...])
"""
self.remotes = {}
if remotes is not None:
for remote, roles in remotes:
self.add(remote, roles)
def __repr__(self):
remotes = [(k, v) for k, v in self.remotes.items()]
remotes.sort(key=lambda tup: tup[0].name)
remotes = '[' + ', '.join('[{remote!r}, {roles!r}]'.format(
remote=k, roles=v) for k, v in remotes) + ']'
return '{classname}(remotes={remotes})'.format(
classname=self.__class__.__name__,
remotes=remotes,
)
def __str__(self):
remotes = list(self.remotes.items())
remotes.sort(key=lambda tup: tup[0].name)
remotes = ((k, ','.join(v)) for k, v in remotes)
remotes = ('{k}[{v}]'.format(k=k, v=v) for k, v in remotes)
return ' '.join(remotes)
def add(self, remote, roles):
"""
Add roles to the list of remotes.
"""
if remote in self.remotes:
raise RuntimeError(
'Remote {new!r} already found in remotes: {old!r}'.format(
new=remote,
old=self.remotes[remote],
),
)
self.remotes[remote] = list(roles)
def run(self, wait=True, parallel=False, **kwargs):
"""
Run a command on all the nodes in this cluster.
Goes through nodes in alphabetical order.
The default usage is when parallel=False and wait=True,
which is a sequential run for each node one by one.
If you specify parallel=True, it will be in parallel.
If you specify wait=False, it returns immediately.
Since it is not possible to run sequentially and
do not wait each command run finished, the parallel value
is ignored and treated as True.
Returns a list of `RemoteProcess`.
"""
# -+-------+----------+----------+------------+---------------
# | wait | parallel | run.wait | remote.run | comments
# -+-------+----------+----------+------------+---------------
# 1|*True |*False | no | wait=True | sequentially
# 2| True | True | yes | wait=False | parallel
# 3| False | True | no | wait=False | parallel
# 4| False | False | no | wait=False | same as above
# We always run in parallel if wait=False,
# that is why (4) is equivalent to (3).
# We wait from remote.run only if run sequentially.
_wait = (parallel == False and wait == True)
remotes = sorted(self.remotes.keys(), key=lambda rem: rem.name)
procs = [remote.run(**kwargs, wait=_wait) for remote in remotes]
# We do run.wait only if parallel=True, because if parallel=False,
# we have run sequentially and all processes are complete.
if parallel and wait:
run.wait(procs)
return procs
def sh(self, script, **kwargs):
"""
Run a command on all the nodes in this cluster.
Goes through nodes in alphabetical order.
Returns a list of the command outputs correspondingly.
"""
remotes = sorted(self.remotes.keys(), key=lambda rem: rem.name)
return [remote.sh(script, **kwargs) for remote in remotes]
def write_file(self, file_name, content, sudo=False, perms=None, owner=None):
"""
Write text to a file on each node.
:param file_name: file name
:param content: file content
:param sudo: use sudo
:param perms: file permissions (passed to chmod) ONLY if sudo is True
"""
remotes = sorted(self.remotes.keys(), key=lambda rem: rem.name)
for remote in remotes:
if sudo:
remote.write_file(file_name, content,
sudo=True, mode=perms, owner=owner)
else:
if perms is not None or owner is not None:
raise ValueError("To specify perms or owner, sudo must be True")
remote.write_file(file_name, content)
def only(self, *roles):
"""
Return a cluster with only the remotes that have all of given roles.
For roles given as strings, they are matched against the roles
on a remote, and the remote passes the check only if all the
roles listed are present.
Argument can be callable, and will act as a match on roles of
the remote. The matcher will be evaluated one role at a time,
but a match on any role is good enough. Note that this is
subtly diffent from the behavior of string roles, but is
logical if you consider a callable to be similar to passing a
non-string object with an `__eq__` method.
For example::
web = mycluster.only(lambda role: role.startswith('web-'))
"""
c = self.__class__()
want = frozenset(r for r in roles if not callable(r))
matchers = [r for r in roles if callable(r)]
for remote, has_roles in self.remotes.items():
# strings given as roles must all match
if frozenset(has_roles) & want != want:
# not a match
continue
# every matcher given must match at least one role
if not all(
any(matcher(role) for role in has_roles)
for matcher in matchers
):
continue
c.add(remote, has_roles)
return c
def exclude(self, *roles):
"""
Return a cluster *without* remotes that have all of given roles.
This is the opposite of `only`.
"""
matches = self.only(*roles)
c = self.__class__()
for remote, has_roles in self.remotes.items():
if remote not in matches.remotes:
c.add(remote, has_roles)
return c
def filter(self, func):
"""
Return a cluster whose remotes are filtered by `func`.
Example::
cluster = ctx.cluster.filter(lambda r: r.is_online)
"""
result = self.__class__()
for rem, roles in self.remotes.items():
if func(rem):
result.add(rem, roles)
return result