From 8b9292ed057540aaae3fe1d09952345261e92bc9 Mon Sep 17 00:00:00 2001 From: Jason Davis <32852580+JasonD28@users.noreply.github.com> Date: Mon, 10 Aug 2020 10:20:43 -0700 Subject: [PATCH] fix: add retry to SQL-based alerting celery task (#10542) * added retry and minimized sqlalchemy object lives * pylint * added try catch * adjusted naming * added scoped session * update tests for dbsession * added requested changes * nit todo Co-authored-by: Jason Davis <@dropbox.com> --- superset/tasks/schedules.py | 74 ++++++++++++-------- tests/alerts_tests.py | 130 ++++++++++++++++++------------------ 2 files changed, 113 insertions(+), 91 deletions(-) diff --git a/superset/tasks/schedules.py b/superset/tasks/schedules.py index 4fd55aaf6d9b..e297da2d2a5c 100644 --- a/superset/tasks/schedules.py +++ b/superset/tasks/schedules.py @@ -47,12 +47,13 @@ from retry.api import retry_call from selenium.common.exceptions import WebDriverException from selenium.webdriver import chrome, firefox -from sqlalchemy.orm import Session +from sqlalchemy.exc import NoSuchColumnError, ResourceClosedError from werkzeug.http import parse_cookie from superset import app, db, security_manager, thumbnail_cache from superset.extensions import celery_app from superset.models.alerts import Alert, AlertLog +from superset.models.core import Database from superset.models.dashboard import Dashboard from superset.models.schedules import ( EmailDeliveryType, @@ -79,6 +80,7 @@ logger = logging.getLogger("tasks.email_reports") logger.setLevel(logging.INFO) +stats_logger = current_app.config["STATS_LOGGER"] EMAIL_PAGE_RENDER_WAIT = config["EMAIL_PAGE_RENDER_WAIT"] WEBDRIVER_BASEURL = config["WEBDRIVER_BASEURL"] WEBDRIVER_BASEURL_USER_FRIENDLY = config["WEBDRIVER_BASEURL_USER_FRIENDLY"] @@ -533,6 +535,11 @@ def schedule_email_report( # pylint: disable=unused-argument name="alerts.run_query", bind=True, soft_time_limit=config["EMAIL_ASYNC_TIME_LIMIT_SEC"], + # TODO: find cause of https://github.com/apache/incubator-superset/issues/10530 + # and remove retry + autoretry_for=(NoSuchColumnError, ResourceClosedError,), + retry_kwargs={"max_retries": 5}, + retry_backoff=True, ) def schedule_alert_query( # pylint: disable=unused-argument task: Task, @@ -542,24 +549,33 @@ def schedule_alert_query( # pylint: disable=unused-argument is_test_alert: Optional[bool] = False, ) -> None: model_cls = get_scheduler_model(report_type) - dbsession = db.create_scoped_session() - schedule = dbsession.query(model_cls).get(schedule_id) - # The user may have disabled the schedule. If so, ignore this - if not schedule or not schedule.active: - logger.info("Ignoring deactivated alert") - return + try: + schedule = db.session.query(model_cls).get(schedule_id) - if report_type == ScheduleType.alert: - if is_test_alert and recipients: - deliver_alert(schedule.id, recipients) + # The user may have disabled the schedule. If so, ignore this + if not schedule or not schedule.active: + logger.info("Ignoring deactivated alert") return - if run_alert_query(schedule.id, dbsession): - # deliver_dashboard OR deliver_slice - return - else: - raise RuntimeError("Unknown report type") + if report_type == ScheduleType.alert: + if is_test_alert and recipients: + deliver_alert(schedule.id, recipients) + return + + if run_alert_query( + schedule.id, schedule.database_id, schedule.sql, schedule.label + ): + # deliver_dashboard OR deliver_slice + return + else: + raise RuntimeError("Unknown report type") + except NoSuchColumnError as column_error: + stats_logger.incr("run_alert_task.error.nosuchcolumnerror") + raise column_error + except ResourceClosedError as resource_error: + stats_logger.incr("run_alert_task.error.resourceclosederror") + raise resource_error class AlertState: @@ -618,23 +634,23 @@ def deliver_alert(alert_id: int, recipients: Optional[str] = None) -> None: _deliver_email(recipients, deliver_as_group, subject, body, data, images) -def run_alert_query(alert_id: int, dbsession: Session) -> Optional[bool]: +def run_alert_query( + alert_id: int, database_id: int, sql: str, label: str +) -> Optional[bool]: """ Execute alert.sql and return value if any rows are returned """ - alert = db.session.query(Alert).get(alert_id) - - logger.info("Processing alert ID: %i", alert.id) - database = alert.database + logger.info("Processing alert ID: %i", alert_id) + database = db.session.query(Database).get(database_id) if not database: logger.error("Alert database not preset") return None - if not alert.sql: + if not sql: logger.error("Alert SQL not preset") return None - parsed_query = ParsedQuery(alert.sql) + parsed_query = ParsedQuery(sql) sql = parsed_query.stripped() state = None @@ -642,27 +658,31 @@ def run_alert_query(alert_id: int, dbsession: Session) -> Optional[bool]: df = pd.DataFrame() try: - logger.info("Evaluating SQL for alert %s", alert) + logger.info("Evaluating SQL for alert <%s:%s>", alert_id, label) df = database.get_df(sql) except Exception as exc: # pylint: disable=broad-except state = AlertState.ERROR logging.exception(exc) - logging.error("Failed at evaluating alert: %s (%s)", alert.label, alert.id) + logging.error("Failed at evaluating alert: %s (%s)", label, alert_id) dttm_end = datetime.utcnow() + last_eval_dttm = datetime.utcnow() if state != AlertState.ERROR: - alert.last_eval_dttm = datetime.utcnow() if not df.empty: # Looking for truthy cells for row in df.to_records(): if any(row): state = AlertState.TRIGGER - deliver_alert(alert.id) + deliver_alert(alert_id) break if not state: state = AlertState.PASS + db.session.commit() + alert = db.session.query(Alert).get(alert_id) + if state != AlertState.ERROR: + alert.last_eval_dttm = last_eval_dttm alert.last_state = state alert.logs.append( AlertLog( @@ -672,7 +692,7 @@ def run_alert_query(alert_id: int, dbsession: Session) -> Optional[bool]: state=state, ) ) - dbsession.commit() + db.session.commit() return None diff --git a/tests/alerts_tests.py b/tests/alerts_tests.py index c78847cfa5dd..5749821484e9 100644 --- a/tests/alerts_tests.py +++ b/tests/alerts_tests.py @@ -38,41 +38,42 @@ def setup_database(): slice_id = db.session.query(Slice).all()[0].id database_id = utils.get_example_database().id - alert1 = Alert( - id=1, - label="alert_1", - active=True, - crontab="*/1 * * * *", - sql="SELECT 0", - alert_type="email", - slice_id=slice_id, - database_id=database_id, - ) - alert2 = Alert( - id=2, - label="alert_2", - active=True, - crontab="*/1 * * * *", - sql="SELECT 55", - alert_type="email", - slice_id=slice_id, - database_id=database_id, - ) - alert3 = Alert( - id=3, - label="alert_3", - active=False, - crontab="*/1 * * * *", - sql="UPDATE 55", - alert_type="email", - slice_id=slice_id, - database_id=database_id, - ) - alert4 = Alert(id=4, active=False, label="alert_4", database_id=-1) - alert5 = Alert(id=5, active=False, label="alert_5", database_id=database_id) - - for num in range(1, 6): - eval(f"db.session.add(alert{num})") + alerts = [ + Alert( + id=1, + label="alert_1", + active=True, + crontab="*/1 * * * *", + sql="SELECT 0", + alert_type="email", + slice_id=slice_id, + database_id=database_id, + ), + Alert( + id=2, + label="alert_2", + active=True, + crontab="*/1 * * * *", + sql="SELECT 55", + alert_type="email", + slice_id=slice_id, + database_id=database_id, + ), + Alert( + id=3, + label="alert_3", + active=False, + crontab="*/1 * * * *", + sql="UPDATE 55", + alert_type="email", + slice_id=slice_id, + database_id=database_id, + ), + Alert(id=4, active=False, label="alert_4", database_id=-1), + Alert(id=5, active=False, label="alert_5", database_id=database_id), + ] + + db.session.bulk_save_objects(alerts) db.session.commit() yield db.session @@ -82,45 +83,46 @@ def setup_database(): @patch("superset.tasks.schedules.deliver_alert") @patch("superset.tasks.schedules.logging.Logger.error") -def test_run_alert_query(mock_error, mock_deliver, setup_database): - database = setup_database - run_alert_query(database.query(Alert).filter_by(id=1).one().id, database) - alert1 = database.query(Alert).filter_by(id=1).one() - assert mock_deliver.call_count == 0 - assert len(alert1.logs) == 1 - assert alert1.logs[0].alert_id == 1 - assert alert1.logs[0].state == "pass" - - run_alert_query(database.query(Alert).filter_by(id=2).one().id, database) - alert2 = database.query(Alert).filter_by(id=2).one() - assert mock_deliver.call_count == 1 - assert len(alert2.logs) == 1 - assert alert2.logs[0].alert_id == 2 - assert alert2.logs[0].state == "trigger" - - run_alert_query(database.query(Alert).filter_by(id=3).one().id, database) - alert3 = database.query(Alert).filter_by(id=3).one() - assert mock_deliver.call_count == 1 +def test_run_alert_query(mock_error, mock_deliver_alert, setup_database): + dbsession = setup_database + + # Test passing alert with null SQL result + alert1 = dbsession.query(Alert).filter_by(id=1).one() + run_alert_query(alert1.id, alert1.database_id, alert1.sql, alert1.label) + assert mock_deliver_alert.call_count == 0 + assert mock_error.call_count == 0 + + # Test passing alert with True SQL result + alert2 = dbsession.query(Alert).filter_by(id=2).one() + run_alert_query(alert2.id, alert2.database_id, alert2.sql, alert2.label) + assert mock_deliver_alert.call_count == 1 + assert mock_error.call_count == 0 + + # Test passing alert with error in SQL query + alert3 = dbsession.query(Alert).filter_by(id=3).one() + run_alert_query(alert3.id, alert3.database_id, alert3.sql, alert3.label) + assert mock_deliver_alert.call_count == 1 assert mock_error.call_count == 2 - assert len(alert3.logs) == 1 - assert alert3.logs[0].alert_id == 3 - assert alert3.logs[0].state == "error" - run_alert_query(database.query(Alert).filter_by(id=4).one().id, database) - assert mock_deliver.call_count == 1 + # Test passing alert with invalid database + alert4 = dbsession.query(Alert).filter_by(id=4).one() + run_alert_query(alert4.id, alert4.database_id, alert4.sql, alert4.label) + assert mock_deliver_alert.call_count == 1 assert mock_error.call_count == 3 - run_alert_query(database.query(Alert).filter_by(id=5).one().id, database) - assert mock_deliver.call_count == 1 + # Test passing alert with no SQL statement + alert5 = dbsession.query(Alert).filter_by(id=5).one() + run_alert_query(alert5.id, alert5.database_id, alert5.sql, alert5.label) + assert mock_deliver_alert.call_count == 1 assert mock_error.call_count == 4 @patch("superset.tasks.schedules.deliver_alert") @patch("superset.tasks.schedules.run_alert_query") def test_schedule_alert_query(mock_run_alert, mock_deliver_alert, setup_database): - database = setup_database - active_alert = database.query(Alert).filter_by(id=1).one() - inactive_alert = database.query(Alert).filter_by(id=3).one() + dbsession = setup_database + active_alert = dbsession.query(Alert).filter_by(id=1).one() + inactive_alert = dbsession.query(Alert).filter_by(id=3).one() # Test that inactive alerts are no processed schedule_alert_query(report_type=ScheduleType.alert, schedule_id=inactive_alert.id)