Skip to content

Commit

Permalink
Fix triggerer thread crash in daemon mode (#34931)
Browse files Browse the repository at this point in the history
* Fixes #34816

Change the order of operations so that async child thread is created after forking when entering daemon context.

This makes sure that the thread stays alive in the internal loop.

---------

Co-authored-by: daniel.dylag <danieldylag1990@gmail.com>
(cherry picked from commit 9c1e8c2)
  • Loading branch information
Bisk1 authored and ephraimbuddy committed Oct 29, 2023
1 parent 9262b4d commit cf5715d
Showing 1 changed file with 6 additions and 1 deletion.
7 changes: 6 additions & 1 deletion airflow/cli/commands/triggerer_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ def triggerer(args):
settings.MASK_SECRETS_IN_LOGS = True
print(settings.HEADER)
triggerer_heartrate = conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC")
triggerer_job_runner = TriggererJobRunner(job=Job(heartrate=triggerer_heartrate), capacity=args.capacity)

if args.daemon:
pid, stdout, stderr, log_file = setup_locations(
Expand All @@ -77,10 +76,16 @@ def triggerer(args):
umask=int(settings.DAEMON_UMASK, 8),
)
with daemon_context, _serve_logs(args.skip_serve_logs):
triggerer_job_runner = TriggererJobRunner(
job=Job(heartrate=triggerer_heartrate), capacity=args.capacity
)
run_job(job=triggerer_job_runner.job, execute_callable=triggerer_job_runner._execute)
else:
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
signal.signal(signal.SIGQUIT, sigquit_handler)
with _serve_logs(args.skip_serve_logs):
triggerer_job_runner = TriggererJobRunner(
job=Job(heartrate=triggerer_heartrate), capacity=args.capacity
)
run_job(job=triggerer_job_runner.job, execute_callable=triggerer_job_runner._execute)

0 comments on commit cf5715d

Please sign in to comment.