-
Notifications
You must be signed in to change notification settings - Fork 2
/
engine.py
368 lines (309 loc) · 12.8 KB
/
engine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
import copy
import copyreg
import datetime as dt
import multiprocessing as mp
import sys
import time
import types
import pandas as pd
def _pickle_method(method):
"""
Pickle methods in order to assign them to different
processors using multiprocessing module. It tells the engine how
to pickle methods.
:param method: method to be pickled
"""
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
"""
Unpickle methods in order to assign them to different
processors using multiprocessing module. It tells the engine how
to unpickle methods.
:param func_name: func name to unpickle
:param obj: pickled object
:param cls: class method
:return: unpickled function
"""
func = None
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get(obj, cls)
copyreg.pickle(types.MethodType, _pickle_method, _unpickle_method)
def map_reduce_jobs(func, molecules, threads=24, batches=1, linear_molecules=True, redux=None,
redux_args={}, redux_in_place=False, report_progress=False, **kargs):
"""
Parallelize jobs and combine them into a single output
:param func: function to be parallelized
:param molecules[0]: Name of argument used to pass the molecule
:param molecules[1]: List of atoms that will be grouped into molecules
:param threads: number of threads
:param batches: number of parallel batches (jobs per core)
:param linear_molecules: Whether partition will be linear or double-nested
:param redux: callabck to the function that carries out the reduction.
:param redux_args: this is a dictionnary that contains the keyword arguments that must
:param redux_in_place: a boolean, indicating wether the redux operation should happen in-place or not.
For example, redux=dict.update and redux=list.append require redux_in_place=True,
since appending a list and updating a dictionnary are both in place operations.
:param kargs: any other argument needed by func
:param report_progress: Whether progressed will be logged or not
:return results combined into a single output
"""
parts = __create_parts(batches, linear_molecules, molecules, threads)
jobs = __create_jobs(func, kargs, molecules, parts)
out = __process_jobs_redux(jobs, redux=redux, redux_args=redux_args, redux_in_place=redux_in_place, threads=threads,
report_progress=report_progress)
return out
def map_jobs(func, molecules, threads=24, batches=1, linear_molecules=True, report_progress=False,
**kargs):
"""
Parallelize jobs, return a DataFrame or Series
:param func: function to be parallelized
:param molecules: pandas object
:param molecules[0]: Name of argument used to pass the molecule
:param molecules[1]: List of atoms that will be grouped into molecules
:param threads: number of threads that will be used in parallel (one processor per thread)
:param batches: number of parallel batches (jobs per core)
:param linear_molecules: whether partition will be linear or double-nested
:param report_progress: whether progressed will be logged or not
:param kargs: any other argument needed by func
"""
parts = __create_parts(batches, linear_molecules, molecules, threads)
jobs = __create_jobs(func, kargs, molecules, parts)
out = __process_jobs(jobs, threads, report_progress)
return __create_output(out)
def __create_parts(batches, linear_molecules, molecules, threads):
"""
Create partitions of atoms to be executed on each processor
:param batches: number of parallel batches (jobs per core)
:param linear_molecules: Whether partition will be linear or double-nested
:param molecules: pandas object
:param threads: number of threads that will be used in parallel (one processor per thread)
:return: partitions array
"""
if linear_molecules:
return __linear_parts(len(molecules[1]), threads * batches)
else:
return __nested_parts(len(molecules[1]), threads * batches)
def __create_output(out):
"""
Create DataFrame or Series output if needed
:param out: result array
:return: return the result as a DataFrame or Series if needed
"""
import pandas as pd
if isinstance(out[0], pd.DataFrame):
df0 = pd.DataFrame()
elif isinstance(out[0], pd.Series):
df0 = pd.Series()
else:
return out
for i in out:
df0 = df0.append(i)
return df0.sort_index()
def __process_jobs(jobs, threads, report_progress):
"""
Process jobs
:param jobs: jobs to process
:param threads: number of threads that will be used in parallel (one processor per thread)
:param report_progress: Whether progressed will be logged or not
:return: result output
"""
if threads == 1:
out = __process_jobs_sequentially_for_debugging(jobs)
else:
out = __process_jobs_in_parallel(jobs=jobs, threads=threads, report_progress=report_progress)
return out
def __create_jobs(func, kargs, molecules, parts):
"""
Create jobs
:param func: function to be executed
:param kargs: any other argument needed by the function
:param parts: partitionned list of atoms to be passed to the function
"""
jobs = []
for i in range(1, len(parts)):
job = {molecules[0]: molecules[1][parts[i - 1]: parts[i]], 'func': func}
job.update(kargs)
jobs.append(job)
return jobs
def __process_jobs_in_parallel(jobs, task=None, threads=24, report_progress=False):
"""
Process jobs with a multiprocess Pool
:param jobs: jobs to be processed (data to be passed to task)
:param task: func to be executed for each jobs
:param threads: number of threads to create
:param report_progress: Whether progressed will be logged or not
"""
if task is None:
task = jobs[0]['func'].__name__
pool = mp.Pool(processes=threads)
outputs, out, time0 = pool.imap_unordered(__expand_call, jobs), [], time.time()
__map_outputs(jobs, out, outputs, task, time0, report_progress)
pool.close()
pool.join()
return out
def __map_outputs(jobs, out, outputs, task, time0, report_progress):
"""
Map outputs
:param jobs: jobs to be processed (data to be passed to task)
:param out: single output
:param outputs: outputs
:param task: task
:param time0: start time
:param report_progress: Whether progressed will be logged or not
"""
for i, out_ in enumerate(outputs, 1):
out.append(out_)
if report_progress:
print_progress(i, len(jobs), time0, task)
def __process_jobs_redux(jobs, task=None, threads=24, redux=None, redux_args={}, redux_in_place=False,
report_progress=False):
"""
Process jobs and combine them into a single output(redux),
:param jobs: jobs to run in parallel
:param task: current task
:param threads: number of threads
:param redux: callabck to the function that carries out the reduction.
:param redux_args: this is a dictionnary that contains the keyword arguments that must
be passed to redux (if any).
:param redux_in_place: a boolean, indicating wether the redux operation should happen in-place or not.
For example, redux=dict.update and redux=list.append require redux_in_place=True,
since appending a list and updating a dictionnary are both in place operations.
:param report_progress: Whether progressed will be logged or not
:return: job result array
"""
if task is None:
task = jobs[0]['func'].__name__
pool = mp.Pool(processes=threads)
imap = pool.imap_unordered(__expand_call, jobs)
out = None
if out is None and redux is None:
redux = list.append
redux_in_place = True
time0 = time.time()
out = __map_reduce_outputs(imap, jobs, out, redux, redux_args, redux_in_place, task, time0, report_progress)
pool.close()
pool.join()
if isinstance(out, (pd.Series, pd.DataFrame)):
out = out.sort_index()
return out
def __map_reduce_outputs(imap, jobs, out, redux, redux_args, redux_in_place, task, time0, report_progress):
"""
Map reduce outputs
:param imap: job output iterator
:param jobs: jobs to run in parallel
:param out: output
:param redux: callabck to the function that carries out the reduction.
:param redux_args: this is a dictionnary that contains the keyword arguments that must
:param redux_in_place: a boolean, indicating whether the redux operation should happen in-place or not.
:param task: task to be executed
:param time0: start time
:param report_progress: Whether progressed will be logged or not
:return:
"""
for i, out_ in enumerate(imap, 1):
out = __reduce_output(out, out_, redux, redux_args, redux_in_place)
if report_progress:
print_progress(i, len(jobs), time0, task)
return out
def __reduce_output(out, out_, redux, redux_args, redux_in_place):
"""
Reduce output into a single output with the redux function
:param out: output
:param out_: current output
:param redux: callabck to the function that carries out the reduction.
:param redux_args: this is a dictionnary that contains the keyword arguments that must
:param redux_in_place: a boolean, indicating whether the redux operation should happen in-place or not.
For example, redux=dict.update and redux=list.append require redux_in_place=True,
since appending a list and updating a dictionnary are both in place operations.
:return:
"""
if out is None:
if redux is None:
out = [out_]
else:
out = copy.deepcopy(out_)
else:
if redux_in_place:
redux(out, out_, **redux_args)
else:
out = redux(out, out_, **redux_args)
return out
def print_progress(job_number, job_len, time0, task):
"""
Report jobs progress
:param job_number: job index
:param job_len: number of jobs
:param time0: multiprocessing start timestamp
:param task: task to process
"""
percentage = float(job_number) / job_len
minutes = (time.time() - time0) / 60.
minutes_remaining = minutes * (1 / percentage - 1)
msg = [percentage, minutes, minutes_remaining]
timestamp = str(dt.datetime.fromtimestamp(time.time()))
msg = timestamp + ' ' + str(round(msg[0] * 100, 2)) + '% ' + task + ' done after ' + \
str(round(msg[1], 2)) + ' minutes. Remaining ' + str(round(msg[2], 2)) + ' minutes.'
if job_number < job_len:
sys.stderr.write(msg + '\r')
else:
sys.stderr.write(msg + '\n')
return
def __process_jobs_sequentially_for_debugging(jobs):
"""
Simple function that processes jobs sequentially for debugging
:param jobs: jobs to process
:return: result array of jobs
"""
out = []
for job in jobs:
out_ = __expand_call(job)
out.append(out_)
return out
def __expand_call(kargs):
"""
Pass the job (molecule) to the callback function
Expand the arguments of a callback function, kargs['func']
:param kargs: argument needed by callback func
"""
func = kargs['func']
del kargs['func']
out = func(**kargs)
return out
def __linear_parts(number_of_atoms, number_of_threads):
"""
Partition a list of atoms in subset of equal size between the number of processors and the number of atoms.
:param number_of_atoms: number of atoms (individual tasks to execute and group into molecules)
:param number_of_threads: number of threads to create
:return: return partitions or list of list of atoms (molecules)
"""
parts = pd.np.linspace(0, number_of_atoms, min(number_of_threads, number_of_atoms) + 1)
parts = pd.np.ceil(parts).astype(int)
return parts
def __nested_parts(number_of_atoms, number_of_threads, upper_triangle=False):
"""
Partition of atoms with an inner loop
:param number_of_atoms: number of atoms (individual tasks to execute and group into molecules)
:param number_of_threads: number of threads to create
:param upper_triangle:
:return: return partitions or list of list of atoms (molecules)
"""
parts = [0]
number_of_threads_ = min(number_of_threads, number_of_atoms)
for num in range(number_of_threads_):
part = 1 + 4 * (parts[-1] ** 2 + parts[-1] + number_of_atoms * (number_of_atoms + 1.) / number_of_threads_)
part = (-1 + part ** .5) / 2.
parts.append(part)
parts = pd.np.round(parts).astype(int)
if upper_triangle:
parts = pd.np.cumsum(pd.np.diff(parts)[::-1])
parts = pd.np.append(pd.np.array([0]), parts)
return parts