13
13
from plain .utils import timezone
14
14
from plain .utils .module_loading import import_string
15
15
16
- from .models import Job , JobRequest , JobResult , JobResultStatuses
16
+ from .models import JobProcess , JobRequest , JobResult , JobResultStatuses
17
17
from .registry import jobs_registry
18
18
19
19
logger = logging .getLogger ("plain.worker" )
@@ -114,16 +114,18 @@ def run(self):
114
114
job_request .queue ,
115
115
)
116
116
117
- job = job_request .convert_to_job ()
117
+ job = job_request .convert_to_job_process ()
118
118
119
- job_uuid = str (job .uuid ) # Make a str copy
119
+ job_process_uuid = str (job .uuid ) # Make a str copy
120
120
121
121
# Release these now
122
122
del job_request
123
123
del job
124
124
125
- future = self .executor .submit (process_job , job_uuid )
126
- future .add_done_callback (partial (future_finished_callback , job_uuid ))
125
+ future = self .executor .submit (process_job , job_process_uuid )
126
+ future .add_done_callback (
127
+ partial (future_finished_callback , job_process_uuid )
128
+ )
127
129
128
130
# Do a quick sleep regardless to see if it
129
131
# gives processes a chance to start up
@@ -213,7 +215,7 @@ def log_stats(self):
213
215
num_proccesses = 0
214
216
215
217
jobs_requested = JobRequest .query .filter (queue__in = self .queues ).count ()
216
- jobs_processing = Job .query .filter (queue__in = self .queues ).count ()
218
+ jobs_processing = JobProcess .query .filter (queue__in = self .queues ).count ()
217
219
218
220
logger .info (
219
221
'Job worker stats worker_processes=%s worker_queues="%s" jobs_requested=%s jobs_processing=%s worker_max_processes=%s worker_max_jobs_per_process=%s' ,
@@ -228,52 +230,52 @@ def log_stats(self):
228
230
def rescue_job_results (self ):
229
231
"""Find any lost or failed jobs on this worker's queues and handle them."""
230
232
# TODO return results and log them if there are any?
231
- Job .query .filter (queue__in = self .queues ).mark_lost_jobs ()
233
+ JobProcess .query .filter (queue__in = self .queues ).mark_lost_jobs ()
232
234
JobResult .query .filter (queue__in = self .queues ).retry_failed_jobs ()
233
235
234
236
235
- def future_finished_callback (job_uuid : str , future : Future ):
237
+ def future_finished_callback (job_process_uuid : str , future : Future ):
236
238
if future .cancelled ():
237
- logger .warning ("Job cancelled job_uuid =%s" , job_uuid )
239
+ logger .warning ("Job cancelled job_process_uuid =%s" , job_process_uuid )
238
240
try :
239
- job = Job .query .get (uuid = job_uuid )
241
+ job = JobProcess .query .get (uuid = job_process_uuid )
240
242
job .convert_to_result (status = JobResultStatuses .CANCELLED )
241
- except Job .DoesNotExist :
243
+ except JobProcess .DoesNotExist :
242
244
# Job may have already been cleaned up
243
245
pass
244
246
elif exception := future .exception ():
245
247
# Process pool may have been killed...
246
248
logger .warning (
247
- "Job failed job_uuid =%s" ,
248
- job_uuid ,
249
+ "Job failed job_process_uuid =%s" ,
250
+ job_process_uuid ,
249
251
exc_info = exception ,
250
252
)
251
253
try :
252
- job = Job .query .get (uuid = job_uuid )
254
+ job = JobProcess .query .get (uuid = job_process_uuid )
253
255
job .convert_to_result (status = JobResultStatuses .CANCELLED )
254
- except Job .DoesNotExist :
256
+ except JobProcess .DoesNotExist :
255
257
# Job may have already been cleaned up
256
258
pass
257
259
else :
258
- logger .debug ("Job finished job_uuid =%s" , job_uuid )
260
+ logger .debug ("Job finished job_process_uuid =%s" , job_process_uuid )
259
261
260
262
261
- def process_job (job_uuid ):
263
+ def process_job (job_process_uuid ):
262
264
try :
263
265
worker_pid = os .getpid ()
264
266
265
267
request_started .send (sender = None )
266
268
267
- job = Job .query .get (uuid = job_uuid )
269
+ job_process = JobProcess .query .get (uuid = job_process_uuid )
268
270
269
271
logger .info (
270
272
'Executing job worker_pid=%s job_class=%s job_request_uuid=%s job_priority=%s job_source="%s" job_queue="%s"' ,
271
273
worker_pid ,
272
- job .job_class ,
273
- job .job_request_uuid ,
274
- job .priority ,
275
- job .source ,
276
- job .queue ,
274
+ job_process .job_class ,
275
+ job_process .job_request_uuid ,
276
+ job_process .priority ,
277
+ job_process .source ,
278
+ job_process .queue ,
277
279
)
278
280
279
281
def middleware_chain (job ):
@@ -284,19 +286,19 @@ def middleware_chain(job):
284
286
middleware_instance = middleware_class (middleware_chain )
285
287
middleware_chain = middleware_instance
286
288
287
- job_result = middleware_chain (job )
289
+ job_result = middleware_chain (job_process )
288
290
289
291
# Release it now
290
- del job
292
+ del job_process
291
293
292
294
duration = job_result .ended_at - job_result .started_at
293
295
duration = duration .total_seconds ()
294
296
295
297
logger .info (
296
- 'Completed job worker_pid=%s job_class=%s job_uuid =%s job_request_uuid=%s job_result_uuid=%s job_priority=%s job_source="%s" job_queue="%s" job_duration=%s' ,
298
+ 'Completed job worker_pid=%s job_class=%s job_process_uuid =%s job_request_uuid=%s job_result_uuid=%s job_priority=%s job_source="%s" job_queue="%s" job_duration=%s' ,
297
299
worker_pid ,
298
300
job_result .job_class ,
299
- job_result .job_uuid ,
301
+ job_result .job_process_uuid ,
300
302
job_result .job_request_uuid ,
301
303
job_result .uuid ,
302
304
job_result .priority ,
0 commit comments