diff --git a/superset/reports/notifications/email.py b/superset/reports/notifications/email.py index 358294f1e14c..1f042ded83f5 100644 --- a/superset/reports/notifications/email.py +++ b/superset/reports/notifications/email.py @@ -207,6 +207,8 @@ def send(self) -> None: dryrun=False, header_data=content.header_data, ) - logger.info("Report sent to email") + logger.info( + "Report sent to email, notification content is %s", content.header_data + ) except Exception as ex: raise NotificationError(ex) from ex diff --git a/superset/tasks/scheduler.py b/superset/tasks/scheduler.py index 2a89571a87e4..2db721de7d10 100644 --- a/superset/tasks/scheduler.py +++ b/superset/tasks/scheduler.py @@ -16,6 +16,7 @@ # under the License. import logging +from celery import Celery from celery.exceptions import SoftTimeLimitExceeded from dateutil import parser @@ -70,8 +71,8 @@ def scheduler() -> None: ) -@celery_app.task(name="reports.execute") -def execute(report_schedule_id: int, scheduled_dttm: str) -> None: +@celery_app.task(name="reports.execute", bind=True) +def execute(self: Celery.task, report_schedule_id: int, scheduled_dttm: str) -> None: task_id = None try: task_id = execute.request.id @@ -90,10 +91,12 @@ def execute(report_schedule_id: int, scheduled_dttm: str) -> None: logger.exception( "An unexpected occurred while executing the report: %s", task_id ) + self.update_state(state="FAILURE") except CommandException: logger.exception( "A downstream exception occurred while generating" " a report: %s", task_id ) + self.update_state(state="FAILURE") @celery_app.task(name="reports.prune_log") diff --git a/tests/integration_tests/reports/scheduler_tests.py b/tests/integration_tests/reports/scheduler_tests.py index f8c7873fe5ee..9f3d0d55d886 100644 --- a/tests/integration_tests/reports/scheduler_tests.py +++ b/tests/integration_tests/reports/scheduler_tests.py @@ -14,7 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from typing import List +from random import randint from unittest.mock import patch from freezegun import freeze_time @@ -22,7 +22,7 @@ from superset.extensions import db from superset.reports.models import ReportScheduleType -from superset.tasks.scheduler import scheduler +from superset.tasks.scheduler import execute, scheduler from tests.integration_tests.reports.utils import insert_report_schedule from tests.integration_tests.test_app import app @@ -87,7 +87,6 @@ def test_scheduler_celery_timeout_utc(execute_mock): with freeze_time("2020-01-01T09:00:00Z"): scheduler() - print(execute_mock.call_args) assert execute_mock.call_args[1]["soft_time_limit"] == 3601 assert execute_mock.call_args[1]["time_limit"] == 3610 db.session.delete(report_schedule) @@ -136,3 +135,26 @@ def test_scheduler_feature_flag_off(execute_mock, is_feature_enabled): execute_mock.assert_not_called() db.session.delete(report_schedule) db.session.commit() + + +@patch("superset.reports.commands.execute.AsyncExecuteReportScheduleCommand.__init__") +@patch("superset.reports.commands.execute.AsyncExecuteReportScheduleCommand.run") +@patch("superset.tasks.scheduler.execute.update_state") +def test_execute_task(update_state_mock, command_mock, init_mock): + from superset.reports.commands.exceptions import ReportScheduleUnexpectedError + + with app.app_context(): + report_schedule = insert_report_schedule( + type=ReportScheduleType.ALERT, + name=f"report-{randint(0,1000)}", + crontab="0 4 * * *", + timezone="America/New_York", + ) + init_mock.return_value = None + command_mock.side_effect = ReportScheduleUnexpectedError("Unexpected error") + with freeze_time("2020-01-01T09:00:00Z"): + execute(report_schedule.id, "2020-01-01T09:00:00Z") + update_state_mock.assert_called_with(state="FAILURE") + + db.session.delete(report_schedule) + db.session.commit()