From 394c9a19fd078bf5c7afbfe3024559028c187b8b Mon Sep 17 00:00:00 2001 From: Phillip Kelley-Dotson Date: Fri, 25 Mar 2022 13:25:44 -0700 Subject: [PATCH] chore: remove old alerts and configs keys (#19261) * remove templates * remove models and more templates * remove view * remove tasks * remove views * remove schedule models * remove views, init files config code * remove supersetapp init code * remove tests and clean up pylint errors * remove unused import * pylint * run black * remove deprecate notice --- superset/cli/test.py | 22 - superset/config.py | 12 - superset/initialization/__init__.py | 57 +- superset/models/__init__.py | 10 +- superset/models/alerts.py | 176 ----- superset/models/schedules.py | 104 --- superset/tasks/alerts/__init__.py | 17 - superset/tasks/alerts/observer.py | 96 --- superset/tasks/alerts/validator.py | 111 --- superset/tasks/celery_app.py | 2 +- superset/tasks/schedules.py | 855 ---------------------- superset/views/__init__.py | 1 - superset/views/alerts.py | 204 +----- superset/views/schedules.py | 349 --------- tests/integration_tests/alerts_tests.py | 414 ----------- tests/integration_tests/schedules_test.py | 596 --------------- 16 files changed, 6 insertions(+), 3020 deletions(-) delete mode 100644 superset/models/alerts.py delete mode 100644 superset/models/schedules.py delete mode 100644 superset/tasks/alerts/__init__.py delete mode 100644 superset/tasks/alerts/observer.py delete mode 100644 superset/tasks/alerts/validator.py delete mode 100644 superset/tasks/schedules.py delete mode 100644 superset/views/schedules.py delete mode 100644 tests/integration_tests/alerts_tests.py delete mode 100644 tests/integration_tests/schedules_test.py diff --git a/superset/cli/test.py b/superset/cli/test.py index df0142b65409..db065287b008 100755 --- a/superset/cli/test.py +++ b/superset/cli/test.py @@ -15,7 +15,6 @@ # specific language governing permissions and limitations # under the License. import logging -from datetime import datetime, timedelta import click from colorama import Fore @@ -23,7 +22,6 @@ import superset.utils.database as database_utils from superset import app, security_manager -from superset.utils.celery import session_scope logger = logging.getLogger(__name__) @@ -88,23 +86,3 @@ def load_test_users_run() -> None: password="general", ) sm.get_session.commit() - - -@click.command() -@with_appcontext -def alert() -> None: - """Run the alert scheduler loop""" - # this command is just for testing purposes - # pylint: disable=import-outside-toplevel - from superset.models.schedules import ScheduleType - from superset.tasks.schedules import schedule_window - - click.secho("Processing one alert loop", fg="green") - with session_scope(nullpool=True) as session: - schedule_window( - report_type=ScheduleType.alert, - start_at=datetime.now() - timedelta(1000), - stop_at=datetime.now(), - resolution=6000, - session=session, - ) diff --git a/superset/config.py b/superset/config.py index b578c99354a5..80224a68c361 100644 --- a/superset/config.py +++ b/superset/config.py @@ -1051,18 +1051,6 @@ def SQL_QUERY_MUTATOR( # pylint: disable=invalid-name,unused-argument return sql -# Enable / disable scheduled email reports -# -# Warning: This config key is deprecated and will be removed in version 2.0.0" -ENABLE_SCHEDULED_EMAIL_REPORTS = False - -# Enable / disable Alerts, where users can define custom SQL that -# will send emails with screenshots of charts or dashboards periodically -# if it meets the criteria -# -# Warning: This config key is deprecated and will be removed in version 2.0.0" -ENABLE_ALERTS = False - # --------------------------------------------------- # Alerts & Reports # --------------------------------------------------- diff --git a/superset/initialization/__init__.py b/superset/initialization/__init__.py index f6ffd3ec3a09..2b970b718fad 100644 --- a/superset/initialization/__init__.py +++ b/superset/initialization/__init__.py @@ -150,13 +150,7 @@ def init_views(self) -> None: from superset.reports.logs.api import ReportExecutionLogRestApi from superset.security.api import SecurityRestApi from superset.views.access_requests import AccessRequestsModelView - from superset.views.alerts import ( - AlertLogModelView, - AlertModelView, - AlertObservationModelView, - AlertView, - ReportView, - ) + from superset.views.alerts import AlertView from superset.views.annotations import ( AnnotationLayerModelView, AnnotationModelView, @@ -185,10 +179,6 @@ def init_views(self) -> None: from superset.views.log.api import LogRestApi from superset.views.log.views import LogModelView from superset.views.redirects import R - from superset.views.schedules import ( - DashboardEmailScheduleView, - SliceEmailScheduleView, - ) from superset.views.sql_lab import ( SavedQueryView, SavedQueryViewApi, @@ -393,50 +383,6 @@ def init_views(self) -> None: # # Conditionally setup email views # - if self.config["ENABLE_SCHEDULED_EMAIL_REPORTS"]: - logging.warning( - "ENABLE_SCHEDULED_EMAIL_REPORTS " - "is deprecated and will be removed in version 2.0.0" - ) - - appbuilder.add_separator( - "Manage", cond=lambda: self.config["ENABLE_SCHEDULED_EMAIL_REPORTS"] - ) - appbuilder.add_view( - DashboardEmailScheduleView, - "Dashboard Email Schedules", - label=__("Dashboard Emails"), - category="Manage", - category_label=__("Manage"), - icon="fa-search", - menu_cond=lambda: self.config["ENABLE_SCHEDULED_EMAIL_REPORTS"], - ) - appbuilder.add_view( - SliceEmailScheduleView, - "Chart Emails", - label=__("Chart Email Schedules"), - category="Manage", - category_label=__("Manage"), - icon="fa-search", - menu_cond=lambda: self.config["ENABLE_SCHEDULED_EMAIL_REPORTS"], - ) - - if self.config["ENABLE_ALERTS"]: - logging.warning( - "ENABLE_ALERTS is deprecated and will be removed in version 2.0.0" - ) - - appbuilder.add_view( - AlertModelView, - "Alerts", - label=__("Alerts"), - category="Manage", - category_label=__("Manage"), - icon="fa-exclamation-triangle", - menu_cond=lambda: bool(self.config["ENABLE_ALERTS"]), - ) - appbuilder.add_view_no_menu(AlertLogModelView) - appbuilder.add_view_no_menu(AlertObservationModelView) appbuilder.add_view( AlertView, @@ -447,7 +393,6 @@ def init_views(self) -> None: icon="fa-exclamation-triangle", menu_cond=lambda: feature_flag_manager.is_feature_enabled("ALERT_REPORTS"), ) - appbuilder.add_view_no_menu(ReportView) appbuilder.add_view( AccessRequestsModelView, diff --git a/superset/models/__init__.py b/superset/models/__init__.py index 573d22d1094f..a102a0fff59a 100644 --- a/superset/models/__init__.py +++ b/superset/models/__init__.py @@ -14,12 +14,4 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from . import ( - alerts, - core, - datasource_access_request, - dynamic_plugins, - schedules, - sql_lab, - user_attributes, -) +from . import core, datasource_access_request, dynamic_plugins, sql_lab, user_attributes diff --git a/superset/models/alerts.py b/superset/models/alerts.py deleted file mode 100644 index 163dcf027de0..000000000000 --- a/superset/models/alerts.py +++ /dev/null @@ -1,176 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -"""Models for scheduled execution of jobs""" -import json -import textwrap -from datetime import datetime -from typing import Any, Optional - -from flask_appbuilder import Model -from sqlalchemy import ( - Boolean, - Column, - DateTime, - Float, - ForeignKey, - Integer, - String, - Table, - Text, -) -from sqlalchemy.ext.declarative import declared_attr -from sqlalchemy.orm import backref, relationship, RelationshipProperty - -from superset import db, security_manager -from superset.models.helpers import AuditMixinNullable - -metadata = Model.metadata # pylint: disable=no-member - - -alert_owner = Table( - "alert_owner", - metadata, - Column("id", Integer, primary_key=True), - Column("user_id", Integer, ForeignKey("ab_user.id")), - Column("alert_id", Integer, ForeignKey("alerts.id")), -) - - -class Alert(Model, AuditMixinNullable): - - """Schedules for emailing slices / dashboards""" - - __tablename__ = "alerts" - - id = Column(Integer, primary_key=True) - label = Column(String(150), nullable=False) - active = Column(Boolean, default=True, index=True) - # TODO(bkyryliuk): enforce minimal supported frequency - crontab = Column(String(50), nullable=False) - - alert_type = Column(String(50)) - owners = relationship(security_manager.user_model, secondary=alert_owner) - recipients = Column(Text) - slack_channel = Column(Text) - - # TODO(bkyryliuk): implement log_retention - log_retention = Column(Integer, default=90) - grace_period = Column(Integer, default=60 * 60 * 24) - - slice_id = Column(Integer, ForeignKey("slices.id")) - slice = relationship("Slice", backref="alerts", foreign_keys=[slice_id]) - - dashboard_id = Column(Integer, ForeignKey("dashboards.id")) - dashboard = relationship("Dashboard", backref="alert", foreign_keys=[dashboard_id]) - - last_eval_dttm = Column(DateTime, default=datetime.utcnow) - last_state = Column(String(10)) - - # Observation related columns - sql = Column(Text, nullable=False) - - # Validation related columns - validator_type = Column(String(100), nullable=False) - validator_config = Column( - Text, - default=textwrap.dedent( - """ - { - - } - """ - ), - ) - - @declared_attr - def database_id(self) -> int: - return Column(Integer, ForeignKey("dbs.id"), nullable=False) - - @declared_attr - def database(self) -> RelationshipProperty: - return relationship( - "Database", - foreign_keys=[self.database_id], - backref=backref("sql_observers", cascade="all, delete-orphan"), - ) - - def get_last_observation(self) -> Optional[Any]: - observations = list( - db.session.query(SQLObservation) - .filter_by(alert_id=self.id) - .order_by(SQLObservation.dttm.desc()) - .limit(1) - ) - - if observations: - return observations[0] - - return None - - def __str__(self) -> str: - return f"<{self.id}:{self.label}>" - - @property - def pretty_config(self) -> str: - """String representing the comparison that will trigger a validator""" - config = json.loads(self.validator_config) - - if self.validator_type.lower() == "operator": - return f"{config['op']} {config['threshold']}" - - if self.validator_type.lower() == "not null": - return "!= Null or 0" - - return "" - - -class AlertLog(Model): - """Keeps track of alert-related operations""" - - __tablename__ = "alert_logs" - - id = Column(Integer, primary_key=True) - scheduled_dttm = Column(DateTime) - dttm_start = Column(DateTime, default=datetime.utcnow) - dttm_end = Column(DateTime, default=datetime.utcnow) - alert_id = Column(Integer, ForeignKey("alerts.id")) - alert = relationship("Alert", backref="logs", foreign_keys=[alert_id]) - state = Column(String(10)) - - @property - def duration(self) -> int: - return (self.dttm_end - self.dttm_start).total_seconds() - - -# TODO: Currently SQLObservation table will constantly grow with no limit, -# add some retention restriction or more to a more scalable db e.g. -# https://github.com/apache/superset/blob/master/superset/utils/log.py#L32 -class SQLObservation(Model): # pylint: disable=too-few-public-methods - """Keeps track of the collected observations for alerts.""" - - __tablename__ = "sql_observations" - - id = Column(Integer, primary_key=True) - dttm = Column(DateTime, default=datetime.utcnow, index=True) - alert_id = Column(Integer, ForeignKey("alerts.id")) - alert = relationship( - "Alert", - foreign_keys=[alert_id], - backref=backref("observations", cascade="all, delete-orphan"), - ) - value = Column(Float) - error_msg = Column(String(500)) diff --git a/superset/models/schedules.py b/superset/models/schedules.py deleted file mode 100644 index f60890bfc3b5..000000000000 --- a/superset/models/schedules.py +++ /dev/null @@ -1,104 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -"""Models for scheduled execution of jobs""" -import enum -from typing import Optional, Type - -from flask_appbuilder import Model -from sqlalchemy import Boolean, Column, Enum, ForeignKey, Integer, String, Text -from sqlalchemy.ext.declarative import declared_attr -from sqlalchemy.orm import relationship, RelationshipProperty - -from superset import security_manager -from superset.models.alerts import Alert -from superset.models.helpers import AuditMixinNullable, ImportExportMixin - -metadata = Model.metadata # pylint: disable=no-member - - -class ScheduleType(str, enum.Enum): - # pylint: disable=invalid-name - slice = "slice" - dashboard = "dashboard" - alert = "alert" - - -class EmailDeliveryType(str, enum.Enum): - # pylint: disable=invalid-name - attachment = "Attachment" - inline = "Inline" - - -class SliceEmailReportFormat(str, enum.Enum): - # pylint: disable=invalid-name - visualization = "Visualization" - data = "Raw data" - - -class EmailSchedule: - - """Schedules for emailing slices / dashboards""" - - __tablename__ = "email_schedules" - - id = Column(Integer, primary_key=True) - active = Column(Boolean, default=True, index=True) - crontab = Column(String(50)) - - @declared_attr - def user_id(self) -> int: - return Column(Integer, ForeignKey("ab_user.id")) - - @declared_attr - def user(self) -> RelationshipProperty: - return relationship( - security_manager.user_model, - backref=self.__tablename__, - foreign_keys=[self.user_id], - ) - - recipients = Column(Text) - slack_channel = Column(Text) - deliver_as_group = Column(Boolean, default=False) - delivery_type = Column(Enum(EmailDeliveryType)) - - -class DashboardEmailSchedule( - Model, AuditMixinNullable, ImportExportMixin, EmailSchedule -): - __tablename__ = "dashboard_email_schedules" - dashboard_id = Column(Integer, ForeignKey("dashboards.id")) - dashboard = relationship( - "Dashboard", backref="email_schedules", foreign_keys=[dashboard_id] - ) - - -class SliceEmailSchedule(Model, AuditMixinNullable, ImportExportMixin, EmailSchedule): - __tablename__ = "slice_email_schedules" - slice_id = Column(Integer, ForeignKey("slices.id")) - slice = relationship("Slice", backref="email_schedules", foreign_keys=[slice_id]) - email_format = Column(Enum(SliceEmailReportFormat)) - - -def get_scheduler_model(report_type: str) -> Optional[Type[EmailSchedule]]: - if report_type == ScheduleType.dashboard: - return DashboardEmailSchedule - if report_type == ScheduleType.slice: - return SliceEmailSchedule - if report_type == ScheduleType.alert: - return Alert - return None diff --git a/superset/tasks/alerts/__init__.py b/superset/tasks/alerts/__init__.py deleted file mode 100644 index fd9417fe5c1e..000000000000 --- a/superset/tasks/alerts/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -# -*- coding: utf-8 -*- -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/superset/tasks/alerts/observer.py b/superset/tasks/alerts/observer.py deleted file mode 100644 index cbe73d886ae9..000000000000 --- a/superset/tasks/alerts/observer.py +++ /dev/null @@ -1,96 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import logging -from datetime import datetime -from typing import Optional - -import pandas as pd -from sqlalchemy.orm import Session - -from superset import jinja_context -from superset.models.alerts import Alert, SQLObservation - -logger = logging.getLogger("tasks.email_reports") - - -# Session needs to be passed along in the celery workers and db.session cannot be used. -# For more info see: https://github.com/apache/superset/issues/10530 -def observe(alert_id: int, session: Session) -> Optional[str]: - """Collect observations for the alert. - Returns an error message if the observer value was not valid - """ - - alert = session.query(Alert).filter_by(id=alert_id).one() - - value = None - - tp = jinja_context.get_template_processor(database=alert.database) - rendered_sql = tp.process_template(alert.sql) - df = alert.database.get_df(rendered_sql) - - error_msg = validate_observer_result(df, alert.id, alert.label) - - if not error_msg and not df.empty and df.to_records()[0][1] is not None: - value = float(df.to_records()[0][1]) - - observation = SQLObservation( - alert_id=alert_id, dttm=datetime.utcnow(), value=value, error_msg=error_msg, - ) - - session.add(observation) - session.commit() - - return error_msg - - -def validate_observer_result( - sql_result: pd.DataFrame, alert_id: int, alert_label: str -) -> Optional[str]: - """ - Verifies if a DataFrame SQL query result to see if - it contains a valid value for a SQLObservation. - Returns an error message if the result is invalid. - """ - try: - if sql_result.empty: - # empty results are used for the not null validator - return None - - rows = sql_result.to_records() - - assert ( - len(rows) == 1 - ), f"Observer for alert <{alert_id}:{alert_label}> returned more than 1 row" - - assert ( - len(rows[0]) == 2 - ), f"Observer for alert <{alert_id}:{alert_label}> returned more than 1 column" - - if rows[0][1] is None: - return None - - float(rows[0][1]) - - except AssertionError as error: - return str(error) - except (TypeError, ValueError): - return ( - f"Observer for alert <{alert_id}:{alert_label}> returned a non-number value" - ) - - return None diff --git a/superset/tasks/alerts/validator.py b/superset/tasks/alerts/validator.py deleted file mode 100644 index 38b579134159..000000000000 --- a/superset/tasks/alerts/validator.py +++ /dev/null @@ -1,111 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -import enum -import json -from operator import eq, ge, gt, le, lt, ne -from typing import Callable, Optional - -import numpy as np - -from superset.exceptions import SupersetException -from superset.models.alerts import Alert - -OPERATOR_FUNCTIONS = {">=": ge, ">": gt, "<=": le, "<": lt, "==": eq, "!=": ne} - - -class AlertValidatorType(str, enum.Enum): - NOT_NULL = "not null" - OPERATOR = "operator" - - @classmethod - def valid_type(cls, validator_type: str) -> bool: - return any(val_type.value == validator_type for val_type in cls) - - -def check_validator(validator_type: str, config: str) -> None: - if not AlertValidatorType.valid_type(validator_type): - raise SupersetException( - f"Error: {validator_type} is not a valid validator type." - ) - - config_dict = json.loads(config) - - if validator_type == AlertValidatorType.OPERATOR.value: - - if not (config_dict.get("op") and config_dict.get("threshold") is not None): - raise SupersetException( - "Error: Operator Validator needs specified operator and threshold " - 'values. Add "op" and "threshold" to config.' - ) - - if not config_dict["op"] in OPERATOR_FUNCTIONS.keys(): - raise SupersetException( - f'Error: {config_dict["op"]} is an invalid operator type. Change ' - f'the "op" value in the config to one of ' - f'["<", "<=", ">", ">=", "==", "!="]' - ) - - if not isinstance(config_dict["threshold"], (int, float)): - raise SupersetException( - f'Error: {config_dict["threshold"]} is an invalid threshold value.' - f' Change the "threshold" value in the config.' - ) - - -def not_null_validator( - alert: Alert, validator_config: str # pylint: disable=unused-argument -) -> bool: - """Returns True if a recent observation is not NULL""" - - observation = alert.get_last_observation() - # TODO: Validate malformed observations/observations with errors separately - if ( - not observation - or observation.error_msg - or observation.value in (0, None, np.nan) - ): - return False - return True - - -def operator_validator(alert: Alert, validator_config: str) -> bool: - """ - Returns True if a recent observation is greater than or equal to - the value given in the validator config - """ - observation = alert.get_last_observation() - if not observation or observation.value in (None, np.nan): - return False - - operator = json.loads(validator_config)["op"] - threshold = json.loads(validator_config)["threshold"] - return OPERATOR_FUNCTIONS[operator](observation.value, threshold) - - -def get_validator_function( - validator_type: str, -) -> Optional[Callable[[Alert, str], bool]]: - """Returns a validation function based on validator_type""" - - alert_validators = { - AlertValidatorType.NOT_NULL.value: not_null_validator, - AlertValidatorType.OPERATOR.value: operator_validator, - } - if alert_validators.get(validator_type.lower()): - return alert_validators[validator_type.lower()] - - return None diff --git a/superset/tasks/celery_app.py b/superset/tasks/celery_app.py index f8b9bef0d732..850709bfb486 100644 --- a/superset/tasks/celery_app.py +++ b/superset/tasks/celery_app.py @@ -32,7 +32,7 @@ # Need to import late, as the celery_app will have been setup by "create_app()" # pylint: disable=wrong-import-position, unused-import -from . import cache, schedules, scheduler # isort:skip +from . import cache, scheduler # isort:skip # Export the celery app globally for Celery (as run on the cmd line) to find app = celery_app diff --git a/superset/tasks/schedules.py b/superset/tasks/schedules.py deleted file mode 100644 index 05506d077a97..000000000000 --- a/superset/tasks/schedules.py +++ /dev/null @@ -1,855 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -""" -DEPRECATION NOTICE: this module is deprecated as of v1.0.0. -It will be removed in future versions of Superset. Please -migrate to the new scheduler: `superset.tasks.scheduler`. -""" - -import logging -import time -import urllib.request -from collections import namedtuple -from datetime import datetime, timedelta -from email.utils import make_msgid, parseaddr -from enum import Enum -from typing import ( - Any, - Callable, - Dict, - Iterator, - NamedTuple, - Optional, - Tuple, - TYPE_CHECKING, - Union, -) -from urllib.error import URLError - -import croniter -import simplejson as json -from celery.app.task import Task -from dateutil.tz import tzlocal -from flask import current_app, render_template, url_for -from flask_babel import gettext as __ -from selenium.common.exceptions import WebDriverException -from selenium.webdriver import chrome, firefox -from selenium.webdriver.remote.webdriver import WebDriver -from sqlalchemy import func -from sqlalchemy.exc import NoSuchColumnError, ResourceClosedError -from sqlalchemy.orm import Session - -from superset import app, security_manager, thumbnail_cache -from superset.extensions import celery_app, machine_auth_provider_factory -from superset.models.alerts import Alert, AlertLog -from superset.models.dashboard import Dashboard -from superset.models.schedules import ( - EmailDeliveryType, - get_scheduler_model, - ScheduleType, - SliceEmailReportFormat, -) -from superset.models.slice import Slice -from superset.tasks.alerts.observer import observe -from superset.tasks.alerts.validator import get_validator_function -from superset.tasks.slack_util import deliver_slack_msg -from superset.utils.celery import session_scope -from superset.utils.core import get_email_address_list, send_email_smtp -from superset.utils.retries import retry_call -from superset.utils.screenshots import ChartScreenshot, WebDriverProxy -from superset.utils.urls import get_url_path - -if TYPE_CHECKING: - from flask_appbuilder.security.sqla.models import User - from werkzeug.datastructures import TypeConversionDict - -# Globals -config = app.config -logger = logging.getLogger("tasks.email_reports") -logger.setLevel(logging.INFO) - -stats_logger = current_app.config["STATS_LOGGER"] -EMAIL_PAGE_RENDER_WAIT = config["EMAIL_PAGE_RENDER_WAIT"] -WEBDRIVER_BASEURL = config["WEBDRIVER_BASEURL"] -WEBDRIVER_BASEURL_USER_FRIENDLY = config["WEBDRIVER_BASEURL_USER_FRIENDLY"] - -ReportContent = namedtuple( - "ReportContent", - [ - "body", # email body - "data", # attachments - "images", # embedded images for the email - "slack_message", # html not supported, only markdown - # attachments for the slack message, embedding not supported - "slack_attachment", - ], -) - - -class ScreenshotData(NamedTuple): - url: str # url to chat/dashboard for this screenshot - image: Optional[bytes] # bytes for the screenshot - - -class AlertContent(NamedTuple): - label: str # alert name - sql: str # sql statement for alert - observation_value: str # value from observation that triggered the alert - validation_error_message: str # a string of the comparison that triggered an alert - alert_url: str # url to alert details - image_data: Optional[ScreenshotData] # data for the alert screenshot - - -def _get_email_to_and_bcc( - recipients: str, deliver_as_group: bool -) -> Iterator[Tuple[str, str]]: - bcc = config["EMAIL_REPORT_BCC_ADDRESS"] - - if deliver_as_group: - to = recipients - yield (to, bcc) - else: - for to in get_email_address_list(recipients): - yield (to, bcc) - - -# TODO(bkyryliuk): move email functionality into a separate module. -def _deliver_email( # pylint: disable=too-many-arguments - recipients: str, - deliver_as_group: bool, - subject: str, - body: str, - data: Optional[Dict[str, Any]], - images: Optional[Dict[str, bytes]], -) -> None: - for (to, bcc) in _get_email_to_and_bcc(recipients, deliver_as_group): - send_email_smtp( - to, - subject, - body, - config, - data=data, - images=images, - bcc=bcc, - mime_subtype="related", - dryrun=config["SCHEDULED_EMAIL_DEBUG_MODE"], - ) - - -def _generate_report_content( - delivery_type: EmailDeliveryType, screenshot: bytes, name: str, url: str -) -> ReportContent: - data: Optional[Dict[str, Any]] - - # how to: https://api.slack.com/reference/surfaces/formatting - slack_message = __( - """ - *%(name)s*\n - <%(url)s|Explore in Superset> - """, - name=name, - url=url, - ) - - if delivery_type == EmailDeliveryType.attachment: - images = None - data = {"screenshot": screenshot} - body = __( - 'Explore in Superset

', - name=name, - url=url, - ) - elif delivery_type == EmailDeliveryType.inline: - # Get the domain from the 'From' address .. - # and make a message id without the < > in the ends - domain = parseaddr(config["SMTP_MAIL_FROM"])[1].split("@")[1] - msgid = make_msgid(domain)[1:-1] - - images = {msgid: screenshot} - data = None - body = __( - """ - Explore in Superset

- - """, - name=name, - url=url, - msgid=msgid, - ) - - return ReportContent(body, data, images, slack_message, screenshot) - - -def _get_url_path(view: str, user_friendly: bool = False, **kwargs: Any) -> str: - with app.test_request_context(): - base_url = ( - WEBDRIVER_BASEURL_USER_FRIENDLY if user_friendly else WEBDRIVER_BASEURL - ) - return urllib.parse.urljoin(str(base_url), url_for(view, **kwargs)) - - -def create_webdriver(session: Session) -> WebDriver: - return WebDriverProxy(driver_type=config["WEBDRIVER_TYPE"]).auth( - get_reports_user(session) - ) - - -def get_reports_user(session: Session) -> "User": - return ( - session.query(security_manager.user_model) - .filter( - func.lower(security_manager.user_model.username) - == func.lower(config["EMAIL_REPORTS_USER"]) - ) - .one() - ) - - -def destroy_webdriver( - driver: Union[chrome.webdriver.WebDriver, firefox.webdriver.WebDriver] -) -> None: - """ - Destroy a driver - """ - - # This is some very flaky code in selenium. Hence the retries - # and catch-all exceptions - try: - retry_call(driver.close, max_tries=2) - except Exception: # pylint: disable=broad-except - pass - try: - driver.quit() - except Exception: # pylint: disable=broad-except - pass - - -def deliver_dashboard( # pylint: disable=too-many-locals - dashboard_id: int, - recipients: Optional[str], - slack_channel: Optional[str], - delivery_type: EmailDeliveryType, - deliver_as_group: bool, -) -> None: - - """ - Given a schedule, delivery the dashboard as an email report - """ - with session_scope(nullpool=True) as session: - dashboard = session.query(Dashboard).filter_by(id=dashboard_id).one() - - dashboard_url = _get_url_path( - "Superset.dashboard", dashboard_id_or_slug=dashboard.id - ) - dashboard_url_user_friendly = _get_url_path( - "Superset.dashboard", user_friendly=True, dashboard_id_or_slug=dashboard.id - ) - - # Create a driver, fetch the page, wait for the page to render - driver = create_webdriver(session) - window = config["WEBDRIVER_WINDOW"]["dashboard"] - driver.set_window_size(*window) - driver.get(dashboard_url) - time.sleep(EMAIL_PAGE_RENDER_WAIT) - - # Set up a function to retry once for the element. - # This is buggy in certain selenium versions with firefox driver - get_element = getattr(driver, "find_element_by_class_name") - element = retry_call( - get_element, - fargs=["grid-container"], - max_tries=2, - interval=EMAIL_PAGE_RENDER_WAIT, - ) - - try: - screenshot = element.screenshot_as_png - except WebDriverException: - # Some webdrivers do not support screenshots for elements. - # In such cases, take a screenshot of the entire page. - screenshot = driver.screenshot() - finally: - destroy_webdriver(driver) - - # Generate the email body and attachments - report_content = _generate_report_content( - delivery_type, - screenshot, - dashboard.dashboard_title, - dashboard_url_user_friendly, - ) - - subject = __( - "%(prefix)s %(title)s", - prefix=config["EMAIL_REPORTS_SUBJECT_PREFIX"], - title=dashboard.dashboard_title, - ) - - if recipients: - _deliver_email( - recipients, - deliver_as_group, - subject, - report_content.body, - report_content.data, - report_content.images, - ) - if slack_channel: - deliver_slack_msg( - slack_channel, - subject, - report_content.slack_message, - report_content.slack_attachment, - ) - - -def _get_slice_data( - slc: Slice, delivery_type: EmailDeliveryType, session: Session -) -> ReportContent: - slice_url = _get_url_path( - "Superset.explore_json", csv="true", form_data=json.dumps({"slice_id": slc.id}) - ) - - # URL to include in the email - slice_url_user_friendly = _get_url_path( - "Superset.slice", slice_id=slc.id, user_friendly=True - ) - - # Login on behalf of the "reports" user in order to get cookies to deal with auth - auth_cookies = machine_auth_provider_factory.instance.get_auth_cookies( - get_reports_user(session) - ) - # Build something like "session=cool_sess.val;other-cookie=awesome_other_cookie" - cookie_str = ";".join([f"{key}={val}" for key, val in auth_cookies.items()]) - - opener = urllib.request.build_opener() - opener.addheaders.append(("Cookie", cookie_str)) - response = opener.open(slice_url) - if response.getcode() != 200: - raise URLError(response.getcode()) - - # TODO: Move to the csv module - content = response.read() - rows = [r.split(b",") for r in content.splitlines()] - - if delivery_type == EmailDeliveryType.inline: - data = None - - # Parse the csv file and generate HTML - columns = rows.pop(0) - with app.app_context(): - body = render_template( - "superset/reports/slice_data.html", - columns=columns, - rows=rows, - name=slc.slice_name, - link=slice_url_user_friendly, - ) - - elif delivery_type == EmailDeliveryType.attachment: - data = {__("%(name)s.csv", name=slc.slice_name): content} - body = __( - 'Explore in Superset

', - name=slc.slice_name, - url=slice_url_user_friendly, - ) - - # how to: https://api.slack.com/reference/surfaces/formatting - slack_message = __( - """ - *%(slice_name)s*\n - <%(slice_url_user_friendly)s|Explore in Superset> - """, - slice_name=slc.slice_name, - slice_url_user_friendly=slice_url_user_friendly, - ) - - return ReportContent(body, data, None, slack_message, content) - - -def _get_slice_screenshot(slice_id: int, session: Session) -> ScreenshotData: - slice_obj = session.query(Slice).get(slice_id) - - chart_url = get_url_path("Superset.slice", slice_id=slice_obj.id, standalone="true") - screenshot = ChartScreenshot(chart_url, slice_obj.digest) - image_url = _get_url_path( - "Superset.slice", user_friendly=True, slice_id=slice_obj.id, - ) - - user = security_manager.get_user_by_username( - current_app.config["THUMBNAIL_SELENIUM_USER"], session=session - ) - image_data = screenshot.compute_and_cache( - user=user, cache=thumbnail_cache, force=True, - ) - - session.commit() - return ScreenshotData(image_url, image_data) - - -def _get_slice_visualization( - slc: Slice, delivery_type: EmailDeliveryType, session: Session -) -> ReportContent: - # Create a driver, fetch the page, wait for the page to render - driver = create_webdriver(session) - window = config["WEBDRIVER_WINDOW"]["slice"] - driver.set_window_size(*window) - - slice_url = _get_url_path("Superset.slice", slice_id=slc.id) - slice_url_user_friendly = _get_url_path( - "Superset.slice", slice_id=slc.id, user_friendly=True - ) - - driver.get(slice_url) - time.sleep(EMAIL_PAGE_RENDER_WAIT) - - # Set up a function to retry once for the element. - # This is buggy in certain selenium versions with firefox driver - element = retry_call( - driver.find_element_by_class_name, - fargs=["chart-container"], - max_tries=2, - interval=EMAIL_PAGE_RENDER_WAIT, - ) - - try: - screenshot = element.screenshot_as_png - except WebDriverException: - # Some webdrivers do not support screenshots for elements. - # In such cases, take a screenshot of the entire page. - screenshot = driver.screenshot() - finally: - destroy_webdriver(driver) - - # Generate the email body and attachments - return _generate_report_content( - delivery_type, screenshot, slc.slice_name, slice_url_user_friendly - ) - - -def deliver_slice( # pylint: disable=too-many-arguments - slice_id: int, - recipients: Optional[str], - slack_channel: Optional[str], - delivery_type: EmailDeliveryType, - email_format: SliceEmailReportFormat, - deliver_as_group: bool, - session: Session, -) -> None: - """ - Given a schedule, delivery the slice as an email report - """ - slc = session.query(Slice).filter_by(id=slice_id).one() - - if email_format == SliceEmailReportFormat.data: - report_content = _get_slice_data(slc, delivery_type, session) - elif email_format == SliceEmailReportFormat.visualization: - report_content = _get_slice_visualization(slc, delivery_type, session) - else: - raise RuntimeError("Unknown email report format") - - subject = __( - "%(prefix)s %(title)s", - prefix=config["EMAIL_REPORTS_SUBJECT_PREFIX"], - title=slc.slice_name, - ) - - if recipients: - _deliver_email( - recipients, - deliver_as_group, - subject, - report_content.body, - report_content.data, - report_content.images, - ) - if slack_channel: - deliver_slack_msg( - slack_channel, - subject, - report_content.slack_message, - report_content.slack_attachment, - ) - - -@celery_app.task( - name="email_reports.send", - bind=True, - soft_time_limit=config["EMAIL_ASYNC_TIME_LIMIT_SEC"], -) -def schedule_email_report( - _task: Task, - report_type: ScheduleType, - schedule_id: int, - recipients: Optional[str] = None, - slack_channel: Optional[str] = None, -) -> None: - model_cls = get_scheduler_model(report_type) - with session_scope(nullpool=True) as session: - schedule = session.query(model_cls).get(schedule_id) - - # The user may have disabled the schedule. If so, ignore this - if not schedule or not schedule.active: - logger.info("Ignoring deactivated schedule") - return - - recipients = recipients or schedule.recipients - slack_channel = slack_channel or schedule.slack_channel - logger.info( - "Starting report for slack: %s and recipients: %s.", - slack_channel, - recipients, - ) - - if report_type == ScheduleType.dashboard: - deliver_dashboard( - schedule.dashboard_id, - recipients, - slack_channel, - schedule.delivery_type, - schedule.deliver_as_group, - ) - elif report_type == ScheduleType.slice: - deliver_slice( - schedule.slice_id, - recipients, - slack_channel, - schedule.delivery_type, - schedule.email_format, - schedule.deliver_as_group, - session, - ) - else: - raise RuntimeError("Unknown report type") - - -@celery_app.task( - name="alerts.run_query", - bind=True, - # TODO: find cause of https://github.com/apache/superset/issues/10530 - # and remove retry - autoretry_for=(NoSuchColumnError, ResourceClosedError,), - retry_kwargs={"max_retries": 1}, - retry_backoff=True, -) -def schedule_alert_query( - _task: Task, - report_type: ScheduleType, - schedule_id: int, - recipients: Optional[str] = None, - slack_channel: Optional[str] = None, -) -> None: - model_cls = get_scheduler_model(report_type) - with session_scope(nullpool=True) as session: - schedule = session.query(model_cls).get(schedule_id) - - # The user may have disabled the schedule. If so, ignore this - if not schedule or not schedule.active: - logger.info("Ignoring deactivated alert") - return - - if report_type == ScheduleType.alert: - evaluate_alert( - schedule.id, schedule.label, session, recipients, slack_channel - ) - else: - raise RuntimeError("Unknown report type") - - -class AlertState(str, Enum): - ERROR = "error" - TRIGGER = "trigger" - PASS = "pass" - - -def deliver_alert( - alert_id: int, - session: Session, - recipients: Optional[str] = None, - slack_channel: Optional[str] = None, -) -> None: - """ - Gathers alert information and sends out the alert - to its respective email and slack recipients - """ - - alert = session.query(Alert).get(alert_id) - - logging.info("Triggering alert: %s", alert) - - # Set all the values for the alert report - # Alternate values are used in the case of a test alert - # where an alert might not have a validator - recipients = recipients or alert.recipients - slack_channel = slack_channel or alert.slack_channel - validation_error_message = ( - str(alert.observations[-1].value) + " " + alert.pretty_config - ) - - if alert.slice: - alert_content = AlertContent( - alert.label, - alert.sql, - str(alert.observations[-1].value), - validation_error_message, - _get_url_path("AlertModelView.show", user_friendly=True, pk=alert_id), - _get_slice_screenshot(alert.slice.id, session), - ) - else: - # TODO: dashboard delivery! - alert_content = AlertContent( - alert.label, - alert.sql, - str(alert.observations[-1].value), - validation_error_message, - _get_url_path("AlertModelView.show", user_friendly=True, pk=alert_id), - None, - ) - - if recipients: - deliver_email_alert(alert_content, recipients) - if slack_channel: - deliver_slack_alert(alert_content, slack_channel) - - -def deliver_email_alert(alert_content: AlertContent, recipients: str) -> None: - """Delivers an email alert to the given email recipients""" - subject = f"[Superset] Triggered alert: {alert_content.label}" - deliver_as_group = False - data = None - images = {} - # TODO(JasonD28): add support for emails with no screenshot - image_url = None - if alert_content.image_data: - image_url = alert_content.image_data.url - if alert_content.image_data.image: - images = {"screenshot": alert_content.image_data.image} - - body = render_template( - "email/alert.txt", - alert_url=alert_content.alert_url, - label=alert_content.label, - sql=alert_content.sql, - observation_value=alert_content.observation_value, - validation_error_message=alert_content.validation_error_message, - image_url=image_url, - ) - - _deliver_email(recipients, deliver_as_group, subject, body, data, images) - - -def deliver_slack_alert(alert_content: AlertContent, slack_channel: str) -> None: - """Delivers a slack alert to the given slack channel""" - - subject = __("[Alert] %(label)s", label=alert_content.label) - - image = None - if alert_content.image_data: - slack_message = render_template( - "slack/alert.txt", - label=alert_content.label, - sql=alert_content.sql, - observation_value=alert_content.observation_value, - validation_error_message=alert_content.validation_error_message, - url=alert_content.image_data.url, - alert_url=alert_content.alert_url, - ) - image = alert_content.image_data.image - else: - slack_message = render_template( - "slack/alert_no_screenshot.txt", - label=alert_content.label, - sql=alert_content.sql, - observation_value=alert_content.observation_value, - validation_error_message=alert_content.validation_error_message, - alert_url=alert_content.alert_url, - ) - - deliver_slack_msg( - slack_channel, subject, slack_message, image, - ) - - -def evaluate_alert( - alert_id: int, - label: str, - session: Session, - recipients: Optional[str] = None, - slack_channel: Optional[str] = None, -) -> None: - """Processes an alert to see if it should be triggered""" - - logger.info("Processing alert ID: %i", alert_id) - - state = None - dttm_start = datetime.utcnow() - - try: - logger.info("Querying observers for alert <%s:%s>", alert_id, label) - error_msg = observe(alert_id, session) - if error_msg: - state = AlertState.ERROR - logging.error(error_msg) - except Exception as exc: # pylint: disable=broad-except - state = AlertState.ERROR - logging.exception(exc) - logging.error("Failed at query observers for alert: %s (%s)", label, alert_id) - - dttm_end = datetime.utcnow() - - if state != AlertState.ERROR: - # Don't validate alert on test runs since it may not be triggered - if recipients or slack_channel: - deliver_alert(alert_id, session, recipients, slack_channel) - state = AlertState.TRIGGER - # Validate during regular workflow and deliver only if triggered - elif validate_observations(alert_id, label, session): - deliver_alert(alert_id, session, recipients, slack_channel) - state = AlertState.TRIGGER - else: - state = AlertState.PASS - - session.commit() - alert = session.query(Alert).get(alert_id) - if state != AlertState.ERROR: - alert.last_eval_dttm = dttm_end - alert.last_state = state - alert.logs.append( - AlertLog( - scheduled_dttm=dttm_start, - dttm_start=dttm_start, - dttm_end=dttm_end, - state=state, - ) - ) - session.commit() - - -def validate_observations(alert_id: int, label: str, session: Session) -> bool: - """ - Runs an alert's validators to check if it should be triggered or not - If so, return the name of the validator that returned true - """ - - logger.info("Validating observations for alert <%s:%s>", alert_id, label) - alert = session.query(Alert).get(alert_id) - validate = get_validator_function(alert.validator_type) - return bool(validate and validate(alert, alert.validator_config)) - - -def next_schedules( - crontab: str, start_at: datetime, stop_at: datetime, resolution: int = 0 -) -> Iterator[datetime]: - crons = croniter.croniter(crontab, start_at - timedelta(seconds=1)) - previous = start_at - timedelta(days=1) - - for eta in crons.all_next(datetime): - # Do not cross the time boundary - if eta >= stop_at: - break - - if eta < start_at: - continue - - # Do not allow very frequent tasks - if eta - previous < timedelta(seconds=resolution): - continue - - yield eta - previous = eta - - -def schedule_window( - report_type: str, - start_at: datetime, - stop_at: datetime, - resolution: int, - session: Session, -) -> None: - """ - Find all active schedules and schedule celery tasks for - each of them with a specific ETA (determined by parsing - the cron schedule for the schedule) - """ - model_cls = get_scheduler_model(report_type) - - if not model_cls: - return None - - schedules = session.query(model_cls).filter(model_cls.active.is_(True)) - - for schedule in schedules: - logging.info("Processing schedule %s", schedule) - args = (report_type, schedule.id) - schedule_start_at = start_at - - if ( - hasattr(schedule, "last_eval_dttm") - and schedule.last_eval_dttm - and schedule.last_eval_dttm > start_at - ): - schedule_start_at = schedule.last_eval_dttm + timedelta(seconds=1) - - # Schedule the job for the specified time window - for eta in next_schedules( - schedule.crontab, schedule_start_at, stop_at, resolution=resolution - ): - logging.info("Scheduled eta %s", eta) - get_scheduler_action(report_type).apply_async(args, eta=eta) # type: ignore - - return None - - -def get_scheduler_action(report_type: str) -> Optional[Callable[..., Any]]: - if report_type == ScheduleType.dashboard: - return schedule_email_report - if report_type == ScheduleType.slice: - return schedule_email_report - if report_type == ScheduleType.alert: - return schedule_alert_query - return None - - -@celery_app.task(name="email_reports.schedule_hourly") -def schedule_hourly() -> None: - """Celery beat job meant to be invoked hourly""" - if not config["ENABLE_SCHEDULED_EMAIL_REPORTS"]: - logger.info("Scheduled email reports not enabled in config") - return - - resolution = config["EMAIL_REPORTS_CRON_RESOLUTION"] * 60 - - # Get the top of the hour - start_at = datetime.now(tzlocal()).replace(microsecond=0, second=0, minute=0) - stop_at = start_at + timedelta(seconds=3600) - - with session_scope(nullpool=True) as session: - schedule_window(ScheduleType.dashboard, start_at, stop_at, resolution, session) - schedule_window(ScheduleType.slice, start_at, stop_at, resolution, session) - - -@celery_app.task(name="alerts.schedule_check") -def schedule_alerts() -> None: - """Celery beat job meant to be invoked every minute to check alerts""" - resolution = 0 - now = datetime.utcnow() - start_at = now - timedelta( - seconds=300 - ) # process any missed tasks in the past few minutes - stop_at = now + timedelta(seconds=1) - with session_scope(nullpool=True) as session: - schedule_window(ScheduleType.alert, start_at, stop_at, resolution, session) diff --git a/superset/views/__init__.py b/superset/views/__init__.py index c3a349ce495c..c33601f7278d 100644 --- a/superset/views/__init__.py +++ b/superset/views/__init__.py @@ -27,7 +27,6 @@ dynamic_plugins, health, redirects, - schedules, sql_lab, tags, ) diff --git a/superset/views/alerts.py b/superset/views/alerts.py index 04640fa223fe..b97587ec7185 100644 --- a/superset/views/alerts.py +++ b/superset/views/alerts.py @@ -14,76 +14,19 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -""" -DEPRECATION NOTICE: this module is deprecated and will be removed on 2.0. -""" -from croniter import croniter -from flask import abort, current_app as app, flash, Markup -from flask_appbuilder import CompactCRUDMixin, permission_name +from flask import abort +from flask_appbuilder import permission_name from flask_appbuilder.api import expose -from flask_appbuilder.hooks import before_request -from flask_appbuilder.models.sqla.interface import SQLAInterface from flask_appbuilder.security.decorators import has_access -from flask_babel import lazy_gettext as _ -from werkzeug.exceptions import NotFound from superset import is_feature_enabled -from superset.constants import RouteMethod -from superset.models.alerts import Alert, AlertLog, SQLObservation from superset.superset_typing import FlaskResponse -from superset.tasks.alerts.validator import check_validator -from superset.utils import core as utils -from superset.utils.core import get_email_address_str, markdown -from ..exceptions import SupersetException -from .base import BaseSupersetView, SupersetModelView +from .base import BaseSupersetView # TODO: access control rules for this module -class EnsureEnabledMixin: - @staticmethod - def is_enabled() -> bool: - return bool(app.config["ENABLE_ALERTS"]) - - @before_request - def ensure_enabled(self) -> None: - if not self.is_enabled(): - raise NotFound() - - -class AlertLogModelView( - CompactCRUDMixin, EnsureEnabledMixin, SupersetModelView -): # pylint: disable=too-many-ancestors - datamodel = SQLAInterface(AlertLog) - include_route_methods = {RouteMethod.LIST} | {"show"} - base_order = ("dttm_start", "desc") - list_columns = ( - "scheduled_dttm", - "dttm_start", - "duration", - "state", - ) - - -class AlertObservationModelView( - CompactCRUDMixin, EnsureEnabledMixin, SupersetModelView -): # pylint: disable=too-many-ancestors - datamodel = SQLAInterface(SQLObservation) - include_route_methods = {RouteMethod.LIST} | {"show"} - base_order = ("dttm", "desc") - list_title = _("List Observations") - show_title = _("Show Observation") - list_columns = ( - "dttm", - "value", - "error_msg", - ) - label_columns = { - "error_msg": _("Error Message"), - } - - class BaseAlertReportView(BaseSupersetView): route_base = "/report" class_permission_name = "ReportSchedule" @@ -109,144 +52,3 @@ def log(self, pk: int) -> FlaskResponse: # pylint: disable=unused-argument class AlertView(BaseAlertReportView): route_base = "/alert" class_permission_name = "ReportSchedule" - - -class ReportView(BaseAlertReportView): - route_base = "/report" - class_permission_name = "ReportSchedule" - - -class AlertModelView(EnsureEnabledMixin, SupersetModelView): - datamodel = SQLAInterface(Alert) - route_base = "/alerts" - include_route_methods = RouteMethod.CRUD_SET | {"log"} - - list_columns = ( - "label", - "owners", - "database", - "sql", - "pretty_config", - "crontab", - "last_eval_dttm", - "last_state", - "active", - "owners", - ) - show_columns = ( - "label", - "database", - "sql", - "validator_type", - "validator_config", - "active", - "crontab", - "owners", - "slice", - "recipients", - "slack_channel", - "log_retention", - "grace_period", - "last_eval_dttm", - "last_state", - ) - order_columns = ["label", "last_eval_dttm", "last_state", "active"] - add_columns = ( - "label", - "database", - "sql", - "validator_type", - "validator_config", - "active", - "crontab", - # TODO: implement different types of alerts - # "alert_type", - "owners", - "recipients", - "slack_channel", - "slice", - # TODO: implement dashboard screenshots with alerts - # "dashboard", - "log_retention", - "grace_period", - ) - label_columns = { - "log_retention": _("Log Retentions (days)"), - } - description_columns = { - "crontab": markdown( - "A CRON-like expression. " - "[Crontab Guru](https://crontab.guru/) is " - "a helpful resource that can help you craft a CRON expression.", - True, - ), - "recipients": _("A semicolon ';' delimited list of email addresses"), - "log_retention": _("How long to keep the logs around for this alert"), - "grace_period": _( - "Once an alert is triggered, how long, in seconds, before " - "Superset nags you again." - ), - "sql": _( - "A SQL statement that defines whether the alert should get triggered or " - "not. The query is expected to return either NULL or a number value." - ), - "validator_type": utils.markdown( - "Determines when to trigger alert based off value from alert query. " - "Alerts will be triggered with these validator types:" - "", - True, - ), - "validator_config": utils.markdown( - "JSON string containing values the validator will compare against. " - "Each validator needs the following values:" - "", - True, - ), - } - - edit_columns = add_columns - related_views = [ - AlertObservationModelView, - AlertLogModelView, - ] - - @expose("/list/") - @has_access - def list(self) -> FlaskResponse: - flash( - Markup( - _( - "This feature is deprecated and will be removed on 2.0. " - "Take a look at the replacement feature " - "" - "Alerts & Reports documentation" - ) - ), - "warning", - ) - return super().list() - - def pre_add(self, item: "AlertModelView") -> None: - item.recipients = get_email_address_str(item.recipients) - - if not croniter.is_valid(item.crontab): - raise SupersetException("Invalid crontab format") - - item.validator_type = item.validator_type.lower() - check_validator(item.validator_type, item.validator_config) - - def pre_update(self, item: "AlertModelView") -> None: - item.validator_type = item.validator_type.lower() - check_validator(item.validator_type, item.validator_config) - - def post_update(self, item: "AlertModelView") -> None: - self.post_add(item) diff --git a/superset/views/schedules.py b/superset/views/schedules.py deleted file mode 100644 index 39d4af9b8b25..000000000000 --- a/superset/views/schedules.py +++ /dev/null @@ -1,349 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -""" -DEPRECATION NOTICE: this module is deprecated and will be removed on 2.0. -""" - -import enum -from typing import Type, Union - -import simplejson as json -from croniter import croniter -from flask import current_app as app, flash, g, Markup -from flask_appbuilder import expose -from flask_appbuilder.hooks import before_request -from flask_appbuilder.models.sqla.interface import SQLAInterface -from flask_appbuilder.security.decorators import has_access -from flask_babel import lazy_gettext as _ -from werkzeug.exceptions import NotFound -from wtforms import BooleanField, Form, StringField - -from superset import db, security_manager -from superset.constants import RouteMethod -from superset.exceptions import SupersetException -from superset.models.dashboard import Dashboard -from superset.models.schedules import ( - DashboardEmailSchedule, - ScheduleType, - SliceEmailSchedule, -) -from superset.models.slice import Slice -from superset.superset_typing import FlaskResponse -from superset.tasks.schedules import schedule_email_report -from superset.utils.core import get_email_address_list, json_iso_dttm_ser -from superset.views.core import json_success - -from .base import DeleteMixin, SupersetModelView - - -class EmailScheduleView(SupersetModelView, DeleteMixin): - include_route_methods = RouteMethod.CRUD_SET - _extra_data = {"test_email": False, "test_email_recipients": None} - - @staticmethod - def is_enabled() -> bool: - return app.config["ENABLE_SCHEDULED_EMAIL_REPORTS"] - - @before_request - def ensure_enabled(self) -> None: - if not self.is_enabled(): - raise NotFound() - - @property - def schedule_type(self) -> str: - raise NotImplementedError() - - @property - def schedule_type_model(self) -> Type[Union[Dashboard, Slice]]: - raise NotImplementedError() - - page_size = 20 - - add_exclude_columns = [ - "user", - "created_on", - "changed_on", - "created_by", - "changed_by", - ] - - edit_exclude_columns = add_exclude_columns - - description_columns = { - "deliver_as_group": "If enabled, send a single email to all " - "recipients (in email/To: field)", - "crontab": "Unix style crontab schedule to deliver emails. " - "Changes to schedules reflect in one hour.", - "delivery_type": "Indicates how the rendered content is delivered", - } - - add_form_extra_fields = { - "test_email": BooleanField( - "Send Test Email", - default=False, - description="If enabled, we send a test mail on create / update", - ), - "test_email_recipients": StringField( - "Test Email Recipients", - default=None, - description="List of recipients to send test email to. " - "If empty, we send it to the original recipients", - ), - "test_slack_channel": StringField( - "Test Slack Channel", - default=None, - description="A slack channel to send a test message to.", - ), - } - - edit_form_extra_fields = add_form_extra_fields - - def process_form(self, form: Form, is_created: bool) -> None: - if form.test_email_recipients.data: - test_email_recipients = form.test_email_recipients.data.strip() - else: - test_email_recipients = None - - test_slack_channel = ( - form.test_slack_channel.data.strip() - if form.test_slack_channel.data - else None - ) - - self._extra_data["test_email"] = form.test_email.data - self._extra_data["test_email_recipients"] = test_email_recipients - self._extra_data["test_slack_channel"] = test_slack_channel - - def pre_add(self, item: "EmailScheduleView") -> None: - try: - recipients = get_email_address_list(item.recipients) - item.recipients = ", ".join(recipients) - except Exception as ex: - raise SupersetException("Invalid email list") from ex - - item.user = item.user or g.user - if not croniter.is_valid(item.crontab): - raise SupersetException("Invalid crontab format") - - def pre_update(self, item: "EmailScheduleView") -> None: - self.pre_add(item) - - def post_add(self, item: "EmailScheduleView") -> None: - # Schedule a test mail if the user requested for it. - if self._extra_data["test_email"]: - recipients = self._extra_data["test_email_recipients"] or item.recipients - slack_channel = self._extra_data["test_slack_channel"] or item.slack_channel - args = (self.schedule_type, item.id) - kwargs = dict(recipients=recipients, slack_channel=slack_channel) - schedule_email_report.apply_async(args=args, kwargs=kwargs) - - # Notify the user that schedule changes will be activate only in the - # next hour - if item.active: - flash("Schedule changes will get applied in one hour", "warning") - - def post_update(self, item: "EmailScheduleView") -> None: - self.post_add(item) - - @has_access - @expose("/fetch//", methods=["GET"]) - def fetch_schedules(self, item_id: int) -> FlaskResponse: - - query = db.session.query(self.datamodel.obj) - query = query.join(self.schedule_type_model).filter( - self.schedule_type_model.id == item_id - ) - - schedules = [] - for schedule in query.all(): - info = {"schedule": schedule.id} - - for col in self.list_columns + self.add_exclude_columns: - info[col] = getattr(schedule, col) - - if isinstance(info[col], enum.Enum): - info[col] = info[col].name - elif isinstance(info[col], security_manager.user_model): - info[col] = info[col].username - - info["user"] = schedule.user.username - info[self.schedule_type] = getattr(schedule, self.schedule_type).id - schedules.append(info) - - return json_success(json.dumps(schedules, default=json_iso_dttm_ser)) - - -class DashboardEmailScheduleView( - EmailScheduleView -): # pylint: disable=too-many-ancestors - schedule_type = ScheduleType.dashboard - schedule_type_model = Dashboard - - add_title = _("Schedule Email Reports for Dashboards") - edit_title = add_title - list_title = _("Manage Email Reports for Dashboards") - - datamodel = SQLAInterface(DashboardEmailSchedule) - order_columns = ["user", "dashboard", "created_on"] - - list_columns = [ - "dashboard", - "active", - "crontab", - "user", - "deliver_as_group", - "delivery_type", - ] - - add_columns = [ - "dashboard", - "active", - "crontab", - "recipients", - "slack_channel", - "deliver_as_group", - "delivery_type", - "test_email", - "test_email_recipients", - "test_slack_channel", - ] - - edit_columns = add_columns - - search_columns = [ - "dashboard", - "active", - "user", - "deliver_as_group", - "delivery_type", - ] - - label_columns = { - "dashboard": _("Dashboard"), - "created_on": _("Created On"), - "changed_on": _("Changed On"), - "user": _("User"), - "active": _("Active"), - "crontab": _("Crontab"), - "recipients": _("Recipients"), - "slack_channel": _("Slack Channel"), - "deliver_as_group": _("Deliver As Group"), - "delivery_type": _("Delivery Type"), - } - - @expose("/list/") - @has_access - def list(self) -> FlaskResponse: - flash( - Markup( - _( - "This feature is deprecated and will be removed on 2.0. " - "Take a look at the replacement feature " - "" - "Alerts & Reports documentation" - ) - ), - "warning", - ) - return super().list() - - def pre_add(self, item: "DashboardEmailScheduleView") -> None: - if item.dashboard is None: - raise SupersetException("Dashboard is mandatory") - super().pre_add(item) - - -class SliceEmailScheduleView(EmailScheduleView): # pylint: disable=too-many-ancestors - schedule_type = ScheduleType.slice - schedule_type_model = Slice - add_title = _("Schedule Email Reports for Charts") - edit_title = add_title - list_title = _("Manage Email Reports for Charts") - - datamodel = SQLAInterface(SliceEmailSchedule) - order_columns = ["user", "slice", "created_on"] - list_columns = [ - "slice", - "active", - "crontab", - "user", - "deliver_as_group", - "delivery_type", - "email_format", - ] - - add_columns = [ - "slice", - "active", - "crontab", - "recipients", - "slack_channel", - "deliver_as_group", - "delivery_type", - "email_format", - "test_email", - "test_email_recipients", - "test_slack_channel", - ] - - edit_columns = add_columns - - search_columns = [ - "slice", - "active", - "user", - "deliver_as_group", - "delivery_type", - "email_format", - ] - - label_columns = { - "slice": _("Chart"), - "created_on": _("Created On"), - "changed_on": _("Changed On"), - "user": _("User"), - "active": _("Active"), - "crontab": _("Crontab"), - "recipients": _("Recipients"), - "slack_channel": _("Slack Channel"), - "deliver_as_group": _("Deliver As Group"), - "delivery_type": _("Delivery Type"), - "email_format": _("Email Format"), - } - - @expose("/list/") - @has_access - def list(self) -> FlaskResponse: - flash( - Markup( - _( - "This feature is deprecated and will be removed on 2.0. " - "Take a look at the replacement feature " - "" - "Alerts & Reports documentation" - ) - ), - "warning", - ) - return super().list() - - def pre_add(self, item: "SliceEmailScheduleView") -> None: - if item.slice is None: - raise SupersetException("Slice is mandatory") - super().pre_add(item) diff --git a/tests/integration_tests/alerts_tests.py b/tests/integration_tests/alerts_tests.py deleted file mode 100644 index 50558741d85a..000000000000 --- a/tests/integration_tests/alerts_tests.py +++ /dev/null @@ -1,414 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -"""Unit tests for alerting in Superset""" -import json -import logging -from unittest.mock import patch - -import pytest -from sqlalchemy.orm import Session - -import superset.utils.database -from superset import db -from superset.exceptions import SupersetException -from superset.models.alerts import Alert, AlertLog, SQLObservation -from superset.models.slice import Slice -from superset.tasks.alerts.observer import observe -from superset.tasks.alerts.validator import ( - AlertValidatorType, - check_validator, - not_null_validator, - operator_validator, -) -from superset.tasks.schedules import ( - AlertState, - deliver_alert, - evaluate_alert, - validate_observations, -) -from superset.utils import core as utils -from superset.views.alerts import ( - AlertLogModelView, - AlertModelView, - AlertObservationModelView, -) -from tests.integration_tests.base_tests import SupersetTestCase -from tests.integration_tests.test_app import app -from tests.integration_tests.utils import read_fixture - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - - -@pytest.yield_fixture(scope="module") -def setup_database(): - with app.app_context(): - example_database = superset.utils.database.get_example_database() - example_database.get_sqla_engine().execute( - "CREATE TABLE test_table AS SELECT 1 as first, 2 as second" - ) - example_database.get_sqla_engine().execute( - "INSERT INTO test_table (first, second) VALUES (3, 4)" - ) - - yield db.session - - db.session.query(SQLObservation).delete() - db.session.query(AlertLog).delete() - db.session.query(Alert).delete() - db.session.commit() - example_database.get_sqla_engine().execute("DROP TABLE test_table") - - -def create_alert( - db_session: Session, - sql: str, - validator_type: AlertValidatorType = AlertValidatorType.OPERATOR, - validator_config: str = "", -) -> Alert: - db_session.commit() - alert = Alert( - label="test_alert", - active=True, - crontab="* * * * *", - slice_id=db_session.query(Slice).all()[0].id, - recipients="recipient1@superset.com", - slack_channel="#test_channel", - sql=sql, - database_id=superset.utils.database.get_example_database().id, - validator_type=validator_type, - validator_config=validator_config, - ) - db_session.add(alert) - db_session.commit() - return alert - - -@pytest.mark.parametrize( - "description, query, value", - [ - ("Test int SQL return", "SELECT 55", 55.0), - ("Test double SQL return", "SELECT 30.0 as wage", 30.0), - ("Test NULL result", "SELECT null as null_result", None), - ( - "Test empty SQL return", - "SELECT first FROM test_table WHERE first = -1", - None, - ), - ( - "Test multi line query", - """ - -- comment - SELECT - 1 -- comment - FROM test_table - WHERE first = 1 - """, - 1.0, - ), - ("Test jinja", "SELECT {{ 2 }}", 2.0), - ], -) -def test_alert_observer_no_error_msg(setup_database, description, query, value): - logger.info(description) - db_session = setup_database - alert = create_alert(db_session, query) - observe(alert.id, db_session) - if value is None: - assert alert.observations[-1].value is None - else: - assert alert.observations[-1].value == value - assert alert.observations[-1].error_msg is None - - -@pytest.mark.parametrize( - "description, query", - [ - ("Test str result", "SELECT 'test_string' as string_value"), - ("Test two row result", "SELECT first FROM test_table"), - ( - "Test two column result", - "SELECT first, second FROM test_table WHERE first = 1", - ), - ], -) -def test_alert_observer_error_msg(setup_database, description, query): - logger.info(description) - db_session = setup_database - alert = create_alert(db_session, query) - observe(alert.id, db_session) - assert alert.observations[-1].value is None - assert alert.observations[-1].error_msg is not None - - -@patch("superset.tasks.schedules.deliver_alert") -def test_evaluate_alert(mock_deliver_alert, setup_database): - db_session = setup_database - - # Test error with Observer SQL statement - alert1 = create_alert(db_session, "$%^&") - evaluate_alert(alert1.id, alert1.label, db_session) - assert alert1.logs[-1].state == AlertState.ERROR - - # Test pass on alert lacking validator config - alert2 = create_alert(db_session, "SELECT 55") - # evaluation fails if config is malformed - with pytest.raises(json.decoder.JSONDecodeError): - evaluate_alert(alert2.id, alert2.label, db_session) - assert not alert2.logs - - # Test triggering successful alert - alert3 = create_alert(db_session, "SELECT 55", "not null", "{}") - evaluate_alert(alert3.id, alert3.label, db_session) - assert mock_deliver_alert.call_count == 1 - assert alert3.logs[-1].state == AlertState.TRIGGER - - -@pytest.mark.parametrize( - "description, validator_type, config", - [ - ("Test with invalid operator type", "greater than", "{}"), - ("Test with empty config", "operator", "{}"), - ("Test with invalid operator", "operator", '{"op": "is", "threshold":50.0}'), - ( - "Test with invalid threshold", - "operator", - '{"op": "is", "threshold":"hello"}', - ), - ], -) -def test_check_validator_error(description, validator_type, config): - logger.info(description) - with pytest.raises(SupersetException): - check_validator(validator_type, config) - - -@pytest.mark.parametrize( - "description, validator_type, config", - [ - ( - "Test with float threshold and no errors", - "operator", - '{"op": ">=", "threshold": 50.0}', - ), - ( - "Test with int threshold and no errors", - "operator", - '{"op": ">=", "threshold": 50}', - ), - ], -) -def test_check_validator_no_error(description, validator_type, config): - logger.info(description) - assert check_validator(validator_type, config) is None - - -@pytest.mark.parametrize( - "description, query, value", - [ - ("Test passing with 'null' SQL result", "SELECT 0", False), - ( - "Test passing with empty SQL result", - "SELECT first FROM test_table WHERE first = -1", - False, - ), - ("Test triggering alert with non-null SQL result", "SELECT 55", True), - ], -) -def test_not_null_validator(setup_database, description, query, value): - logger.info(description) - db_session = setup_database - alert = create_alert(db_session, query) - observe(alert.id, db_session) - assert not_null_validator(alert, "{}") is value - - -def test_operator_validator(setup_database): - dbsession = setup_database - - # Test passing with empty SQL result - alert1 = create_alert(dbsession, "SELECT first FROM test_table WHERE first = -1") - observe(alert1.id, dbsession) - assert operator_validator(alert1, '{"op": ">=", "threshold": 60}') is False - # ensure that 0 threshold works - assert operator_validator(alert1, '{"op": ">=", "threshold": 0}') is False - - # Test passing with result that doesn't pass a greater than threshold - alert2 = create_alert(dbsession, "SELECT 55") - observe(alert2.id, dbsession) - assert operator_validator(alert2, '{"op": ">=", "threshold": 60}') is False - - # Test passing with result that passes a greater than threshold - assert operator_validator(alert2, '{"op": ">=", "threshold": 40}') is True - - # Test passing with result that doesn't pass a less than threshold - assert operator_validator(alert2, '{"op": "<=", "threshold": 40}') is False - - # Test passing with result that passes threshold - assert operator_validator(alert2, '{"op": "<=", "threshold": 60}') is True - - # Test passing with result that doesn't equal threshold - assert operator_validator(alert2, '{"op": "==", "threshold": 60}') is False - - # Test passing with result that equals threshold - assert operator_validator(alert2, '{"op": "==", "threshold": 55}') is True - - # Test passing with result that equals decimal threshold - assert operator_validator(alert2, '{"op": ">", "threshold": 54.999}') is True - - -@pytest.mark.parametrize( - "description, query, validator_type, config", - [ - ("Test False on alert with no validator", "SELECT 55", "operator", ""), - ("Test False on alert with no observations", "SELECT 0", "not null", "{}"), - ], -) -def test_validate_observations_no_observe( - setup_database, description, query, validator_type, config -): - db_session = setup_database - logger.info(description) - - alert = create_alert(db_session, query, validator_type, config) - assert validate_observations(alert.id, alert.label, db_session) is False - - -@pytest.mark.parametrize( - "description, query, validator_type, config, expected", - [ - ( - "Test False on alert that should not be triggered", - "SELECT 0", - "not null", - "{}", - False, - ), - ( - "Test True on alert that should be triggered", - "SELECT 55", - "operator", - '{"op": "<=", "threshold": 60}', - True, - ), - ], -) -def test_validate_observations_with_observe( - setup_database, description, query, validator_type, config, expected -): - db_session = setup_database - logger.info(description) - - alert = create_alert(db_session, query, validator_type, config) - observe(alert.id, db_session) - assert validate_observations(alert.id, alert.label, db_session) is expected - - -def test_validate_observations(setup_database): - db_session = setup_database - - # Test False on alert that shouldnt be triggered - alert3 = create_alert(db_session, "SELECT 0", "not null", "{}") - observe(alert3.id, db_session) - assert validate_observations(alert3.id, alert3.label, db_session) is False - - # Test True on alert that should be triggered - alert4 = create_alert( - db_session, "SELECT 55", "operator", '{"op": "<=", "threshold": 60}' - ) - observe(alert4.id, db_session) - assert validate_observations(alert4.id, alert4.label, db_session) is True - - -@patch("superset.tasks.slack_util.WebClient.files_upload") -@patch("superset.tasks.schedules.send_email_smtp") -@patch("superset.tasks.schedules._get_url_path") -@patch("superset.utils.screenshots.ChartScreenshot.compute_and_cache") -def test_deliver_alert_screenshot( - screenshot_mock, url_mock, email_mock, file_upload_mock, setup_database -): - dbsession = setup_database - alert = create_alert(dbsession, "SELECT 55", "not null", "{}") - observe(alert.id, dbsession) - - screenshot = read_fixture("sample.png") - screenshot_mock.return_value = screenshot - - # TODO: fix AlertModelView.show url call from test - url_mock.side_effect = [ - f"http://0.0.0.0:8080/alerts/show/{alert.id}", - f"http://0.0.0.0:8080/superset/slice/{alert.slice_id}/", - ] - - deliver_alert(alert.id, dbsession) - assert email_mock.call_args[1]["images"]["screenshot"] == screenshot - assert file_upload_mock.call_args[1] == { - "channels": alert.slack_channel, - "file": screenshot, - "initial_comment": f"\n*Triggered Alert: {alert.label} :redalert:*\n" - f"*Query*:```{alert.sql}```\n" - f"*Result*: {alert.observations[-1].value}\n" - f"*Reason*: {alert.observations[-1].value} {alert.pretty_config}\n" - f"\n", - "title": f"[Alert] {alert.label}", - } - - -class TestAlertsEndpoints(SupersetTestCase): - def test_log_model_view_disabled(self): - with patch.object(AlertLogModelView, "is_enabled", return_value=False): - self.login("admin") - uri = "/alertlogmodelview/list/" - rv = self.client.get(uri) - self.assertEqual(rv.status_code, 404) - - def test_log_model_view_enabled(self): - with patch.object(AlertLogModelView, "is_enabled", return_value=True): - self.login("admin") - uri = "/alertlogmodelview/list/" - rv = self.client.get(uri) - self.assertLess(rv.status_code, 400) - - def test_model_view_disabled(self): - with patch.object(AlertModelView, "is_enabled", return_value=False): - self.login("admin") - uri = "/alerts/list/" - rv = self.client.get(uri) - self.assertEqual(rv.status_code, 404) - - def test_model_view_enabled(self): - with patch.object(AlertModelView, "is_enabled", return_value=True): - self.login("admin") - uri = "/alerts/list/" - rv = self.client.get(uri) - self.assertNotEqual(rv.status_code, 404) - - def test_observation_view_disabled(self): - with patch.object(AlertObservationModelView, "is_enabled", return_value=False): - self.login("admin") - uri = "/alertobservationmodelview/list/" - rv = self.client.get(uri) - self.assertEqual(rv.status_code, 404) - - def test_observation_view_enabled(self): - with patch.object(AlertObservationModelView, "is_enabled", return_value=True): - self.login("admin") - uri = "/alertobservationmodelview/list/" - rv = self.client.get(uri) - self.assertLess(rv.status_code, 400) diff --git a/tests/integration_tests/schedules_test.py b/tests/integration_tests/schedules_test.py deleted file mode 100644 index b5cfe716051a..000000000000 --- a/tests/integration_tests/schedules_test.py +++ /dev/null @@ -1,596 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# isort:skip_file -from datetime import datetime, timedelta -from superset.views.schedules import DashboardEmailScheduleView, SliceEmailScheduleView -from unittest.mock import Mock, patch, PropertyMock - -from flask_babel import gettext as __ -import pytest -from selenium.common.exceptions import WebDriverException -from slack import errors, WebClient - -from tests.integration_tests.fixtures.world_bank_dashboard import ( - load_world_bank_dashboard_with_slices, - load_world_bank_data, -) -from tests.integration_tests.test_app import app -from superset import db -from superset.models.dashboard import Dashboard -from superset.models.schedules import ( - DashboardEmailSchedule, - EmailDeliveryType, - SliceEmailReportFormat, - SliceEmailSchedule, -) -from superset.tasks.schedules import ( - create_webdriver, - deliver_dashboard, - deliver_slice, - next_schedules, -) -from superset.models.slice import Slice -from tests.integration_tests.base_tests import SupersetTestCase -from tests.integration_tests.utils import read_fixture - - -class TestSchedules(SupersetTestCase): - - RECIPIENTS = "recipient1@superset.com, recipient2@superset.com" - BCC = "bcc@superset.com" - CSV = read_fixture("trends.csv") - - @pytest.fixture() - def add_schedule_slice_and_dashboard(self): - with app.app_context(): - self.common_data = dict( - active=True, - crontab="* * * * *", - recipients=self.RECIPIENTS, - deliver_as_group=True, - delivery_type=EmailDeliveryType.inline, - ) - # Pick up a sample slice and dashboard - slice = db.session.query(Slice).filter_by(slice_name="Region Filter").one() - dashboard = ( - db.session.query(Dashboard) - .filter_by(dashboard_title="World Bank's Data") - .one() - ) - - dashboard_schedule = DashboardEmailSchedule(**self.common_data) - dashboard_schedule.dashboard_id = dashboard.id - dashboard_schedule.user_id = 1 - db.session.add(dashboard_schedule) - - slice_schedule = SliceEmailSchedule(**self.common_data) - slice_schedule.slice_id = slice.id - slice_schedule.user_id = 1 - slice_schedule.email_format = SliceEmailReportFormat.data - slice_schedule.slack_channel = "#test_channel" - - db.session.add(slice_schedule) - db.session.commit() - - self.slice_schedule = slice_schedule.id - self.dashboard_schedule = dashboard_schedule.id - - yield - - with app.app_context(): - db.session.query(SliceEmailSchedule).filter_by( - id=self.slice_schedule - ).delete() - db.session.query(DashboardEmailSchedule).filter_by( - id=self.dashboard_schedule - ).delete() - db.session.commit() - - def test_crontab_scheduler(self): - crontab = "* * * * *" - - start_at = datetime.now().replace(microsecond=0, second=0, minute=0) - stop_at = start_at + timedelta(seconds=3600) - - # Fire off the task every minute - schedules = list(next_schedules(crontab, start_at, stop_at, resolution=0)) - - self.assertEqual(schedules[0], start_at) - self.assertEqual(schedules[-1], stop_at - timedelta(seconds=60)) - self.assertEqual(len(schedules), 60) - - # Fire off the task every 10 minutes, controlled via resolution - schedules = list(next_schedules(crontab, start_at, stop_at, resolution=10 * 60)) - - self.assertEqual(schedules[0], start_at) - self.assertEqual(schedules[-1], stop_at - timedelta(seconds=10 * 60)) - self.assertEqual(len(schedules), 6) - - # Fire off the task every 12 minutes, controlled via resolution - schedules = list(next_schedules(crontab, start_at, stop_at, resolution=12 * 60)) - - self.assertEqual(schedules[0], start_at) - self.assertEqual(schedules[-1], stop_at - timedelta(seconds=12 * 60)) - self.assertEqual(len(schedules), 5) - - def test_wider_schedules(self): - crontab = "*/15 2,10 * * *" - - for hour in range(0, 24): - start_at = datetime.now().replace( - microsecond=0, second=0, minute=0, hour=hour - ) - stop_at = start_at + timedelta(seconds=3600) - schedules = list(next_schedules(crontab, start_at, stop_at, resolution=0)) - - if hour in (2, 10): - self.assertEqual(len(schedules), 4) - else: - self.assertEqual(len(schedules), 0) - - def test_complex_schedule(self): - # Run the job on every Friday of March and May - # On these days, run the job at - # 5:10 pm - # 5:11 pm - # 5:12 pm - # 5:13 pm - # 5:14 pm - # 5:15 pm - # 5:25 pm - # 5:28 pm - # 5:31 pm - # 5:34 pm - # 5:37 pm - # 5:40 pm - crontab = "10-15,25-40/3 17 * 3,5 5" - start_at = datetime.strptime("2018/01/01", "%Y/%m/%d") - stop_at = datetime.strptime("2018/12/31", "%Y/%m/%d") - - schedules = list(next_schedules(crontab, start_at, stop_at, resolution=60)) - self.assertEqual(len(schedules), 108) - fmt = "%Y-%m-%d %H:%M:%S" - self.assertEqual(schedules[0], datetime.strptime("2018-03-02 17:10:00", fmt)) - self.assertEqual(schedules[-1], datetime.strptime("2018-05-25 17:40:00", fmt)) - self.assertEqual(schedules[59], datetime.strptime("2018-03-30 17:40:00", fmt)) - self.assertEqual(schedules[60], datetime.strptime("2018-05-04 17:10:00", fmt)) - - @patch("superset.tasks.schedules.firefox.webdriver.WebDriver") - def test_create_driver(self, mock_driver_class): - mock_driver = Mock() - mock_driver_class.return_value = mock_driver - mock_driver.find_elements_by_id.side_effect = [True, False] - - create_webdriver(db.session) - mock_driver.add_cookie.assert_called_once() - - @pytest.mark.usefixtures( - "load_world_bank_dashboard_with_slices", "add_schedule_slice_and_dashboard" - ) - @patch("superset.tasks.schedules.firefox.webdriver.WebDriver") - @patch("superset.tasks.schedules.send_email_smtp") - @patch("superset.tasks.schedules.time") - def test_deliver_dashboard_inline(self, mtime, send_email_smtp, driver_class): - element = Mock() - driver = Mock() - mtime.sleep.return_value = None - - driver_class.return_value = driver - - # Ensure that we are able to login with the driver - driver.find_elements_by_id.side_effect = [True, False] - driver.find_element_by_class_name.return_value = element - element.screenshot_as_png = read_fixture("sample.png") - - schedule = ( - db.session.query(DashboardEmailSchedule) - .filter_by(id=self.dashboard_schedule) - .one() - ) - - deliver_dashboard( - schedule.dashboard_id, - schedule.recipients, - schedule.slack_channel, - schedule.delivery_type, - schedule.deliver_as_group, - ) - - mtime.sleep.assert_called_once() - driver.screenshot.assert_not_called() - send_email_smtp.assert_called_once() - - @pytest.mark.usefixtures( - "load_world_bank_dashboard_with_slices", "add_schedule_slice_and_dashboard" - ) - @patch("superset.tasks.schedules.firefox.webdriver.WebDriver") - @patch("superset.tasks.schedules.send_email_smtp") - @patch("superset.tasks.schedules.time") - def test_deliver_dashboard_as_attachment( - self, mtime, send_email_smtp, driver_class - ): - element = Mock() - driver = Mock() - mtime.sleep.return_value = None - - driver_class.return_value = driver - - # Ensure that we are able to login with the driver - driver.find_elements_by_id.side_effect = [True, False] - driver.find_element_by_id.return_value = element - driver.find_element_by_class_name.return_value = element - element.screenshot_as_png = read_fixture("sample.png") - - schedule = ( - db.session.query(DashboardEmailSchedule) - .filter_by(id=self.dashboard_schedule) - .one() - ) - - schedule.delivery_type = EmailDeliveryType.attachment - - deliver_dashboard( - schedule.dashboard_id, - schedule.recipients, - schedule.slack_channel, - schedule.delivery_type, - schedule.deliver_as_group, - ) - - mtime.sleep.assert_called_once() - driver.screenshot.assert_not_called() - send_email_smtp.assert_called_once() - self.assertIsNone(send_email_smtp.call_args[1]["images"]) - self.assertEqual( - send_email_smtp.call_args[1]["data"]["screenshot"], - element.screenshot_as_png, - ) - - @pytest.mark.usefixtures( - "load_world_bank_dashboard_with_slices", "add_schedule_slice_and_dashboard" - ) - @patch("superset.tasks.schedules.firefox.webdriver.WebDriver") - @patch("superset.tasks.schedules.send_email_smtp") - @patch("superset.tasks.schedules.time") - def test_dashboard_chrome_like(self, mtime, send_email_smtp, driver_class): - # Test functionality for chrome driver which does not support - # element snapshots - element = Mock() - driver = Mock() - mtime.sleep.return_value = None - type(element).screenshot_as_png = PropertyMock(side_effect=WebDriverException) - - driver_class.return_value = driver - - # Ensure that we are able to login with the driver - driver.find_elements_by_id.side_effect = [True, False] - driver.find_element_by_id.return_value = element - driver.find_element_by_class_name.return_value = element - driver.screenshot.return_value = read_fixture("sample.png") - - schedule = ( - db.session.query(DashboardEmailSchedule) - .filter_by(id=self.dashboard_schedule) - .one() - ) - - deliver_dashboard( - schedule.dashboard_id, - schedule.recipients, - schedule.slack_channel, - schedule.delivery_type, - schedule.deliver_as_group, - ) - - mtime.sleep.assert_called_once() - driver.screenshot.assert_called_once() - send_email_smtp.assert_called_once() - - self.assertEqual(send_email_smtp.call_args[0][0], self.RECIPIENTS) - self.assertEqual( - list(send_email_smtp.call_args[1]["images"].values())[0], - driver.screenshot.return_value, - ) - - @pytest.mark.usefixtures( - "load_world_bank_dashboard_with_slices", "add_schedule_slice_and_dashboard" - ) - @patch("superset.tasks.schedules.firefox.webdriver.WebDriver") - @patch("superset.tasks.schedules.send_email_smtp") - @patch("superset.tasks.schedules.time") - def test_deliver_email_options(self, mtime, send_email_smtp, driver_class): - element = Mock() - driver = Mock() - mtime.sleep.return_value = None - - driver_class.return_value = driver - - # Ensure that we are able to login with the driver - driver.find_elements_by_id.side_effect = [True, False] - driver.find_element_by_class_name.return_value = element - element.screenshot_as_png = read_fixture("sample.png") - - schedule = ( - db.session.query(DashboardEmailSchedule) - .filter_by(id=self.dashboard_schedule) - .one() - ) - - # Send individual mails to the group - schedule.deliver_as_group = False - - # Set a bcc email address - app.config["EMAIL_REPORT_BCC_ADDRESS"] = self.BCC - - deliver_dashboard( - schedule.dashboard_id, - schedule.recipients, - schedule.slack_channel, - schedule.delivery_type, - schedule.deliver_as_group, - ) - - mtime.sleep.assert_called_once() - driver.screenshot.assert_not_called() - - self.assertEqual(send_email_smtp.call_count, 2) - self.assertEqual(send_email_smtp.call_args[1]["bcc"], self.BCC) - - @pytest.mark.usefixtures( - "load_world_bank_dashboard_with_slices", "add_schedule_slice_and_dashboard" - ) - @patch("superset.tasks.slack_util.WebClient.files_upload") - @patch("superset.tasks.schedules.firefox.webdriver.WebDriver") - @patch("superset.tasks.schedules.send_email_smtp") - @patch("superset.tasks.schedules.time") - def test_deliver_slice_inline_image( - self, mtime, send_email_smtp, driver_class, files_upload - ): - element = Mock() - driver = Mock() - mtime.sleep.return_value = None - - driver_class.return_value = driver - - # Ensure that we are able to login with the driver - driver.find_elements_by_id.side_effect = [True, False] - driver.find_element_by_class_name.return_value = element - element.screenshot_as_png = read_fixture("sample.png") - - schedule = ( - db.session.query(SliceEmailSchedule).filter_by(id=self.slice_schedule).one() - ) - - schedule.email_format = SliceEmailReportFormat.visualization - schedule.delivery_format = EmailDeliveryType.inline - - deliver_slice( - schedule.slice_id, - schedule.recipients, - schedule.slack_channel, - schedule.delivery_type, - schedule.email_format, - schedule.deliver_as_group, - db.session, - ) - mtime.sleep.assert_called_once() - driver.screenshot.assert_not_called() - send_email_smtp.assert_called_once() - - self.assertEqual( - list(send_email_smtp.call_args[1]["images"].values())[0], - element.screenshot_as_png, - ) - - self.assertEqual( - files_upload.call_args[1], - { - "channels": "#test_channel", - "file": element.screenshot_as_png, - "initial_comment": f"\n *Region Filter*\n\n \n ", - "title": "[Report] Region Filter", - }, - ) - - @pytest.mark.usefixtures( - "load_world_bank_dashboard_with_slices", "add_schedule_slice_and_dashboard" - ) - @patch("superset.tasks.slack_util.WebClient.files_upload") - @patch("superset.tasks.schedules.firefox.webdriver.WebDriver") - @patch("superset.tasks.schedules.send_email_smtp") - @patch("superset.tasks.schedules.time") - def test_deliver_slice_attachment( - self, mtime, send_email_smtp, driver_class, files_upload - ): - element = Mock() - driver = Mock() - mtime.sleep.return_value = None - - driver_class.return_value = driver - - # Ensure that we are able to login with the driver - driver.find_elements_by_id.side_effect = [True, False] - driver.find_element_by_class_name.return_value = element - element.screenshot_as_png = read_fixture("sample.png") - - schedule = ( - db.session.query(SliceEmailSchedule).filter_by(id=self.slice_schedule).one() - ) - - schedule.email_format = SliceEmailReportFormat.visualization - schedule.delivery_type = EmailDeliveryType.attachment - - deliver_slice( - schedule.slice_id, - schedule.recipients, - schedule.slack_channel, - schedule.delivery_type, - schedule.email_format, - schedule.deliver_as_group, - db.session, - ) - - mtime.sleep.assert_called_once() - driver.screenshot.assert_not_called() - send_email_smtp.assert_called_once() - - self.assertEqual( - send_email_smtp.call_args[1]["data"]["screenshot"], - element.screenshot_as_png, - ) - - self.assertEqual( - files_upload.call_args[1], - { - "channels": "#test_channel", - "file": element.screenshot_as_png, - "initial_comment": f"\n *Region Filter*\n\n \n ", - "title": "[Report] Region Filter", - }, - ) - - @pytest.mark.usefixtures( - "load_world_bank_dashboard_with_slices", "add_schedule_slice_and_dashboard" - ) - @patch("superset.tasks.slack_util.WebClient.files_upload") - @patch("superset.tasks.schedules.urllib.request.OpenerDirector.open") - @patch("superset.tasks.schedules.urllib.request.urlopen") - @patch("superset.tasks.schedules.send_email_smtp") - def test_deliver_slice_csv_attachment( - self, send_email_smtp, mock_open, mock_urlopen, files_upload - ): - response = Mock() - mock_open.return_value = response - mock_urlopen.return_value = response - mock_urlopen.return_value.getcode.return_value = 200 - response.read.return_value = self.CSV - - schedule = ( - db.session.query(SliceEmailSchedule).filter_by(id=self.slice_schedule).one() - ) - - schedule.email_format = SliceEmailReportFormat.data - schedule.delivery_type = EmailDeliveryType.attachment - - deliver_slice( - schedule.slice_id, - schedule.recipients, - schedule.slack_channel, - schedule.delivery_type, - schedule.email_format, - schedule.deliver_as_group, - db.session, - ) - - send_email_smtp.assert_called_once() - - file_name = __("%(name)s.csv", name=schedule.slice.slice_name) - - self.assertEqual(send_email_smtp.call_args[1]["data"][file_name], self.CSV) - - self.assertEqual( - files_upload.call_args[1], - { - "channels": "#test_channel", - "file": self.CSV, - "initial_comment": f"\n *Region Filter*\n\n \n ", - "title": "[Report] Region Filter", - }, - ) - - @pytest.mark.usefixtures( - "load_world_bank_dashboard_with_slices", "add_schedule_slice_and_dashboard" - ) - @patch("superset.tasks.slack_util.WebClient.files_upload") - @patch("superset.tasks.schedules.urllib.request.urlopen") - @patch("superset.tasks.schedules.urllib.request.OpenerDirector.open") - @patch("superset.tasks.schedules.send_email_smtp") - def test_deliver_slice_csv_inline( - self, send_email_smtp, mock_open, mock_urlopen, files_upload - ): - response = Mock() - mock_open.return_value = response - mock_urlopen.return_value = response - mock_urlopen.return_value.getcode.return_value = 200 - response.read.return_value = self.CSV - schedule = ( - db.session.query(SliceEmailSchedule).filter_by(id=self.slice_schedule).one() - ) - - schedule.email_format = SliceEmailReportFormat.data - schedule.delivery_type = EmailDeliveryType.inline - - deliver_slice( - schedule.slice_id, - schedule.recipients, - schedule.slack_channel, - schedule.delivery_type, - schedule.email_format, - schedule.deliver_as_group, - db.session, - ) - - send_email_smtp.assert_called_once() - - self.assertIsNone(send_email_smtp.call_args[1]["data"]) - self.assertTrue("\n ", - "title": "[Report] Region Filter", - }, - ) - - def test_dashboard_disabled(self): - with patch.object(DashboardEmailScheduleView, "is_enabled", return_value=False): - self.login("admin") - uri = "/dashboardemailscheduleview/list/" - rv = self.client.get(uri) - self.assertEqual(rv.status_code, 404) - - def test_dashboard_enabled(self): - with patch.object(DashboardEmailScheduleView, "is_enabled", return_value=True): - self.login("admin") - uri = "/dashboardemailscheduleview/list/" - rv = self.client.get(uri) - self.assertLess(rv.status_code, 400) - - def test_slice_disabled(self): - with patch.object(SliceEmailScheduleView, "is_enabled", return_value=False): - self.login("admin") - uri = "/sliceemailscheduleview/list/" - rv = self.client.get(uri) - self.assertEqual(rv.status_code, 404) - - def test_slice_enabled(self): - with patch.object(SliceEmailScheduleView, "is_enabled", return_value=True): - self.login("admin") - uri = "/sliceemailscheduleview/list/" - rv = self.client.get(uri) - self.assertLess(rv.status_code, 400) - - -def test_slack_client_compatibility(): - c2 = WebClient() - # slackclient >2.5.0 raises TypeError: a bytes-like object is required, not 'str - # and requires to path a filepath instead of the bytes directly - with pytest.raises(errors.SlackApiError): - c2.files_upload(channels="#bogdan-test2", file=b"blabla", title="Test upload")