/
job_queue.py
196 lines (155 loc) · 5.87 KB
/
job_queue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
"""
Sliding-window-based job/task queue class (& example of use.)
May use ``multiprocessing.Process`` or ``threading.Thread`` objects as queue
items, though within Fabric itself only ``Process`` objects are used/supported.
"""
from pprint import pprint
from Crypto import Random
import time
from fabric.state import env, io_sleep
class JobQueue(object):
"""
The goal of this class is to make a queue of processes to run, and go
through them running X number at any given time.
So if the bubble is 5 start with 5 running and move the bubble of running
procs along the queue looking something like this:
Start
...........................
[~~~~~]....................
___[~~~~~].................
_________[~~~~~]...........
__________________[~~~~~]..
____________________[~~~~~]
___________________________
End
"""
def __init__(self, max_running):
"""
Setup the class to resonable defaults.
"""
self._queued = []
self._running = []
self._completed = []
self._num_of_jobs = 0
self._max = max_running
self._finished = False
self._closed = False
self._debug = False
def _all_alive(self):
"""
Simply states if all procs are alive or not. Needed to determine when
to stop looping, and pop dead procs off and add live ones.
"""
if self._running:
return all([x.is_alive() for x in self._running])
else:
return False
def __len__(self):
"""
Just going to use number of jobs as the JobQueue length.
"""
return self._num_of_jobs
def close(self):
"""
A sanity check, so that the need to care about new jobs being added in
the last throws of the job_queue's run are negated.
"""
if self._debug:
print("job queue closed.")
self._closed = True
def append(self, process):
"""
Add the Process() to the queue, so that later it can be checked up on.
That is if the JobQueue is still open.
If the queue is closed, this will just silently do nothing.
"""
if not self._closed:
self._queued.append(process)
self._num_of_jobs += 1
if self._debug:
print("job queue appended %s." % process.name)
def run(self):
"""
This is the workhorse. It will take the intial jobs from the _queue,
start them, add them to _running, and then go into the main running
loop.
This loop will check for done procs, if found, move them out of
_running into _completed. It also checks for a _running queue with open
spots, which it will then fill as discovered.
To end the loop, there have to be no running procs, and no more procs
to be run in the queue.
This function returns an iterable of all its children's exit codes.
"""
def _advance_the_queue():
"""
Helper function to do the job of poping a new proc off the queue
start it, then add it to the running queue. This will eventually
depleate the _queue, which is a condition of stopping the running
while loop.
It also sets the env.host_string from the job.name, so that fabric
knows that this is the host to be making connections on.
"""
job = self._queued.pop()
if self._debug:
print("Popping '%s' off the queue and starting it" % job.name)
env.host_string = env.host = job.name
job.start()
self._running.append(job)
if not self._closed:
raise Exception("Need to close() before starting.")
if self._debug:
print("Job queue starting.")
while len(self._running) < self._max:
_advance_the_queue()
while not self._finished:
while len(self._running) < self._max and self._queued:
_advance_the_queue()
if not self._all_alive():
for id, job in enumerate(self._running):
if not job.is_alive():
if self._debug:
print("Job queue found finished proc: %s." %
job.name)
done = self._running.pop(id)
self._completed.append(done)
if self._debug:
print("Job queue has %d running." % len(self._running))
if not (self._queued or self._running):
if self._debug:
print("Job queue finished.")
for job in self._completed:
job.join()
self._finished = True
time.sleep(io_sleep)
return [x.exitcode for x in self._completed]
#### Sample
def try_using(parallel_type):
"""
This will run the queue through it's paces, and show a simple way of using
the job queue.
"""
def print_number(number):
"""
Simple function to give a simple task to execute.
"""
print(number)
if parallel_type == "multiprocessing":
from multiprocessing import Process as Bucket
elif parallel_type == "threading":
from threading import Thread as Bucket
# Make a job_queue with a bubble of len 5, and have it print verbosely
jobs = JobQueue(5)
jobs._debug = True
# Add 20 procs onto the stack
for x in range(20):
jobs.append(Bucket(
target = print_number,
args = [x],
kwargs = {},
))
# Close up the queue and then start it's execution
jobs.close()
jobs.run()
if __name__ == '__main__':
try_using("multiprocessing")
try_using("threading")