forked from ipython/ipython
-
Notifications
You must be signed in to change notification settings - Fork 0
/
taskfc.py
345 lines (273 loc) · 11.2 KB
/
taskfc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
# encoding: utf-8
# -*- test-case-name: IPython.kernel.tests.test_taskxmlrpc -*-
"""A Foolscap interface to a TaskController.
This class lets Foolscap clients talk to a TaskController.
"""
__docformat__ = "restructuredtext en"
#-------------------------------------------------------------------------------
# Copyright (C) 2008 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
import cPickle as pickle
import xmlrpclib, copy
from zope.interface import Interface, implements
from twisted.internet import defer
from twisted.python import components, failure
try:
# This is preferred in foolscap v > 0.4.3
from foolscap.api import Referenceable
except ImportError:
# Fallback for older versions
from foolscap import Referenceable
from IPython.kernel.twistedutil import blockingCallFromThread
from IPython.kernel import error, task as taskmodule, taskclient
from IPython.kernel.pickleutil import can, uncan
from IPython.kernel.clientinterfaces import (
IFCClientInterfaceProvider,
IBlockingClientAdaptor
)
from IPython.kernel.mapper import (
TaskMapper,
ITaskMapperFactory,
IMapper
)
from IPython.kernel.parallelfunction import (
ParallelFunction,
ITaskParallelDecorator
)
#-------------------------------------------------------------------------------
# The Controller side of things
#-------------------------------------------------------------------------------
class IFCTaskController(Interface):
"""Foolscap interface to task controller.
See the documentation of `ITaskController` for more information.
"""
def remote_run(binTask):
""""""
def remote_abort(taskid):
""""""
def remote_get_task_result(taskid, block=False):
""""""
def remote_barrier(taskids):
""""""
def remote_spin():
""""""
def remote_queue_status(verbose):
""""""
def remote_clear(taskids=None):
""""""
class FCTaskControllerFromTaskController(Referenceable):
"""
Adapt a `TaskController` to an `IFCTaskController`
This class is used to expose a `TaskController` over the wire using
the Foolscap network protocol.
"""
implements(IFCTaskController, IFCClientInterfaceProvider)
def __init__(self, taskController):
self.taskController = taskController
#---------------------------------------------------------------------------
# Non interface methods
#---------------------------------------------------------------------------
def packageFailure(self, f):
f.cleanFailure()
return self.packageSuccess(f)
def packageSuccess(self, obj):
serial = pickle.dumps(obj, 2)
return serial
#---------------------------------------------------------------------------
# ITaskController related methods
#---------------------------------------------------------------------------
def remote_run(self, ptask):
try:
task = pickle.loads(ptask)
task.uncan_task()
except:
d = defer.fail(pickle.UnpickleableError("Could not unmarshal task"))
else:
d = self.taskController.run(task)
d.addCallback(self.packageSuccess)
d.addErrback(self.packageFailure)
return d
def remote_abort(self, taskid):
d = self.taskController.abort(taskid)
d.addCallback(self.packageSuccess)
d.addErrback(self.packageFailure)
return d
def remote_get_task_result(self, taskid, block=False):
d = self.taskController.get_task_result(taskid, block)
d.addCallback(self.packageSuccess)
d.addErrback(self.packageFailure)
return d
def remote_barrier(self, taskids):
d = self.taskController.barrier(taskids)
d.addCallback(self.packageSuccess)
d.addErrback(self.packageFailure)
return d
def remote_spin(self):
d = self.taskController.spin()
d.addCallback(self.packageSuccess)
d.addErrback(self.packageFailure)
return d
def remote_queue_status(self, verbose):
d = self.taskController.queue_status(verbose)
d.addCallback(self.packageSuccess)
d.addErrback(self.packageFailure)
return d
def remote_clear(self,taskids=None):
d = self.taskController.clear(taskids)
d.addCallback(self.packageSuccess)
d.addErrback(self.packageFailure)
return d
def remote_get_client_name(self):
return 'IPython.kernel.taskfc.FCTaskClient'
components.registerAdapter(FCTaskControllerFromTaskController,
taskmodule.ITaskController, IFCTaskController)
#-------------------------------------------------------------------------------
# The Client side of things
#-------------------------------------------------------------------------------
class FCTaskClient(object):
"""
Client class for Foolscap exposed `TaskController`.
This class is an adapter that makes a `RemoteReference` to a
`TaskController` look like an actual `ITaskController` on the client side.
This class also implements `IBlockingClientAdaptor` so that clients can
automatically get a blocking version of this class.
"""
implements(
taskmodule.ITaskController,
IBlockingClientAdaptor,
ITaskMapperFactory,
IMapper,
ITaskParallelDecorator
)
def __init__(self, remote_reference):
self.remote_reference = remote_reference
#---------------------------------------------------------------------------
# Non interface methods
#---------------------------------------------------------------------------
def unpackage(self, r):
return pickle.loads(r)
#---------------------------------------------------------------------------
# ITaskController related methods
#---------------------------------------------------------------------------
def run(self, task):
"""Run a task on the `TaskController`.
See the documentation of the `MapTask` and `StringTask` classes for
details on how to build a task of different types.
:Parameters:
task : an `ITask` implementer
:Returns: The int taskid of the submitted task. Pass this to
`get_task_result` to get the `TaskResult` object.
"""
assert isinstance(task, taskmodule.BaseTask), "task must be a Task object!"
task.can_task()
ptask = pickle.dumps(task, 2)
task.uncan_task()
d = self.remote_reference.callRemote('run', ptask)
d.addCallback(self.unpackage)
return d
def get_task_result(self, taskid, block=False):
"""
Get a task result by taskid.
:Parameters:
taskid : int
The taskid of the task to be retrieved.
block : boolean
Should I block until the task is done?
:Returns: A `TaskResult` object that encapsulates the task result.
"""
d = self.remote_reference.callRemote('get_task_result', taskid, block)
d.addCallback(self.unpackage)
return d
def abort(self, taskid):
"""
Abort a task by taskid.
:Parameters:
taskid : int
The taskid of the task to be aborted.
"""
d = self.remote_reference.callRemote('abort', taskid)
d.addCallback(self.unpackage)
return d
def barrier(self, taskids):
"""Block until a set of tasks are completed.
:Parameters:
taskids : list, tuple
A sequence of taskids to block on.
"""
d = self.remote_reference.callRemote('barrier', taskids)
d.addCallback(self.unpackage)
return d
def spin(self):
"""
Touch the scheduler, to resume scheduling without submitting a task.
This method only needs to be called in unusual situations where the
scheduler is idle for some reason.
"""
d = self.remote_reference.callRemote('spin')
d.addCallback(self.unpackage)
return d
def queue_status(self, verbose=False):
"""
Get a dictionary with the current state of the task queue.
:Parameters:
verbose : boolean
If True, return a list of taskids. If False, simply give
the number of tasks with each status.
:Returns:
A dict with the queue status.
"""
d = self.remote_reference.callRemote('queue_status', verbose)
d.addCallback(self.unpackage)
return d
def clear(self,taskids=None):
"""
Clear previously run tasks from the task controller.
:Parameters:
taskids : list, tuple, None
A sequence of taskids whose results we should drop.
if None: clear all results
:Returns:
An int, the number of tasks cleared
This is needed because the task controller keep all task results
in memory. This can be a problem is there are many completed
tasks. Users should call this periodically to clean out these
cached task results.
"""
d = self.remote_reference.callRemote('clear', taskids)
d.addCallback(self.unpackage)
return d
def adapt_to_blocking_client(self):
"""
Wrap self in a blocking version that implements `IBlockingTaskClient.
"""
from IPython.kernel.taskclient import IBlockingTaskClient
return IBlockingTaskClient(self)
def map(self, func, *sequences):
"""
Apply func to *sequences elementwise. Like Python's builtin map.
This version is load balanced.
"""
return self.mapper().map(func, *sequences)
def mapper(self, clear_before=False, clear_after=False, retries=0,
recovery_task=None, depend=None, block=True):
"""
Create an `IMapper` implementer with a given set of arguments.
The `IMapper` created using a task controller is load balanced.
See the documentation for `IPython.kernel.task.BaseTask` for
documentation on the arguments to this method.
"""
return TaskMapper(self, clear_before=clear_before,
clear_after=clear_after, retries=retries,
recovery_task=recovery_task, depend=depend, block=block)
def parallel(self, clear_before=False, clear_after=False, retries=0,
recovery_task=None, depend=None, block=True):
mapper = self.mapper(clear_before, clear_after, retries,
recovery_task, depend, block)
pf = ParallelFunction(mapper)
return pf