Skip to content

Commit

Permalink
Fix triggerer thread crash in daemon mode (#34931)
Browse files Browse the repository at this point in the history
* Fixes #34816

Change the order of operations so that async child thread is created after forking when entering daemon context.

This makes sure that the thread stays alive in the internal loop.

---------

Co-authored-by: daniel.dylag <danieldylag1990@gmail.com>
  • Loading branch information
Bisk1 and daniel.dylag committed Oct 14, 2023
1 parent b067051 commit 9c1e8c2
Showing 1 changed file with 6 additions and 1 deletion.
7 changes: 6 additions & 1 deletion airflow/cli/commands/triggerer_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ def triggerer(args):
settings.MASK_SECRETS_IN_LOGS = True
print(settings.HEADER)
triggerer_heartrate = conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC")
triggerer_job_runner = TriggererJobRunner(job=Job(heartrate=triggerer_heartrate), capacity=args.capacity)

if args.daemon:
pid, stdout, stderr, log_file = setup_locations(
Expand All @@ -77,10 +76,16 @@ def triggerer(args):
umask=int(settings.DAEMON_UMASK, 8),
)
with daemon_context, _serve_logs(args.skip_serve_logs):
triggerer_job_runner = TriggererJobRunner(
job=Job(heartrate=triggerer_heartrate), capacity=args.capacity
)
run_job(job=triggerer_job_runner.job, execute_callable=triggerer_job_runner._execute)
else:
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
signal.signal(signal.SIGQUIT, sigquit_handler)
with _serve_logs(args.skip_serve_logs):
triggerer_job_runner = TriggererJobRunner(
job=Job(heartrate=triggerer_heartrate), capacity=args.capacity
)
run_job(job=triggerer_job_runner.job, execute_callable=triggerer_job_runner._execute)

0 comments on commit 9c1e8c2

Please sign in to comment.