forked from microsoft/Qcodes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
slack.py
430 lines (381 loc) · 15.1 KB
/
slack.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
import os
import tempfile
from functools import partial
from time import sleep
import inspect
from slacker import Slacker
import threading
import traceback
from qcodes.plots.base import BasePlot
from qcodes import config as qc_config
from qcodes.instrument.parameter import _BaseParameter
from qcodes import active_loop, active_data_set
def convert_command(text):
def try_convert_str(string):
try:
val = int(string)
return val
except ValueError:
pass
try:
val = float(string)
return val
except ValueError:
pass
return string
# Format text to lowercase, and remove trailing whitespaces
text = text.lower().rstrip(' ')
command, *args_str = text.split(' ')
# Convert string args to floats/kwargs
args = []
kwargs = {}
for arg in args_str:
if '=' in arg:
# arg is a kwarg
key, val = arg.split('=')
# Try to convert into a float
val = try_convert_str(val)
kwargs[key] = val
else:
# arg is not a kwarg
# Try to convert into a float
val = try_convert_str(arg)
args.append(val)
return command, args, kwargs
class Slack(threading.Thread):
"""
Slack bot used to send information about qcodes via Slack IMs.
Some default commands are provided, and custom commands/tasks can be
attached (see below).
To setup the Slack bot, a bot first has to be registered via Slack
by clicking 'creating a new bot user' on https://api.slack.com/bot-users.
Once registered, the bot will have a name and unique token.
These and other settings have to be saved in a config dict (see init).
Communication with the Slack bot is performed via instant messaging.
When an IM is sent to the Slack bot, it will be processed during the next
`update()` call (provided the username is registered in the config).
Standard commands provided to the Slack bot are:
plot: Upload latest qcodes plot
msmt/measurement: Print information about latest measurement
notify finished: Send message once measurement is finished
Custom commands can be added as (cmd, func) key-value pairs to
`self.commands`. When `cmd` is sent to the bot, `func` is evaluated.
Custom tasks can be added as well. These are functions that are performed
every time an update is called. The function must return a boolean that
indicates if the task should be removed from the list of tasks.
A custom task can be added as a (cmd, func) key-value pair to
`self.task_commands`.
They can then be called through Slack IM via
notify/task {cmd} *args: register task with name `cmd` that is
performed every time `update()` is called.
"""
def __init__(self, interval=3, config=None, auto_start=True, **commands):
"""
Initializes Slack bot, including auto-updating widget if in notebook
and using multiprocessing.
Args:
interval (int): Update interval for widget (must be over 1s).
config (dict, optional): Config dict
If not given, uses qc.config['user']['slack']
The config dict must contain the following keys:
'bot_name': Name of the bot
'bot_token': Token from bot (obtained from slack website)
'names': Usernames to periodically check for IM messages
auto_start (Bool=True)
"""
if config is not None:
self.config = config
else:
self.config = qc_config.user.slack
self.slack = Slacker(self.config['token'])
self.bot_id = self.slack.users.get_user_id(self.config['bot_name'])
self.users = self.get_users(self.config['names'])
self.get_im_ids(self.users)
self.commands = {'plot': self.upload_latest_plot,
'msmt': self.print_measurement_information,
'measurement': self.print_measurement_information,
'notify': self.add_task,
'help': self.help_message,
'task': self.add_task,
**commands}
self.task_commands = {'finished': self.check_msmt_finished}
self.interval = interval
self.tasks = []
# Flag that exits loop when set to True (called via self.exit())
self._exit = False
# Flag that enables actions to be performed in the event loop
# Enabled via self.start(), disabled via self.stop()
self._is_active = False
# Call Thread init
super().__init__()
if auto_start:
self.start()
def start(self):
self._is_active = True
try:
# Start thread, can only be called once
super().start()
except RuntimeError:
# Thread already started, ignoring
pass
def run(self):
"""
Thread event loop that periodically checks for updates.
Can be stopped via self.stop(), after which the Thread is stopped
Returns:
None
"""
while not self._exit:
# Continue event loop
if self._is_active:
# check for updates
self.update()
sleep(self.interval)
def stop(self):
"""
Stop checking for updates. Can be started again via self.start()
Returns:
None
"""
self._is_active = False
def exit(self):
"""
Exit event loop, stop Thread.
Returns:
None
"""
self._stop = True
def user_from_id(self, user_id):
"""
Retrieve user from user id.
Args:
user_id: Id from which to retrieve user information
Returns:
user (dict): user information
"""
users = [user for user in self.users if
self.users[user]['id'] == user_id]
assert len(users) == 1, "Could not find unique user with id {}".format(
user_id)
return users[0]
def get_users(self, usernames):
"""
Extracts user information for users
Args:
usernames: Slack usernames of users
Returns:
users (dict): {username: user}
"""
users = {}
response = self.slack.users.list()
for member in response.body['members']:
if member['name'] in usernames:
users[member['name']] = member
if len(users) != len(usernames):
remaining_names = [name for name in usernames if name not in users]
raise RuntimeError(
'Could not find names {}'.format(remaining_names))
return users
def get_im_ids(self, users):
"""
Adds IM ids of users to users dict.
Also adds last_ts to the latest IM message
Args:
users (dict): {username: user}
Returns:
None
"""
response = self.slack.im.list()
user_ids = {user: users[user]['id'] for user in users}
im_ids = {im['user']: im['id'] for im in response.body['ims']}
for username, user_id in user_ids.items():
if user_id in im_ids:
users[username]['im_id'] = im_ids[user_id]
# update last ts
users[username]['last_ts'] = float(
self.get_im_messages(username=username, count=1)[0]['ts'])
def get_im_messages(self, username, **kwargs):
"""
Retrieves IM messages from username
Args:
username: Name of user
**kwargs: Additional kwargs for retrieving IM messages
Returns:
List of IM messages
"""
channel = self.users[username].get('im_id', None)
if channel is None:
return []
else:
response = self.slack.im.history(channel=channel,
**kwargs)
return response.body['messages']
def get_new_im_messages(self):
"""
Retrieves new IM messages for each user in self.users.
Updates user['last_ts'] to ts of newest message
Returns:
im_messages (Dict): {username: [messages list]} newer than last_ts
"""
im_messages = {}
for username, user in self.users.items():
last_ts = user.get('last_ts', None)
new_messages = self.get_im_messages(username=username,
oldest=last_ts)
# Kwarg 'oldest' sometimes also returns message with ts==last_ts
new_messages = [m for m in new_messages if
float(m['ts']) != last_ts]
im_messages[username] = new_messages
if new_messages:
self.users[username]['last_ts'] = float(new_messages[0]['ts'])
return im_messages
def update(self):
"""
Performs tasks, and checks for new messages.
Periodically called from widget update.
Returns:
None
"""
new_tasks = []
for task in self.tasks:
task_finished = task()
if not task_finished:
new_tasks.append(task)
self.tasks = new_tasks
new_messages = self.get_new_im_messages()
self.handle_messages(new_messages)
def help_message(self):
""" Return simple help message """
cc = ', '.join(['`' + str(k) + '`' for k in self.commands.keys()])
return '\nAvailable commands: %s' % cc
def handle_messages(self, messages):
"""
Performs commands depending on messages.
This includes adding tasks to be performed during each update.
"""
for user, user_messages in messages.items():
for message in user_messages:
if message.get('user', None) != self.users[user]['id']:
# Filter out bot messages
continue
channel = self.users[user]['im_id']
# Extract command (first word) and possible args
command, args, kwargs = convert_command(message['text'])
if command in self.commands:
msg = 'Executing {}'.format(command)
if args:
msg += ' {}'.format(args)
if kwargs:
msg += ' {}'.format(kwargs)
self.slack.chat.post_message(text=msg, channel=channel)
func = self.commands[command]
try:
if isinstance(func, _BaseParameter):
results = func(*args, **kwargs)
else:
# Only add channel and Slack if they are explicit
# kwargs
func_sig = inspect.signature(func)
if 'channel' in func_sig.parameters:
kwargs['channel'] = channel
if 'slack' in func_sig.parameters:
kwargs['slack'] = self
results = func(*args, **kwargs)
if results is not None:
self.slack.chat.post_message(
text='Results: {}'.format(results),
channel=channel)
except:
self.slack.chat.post_message(
text='Error: {}'.format(traceback.format_exc()),
channel=channel)
else:
self.slack.chat.post_message(
text='Command {} not understood. Try `help`'.format(
command),
channel=channel)
def add_task(self, command, *args, channel, **kwargs):
"""
Add a task to self.tasks, which will be executed during each update
Args:
command: task command
*args: Additional args for command
channel: Slack channel (can also be IM channel)
**kwargs: Additional kwargs for particular
Returns:
None
"""
if command in self.task_commands:
self.slack.chat.post_message(
text='Added task "{}"'.format(command),
channel=channel)
func = self.task_commands[command]
self.tasks.append(partial(func, *args, channel=channel, **kwargs))
else:
self.slack.chat.post_message(
text='Task command {} not understood'.format(command),
channel=channel)
def upload_latest_plot(self, channel, **kwargs):
"""
Uploads latest plot (if any) to slack channel.
The latest plot is retrieved from BasePlot, which is updated every
time a new qcodes plot is instantiated.
Args:
channel: Slack channel (can also be IM channel)
**kwargs: Not used
Returns:
None
"""
# Create temporary filename
temp_filename = tempfile.mktemp(suffix='.jpg')
# Retrieve latest plot
latest_plot = BasePlot.latest_plot
if latest_plot is not None:
# Saves latest plot to filename
latest_plot.save(filename=temp_filename)
# Upload plot to slack
self.slack.files.upload(temp_filename, channels=channel)
os.remove(temp_filename)
else:
self.slack.chat.post_message(text='No latest plot',
channel=channel)
def print_measurement_information(self, channel, **kwargs):
"""
Prints information about the current measurement.
Information printed is percentage complete, and dataset representation.
Dataset is retrieved from DataSet.latest_dataset, which updates itself
every time a new dataset is created
Args:
channel: Slack channel (can also be IM channel)
**kwargs: Not used
Returns:
None
"""
dataset = active_data_set()
if dataset is not None:
self.slack.chat.post_message(
text='Measurement is {:.0f}% complete'.format(
100 * dataset.fraction_complete()),
channel=channel)
self.slack.chat.post_message(
text=repr(dataset), channel=channel)
else:
self.slack.chat.post_message(
text='No latest dataset found',
channel=channel)
def check_msmt_finished(self, channel, **kwargs):
"""
Checks if the latest measurement is completed.
Args:
channel: Slack channel (can also be IM channel)
**kwargs: Not used
Returns:
is_finished (Bool): True if measurement is finished, False otherwise
"""
if active_loop() is None:
self.slack.chat.post_message(
text='Measurement complete',
channel=channel)
return True
else:
return False