Skip to content

Commit

Permalink
feat: add event and interval annotation support to chart data ep
Browse files Browse the repository at this point in the history
  • Loading branch information
villebro committed Nov 23, 2020
1 parent db73869 commit d48f5d4
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 19 deletions.
5 changes: 4 additions & 1 deletion superset-frontend/src/chart/chartAction.js
Expand Up @@ -410,7 +410,10 @@ export function exploreJSON(
});
});

const annotationLayers = formData.annotation_layers || [];
// only retrieve annotations when calling the legacy API
const annotationLayers = shouldUseLegacyApi(formData)
? formData.annotation_layers || []
: [];
const isDashboardRequest = dashboardId > 0;

return Promise.all([
Expand Down
99 changes: 90 additions & 9 deletions superset/common/query_context.py
Expand Up @@ -25,14 +25,17 @@
from flask_babel import gettext as _

from superset import app, db, is_feature_enabled
from superset.annotation_layers.dao import AnnotationLayerDAO
from superset.common.query_object import QueryObject
from superset.connectors.base.models import BaseDatasource
from superset.connectors.connector_registry import ConnectorRegistry
from superset.exceptions import QueryObjectValidationError
from superset.exceptions import QueryObjectValidationError, SupersetException
from superset.extensions import cache_manager, security_manager
from superset.models.slice import Slice
from superset.stats_logger import BaseStatsLogger
from superset.utils import core as utils
from superset.utils.core import DTTM_ALIAS
from superset.views.utils import get_viz
from superset.viz import set_and_log_cache

config = app.config
Expand Down Expand Up @@ -157,8 +160,7 @@ def get_single_payload(self, query_obj: QueryObject) -> Dict[str, Any]:
query_obj.row_offset = 0
query_obj.columns = [o.column_name for o in self.datasource.columns]
payload = self.get_df_payload(query_obj)
# TODO: implement
payload["annotation_data"] = []

df = payload["df"]
status = payload["status"]
if status != utils.QueryStatus.FAILED:
Expand Down Expand Up @@ -220,6 +222,79 @@ def cache_key(self, query_obj: QueryObject, **kwargs: Any) -> Optional[str]:
)
return cache_key

@staticmethod
def get_native_annotation_data(query_obj: QueryObject) -> Dict[str, Any]:
annotation_data = {}
annotation_layers = [
layer
for layer in query_obj.annotation_layers
if layer["sourceType"] == "NATIVE"
]
layer_ids = [layer["value"] for layer in annotation_layers]
layer_objects = {
layer_object.id: layer_object
for layer_object in AnnotationLayerDAO.find_by_ids(layer_ids)
}

# annotations
for layer in annotation_layers:
layer_id = layer["value"]
layer_name = layer["name"]
columns = [
"start_dttm",
"end_dttm",
"short_descr",
"long_descr",
"json_metadata",
]
layer_object = layer_objects[layer_id]
records = [
{column: getattr(annotation, column) for column in columns}
for annotation in layer_object.annotation
]
result = {"columns": columns, "records": records}
annotation_data[layer_name] = result
return annotation_data

@staticmethod
def get_viz_annotation_data(
annotation_layer: Dict[str, Any], force: bool
) -> Dict[str, Any]:
slice_id = annotation_layer["value"]
slc = db.session.query(Slice).filter_by(id=slice_id).one_or_none()
form_data = slc.form_data.copy()
if not slc:
raise QueryObjectValidationError("The slice does not exist")
try:
viz_obj = get_viz(
datasource_type=slc.datasource.type,
datasource_id=slc.datasource.id,
form_data=form_data,
force=force,
)
payload = viz_obj.get_payload()
return payload["data"]
except SupersetException as ex:
raise QueryObjectValidationError(utils.error_msg_from_exception(ex))

def get_annotation_data(self, query_obj: QueryObject) -> Dict[str, Any]:
"""
:param query_obj:
:return:
"""
annotation_data: Dict[str, Any] = self.get_native_annotation_data(query_obj)
for annotation_layer in [
layer
for layer in query_obj.annotation_layers
if layer["sourceType"] in ("line", "table")
]:
name = annotation_layer["name"]
annotation_data[name] = self.get_viz_annotation_data(
annotation_layer, self.force
)
return annotation_data

def get_df_payload( # pylint: disable=too-many-statements
self, query_obj: QueryObject, **kwargs: Any
) -> Dict[str, Any]:
Expand All @@ -233,6 +308,7 @@ def get_df_payload( # pylint: disable=too-many-statements
cache_value = None
status = None
query = ""
annotation_data = {}
error_message = None
if cache_key and cache_manager.data_cache and not self.force:
cache_value = cache_manager.data_cache.get(cache_key)
Expand All @@ -241,6 +317,7 @@ def get_df_payload( # pylint: disable=too-many-statements
try:
df = cache_value["df"]
query = cache_value["query"]
annotation_data = cache_value.get("annotation_data", {})
status = utils.QueryStatus.SUCCESS
is_loaded = True
stats_logger.incr("loaded_from_cache")
Expand Down Expand Up @@ -272,6 +349,8 @@ def get_df_payload( # pylint: disable=too-many-statements
query = query_result["query"]
error_message = query_result["error_message"]
df = query_result["df"]
annotation_data = self.get_annotation_data(query_obj)

if status != utils.QueryStatus.FAILED:
stats_logger.incr("loaded_from_source")
if not self.force:
Expand All @@ -289,18 +368,20 @@ def get_df_payload( # pylint: disable=too-many-statements

if is_loaded and cache_key and status != utils.QueryStatus.FAILED:
set_and_log_cache(
cache_key,
df,
query,
cached_dttm,
self.cache_timeout,
self.datasource.uid,
cache_key=cache_key,
df=df,
query=query,
annotation_data=annotation_data,
cached_dttm=cached_dttm,
cache_timeout=self.cache_timeout,
datasource_uid=self.datasource.uid,
)
return {
"cache_key": cache_key,
"cached_dttm": cache_value["dttm"] if cache_value is not None else None,
"cache_timeout": self.cache_timeout,
"df": df,
"annotation_data": annotation_data,
"error": error_message,
"is_cached": cache_value is not None,
"query": query,
Expand Down
30 changes: 28 additions & 2 deletions superset/common/query_object.py
Expand Up @@ -106,7 +106,12 @@ def __init__(
metrics = metrics or []
extras = extras or {}
is_sip_38 = is_feature_enabled("SIP_38_VIZ_REARCHITECTURE")
self.annotation_layers = annotation_layers
self.annotation_layers = [
layer
for layer in annotation_layers
# formula annotations don't affect the payload, hence can be dropped
if layer["annotationType"] != "FORMULA"
]
self.applied_time_extras = applied_time_extras or {}
self.granularity = granularity
self.from_dttm, self.to_dttm = utils.get_since_until(
Expand Down Expand Up @@ -236,10 +241,31 @@ def cache_key(self, **extra: Any) -> str:
cache_dict["time_range"] = self.time_range
if self.post_processing:
cache_dict["post_processing"] = self.post_processing

annotation_fields = [
"annotationType",
"descriptionColumns",
"intervalEndColumn",
"name",
"overrides",
"sourceType",
"timeColumn",
"titleColumn",
"value",
]
annotation_layers = [
{field: layer[field] for field in annotation_fields if field in layer}
for layer in self.annotation_layers
]
# only add to key if there are annotations present that affect the payload
if annotation_layers:
cache_dict["annotation_layers"] = annotation_layers

json_data = self.json_dumps(cache_dict, sort_keys=True)
return hashlib.md5(json_data.encode("utf-8")).hexdigest()

def json_dumps(self, obj: Any, sort_keys: bool = False) -> str:
@staticmethod
def json_dumps(obj: Any, sort_keys: bool = False) -> str:
return json.dumps(
obj, default=utils.json_int_dttm_ser, ignore_nan=True, sort_keys=sort_keys
)
Expand Down
17 changes: 10 additions & 7 deletions superset/viz.py
Expand Up @@ -104,9 +104,12 @@ def set_and_log_cache(
cached_dttm: str,
cache_timeout: int,
datasource_uid: Optional[str],
annotation_data: Optional[Dict[str, Any]] = None,
) -> None:
try:
cache_value = dict(dttm=cached_dttm, df=df, query=query)
cache_value = dict(
dttm=cached_dttm, df=df, query=query, annotation_data=annotation_data or {}
)
stats_logger.incr("set_cache_key")
cache_manager.data_cache.set(cache_key, cache_value, timeout=cache_timeout)

Expand Down Expand Up @@ -587,12 +590,12 @@ def get_df_payload(

if is_loaded and cache_key and self.status != utils.QueryStatus.FAILED:
set_and_log_cache(
cache_key,
df,
self.query,
cached_dttm,
self.cache_timeout,
self.datasource.uid,
cache_key=cache_key,
df=df,
query=self.query,
cached_dttm=cached_dttm,
cache_timeout=self.cache_timeout,
datasource_uid=self.datasource.uid,
)
return {
"cache_key": self._any_cache_key,
Expand Down

0 comments on commit d48f5d4

Please sign in to comment.