Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-1004] Fix airflow webserver -D to run in background #2208

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
64 changes: 53 additions & 11 deletions airflow/bin/cli.py
Expand Up @@ -753,7 +753,12 @@ def webserver(args):
app.run(debug=True, port=args.port, host=args.hostname,
ssl_context=(ssl_cert, ssl_key) if ssl_cert and ssl_key else None)
else:
pid, stdout, stderr, log_file = setup_locations("webserver", pid=args.pid)
pid, stdout, stderr, log_file = setup_locations("webserver", args.pid, args.stdout, args.stderr, args.log_file)
if args.daemon:
handle = setup_logging(log_file)
stdout = open(stdout, 'w+')
stderr = open(stderr, 'w+')

print(
textwrap.dedent('''\
Running the Gunicorn Server with:
Expand All @@ -771,7 +776,6 @@ def webserver(args):
'-t', str(worker_timeout),
'-b', args.hostname + ':' + str(args.port),
'-n', 'airflow-webserver',
'-p', str(pid),
'-c', 'airflow.www.gunicorn_config'
]

Expand All @@ -782,28 +786,66 @@ def webserver(args):
run_args += ['--error-logfile', str(args.error_logfile)]

if args.daemon:
run_args += ["-D"]
run_args += ['-D', '-p', str(pid)]

if ssl_cert:
run_args += ['--certfile', ssl_cert, '--keyfile', ssl_key]

run_args += ["airflow.www.app:cached_app()"]

gunicorn_master_proc = subprocess.Popen(run_args)
gunicorn_master_proc = None

def kill_proc(dummy_signum, dummy_frame):
gunicorn_master_proc.terminate()
gunicorn_master_proc.wait()
sys.exit(0)

signal.signal(signal.SIGINT, kill_proc)
signal.signal(signal.SIGTERM, kill_proc)
def monitor_gunicorn(gunicorn_master_proc):
# These run forever until SIG{INT, TERM, KILL, ...} signal is sent
if conf.getint('webserver', 'worker_refresh_interval') > 0:
restart_workers(gunicorn_master_proc, num_workers)
else:
while True:
time.sleep(1)

# These run forever until SIG{INT, TERM, KILL, ...} signal is sent
if conf.getint('webserver', 'worker_refresh_interval') > 0:
restart_workers(gunicorn_master_proc, num_workers)
if args.daemon:
base, ext = os.path.splitext(pid)
ctx = daemon.DaemonContext(
pidfile=TimeoutPIDLockFile(base + "-monitor" + ext, -1),
files_preserve=[handle],
stdout=stdout,
stderr=stderr,
signal_map={
signal.SIGINT: kill_proc,
signal.SIGTERM: kill_proc
},
)
with ctx:
subprocess.Popen(run_args)

# Reading pid file directly, since Popen#pid doesn't
# seem to return the right value with DaemonContext.
while True:
try:
with open(pid) as f:
gunicorn_master_proc_pid = int(f.read())
break
except IOError:
logging.debug("Waiting for gunicorn's pid file to be created.")
time.sleep(0.1)

gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid)
monitor_gunicorn(gunicorn_master_proc)

stdout.close()
stderr.close()
else:
while True:
time.sleep(1)
gunicorn_master_proc = subprocess.Popen(run_args)

signal.signal(signal.SIGINT, kill_proc)
signal.signal(signal.SIGTERM, kill_proc)

monitor_gunicorn(gunicorn_master_proc)


def scheduler(args):
Expand Down
56 changes: 56 additions & 0 deletions tests/core.py
Expand Up @@ -1425,6 +1425,62 @@ def test_variables(self):
os.remove('variables1.json')
os.remove('variables2.json')

def test_cli_webserver_foreground(self):
import subprocess

# Confirm that webserver hasn't been launched.
# pgrep returns exit status 1 if no process matched.
self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "airflow"]).wait())
self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait())

# Run webserver in foreground and terminate it.
p = subprocess.Popen(["airflow", "webserver"])
p.terminate()
p.wait()

# Assert that no process remains.
self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "airflow"]).wait())
self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait())

@unittest.skipIf("TRAVIS" in os.environ and bool(os.environ["TRAVIS"]),
"Skipping test due to lack of required file permission")
def test_cli_webserver_background(self):
import subprocess
import psutil

def wait_pidfile(pidfile):
while True:
try:
with open(pidfile) as f:
return int(f.read())
except IOError:
sleep(1)

# Confirm that webserver hasn't been launched.
self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "airflow"]).wait())
self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait())

# Run webserver in background.
subprocess.Popen(["airflow", "webserver", "-D"])
pidfile = cli.setup_locations("webserver")[0]
wait_pidfile(pidfile)

# Assert that gunicorn and its monitor are launched.
self.assertEqual(0, subprocess.Popen(["pgrep", "-c", "airflow"]).wait())
self.assertEqual(0, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait())

# Terminate monitor process.
pidfile = cli.setup_locations("webserver-monitor")[0]
pid = wait_pidfile(pidfile)
p = psutil.Process(pid)
p.terminate()
p.wait()

# Assert that no process remains.
self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "airflow"]).wait())
self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait())


class SecurityTests(unittest.TestCase):
def setUp(self):
configuration.load_test_config()
Expand Down