diff --git a/superset/migrations/versions/cbe71abde154_fix_report_schedule_and_log.py b/superset/migrations/versions/cbe71abde154_fix_report_schedule_and_log.py new file mode 100644 index 000000000000..5c88606511d9 --- /dev/null +++ b/superset/migrations/versions/cbe71abde154_fix_report_schedule_and_log.py @@ -0,0 +1,84 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""fix report schedule and execution log + +Revision ID: cbe71abde154 +Revises: a9422eeaae74 +Create Date: 2022-05-03 19:39:32.074608 + +""" + +# revision identifiers, used by Alembic. +revision = "cbe71abde154" +down_revision = "a9422eeaae74" + +from alembic import op +from sqlalchemy import Column, Float, Integer, String, Text +from sqlalchemy.ext.declarative import declarative_base + +from superset import db +from superset.models.reports import ReportState + +Base = declarative_base() + + +class ReportExecutionLog(Base): + __tablename__ = "report_execution_log" + + id = Column(Integer, primary_key=True) + state = Column(String(50), nullable=False) + value = Column(Float) + value_row_json = Column(Text) + + +class ReportSchedule(Base): + __tablename__ = "report_schedule" + + id = Column(Integer, primary_key=True) + last_state = Column(String(50)) + last_value = Column(Float) + last_value_row_json = Column(Text) + + +def upgrade(): + bind = op.get_bind() + session = db.Session(bind=bind) + + for schedule in ( + session.query(ReportSchedule) + .filter(ReportSchedule.last_state == ReportState.WORKING) + .all() + ): + schedule.last_value = None + schedule.last_value_row_json = None + + session.commit() + + for execution_log in ( + session.query(ReportExecutionLog) + .filter(ReportExecutionLog.state == ReportState.WORKING) + .all() + ): + execution_log.value = None + execution_log.value_row_json = None + + session.commit() + session.close() + + +def downgrade(): + pass diff --git a/superset/reports/commands/execute.py b/superset/reports/commands/execute.py index 9dc3cc195564..ad1b1edcd2a7 100644 --- a/superset/reports/commands/execute.py +++ b/superset/reports/commands/execute.py @@ -94,35 +94,39 @@ def __init__( self._start_dttm = datetime.utcnow() self._execution_id = execution_id - def set_state_and_log( + def update_report_schedule_and_log( self, state: ReportState, error_message: Optional[str] = None, ) -> None: """ - Updates current ReportSchedule state and TS. If on final state writes the log - for this execution + Update the report schedule state et al. and reflect the change in the execution + log. """ - now_dttm = datetime.utcnow() - self.set_state(state, now_dttm) - self.create_log( - state, - error_message=error_message, - ) - def set_state(self, state: ReportState, dttm: datetime) -> None: + self.update_report_schedule(state) + self.create_log(error_message) + + def update_report_schedule(self, state: ReportState) -> None: """ - Set the current report schedule state, on this case we want to - commit immediately + Update the report schedule state et al. + + When the report state is WORKING we must ensure that the values from the last + execution run are cleared to ensure that they are not propagated to the + execution log. """ + + if state == ReportState.WORKING: + self._report_schedule.last_value = None + self._report_schedule.last_value_row_json = None + self._report_schedule.last_state = state - self._report_schedule.last_eval_dttm = dttm + self._report_schedule.last_eval_dttm = datetime.utcnow() + self._session.merge(self._report_schedule) self._session.commit() - def create_log( - self, state: ReportState, error_message: Optional[str] = None - ) -> None: + def create_log(self, error_message: Optional[str] = None) -> None: """ Creates a Report execution log, uses the current computed last_value for Alerts """ @@ -132,7 +136,7 @@ def create_log( end_dttm=datetime.utcnow(), value=self._report_schedule.last_value, value_row_json=self._report_schedule.last_value_row_json, - state=state, + state=self._report_schedule.last_state, error_message=error_message, report_schedule=self._report_schedule, uuid=self._execution_id, @@ -489,17 +493,19 @@ class ReportNotTriggeredErrorState(BaseReportState): initial = True def next(self) -> None: - self.set_state_and_log(ReportState.WORKING) + self.update_report_schedule_and_log(ReportState.WORKING) try: # If it's an alert check if the alert is triggered if self._report_schedule.type == ReportScheduleType.ALERT: if not AlertCommand(self._report_schedule).run(): - self.set_state_and_log(ReportState.NOOP) + self.update_report_schedule_and_log(ReportState.NOOP) return self.send() - self.set_state_and_log(ReportState.SUCCESS) + self.update_report_schedule_and_log(ReportState.SUCCESS) except CommandException as first_ex: - self.set_state_and_log(ReportState.ERROR, error_message=str(first_ex)) + self.update_report_schedule_and_log( + ReportState.ERROR, error_message=str(first_ex) + ) # TODO (dpgaspar) convert this logic to a new state eg: ERROR_ON_GRACE if not self.is_in_error_grace_period(): try: @@ -508,12 +514,12 @@ def next(self) -> None: f" {self._report_schedule.name}", str(first_ex), ) - self.set_state_and_log( + self.update_report_schedule_and_log( ReportState.ERROR, error_message=REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER, ) except CommandException as second_ex: - self.set_state_and_log( + self.update_report_schedule_and_log( ReportState.ERROR, error_message=str(second_ex) ) raise first_ex @@ -532,13 +538,13 @@ class ReportWorkingState(BaseReportState): def next(self) -> None: if self.is_on_working_timeout(): exception_timeout = ReportScheduleWorkingTimeoutError() - self.set_state_and_log( + self.update_report_schedule_and_log( ReportState.ERROR, error_message=str(exception_timeout), ) raise exception_timeout exception_working = ReportSchedulePreviousWorkingError() - self.set_state_and_log( + self.update_report_schedule_and_log( ReportState.WORKING, error_message=str(exception_working), ) @@ -559,15 +565,15 @@ class ReportSuccessState(BaseReportState): def next(self) -> None: if self._report_schedule.type == ReportScheduleType.ALERT: if self.is_in_grace_period(): - self.set_state_and_log( + self.update_report_schedule_and_log( ReportState.GRACE, error_message=str(ReportScheduleAlertGracePeriodError()), ) return - self.set_state_and_log(ReportState.WORKING) + self.update_report_schedule_and_log(ReportState.WORKING) try: if not AlertCommand(self._report_schedule).run(): - self.set_state_and_log(ReportState.NOOP) + self.update_report_schedule_and_log(ReportState.NOOP) return except CommandException as ex: self.send_error( @@ -575,7 +581,7 @@ def next(self) -> None: f" {self._report_schedule.name}", str(ex), ) - self.set_state_and_log( + self.update_report_schedule_and_log( ReportState.ERROR, error_message=REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER, ) @@ -583,9 +589,11 @@ def next(self) -> None: try: self.send() - self.set_state_and_log(ReportState.SUCCESS) + self.update_report_schedule_and_log(ReportState.SUCCESS) except CommandException as ex: - self.set_state_and_log(ReportState.ERROR, error_message=str(ex)) + self.update_report_schedule_and_log( + ReportState.ERROR, error_message=str(ex) + ) class ReportScheduleStateMachine: # pylint: disable=too-few-public-methods diff --git a/tests/integration_tests/reports/commands_tests.py b/tests/integration_tests/reports/commands_tests.py index a334854d58de..7629bdd5b458 100644 --- a/tests/integration_tests/reports/commands_tests.py +++ b/tests/integration_tests/reports/commands_tests.py @@ -122,6 +122,11 @@ def assert_log(state: str, error_message: Optional[str] = None): assert state in log_states assert error_message in [log.error_message for log in logs] + for log in logs: + if log.state == ReportState.WORKING: + assert log.value is None + assert log.value_row_json is None + def create_report_notification( email_target: Optional[str] = None, @@ -370,11 +375,15 @@ def create_report_slack_chart_working(): ) report_schedule.last_state = ReportState.WORKING report_schedule.last_eval_dttm = datetime(2020, 1, 1, 0, 0) + report_schedule.last_value = None + report_schedule.last_value_row_json = None db.session.commit() log = ReportExecutionLog( scheduled_dttm=report_schedule.last_eval_dttm, start_dttm=report_schedule.last_eval_dttm, end_dttm=report_schedule.last_eval_dttm, + value=report_schedule.last_value, + value_row_json=report_schedule.last_value_row_json, state=ReportState.WORKING, report_schedule=report_schedule, uuid=uuid4(),