Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-1235] In webserver, when gunicorn master dies, bubble up exc #3144

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 27 additions & 24 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,54 +14,49 @@
# limitations under the License.

from __future__ import print_function
import logging

import reprlib

import argparse
import json
import logging
import os
import socket
import re
import reprlib
import signal
import subprocess
import sys
import textwrap
from importlib import import_module

import argparse
import threading
import time
import traceback
from builtins import input
from collections import namedtuple
from airflow.utils.timezone import parse as parsedate
import json
from tabulate import tabulate
from importlib import import_module
from urllib.parse import urlunparse

import daemon
from daemon.pidfile import TimeoutPIDLockFile
import signal
import sys
import threading
import traceback
import time
import psutil
import re
from urllib.parse import urlunparse
from daemon.pidfile import TimeoutPIDLockFile
from sqlalchemy import func
from sqlalchemy.orm import exc
from tabulate import tabulate

import airflow
from airflow import api
from airflow import jobs, settings
from airflow import configuration as conf
from airflow import jobs, settings
from airflow.exceptions import AirflowException
from airflow.executors import GetDefaultExecutor
from airflow.models import (DagModel, DagBag, TaskInstance,
DagPickle, DagRun, Variable, DagStat,
Connection, DAG)

from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS)
from airflow.utils import db as db_utils
from airflow.utils.net import get_hostname
from airflow.utils.log.logging_mixin import (LoggingMixin, redirect_stderr,
redirect_stdout)
from airflow.utils.net import get_hostname
from airflow.utils.timezone import parse as parsedate
from airflow.www.app import (cached_app, create_app)

from sqlalchemy import func
from sqlalchemy.orm import exc

api.load_auth()
api_module = import_module(conf.get('cli', 'api_client'))
api_client = api_module.Client(api_base_url=conf.get('cli', 'endpoint_url'),
Expand Down Expand Up @@ -646,6 +641,14 @@ def start_refresh(gunicorn_master_proc):
get_num_workers_running(gunicorn_master_proc))

while True:
# gunicorn_master_proc can be a Popen or Process object, so
# use a new Process handle for checking status on either.
proc_handle = psutil.Process(gunicorn_master_proc.pid)
if not proc_handle.is_running() or \
proc_handle.status in ['zombie', 'dead', 'stopped']:
raise AirflowException('gunicorn master process not running.'
' Cannot start workers.')

num_workers_running = get_num_workers_running(gunicorn_master_proc)
num_ready_workers_running = get_num_ready_workers_running(gunicorn_master_proc)

Expand Down
20 changes: 17 additions & 3 deletions tests/cli/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
#

import unittest

from mock import patch, Mock, MagicMock
from time import sleep

import psutil
from mock import patch, Mock, MagicMock

from airflow import settings
from airflow.bin.cli import get_num_ready_workers_running
from airflow.bin.cli import get_num_ready_workers_running, restart_workers
from airflow.exceptions import AirflowException


class TestCLI(unittest.TestCase):
Expand Down Expand Up @@ -69,3 +69,17 @@ def test_cli_webserver_debug(self):
"webserver terminated with return code {} in debug mode".format(return_code))
p.terminate()
p.wait()

def test_restart_workers_gunicorn_failure(self):

def run_restart_workers_test():
with patch('psutil.Process', return_value=self.process):
with self.assertRaises(AirflowException):
restart_workers(self.gunicorn_master_proc, 0)

self.process.is_running.return_value = False

self.process.is_running.return_value = True
for status in ['zombie', 'dead', 'stopped']:
self.process.status = status
run_restart_workers_test()