Skip to content

Commit

Permalink
fix: add retry to SQL-based alerting celery task (#10542)
Browse files Browse the repository at this point in the history
* added retry and minimized sqlalchemy object lives

* pylint

* added try catch

* adjusted naming

* added scoped session

* update tests for dbsession

* added requested changes

* nit todo

Co-authored-by: Jason Davis <@dropbox.com>
  • Loading branch information
JasonD28 committed Aug 10, 2020
1 parent 5e944e5 commit 8b9292e
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 91 deletions.
74 changes: 47 additions & 27 deletions superset/tasks/schedules.py
Expand Up @@ -47,12 +47,13 @@
from retry.api import retry_call
from selenium.common.exceptions import WebDriverException
from selenium.webdriver import chrome, firefox
from sqlalchemy.orm import Session
from sqlalchemy.exc import NoSuchColumnError, ResourceClosedError
from werkzeug.http import parse_cookie

from superset import app, db, security_manager, thumbnail_cache
from superset.extensions import celery_app
from superset.models.alerts import Alert, AlertLog
from superset.models.core import Database
from superset.models.dashboard import Dashboard
from superset.models.schedules import (
EmailDeliveryType,
Expand All @@ -79,6 +80,7 @@
logger = logging.getLogger("tasks.email_reports")
logger.setLevel(logging.INFO)

stats_logger = current_app.config["STATS_LOGGER"]
EMAIL_PAGE_RENDER_WAIT = config["EMAIL_PAGE_RENDER_WAIT"]
WEBDRIVER_BASEURL = config["WEBDRIVER_BASEURL"]
WEBDRIVER_BASEURL_USER_FRIENDLY = config["WEBDRIVER_BASEURL_USER_FRIENDLY"]
Expand Down Expand Up @@ -533,6 +535,11 @@ def schedule_email_report( # pylint: disable=unused-argument
name="alerts.run_query",
bind=True,
soft_time_limit=config["EMAIL_ASYNC_TIME_LIMIT_SEC"],
# TODO: find cause of https://github.com/apache/incubator-superset/issues/10530
# and remove retry
autoretry_for=(NoSuchColumnError, ResourceClosedError,),
retry_kwargs={"max_retries": 5},
retry_backoff=True,
)
def schedule_alert_query( # pylint: disable=unused-argument
task: Task,
Expand All @@ -542,24 +549,33 @@ def schedule_alert_query( # pylint: disable=unused-argument
is_test_alert: Optional[bool] = False,
) -> None:
model_cls = get_scheduler_model(report_type)
dbsession = db.create_scoped_session()
schedule = dbsession.query(model_cls).get(schedule_id)

# The user may have disabled the schedule. If so, ignore this
if not schedule or not schedule.active:
logger.info("Ignoring deactivated alert")
return
try:
schedule = db.session.query(model_cls).get(schedule_id)

if report_type == ScheduleType.alert:
if is_test_alert and recipients:
deliver_alert(schedule.id, recipients)
# The user may have disabled the schedule. If so, ignore this
if not schedule or not schedule.active:
logger.info("Ignoring deactivated alert")
return

if run_alert_query(schedule.id, dbsession):
# deliver_dashboard OR deliver_slice
return
else:
raise RuntimeError("Unknown report type")
if report_type == ScheduleType.alert:
if is_test_alert and recipients:
deliver_alert(schedule.id, recipients)
return

if run_alert_query(
schedule.id, schedule.database_id, schedule.sql, schedule.label
):
# deliver_dashboard OR deliver_slice
return
else:
raise RuntimeError("Unknown report type")
except NoSuchColumnError as column_error:
stats_logger.incr("run_alert_task.error.nosuchcolumnerror")
raise column_error
except ResourceClosedError as resource_error:
stats_logger.incr("run_alert_task.error.resourceclosederror")
raise resource_error


class AlertState:
Expand Down Expand Up @@ -618,51 +634,55 @@ def deliver_alert(alert_id: int, recipients: Optional[str] = None) -> None:
_deliver_email(recipients, deliver_as_group, subject, body, data, images)


def run_alert_query(alert_id: int, dbsession: Session) -> Optional[bool]:
def run_alert_query(
alert_id: int, database_id: int, sql: str, label: str
) -> Optional[bool]:
"""
Execute alert.sql and return value if any rows are returned
"""
alert = db.session.query(Alert).get(alert_id)

logger.info("Processing alert ID: %i", alert.id)
database = alert.database
logger.info("Processing alert ID: %i", alert_id)
database = db.session.query(Database).get(database_id)
if not database:
logger.error("Alert database not preset")
return None

if not alert.sql:
if not sql:
logger.error("Alert SQL not preset")
return None

parsed_query = ParsedQuery(alert.sql)
parsed_query = ParsedQuery(sql)
sql = parsed_query.stripped()

state = None
dttm_start = datetime.utcnow()

df = pd.DataFrame()
try:
logger.info("Evaluating SQL for alert %s", alert)
logger.info("Evaluating SQL for alert <%s:%s>", alert_id, label)
df = database.get_df(sql)
except Exception as exc: # pylint: disable=broad-except
state = AlertState.ERROR
logging.exception(exc)
logging.error("Failed at evaluating alert: %s (%s)", alert.label, alert.id)
logging.error("Failed at evaluating alert: %s (%s)", label, alert_id)

dttm_end = datetime.utcnow()
last_eval_dttm = datetime.utcnow()

if state != AlertState.ERROR:
alert.last_eval_dttm = datetime.utcnow()
if not df.empty:
# Looking for truthy cells
for row in df.to_records():
if any(row):
state = AlertState.TRIGGER
deliver_alert(alert.id)
deliver_alert(alert_id)
break
if not state:
state = AlertState.PASS

db.session.commit()
alert = db.session.query(Alert).get(alert_id)
if state != AlertState.ERROR:
alert.last_eval_dttm = last_eval_dttm
alert.last_state = state
alert.logs.append(
AlertLog(
Expand All @@ -672,7 +692,7 @@ def run_alert_query(alert_id: int, dbsession: Session) -> Optional[bool]:
state=state,
)
)
dbsession.commit()
db.session.commit()

return None

Expand Down
130 changes: 66 additions & 64 deletions tests/alerts_tests.py
Expand Up @@ -38,41 +38,42 @@ def setup_database():
slice_id = db.session.query(Slice).all()[0].id
database_id = utils.get_example_database().id

alert1 = Alert(
id=1,
label="alert_1",
active=True,
crontab="*/1 * * * *",
sql="SELECT 0",
alert_type="email",
slice_id=slice_id,
database_id=database_id,
)
alert2 = Alert(
id=2,
label="alert_2",
active=True,
crontab="*/1 * * * *",
sql="SELECT 55",
alert_type="email",
slice_id=slice_id,
database_id=database_id,
)
alert3 = Alert(
id=3,
label="alert_3",
active=False,
crontab="*/1 * * * *",
sql="UPDATE 55",
alert_type="email",
slice_id=slice_id,
database_id=database_id,
)
alert4 = Alert(id=4, active=False, label="alert_4", database_id=-1)
alert5 = Alert(id=5, active=False, label="alert_5", database_id=database_id)

for num in range(1, 6):
eval(f"db.session.add(alert{num})")
alerts = [
Alert(
id=1,
label="alert_1",
active=True,
crontab="*/1 * * * *",
sql="SELECT 0",
alert_type="email",
slice_id=slice_id,
database_id=database_id,
),
Alert(
id=2,
label="alert_2",
active=True,
crontab="*/1 * * * *",
sql="SELECT 55",
alert_type="email",
slice_id=slice_id,
database_id=database_id,
),
Alert(
id=3,
label="alert_3",
active=False,
crontab="*/1 * * * *",
sql="UPDATE 55",
alert_type="email",
slice_id=slice_id,
database_id=database_id,
),
Alert(id=4, active=False, label="alert_4", database_id=-1),
Alert(id=5, active=False, label="alert_5", database_id=database_id),
]

db.session.bulk_save_objects(alerts)
db.session.commit()
yield db.session

Expand All @@ -82,45 +83,46 @@ def setup_database():

@patch("superset.tasks.schedules.deliver_alert")
@patch("superset.tasks.schedules.logging.Logger.error")
def test_run_alert_query(mock_error, mock_deliver, setup_database):
database = setup_database
run_alert_query(database.query(Alert).filter_by(id=1).one().id, database)
alert1 = database.query(Alert).filter_by(id=1).one()
assert mock_deliver.call_count == 0
assert len(alert1.logs) == 1
assert alert1.logs[0].alert_id == 1
assert alert1.logs[0].state == "pass"

run_alert_query(database.query(Alert).filter_by(id=2).one().id, database)
alert2 = database.query(Alert).filter_by(id=2).one()
assert mock_deliver.call_count == 1
assert len(alert2.logs) == 1
assert alert2.logs[0].alert_id == 2
assert alert2.logs[0].state == "trigger"

run_alert_query(database.query(Alert).filter_by(id=3).one().id, database)
alert3 = database.query(Alert).filter_by(id=3).one()
assert mock_deliver.call_count == 1
def test_run_alert_query(mock_error, mock_deliver_alert, setup_database):
dbsession = setup_database

# Test passing alert with null SQL result
alert1 = dbsession.query(Alert).filter_by(id=1).one()
run_alert_query(alert1.id, alert1.database_id, alert1.sql, alert1.label)
assert mock_deliver_alert.call_count == 0
assert mock_error.call_count == 0

# Test passing alert with True SQL result
alert2 = dbsession.query(Alert).filter_by(id=2).one()
run_alert_query(alert2.id, alert2.database_id, alert2.sql, alert2.label)
assert mock_deliver_alert.call_count == 1
assert mock_error.call_count == 0

# Test passing alert with error in SQL query
alert3 = dbsession.query(Alert).filter_by(id=3).one()
run_alert_query(alert3.id, alert3.database_id, alert3.sql, alert3.label)
assert mock_deliver_alert.call_count == 1
assert mock_error.call_count == 2
assert len(alert3.logs) == 1
assert alert3.logs[0].alert_id == 3
assert alert3.logs[0].state == "error"

run_alert_query(database.query(Alert).filter_by(id=4).one().id, database)
assert mock_deliver.call_count == 1
# Test passing alert with invalid database
alert4 = dbsession.query(Alert).filter_by(id=4).one()
run_alert_query(alert4.id, alert4.database_id, alert4.sql, alert4.label)
assert mock_deliver_alert.call_count == 1
assert mock_error.call_count == 3

run_alert_query(database.query(Alert).filter_by(id=5).one().id, database)
assert mock_deliver.call_count == 1
# Test passing alert with no SQL statement
alert5 = dbsession.query(Alert).filter_by(id=5).one()
run_alert_query(alert5.id, alert5.database_id, alert5.sql, alert5.label)
assert mock_deliver_alert.call_count == 1
assert mock_error.call_count == 4


@patch("superset.tasks.schedules.deliver_alert")
@patch("superset.tasks.schedules.run_alert_query")
def test_schedule_alert_query(mock_run_alert, mock_deliver_alert, setup_database):
database = setup_database
active_alert = database.query(Alert).filter_by(id=1).one()
inactive_alert = database.query(Alert).filter_by(id=3).one()
dbsession = setup_database
active_alert = dbsession.query(Alert).filter_by(id=1).one()
inactive_alert = dbsession.query(Alert).filter_by(id=3).one()

# Test that inactive alerts are no processed
schedule_alert_query(report_type=ScheduleType.alert, schedule_id=inactive_alert.id)
Expand Down

0 comments on commit 8b9292e

Please sign in to comment.