Skip to content

Commit

Permalink
[AIRFLOW-1004][AIRFLOW-276] Fix airflow webserver -D to run in back…
Browse files Browse the repository at this point in the history
…ground

AIRFLOW-276 introduced a monitor process for
gunicorn
to find new files in the dag folder, but it also
changed
`airflow webserver -D`'s behavior to run in
foreground.
This PR fixes that by running the monitor as a
daemon process.

Closes #2208 from sekikn/AIRFLOW-1004
  • Loading branch information
sekikn authored and bolkedebruin committed Apr 4, 2017
1 parent e4494f8 commit a9b20a0
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 11 deletions.
64 changes: 53 additions & 11 deletions airflow/bin/cli.py
Expand Up @@ -753,7 +753,12 @@ def webserver(args):
app.run(debug=True, port=args.port, host=args.hostname,
ssl_context=(ssl_cert, ssl_key) if ssl_cert and ssl_key else None)
else:
pid, stdout, stderr, log_file = setup_locations("webserver", pid=args.pid)
pid, stdout, stderr, log_file = setup_locations("webserver", args.pid, args.stdout, args.stderr, args.log_file)
if args.daemon:
handle = setup_logging(log_file)
stdout = open(stdout, 'w+')
stderr = open(stderr, 'w+')

print(
textwrap.dedent('''\
Running the Gunicorn Server with:
Expand All @@ -771,7 +776,6 @@ def webserver(args):
'-t', str(worker_timeout),
'-b', args.hostname + ':' + str(args.port),
'-n', 'airflow-webserver',
'-p', str(pid),
'-c', 'airflow.www.gunicorn_config'
]

Expand All @@ -782,28 +786,66 @@ def webserver(args):
run_args += ['--error-logfile', str(args.error_logfile)]

if args.daemon:
run_args += ["-D"]
run_args += ['-D', '-p', str(pid)]

if ssl_cert:
run_args += ['--certfile', ssl_cert, '--keyfile', ssl_key]

run_args += ["airflow.www.app:cached_app()"]

gunicorn_master_proc = subprocess.Popen(run_args)
gunicorn_master_proc = None

def kill_proc(dummy_signum, dummy_frame):
gunicorn_master_proc.terminate()
gunicorn_master_proc.wait()
sys.exit(0)

signal.signal(signal.SIGINT, kill_proc)
signal.signal(signal.SIGTERM, kill_proc)
def monitor_gunicorn(gunicorn_master_proc):
# These run forever until SIG{INT, TERM, KILL, ...} signal is sent
if conf.getint('webserver', 'worker_refresh_interval') > 0:
restart_workers(gunicorn_master_proc, num_workers)
else:
while True:
time.sleep(1)

# These run forever until SIG{INT, TERM, KILL, ...} signal is sent
if conf.getint('webserver', 'worker_refresh_interval') > 0:
restart_workers(gunicorn_master_proc, num_workers)
if args.daemon:
base, ext = os.path.splitext(pid)
ctx = daemon.DaemonContext(
pidfile=TimeoutPIDLockFile(base + "-monitor" + ext, -1),
files_preserve=[handle],
stdout=stdout,
stderr=stderr,
signal_map={
signal.SIGINT: kill_proc,
signal.SIGTERM: kill_proc
},
)
with ctx:
subprocess.Popen(run_args)

# Reading pid file directly, since Popen#pid doesn't
# seem to return the right value with DaemonContext.
while True:
try:
with open(pid) as f:
gunicorn_master_proc_pid = int(f.read())
break
except IOError:
logging.debug("Waiting for gunicorn's pid file to be created.")
time.sleep(0.1)

gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid)
monitor_gunicorn(gunicorn_master_proc)

stdout.close()
stderr.close()
else:
while True:
time.sleep(1)
gunicorn_master_proc = subprocess.Popen(run_args)

signal.signal(signal.SIGINT, kill_proc)
signal.signal(signal.SIGTERM, kill_proc)

monitor_gunicorn(gunicorn_master_proc)


def scheduler(args):
Expand Down
56 changes: 56 additions & 0 deletions tests/core.py
Expand Up @@ -1427,6 +1427,62 @@ def test_variables(self):
os.remove('variables1.json')
os.remove('variables2.json')

def test_cli_webserver_foreground(self):
import subprocess

# Confirm that webserver hasn't been launched.
# pgrep returns exit status 1 if no process matched.
self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "airflow"]).wait())
self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait())

# Run webserver in foreground and terminate it.
p = subprocess.Popen(["airflow", "webserver"])
p.terminate()
p.wait()

# Assert that no process remains.
self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "airflow"]).wait())
self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait())

@unittest.skipIf("TRAVIS" in os.environ and bool(os.environ["TRAVIS"]),
"Skipping test due to lack of required file permission")
def test_cli_webserver_background(self):
import subprocess
import psutil

def wait_pidfile(pidfile):
while True:
try:
with open(pidfile) as f:
return int(f.read())
except IOError:
sleep(1)

# Confirm that webserver hasn't been launched.
self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "airflow"]).wait())
self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait())

# Run webserver in background.
subprocess.Popen(["airflow", "webserver", "-D"])
pidfile = cli.setup_locations("webserver")[0]
wait_pidfile(pidfile)

# Assert that gunicorn and its monitor are launched.
self.assertEqual(0, subprocess.Popen(["pgrep", "-c", "airflow"]).wait())
self.assertEqual(0, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait())

# Terminate monitor process.
pidfile = cli.setup_locations("webserver-monitor")[0]
pid = wait_pidfile(pidfile)
p = psutil.Process(pid)
p.terminate()
p.wait()

# Assert that no process remains.
self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "airflow"]).wait())
self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait())


class SecurityTests(unittest.TestCase):
def setUp(self):
configuration.load_test_config()
Expand Down

0 comments on commit a9b20a0

Please sign in to comment.