-
Notifications
You must be signed in to change notification settings - Fork 81
/
worker.py
1338 lines (1152 loc) · 47.7 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import errno
import fcntl
import hashlib
import json
import os
import random
import select
import signal
import socket
import sys
import threading
import time
import traceback
import uuid
from collections import OrderedDict
from contextlib import ExitStack
from typing import (
TYPE_CHECKING,
Any,
Collection,
ContextManager,
Dict,
List,
Literal,
Optional,
Set,
Tuple,
Union,
)
from redis.client import PubSub
from redis.exceptions import LockError
from redis.lock import Lock
from structlog.stdlib import BoundLogger
from ._internal import (
ACTIVE,
ERROR,
QUEUED,
SCHEDULED,
dotted_parts,
g,
g_fork_lock,
gen_unique_id,
import_attribute,
queue_matches,
serialize_func_name,
serialize_retry_method,
)
from .exceptions import (
RetryException,
StopRetry,
TaskImportError,
TaskNotFound,
)
from .redis_semaphore import Semaphore
from .runner import get_runner_class
from .stats import StatsThread
from .task import Task
from .timeouts import JobTimeoutException
from .utils import redis_glob_escape
if TYPE_CHECKING:
from . import TaskTiger
LOCK_REDIS_KEY = "qslock"
__all__ = ["Worker"]
def sigchld_handler(*args: Any) -> None:
# Nothing to do here. This is just a dummy handler that we set up to catch
# the child process exiting.
pass
class WorkerContextManagerStack(ExitStack):
def __init__(self, context_managers: List[ContextManager]) -> None:
super(WorkerContextManagerStack, self).__init__()
for mgr in context_managers:
self.enter_context(mgr)
class Worker:
def __init__(
self,
tiger: "TaskTiger",
queues: Optional[List[str]] = None,
exclude_queues: Optional[List[str]] = None,
single_worker_queues: Optional[List[str]] = None,
max_workers_per_queue: Optional[int] = None,
store_tracebacks: Optional[bool] = None,
) -> None:
"""
Internal method to initialize a worker.
"""
self.tiger = tiger
bound_logger = tiger.log.bind(pid=os.getpid())
assert isinstance(bound_logger, BoundLogger)
self.log = bound_logger
self.connection = tiger.connection
self.scripts = tiger.scripts
self.config = tiger.config
self._key = tiger._key
self._did_work = True
self._last_task_check = 0.0
self.stats_thread: Optional[StatsThread] = None
self.id = str(uuid.uuid4())
if queues:
self.only_queues = set(queues)
elif self.config["ONLY_QUEUES"]:
self.only_queues = set(self.config["ONLY_QUEUES"])
else:
self.only_queues = set()
if exclude_queues:
self.exclude_queues = set(exclude_queues)
elif self.config["EXCLUDE_QUEUES"]:
self.exclude_queues = set(self.config["EXCLUDE_QUEUES"])
else:
self.exclude_queues = set()
if single_worker_queues:
self.single_worker_queues = set(single_worker_queues)
elif self.config["SINGLE_WORKER_QUEUES"]:
self.single_worker_queues = set(
self.config["SINGLE_WORKER_QUEUES"]
)
else:
self.single_worker_queues = set()
if max_workers_per_queue:
self.max_workers_per_queue: Optional[int] = max_workers_per_queue
else:
self.max_workers_per_queue = None
assert (
self.max_workers_per_queue is None
or self.max_workers_per_queue >= 1
)
if store_tracebacks is None:
self.store_tracebacks = bool(
self.config.get("STORE_TRACEBACKS", True)
)
else:
self.store_tracebacks = bool(store_tracebacks)
self._stop_requested = False
# A worker group is a group of workers that process the same set of
# queues. This allows us to use worker group-specific locks to reduce
# Redis load.
self.worker_group_name = hashlib.sha256(
json.dumps(
[sorted(self.only_queues), sorted(self.exclude_queues)]
).encode("utf8")
).hexdigest()
def _install_signal_handlers(self) -> None:
"""
Sets up signal handlers for safely stopping the worker.
"""
def request_stop(signum: int, frame: Any) -> None:
self._stop_requested = True
self.log.info("stop requested, waiting for task to finish")
signal.signal(signal.SIGINT, request_stop)
signal.signal(signal.SIGTERM, request_stop)
def _uninstall_signal_handlers(self) -> None:
"""
Restores default signal handlers.
"""
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
def _filter_queues(self, queues: Collection[str]) -> List[str]:
"""
Applies the queue filter to the given list of queues and returns the
queues that match. Note that a queue name matches any subqueues
starting with the name, followed by a date. For example, "foo" will
match both "foo" and "foo.bar".
"""
return [
q
for q in queues
if queue_matches(
q,
only_queues=self.only_queues,
exclude_queues=self.exclude_queues,
)
]
def _worker_queue_scheduled_tasks(self) -> None:
"""
Helper method that takes due tasks from the SCHEDULED queue and puts
them in the QUEUED queue for execution. This should be called
periodically.
"""
timeout = self.config["QUEUE_SCHEDULED_TASKS_TIME"]
if timeout > 0:
lock_name = self._key(
"lockv2", "queue_scheduled_tasks", self.worker_group_name
)
lock = self.connection.lock(lock_name, timeout=timeout)
# See if any worker has recently queued scheduled tasks.
if not lock.acquire(blocking=False):
return
queues = set(
self._filter_queues(self._retrieve_queues(self._key(SCHEDULED)))
)
now = time.time()
for queue in queues:
# Move due items from the SCHEDULED queue to the QUEUED queue. If
# items were moved, remove the queue from the scheduled set if it
# is empty, and add it to the queued set so the task gets picked
# up. If any unique tasks are already queued, don't update their
# queue time (because the new queue time would be later).
result = self.scripts.zpoppush(
self._key(SCHEDULED, queue),
self._key(QUEUED, queue),
self.config["SCHEDULED_TASK_BATCH_SIZE"],
now,
now,
if_exists=("noupdate",),
on_success=(
"update_sets",
queue,
self._key(SCHEDULED),
self._key(QUEUED),
),
)
self.log.debug("scheduled tasks", queue=queue, qty=len(result))
# XXX: ideally this would be in the same pipeline, but we only want
# to announce if there was a result.
if result:
if self.config["PUBLISH_QUEUED_TASKS"]:
self.connection.publish(self._key("activity"), queue)
self._did_work = True
def _poll_for_queues(self) -> None:
"""
Refresh list of queues.
Wait if we did not do any work.
This is only used when using polling to get queues with queued tasks.
"""
if not self._did_work:
time.sleep(self.config["POLL_TASK_QUEUES_INTERVAL"])
self._refresh_queue_set()
def _pubsub_for_queues(
self, timeout: float = 0, batch_timeout: float = 0
) -> None:
"""
Check activity channel for new queues and wait as necessary.
This is only used when using pubsub to get queues with queued tasks.
This method is also used to slow down the main processing loop to reduce
the effects of rapidly sending Redis commands. This method will exit
for any of these conditions:
1. _did_work is True, suggests there could be more work pending
2. Found new queue and after batch timeout. Note batch timeout
can be zero so it will exit immediately.
3. Timeout seconds have passed, this is the maximum time to stay in
this method
"""
new_queue_found = False
start_time = batch_exit = time.time()
assert self._pubsub is not None
while True:
# Check to see if batch_exit has been updated
if batch_exit > start_time:
pubsub_sleep = batch_exit - time.time()
else:
pubsub_sleep = start_time + timeout - time.time()
message = self._pubsub.get_message(
timeout=0
if pubsub_sleep < 0 or self._did_work
else pubsub_sleep
)
# Pull remaining messages off of channel
while message:
if message["type"] == "message":
new_queue_found, batch_exit = self._process_queue_message(
message["data"],
new_queue_found,
batch_exit,
start_time,
timeout,
batch_timeout,
)
message = self._pubsub.get_message()
if self._did_work:
# Exit immediately if we did work during the last execution
# loop because there might be more work to do
break
elif time.time() >= batch_exit and new_queue_found:
# After finding a new queue we can wait until the batch timeout
# expires
break
elif time.time() - start_time > timeout:
# Always exit after our maximum wait time
break
def _worker_queue_expired_tasks(self) -> None:
"""
Helper method that takes expired tasks (where we didn't get a
heartbeat until we reached a timeout) and puts them back into the
QUEUED queue for re-execution if they're idempotent, i.e. retriable
on JobTimeoutException. Otherwise, tasks are moved to the ERROR queue
and an exception is logged.
"""
# Note that we use the lock both to unnecessarily prevent multiple
# workers from requeuing expired tasks, as well as to space out
# this task (i.e. we don't release the lock unless we've exhausted our
# batch size, which will hopefully never happen)
lock = self.connection.lock(
self._key("lockv2", "queue_expired_tasks"),
timeout=self.config["REQUEUE_EXPIRED_TASKS_INTERVAL"],
)
if not lock.acquire(blocking=False):
return
now = time.time()
# Get a batch of expired tasks.
task_data = self.scripts.get_expired_tasks(
self.config["REDIS_PREFIX"],
now - self.config["ACTIVE_TASK_UPDATE_TIMEOUT"],
self.config["REQUEUE_EXPIRED_TASKS_BATCH_SIZE"],
)
for (queue, task_id) in task_data:
self.log.debug("expiring task", queue=queue, task_id=task_id)
self._did_work = True
try:
task = Task.from_id(self.tiger, queue, ACTIVE, task_id)
if task.should_retry_on(JobTimeoutException, logger=self.log):
self.log.info(
"queueing expired task", queue=queue, task_id=task_id
)
# Task is idempotent and can be requeued. If the task
# already exists in the QUEUED queue, don't change its
# time.
task._move(
from_state=ACTIVE, to_state=QUEUED, when=now, mode="nx"
)
else:
self.log.error(
"failing expired task", queue=queue, task_id=task_id
)
# Assume the task can't be retried and move it to the error
# queue.
task._move(from_state=ACTIVE, to_state=ERROR, when=now)
except TaskNotFound:
# Either the task was requeued by another worker, or we
# have a task without a task object.
# XXX: Ideally, the following block should be atomic.
if not self.connection.get(self._key("task", task_id)):
self.log.error("not found", queue=queue, task_id=task_id)
task = Task(
self.tiger,
queue=queue,
_data={"id": task_id},
_state=ACTIVE,
)
task._move()
# Release the lock immediately if we processed a full batch. This way,
# another process will be able to pick up another batch immediately
# without waiting for the lock to time out.
if len(task_data) == self.config["REQUEUE_EXPIRED_TASKS_BATCH_SIZE"]:
try:
lock.release()
except LockError:
# Not really a problem if releasing lock fails. It will expire
# soon anyway.
self.log.warning(
"failed to release lock queue_expired_tasks on full batch"
)
def _get_hard_timeouts(self, func: Any, tasks: List[Task]) -> List[float]:
is_batch_func = getattr(func, "_task_batch", False)
if is_batch_func:
task_timeouts = [
task.hard_timeout
for task in tasks
if task.hard_timeout is not None
]
hard_timeout = (
(max(task_timeouts) if task_timeouts else None)
or getattr(func, "_task_hard_timeout", None)
or self.config["DEFAULT_HARD_TIMEOUT"]
)
return [hard_timeout]
else:
return [
task.hard_timeout
or getattr(func, "_task_hard_timeout", None)
or self.config["DEFAULT_HARD_TIMEOUT"]
for task in tasks
]
def _execute_forked(self, tasks: List[Task], log: BoundLogger) -> bool:
"""
Executes the tasks in the forked process. Multiple tasks can be passed
for batch processing. However, they must all use the same function and
will share the execution entry.
"""
success = False
execution: Dict[str, Any] = {}
assert len(tasks)
task_func = tasks[0].serialized_func
assert all([task_func == task.serialized_func for task in tasks[1:]])
execution["time_started"] = time.time()
try:
func = tasks[0].func
runner_class = get_runner_class(log, tasks)
runner = runner_class(self.tiger)
is_batch_func = getattr(func, "_task_batch", False)
g["tiger"] = self.tiger
g["current_task_is_batch"] = is_batch_func
hard_timeouts = self._get_hard_timeouts(func, tasks)
with WorkerContextManagerStack(
self.config["CHILD_CONTEXT_MANAGERS"]
):
if is_batch_func:
# Batch process if the task supports it.
g["current_tasks"] = tasks
runner.run_batch_tasks(tasks, hard_timeouts[0])
else:
# Process sequentially.
for task, hard_timeout in zip(tasks, hard_timeouts):
g["current_tasks"] = [task]
runner.run_single_task(task, hard_timeout)
except RetryException as exc:
execution["retry"] = True
if exc.method:
execution["retry_method"] = serialize_retry_method(exc.method)
execution["log_error"] = exc.log_error
execution["exception_name"] = serialize_func_name(exc.__class__)
exc_info = exc.exc_info or sys.exc_info()
except (JobTimeoutException, Exception) as exc:
execution["exception_name"] = serialize_func_name(exc.__class__)
exc_info = sys.exc_info()
else:
success = True
if not success:
execution["time_failed"] = time.time()
if self.store_tracebacks:
# Currently we only log failed task executions to Redis.
execution["traceback"] = "".join(
traceback.format_exception(*exc_info)
)
execution["success"] = success
execution["host"] = socket.gethostname()
self._store_task_execution(tasks, execution)
return success
def _get_queue_batch_size(self, queue: str) -> int:
"""Get queue batch size."""
# Fetch one item unless this is a batch queue.
# XXX: It would be more efficient to loop in reverse order and break.
batch_queues = self.config["BATCH_QUEUES"]
batch_size = 1
for part in dotted_parts(queue):
if part in batch_queues:
batch_size = batch_queues[part]
return batch_size
def _get_queue_lock(
self, queue: str, log: BoundLogger
) -> Union[
Tuple[None, Literal[True]], Tuple[Optional[Semaphore], Literal[False]]
]:
"""Get queue lock for max worker queues.
For max worker queues it returns a Lock if acquired and whether
it failed to acquire the lock.
"""
max_workers = self.max_workers_per_queue
# Check if this is single worker queue
for part in dotted_parts(queue):
if part in self.single_worker_queues:
log.debug("single worker queue")
max_workers = 1
break
# Max worker queues require us to get a queue lock before
# moving tasks
if max_workers:
queue_lock = Semaphore(
self.connection,
self._key(LOCK_REDIS_KEY, queue),
self.id,
max_locks=max_workers,
timeout=self.config["ACTIVE_TASK_UPDATE_TIMEOUT"],
)
acquired, locks = queue_lock.acquire()
if not acquired:
return None, True
log.debug("acquired queue lock", locks=locks)
else:
queue_lock = None
return queue_lock, False
def _heartbeat(self, queue: str, task_ids: Collection[str]) -> None:
"""
Updates the heartbeat for the given task IDs to prevent them from
timing out and being requeued.
"""
now = time.time()
mapping = {task_id: now for task_id in task_ids}
self.connection.zadd(self._key(ACTIVE, queue), mapping) # type: ignore[arg-type]
def _execute(
self,
queue: str,
tasks: List[Task],
log: BoundLogger,
locks: Collection[Lock],
queue_lock: Optional[Semaphore],
all_task_ids: Set[str],
) -> bool:
"""
Executes the given tasks. Returns a boolean indicating whether
the tasks were executed successfully.
"""
# The tasks must use the same function.
assert len(tasks)
serialized_task_func = tasks[0].serialized_func
task_func = tasks[0].func
assert all(
[
serialized_task_func == task.serialized_func
for task in tasks[1:]
]
)
# Before executing periodic tasks, queue them for the next period.
if serialized_task_func in self.tiger.periodic_task_funcs:
tasks[0]._queue_for_next_period()
with g_fork_lock:
child_pid = os.fork()
if child_pid == 0:
# Child process
log = log.bind(child_pid=os.getpid())
assert isinstance(log, BoundLogger)
# Disconnect the Redis connection inherited from the main process.
# Note that this doesn't disconnect the socket in the main process.
self.connection.connection_pool.disconnect()
random.seed()
# Ignore Ctrl+C in the child so we don't abort the job -- the main
# process already takes care of a graceful shutdown.
signal.signal(signal.SIGINT, signal.SIG_IGN)
# Run the tasks.
success = self._execute_forked(tasks, log)
# Wait for any threads that might be running in the child, just
# like sys.exit() would. Note we don't call sys.exit() directly
# because it would perform additional cleanup (e.g. calling atexit
# handlers twice). See also: https://bugs.python.org/issue18966
threading._shutdown() # type: ignore[attr-defined]
os._exit(int(not success))
else:
# Main process
log = log.bind(child_pid=child_pid)
assert isinstance(log, BoundLogger)
for task in tasks:
log.info(
"processing",
func=serialized_task_func,
task_id=task.id,
params={"args": task.args, "kwargs": task.kwargs},
)
# Attach a signal handler to SIGCHLD (sent when the child process
# exits) so we can capture it.
signal.signal(signal.SIGCHLD, sigchld_handler)
# Since newer Python versions retry interrupted system calls we can't
# rely on the fact that select() is interrupted with EINTR. Instead,
# we'll set up a wake-up file descriptor below.
# Create a new pipe and apply the non-blocking flag (required for
# set_wakeup_fd).
pipe_r, pipe_w = os.pipe()
opened_fd = os.fdopen(pipe_r)
flags = fcntl.fcntl(pipe_r, fcntl.F_GETFL, 0)
flags = flags | os.O_NONBLOCK
fcntl.fcntl(pipe_r, fcntl.F_SETFL, flags)
flags = fcntl.fcntl(pipe_w, fcntl.F_GETFL, 0)
flags = flags | os.O_NONBLOCK
fcntl.fcntl(pipe_w, fcntl.F_SETFL, flags)
# A byte will be written to pipe_w if a signal occurs (and can be
# read from pipe_r).
old_wakeup_fd = signal.set_wakeup_fd(pipe_w)
def check_child_exit() -> Optional[int]:
"""
Do a non-blocking check to see if the child process exited.
Returns None if the process is still running, or the exit code
value of the child process.
"""
try:
pid, return_code = os.waitpid(child_pid, os.WNOHANG)
if pid != 0: # The child process is done.
return return_code
except OSError as e:
# Of course EINTR can happen if the child process exits
# while we're checking whether it exited. In this case it
# should be safe to retry.
if e.errno == errno.EINTR:
return check_child_exit()
else:
raise
return None
hard_timeouts = self._get_hard_timeouts(task_func, tasks)
time_started = time.time()
# Upper bound for when we expect the child processes to finish.
# Since the hard timeout doesn't cover any processing overhead,
# we're adding an extra buffer of ACTIVE_TASK_UPDATE_TIMEOUT
# (which is the same time we use to determine if a task has
# expired).
timeout_at = (
time_started
+ sum(hard_timeouts)
+ self.config["ACTIVE_TASK_UPDATE_TIMEOUT"]
)
# Wait for the child to exit and perform a periodic heartbeat.
# We check for the child twice in this loop so that we avoid
# unnecessary waiting if the child exited just before entering
# the while loop or while renewing heartbeat/locks.
while True:
return_code = check_child_exit()
if return_code is not None:
break
# Wait until the timeout or a signal / child exit occurs.
try:
# If observed the following behavior will be seen
# in the pipe when the parent process receives a
# SIGTERM while a task is running in a child process:
# Linux:
# - 0 when parent receives SIGTERM
# - select() exits with EINTR when child exit
# triggers signal, so the signal in the
# pipe is never seen since check_child_exit()
# will see the child is gone
#
# macOS:
# - 15 (SIGTERM) when parent receives SIGTERM
# - 20 (SIGCHLD) when child exits
results = select.select(
[pipe_r],
[],
[],
self.config["ACTIVE_TASK_UPDATE_TIMER"],
)
if results[0]:
# Purge pipe so select will pause on next call
try:
# Behavior of a would be blocking read()
# Linux:
# Python 2.7 Raises IOError
# Python 3.x returns empty string
#
# macOS:
# Returns empty string
opened_fd.read(1)
except IOError:
pass
except select.error as e:
if e.args[0] != errno.EINTR:
raise
return_code = check_child_exit()
if return_code is not None:
break
now = time.time()
if now > timeout_at:
log.error("hard timeout elapsed in parent process")
os.kill(child_pid, signal.SIGKILL)
pid, return_code = os.waitpid(child_pid, 0)
log.error("child killed", return_code=return_code)
execution = {
"time_started": time_started,
"time_failed": now,
"exception_name": serialize_func_name(
JobTimeoutException
),
"success": False,
"host": socket.gethostname(),
}
self._store_task_execution(tasks, execution)
break
try:
self._heartbeat(queue, all_task_ids)
for lock in locks:
try:
lock.reacquire()
except LockError:
log.warning(
"could not reacquire lock", lock=lock.name
)
if queue_lock:
acquired, current_locks = queue_lock.renew()
if not acquired:
log.debug("queue lock renew failure")
except OSError as e:
# EINTR happens if the task completed. Since we're just
# renewing locks/heartbeat it's okay if we get interrupted.
if e.errno != errno.EINTR:
raise
# Restore signals / clean up
signal.signal(signal.SIGCHLD, signal.SIG_DFL)
signal.set_wakeup_fd(old_wakeup_fd)
opened_fd.close()
os.close(pipe_w)
success = return_code == 0
return success
def _process_queue_message(
self,
message_queue: str,
new_queue_found: bool,
batch_exit: float,
start_time: float,
timeout: float,
batch_timeout: float,
) -> Tuple[bool, float]:
"""Process a queue message from activity channel."""
for queue in self._filter_queues([message_queue]):
if queue not in self._queue_set:
if not new_queue_found:
new_queue_found = True
batch_exit = time.time() + batch_timeout
# Limit batch_exit to max timeout
if batch_exit > start_time + timeout:
batch_exit = start_time + timeout
self._queue_set.add(queue)
self.log.debug("new queue", queue=queue)
return new_queue_found, batch_exit
def _process_queue_tasks(
self,
queue: str,
queue_lock: Optional[Semaphore],
task_ids: Set[str],
now: float,
log: BoundLogger,
) -> int:
"""Process tasks in queue."""
processed_count = 0
# Get all tasks
serialized_tasks = self.connection.mget(
[self._key("task", task_id) for task_id in task_ids]
)
# Parse tasks
tasks = []
for task_id, serialized_task in zip(task_ids, serialized_tasks):
if serialized_task:
task_data = json.loads(serialized_task)
else:
# In the rare case where we don't find the task which is
# queued (see ReliabilityTestCase.test_task_disappears),
# we log an error and remove the task below. We need to
# at least initialize the Task object with an ID so we can
# remove it.
task_data = {"id": task_id}
task = Task(
self.tiger,
queue=queue,
_data=task_data,
_state=ACTIVE,
_ts=now,
)
if not serialized_task:
# Remove task as per comment above
log.error("not found", task_id=task_id)
task._move()
elif task.id != task_id:
log.error("task ID mismatch", task_id=task_id)
# Remove task
task._move()
else:
tasks.append(task)
# List of task IDs that exist and we will update the heartbeat on.
valid_task_ids = {task.id for task in tasks}
# Group by task func
tasks_by_func: Dict[str, List[Task]] = OrderedDict()
for task in tasks:
func = task.serialized_func
if func in tasks_by_func:
tasks_by_func[func].append(task)
else:
tasks_by_func[func] = [task]
# Execute tasks for each task func
for tasks in tasks_by_func.values():
success, processed_tasks = self._execute_task_group(
queue, tasks, valid_task_ids, queue_lock
)
processed_count = processed_count + len(processed_tasks)
log.debug(
"processed", attempted=len(tasks), processed=processed_count
)
for task in processed_tasks:
self._finish_task_processing(queue, task, success, now)
return processed_count
def _process_from_queue(self, queue: str) -> Tuple[List[str], int]:
"""
Internal method to process a task batch from the given queue.
Args:
queue: Queue name to be processed
Returns:
Task IDs: List of tasks that were processed (even if there was an
error so that client code can assume the queue is empty
if nothing was returned)
Count: The number of tasks that were attempted to be executed or
-1 if the queue lock couldn't be acquired.
"""
now = time.time()
log: BoundLogger = self.log.bind(queue=queue)
assert isinstance(log, BoundLogger)
batch_size = self._get_queue_batch_size(queue)
queue_lock, failed_to_acquire = self._get_queue_lock(queue, log)
if failed_to_acquire:
return [], -1
# Move an item to the active queue, if available.
# We need to be careful when moving unique tasks: We currently don't
# support concurrent processing of multiple unique tasks. If the task
# is already in the ACTIVE queue, we need to execute the queued task
# later, i.e. move it to the SCHEDULED queue (prefer the earliest
# time if it's already scheduled). We want to make sure that the last
# queued instance of the task always gets executed no earlier than it
# was queued.
later = time.time() + self.config["LOCK_RETRY"]
task_ids = self.scripts.zpoppush(
self._key(QUEUED, queue),
self._key(ACTIVE, queue),
batch_size,
None,
now,
if_exists=("add", self._key(SCHEDULED, queue), later, "min"),
on_success=(
"update_sets",
queue,
self._key(QUEUED),
self._key(ACTIVE),
self._key(SCHEDULED),
),
)
log.debug(
"moved tasks",
src_queue=QUEUED,
dest_queue=ACTIVE,
qty=len(task_ids),
)
processed_count = 0
if task_ids:
processed_count = self._process_queue_tasks(
queue, queue_lock, task_ids, now, log
)
if queue_lock:
queue_lock.release()
log.debug("released swq lock")
return task_ids, processed_count
def _execute_task_group(
self,
queue: str,
tasks: List[Task],
all_task_ids: Set[str],
queue_lock: Optional[Semaphore],
) -> Tuple[bool, List[Task]]:
"""
Executes the given tasks in the queue. Updates the heartbeat for task
IDs passed in all_task_ids. This internal method is only meant to be
called from within _process_from_queue.
"""
log: BoundLogger = self.log.bind(queue=queue)
assert isinstance(log, BoundLogger)
locks = []
# Keep track of the acquired locks: If two tasks in the list require
# the same lock we only acquire it once.
lock_ids = set()
ready_tasks = []
for task in tasks:
if task.lock:
if task.lock_key:
kwargs = task.kwargs
lock_id = gen_unique_id(
task.serialized_func,
None,
{key: kwargs.get(key) for key in task.lock_key},
)
else:
lock_id = gen_unique_id(
task.serialized_func, task.args, task.kwargs
)
if lock_id not in lock_ids:
lock = self.connection.lock(
self._key("lockv2", lock_id),
timeout=self.config["ACTIVE_TASK_UPDATE_TIMEOUT"],
)
if not lock.acquire(blocking=False):
log.info("could not acquire lock", task_id=task.id)
# Reschedule the task (but if the task is already
# scheduled in case of a unique task, don't prolong
# the schedule date).
when = time.time() + self.config["LOCK_RETRY"]
task._move(
from_state=ACTIVE,
to_state=SCHEDULED,
when=when,
mode="min",
)
# Make sure to remove it from this list so we don't
# re-add to the ACTIVE queue by updating the heartbeat.
all_task_ids.remove(task.id)