Skip to content

Commit

Permalink
fix: Improve the reliability of alerts & reports (apache#25239)
Browse files Browse the repository at this point in the history
  • Loading branch information
jfrag1 committed Sep 19, 2023
1 parent a724850 commit f672d5d
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 67 deletions.
5 changes: 5 additions & 0 deletions superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,10 @@ class D3Format(TypedDict, total=False):
[86400, "24 hours"],
]

# This is used as a workaround for the alerts & reports scheduler task to get the time
# celery beat triggered it, see https://github.com/celery/celery/issues/6974 for details
CELERY_BEAT_SCHEDULER_EXPIRES = timedelta(weeks=1)

# Default celery config is to use SQLA as a broker, in a production setting
# you'll want to use a proper broker as specified here:
# https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/index.html
Expand Down Expand Up @@ -942,6 +946,7 @@ class CeleryConfig: # pylint: disable=too-few-public-methods
"reports.scheduler": {
"task": "reports.scheduler",
"schedule": crontab(minute="*", hour="*"),
"options": {"expires": int(CELERY_BEAT_SCHEDULER_EXPIRES.total_seconds())},
},
"reports.prune_log": {
"task": "reports.prune_log",
Expand Down
14 changes: 7 additions & 7 deletions superset/tasks/cron_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import logging
from collections.abc import Iterator
from datetime import datetime, timedelta, timezone as dt_timezone
from datetime import datetime, timedelta

from croniter import croniter
from pytz import timezone as pytz_timezone, UnknownTimeZoneError
Expand All @@ -27,10 +27,10 @@
logger = logging.getLogger(__name__)


def cron_schedule_window(cron: str, timezone: str) -> Iterator[datetime]:
def cron_schedule_window(
triggered_at: datetime, cron: str, timezone: str
) -> Iterator[datetime]:
window_size = app.config["ALERT_REPORTS_CRON_WINDOW_SIZE"]
# create a time-aware datetime in utc
time_now = datetime.now(tz=dt_timezone.utc)
try:
tz = pytz_timezone(timezone)
except UnknownTimeZoneError:
Expand All @@ -39,9 +39,9 @@ def cron_schedule_window(cron: str, timezone: str) -> Iterator[datetime]:
logger.warning("Timezone %s was invalid. Falling back to 'UTC'", timezone)
utc = pytz_timezone("UTC")
# convert the current time to the user's local time for comparison
time_now = time_now.astimezone(tz)
start_at = time_now - timedelta(seconds=1)
stop_at = time_now + timedelta(seconds=window_size)
time_now = triggered_at.astimezone(tz)
start_at = time_now - timedelta(seconds=window_size / 2)
stop_at = time_now + timedelta(seconds=window_size / 2)
crons = croniter(cron, start_at)
for schedule in crons.all_next(datetime):
if schedule >= stop_at:
Expand Down
9 changes: 8 additions & 1 deletion superset/tasks/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
import logging
from datetime import datetime

from celery import Celery
from celery.exceptions import SoftTimeLimitExceeded
Expand Down Expand Up @@ -47,9 +48,15 @@ def scheduler() -> None:
return
with session_scope(nullpool=True) as session:
active_schedules = ReportScheduleDAO.find_active(session)
triggered_at = (
datetime.fromisoformat(scheduler.request.expires)
- app.config["CELERY_BEAT_SCHEDULER_EXPIRES"]
if scheduler.request.expires
else datetime.utcnow()
)
for active_schedule in active_schedules:
for schedule in cron_schedule_window(
active_schedule.crontab, active_schedule.timezone
triggered_at, active_schedule.crontab, active_schedule.timezone
):
logger.info(
"Scheduling alert %s eta: %s", active_schedule.name, schedule
Expand Down
145 changes: 86 additions & 59 deletions tests/unit_tests/tasks/test_cron_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import datetime

import pytest
import pytz
from dateutil import parser
from freezegun import freeze_time
from freezegun.api import FakeDatetime

from superset.tasks.cron_util import cron_schedule_window
Expand All @@ -27,23 +25,28 @@
@pytest.mark.parametrize(
"current_dttm, cron, expected",
[
("2020-01-01T08:59:01Z", "0 1 * * *", []),
("2020-01-01T08:59:01+00:00", "0 1 * * *", []),
(
"2020-01-01T08:59:02Z",
"2020-01-01T08:59:32+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-01-01T08:59:59Z",
"2020-01-01T08:59:59+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-01-01T09:00:00Z",
"2020-01-01T09:00:00+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
("2020-01-01T09:00:01Z", "0 1 * * *", []),
(
"2020-01-01T09:00:01+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
("2020-01-01T09:00:30+00:00", "0 1 * * *", []),
],
)
def test_cron_schedule_window_los_angeles(
Expand All @@ -53,34 +56,40 @@ def test_cron_schedule_window_los_angeles(
Reports scheduler: Test cron schedule window for "America/Los_Angeles"
"""

with freeze_time(current_dttm):
datetimes = cron_schedule_window(cron, "America/Los_Angeles")
assert (
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
== expected
)
datetimes = cron_schedule_window(
datetime.fromisoformat(current_dttm), cron, "America/Los_Angeles"
)
assert (
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == expected
)


@pytest.mark.parametrize(
"current_dttm, cron, expected",
[
("2020-01-01T00:59:01Z", "0 1 * * *", []),
("2020-01-01T00:59:01+00:00", "0 1 * * *", []),
("2020-01-01T00:59:02+00:00", "0 1 * * *", []),
(
"2020-01-01T00:59:59+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-01-01T00:59:02Z",
"2020-01-01T01:00:00+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-01-01T00:59:59Z",
"2020-01-01T01:00:01+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-01-01T01:00:00Z",
"2020-01-01T01:00:29+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
("2020-01-01T01:00:01Z", "0 1 * * *", []),
("2020-01-01T01:00:30+00:00", "0 1 * * *", []),
],
)
def test_cron_schedule_window_invalid_timezone(
Expand All @@ -90,35 +99,41 @@ def test_cron_schedule_window_invalid_timezone(
Reports scheduler: Test cron schedule window for "invalid timezone"
"""

with freeze_time(current_dttm):
datetimes = cron_schedule_window(cron, "invalid timezone")
# it should default to UTC
assert (
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
== expected
)
datetimes = cron_schedule_window(
datetime.fromisoformat(current_dttm), cron, "invalid timezone"
)
# it should default to UTC
assert (
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == expected
)


@pytest.mark.parametrize(
"current_dttm, cron, expected",
[
("2020-01-01T05:59:01Z", "0 1 * * *", []),
("2020-01-01T05:59:01+00:00", "0 1 * * *", []),
("2020-01-01T05:59:02+00:00", "0 1 * * *", []),
(
"2020-01-01T05:59:59+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-01-01T05:59:02Z",
"2020-01-01T06:00:00+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-01-01T5:59:59Z",
"2020-01-01T06:00:01+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-01-01T6:00:00",
"2020-01-01T06:00:29+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
("2020-01-01T6:00:01Z", "0 1 * * *", []),
("2020-01-01T06:00:30+00:00", "0 1 * * *", []),
],
)
def test_cron_schedule_window_new_york(
Expand All @@ -128,34 +143,40 @@ def test_cron_schedule_window_new_york(
Reports scheduler: Test cron schedule window for "America/New_York"
"""

with freeze_time(current_dttm, tz_offset=0):
datetimes = cron_schedule_window(cron, "America/New_York")
assert (
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
== expected
)
datetimes = cron_schedule_window(
datetime.fromisoformat(current_dttm), cron, "America/New_York"
)
assert (
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == expected
)


@pytest.mark.parametrize(
"current_dttm, cron, expected",
[
("2020-01-01T06:59:01Z", "0 1 * * *", []),
("2020-01-01T06:59:01+00:00", "0 1 * * *", []),
("2020-01-01T06:59:02+00:00", "0 1 * * *", []),
(
"2020-01-01T06:59:59+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-01-01T06:59:02Z",
"2020-01-01T07:00:00+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-01-01T06:59:59Z",
"2020-01-01T07:00:01+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-01-01T07:00:00",
"2020-01-01T07:00:29+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
("2020-01-01T07:00:01Z", "0 1 * * *", []),
("2020-01-01T07:00:30+00:00", "0 1 * * *", []),
],
)
def test_cron_schedule_window_chicago(
Expand All @@ -165,34 +186,40 @@ def test_cron_schedule_window_chicago(
Reports scheduler: Test cron schedule window for "America/Chicago"
"""

with freeze_time(current_dttm, tz_offset=0):
datetimes = cron_schedule_window(cron, "America/Chicago")
assert (
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
== expected
)
datetimes = cron_schedule_window(
datetime.fromisoformat(current_dttm), cron, "America/Chicago"
)
assert (
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == expected
)


@pytest.mark.parametrize(
"current_dttm, cron, expected",
[
("2020-07-01T05:59:01Z", "0 1 * * *", []),
("2020-07-01T05:59:01+00:00", "0 1 * * *", []),
("2020-07-01T05:59:02+00:00", "0 1 * * *", []),
(
"2020-07-01T05:59:59+00:00",
"0 1 * * *",
[FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-07-01T05:59:02Z",
"2020-07-01T06:00:00+00:00",
"0 1 * * *",
[FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-07-01T05:59:59Z",
"2020-07-01T06:00:01+00:00",
"0 1 * * *",
[FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
(
"2020-07-01T06:00:00",
"2020-07-01T06:00:29+00:00",
"0 1 * * *",
[FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y, %H:%M:%S")],
),
("2020-07-01T06:00:01Z", "0 1 * * *", []),
("2020-07-01T06:00:30+00:00", "0 1 * * *", []),
],
)
def test_cron_schedule_window_chicago_daylight(
Expand All @@ -202,9 +229,9 @@ def test_cron_schedule_window_chicago_daylight(
Reports scheduler: Test cron schedule window for "America/Chicago"
"""

with freeze_time(current_dttm, tz_offset=0):
datetimes = cron_schedule_window(cron, "America/Chicago")
assert (
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
== expected
)
datetimes = cron_schedule_window(
datetime.fromisoformat(current_dttm), cron, "America/Chicago"
)
assert (
list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == expected
)

0 comments on commit f672d5d

Please sign in to comment.