Skip to content

Commit

Permalink
deprecate groupby controls in query_obj (#9366)
Browse files Browse the repository at this point in the history
* Deprecate groupby from query_obj

* Fix query_object bug

* Fix histogram

* Remove groupby from legacy druid connector and fix first batch of unit tests

* Deprecate some unnecessary tests + fix a few others

* Address comments

* hide SIP-38 changes behind feature flag

* Break out further SIP-38 related tests

* Reslove test errors

* Add feature flag to QueryContext

* Resolve tests and bad rebase

* Backport recent changes from viz.py and fix broken DeckGL charts

* Fix bad rebase

* backport #9522 and address comments
  • Loading branch information
villebro committed Apr 14, 2020
1 parent c1f8c9e commit 9447381
Show file tree
Hide file tree
Showing 9 changed files with 4,110 additions and 41 deletions.
20 changes: 17 additions & 3 deletions superset/common/query_object.py
Expand Up @@ -16,18 +16,21 @@
# under the License.
# pylint: disable=R
import hashlib
import logging
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Union

import simplejson as json
from flask_babel import gettext as _
from pandas import DataFrame

from superset import app
from superset import app, is_feature_enabled
from superset.exceptions import QueryObjectValidationError
from superset.utils import core as utils, pandas_postprocessing
from superset.views.utils import get_time_range_endpoints

logger = logging.getLogger(__name__)

# TODO: Type Metrics dictionary with TypedDict when it becomes a vanilla python type
# https://github.com/python/mypy/issues/5288

Expand Down Expand Up @@ -75,6 +78,7 @@ def __init__(
relative_start: str = app.config["DEFAULT_RELATIVE_START_TIME"],
relative_end: str = app.config["DEFAULT_RELATIVE_END_TIME"],
):
is_sip_38 = is_feature_enabled("SIP_38_VIZ_REARCHITECTURE")
self.granularity = granularity
self.from_dttm, self.to_dttm = utils.get_since_until(
relative_start=relative_start,
Expand All @@ -85,8 +89,9 @@ def __init__(
self.is_timeseries = is_timeseries
self.time_range = time_range
self.time_shift = utils.parse_human_timedelta(time_shift)
self.groupby = groupby or []
self.post_processing = post_processing or []
if not is_sip_38:
self.groupby = groupby or []

# Temporary solution for backward compatibility issue due the new format of
# non-ad-hoc metric which needs to adhere to superset-ui per
Expand All @@ -107,6 +112,13 @@ def __init__(
self.extras["time_range_endpoints"] = get_time_range_endpoints(form_data={})

self.columns = columns or []
if is_sip_38 and groupby:
self.columns += groupby
logger.warning(
f"The field groupby is deprecated. Viz plugins should "
f"pass all selectables via the columns field"
)

self.orderby = orderby or []

def to_dict(self) -> Dict[str, Any]:
Expand All @@ -115,7 +127,6 @@ def to_dict(self) -> Dict[str, Any]:
"from_dttm": self.from_dttm,
"to_dttm": self.to_dttm,
"is_timeseries": self.is_timeseries,
"groupby": self.groupby,
"metrics": self.metrics,
"row_limit": self.row_limit,
"filter": self.filter,
Expand All @@ -126,6 +137,9 @@ def to_dict(self) -> Dict[str, Any]:
"columns": self.columns,
"orderby": self.orderby,
}
if not is_feature_enabled("SIP_38_VIZ_REARCHITECTURE"):
query_object_dict["groupby"] = self.groupby

return query_object_dict

def cache_key(self, **extra: Any) -> str:
Expand Down
1 change: 1 addition & 0 deletions superset/config.py
Expand Up @@ -287,6 +287,7 @@ def _try_json_readsha(filepath, length): # pylint: disable=unused-argument
"PRESTO_EXPAND_DATA": False,
"REDUCE_DASHBOARD_BOOTSTRAP_PAYLOAD": False,
"SHARE_QUERIES_VIA_KV_STORE": False,
"SIP_38_VIZ_REARCHITECTURE": False,
"TAGGING_SYSTEM": False,
"SQLLAB_BACKEND_PERSISTENCE": False,
"LIST_VIEWS_NEW_UI": False,
Expand Down
55 changes: 39 additions & 16 deletions superset/connectors/druid/models.py
Expand Up @@ -48,7 +48,7 @@
from sqlalchemy.orm import backref, relationship, Session
from sqlalchemy_utils import EncryptedType

from superset import conf, db, security_manager
from superset import conf, db, is_feature_enabled, security_manager
from superset.connectors.base.models import BaseColumn, BaseDatasource, BaseMetric
from superset.constants import NULL_STRING
from superset.exceptions import SupersetException
Expand Down Expand Up @@ -84,6 +84,7 @@
except ImportError:
pass

IS_SIP_38 = is_feature_enabled("SIP_38_VIZ_REARCHITECTURE")
DRUID_TZ = conf.get("DRUID_TZ")
POST_AGG_TYPE = "postagg"
metadata = Model.metadata # pylint: disable=no-member
Expand Down Expand Up @@ -1082,11 +1083,11 @@ def get_aggregations(
return aggregations

def get_dimensions(
self, groupby: List[str], columns_dict: Dict[str, DruidColumn]
self, columns: List[str], columns_dict: Dict[str, DruidColumn]
) -> List[Union[str, Dict]]:
dimensions = []
groupby = [gb for gb in groupby if gb in columns_dict]
for column_name in groupby:
columns = [col for col in columns if col in columns_dict]
for column_name in columns:
col = columns_dict.get(column_name)
dim_spec = col.dimension_spec if col else None
dimensions.append(dim_spec or column_name)
Expand Down Expand Up @@ -1137,11 +1138,12 @@ def sanitize_metric_object(metric: Dict) -> None:

def run_query( # druid
self,
groupby,
metrics,
granularity,
from_dttm,
to_dttm,
columns=None,
groupby=None,
filter=None,
is_timeseries=True,
timeseries_limit=None,
Expand All @@ -1151,7 +1153,6 @@ def run_query( # druid
inner_to_dttm=None,
orderby=None,
extras=None,
columns=None,
phase=2,
client=None,
order_desc=True,
Expand Down Expand Up @@ -1188,7 +1189,11 @@ def run_query( # druid
)

# the dimensions list with dimensionSpecs expanded
dimensions = self.get_dimensions(groupby, columns_dict)

dimensions = self.get_dimensions(
columns if IS_SIP_38 else groupby, columns_dict
)

extras = extras or {}
qry = dict(
datasource=self.datasource_name,
Expand All @@ -1214,7 +1219,9 @@ def run_query( # druid

order_direction = "descending" if order_desc else "ascending"

if columns:
if (IS_SIP_38 and not metrics and "__time" not in columns) or (
not IS_SIP_38 and columns
):
columns.append("__time")
del qry["post_aggregations"]
del qry["aggregations"]
Expand All @@ -1224,11 +1231,20 @@ def run_query( # druid
qry["granularity"] = "all"
qry["limit"] = row_limit
client.scan(**qry)
elif len(groupby) == 0 and not having_filters:
elif (IS_SIP_38 and columns) or (
not IS_SIP_38 and len(groupby) == 0 and not having_filters
):
logger.info("Running timeseries query for no groupby values")
del qry["dimensions"]
client.timeseries(**qry)
elif not having_filters and len(groupby) == 1 and order_desc:
elif (
not having_filters
and order_desc
and (
(IS_SIP_38 and len(columns) == 1)
or (not IS_SIP_38 and len(groupby) == 1)
)
):
dim = list(qry["dimensions"])[0]
logger.info("Running two-phase topn query for dimension [{}]".format(dim))
pre_qry = deepcopy(qry)
Expand Down Expand Up @@ -1279,7 +1295,10 @@ def run_query( # druid
qry["metric"] = list(qry["aggregations"].keys())[0]
client.topn(**qry)
logger.info("Phase 2 Complete")
elif len(groupby) > 0 or having_filters:
elif (
having_filters
or ((IS_SIP_38 and columns) or (not IS_SIP_38 and len(groupby))) > 0
):
# If grouping on multiple fields or using a having filter
# we have to force a groupby query
logger.info("Running groupby query for dimensions [{}]".format(dimensions))
Expand Down Expand Up @@ -1364,8 +1383,8 @@ def run_query( # druid
return query_str

@staticmethod
def homogenize_types(df: pd.DataFrame, groupby_cols: Iterable[str]) -> pd.DataFrame:
"""Converting all GROUPBY columns to strings
def homogenize_types(df: pd.DataFrame, columns: Iterable[str]) -> pd.DataFrame:
"""Converting all columns to strings
When grouping by a numeric (say FLOAT) column, pydruid returns
strings in the dataframe. This creates issues downstream related
Expand All @@ -1374,7 +1393,7 @@ def homogenize_types(df: pd.DataFrame, groupby_cols: Iterable[str]) -> pd.DataFr
Here we replace None with <NULL> and make the whole series a
str instead of an object.
"""
df[groupby_cols] = df[groupby_cols].fillna(NULL_STRING).astype("unicode")
df[columns] = df[columns].fillna(NULL_STRING).astype("unicode")
return df

def query(self, query_obj: Dict) -> QueryResult:
Expand All @@ -1390,7 +1409,9 @@ def query(self, query_obj: Dict) -> QueryResult:
df=df, query=query_str, duration=datetime.now() - qry_start_dttm
)

df = self.homogenize_types(df, query_obj.get("groupby", []))
df = self.homogenize_types(
df, query_obj.get("columns" if IS_SIP_38 else "groupby", [])
)
df.columns = [
DTTM_ALIAS if c in ("timestamp", "__time") else c for c in df.columns
]
Expand All @@ -1405,7 +1426,9 @@ def query(self, query_obj: Dict) -> QueryResult:
cols: List[str] = []
if DTTM_ALIAS in df.columns:
cols += [DTTM_ALIAS]
cols += query_obj.get("groupby") or []

if not IS_SIP_38:
cols += query_obj.get("groupby") or []
cols += query_obj.get("columns") or []
cols += query_obj.get("metrics") or []

Expand Down
32 changes: 22 additions & 10 deletions superset/connectors/sqla/models.py
Expand Up @@ -49,7 +49,7 @@
from sqlalchemy.sql import column, ColumnElement, literal_column, table, text
from sqlalchemy.sql.expression import Label, Select, TextAsFrom

from superset import app, db, security_manager
from superset import app, db, is_feature_enabled, security_manager
from superset.connectors.base.models import BaseColumn, BaseDatasource, BaseMetric
from superset.constants import NULL_STRING
from superset.db_engine_specs.base import TimestampExpression
Expand Down Expand Up @@ -696,11 +696,12 @@ def _get_sqla_row_level_filters(self, template_processor) -> List[str]:

def get_sqla_query( # sqla
self,
groupby,
metrics,
granularity,
from_dttm,
to_dttm,
columns=None,
groupby=None,
filter=None,
is_timeseries=True,
timeseries_limit=15,
Expand All @@ -710,7 +711,6 @@ def get_sqla_query( # sqla
inner_to_dttm=None,
orderby=None,
extras=None,
columns=None,
order_desc=True,
) -> SqlaQuery:
"""Querying any sqla table from this common interface"""
Expand All @@ -723,6 +723,7 @@ def get_sqla_query( # sqla
"filter": filter,
"columns": {col.column_name: col for col in self.columns},
}
is_sip_38 = is_feature_enabled("SIP_38_VIZ_REARCHITECTURE")
template_kwargs.update(self.template_params_dict)
extra_cache_keys: List[Any] = []
template_kwargs["extra_cache_keys"] = extra_cache_keys
Expand All @@ -749,7 +750,11 @@ def get_sqla_query( # sqla
"and is required by this type of chart"
)
)
if not groupby and not metrics and not columns:
if (
not metrics
and not columns
and (is_sip_38 or (not is_sip_38 and not groupby))
):
raise Exception(_("Empty query?"))
metrics_exprs: List[ColumnElement] = []
for m in metrics:
Expand All @@ -768,9 +773,9 @@ def get_sqla_query( # sqla
select_exprs: List[Column] = []
groupby_exprs_sans_timestamp: OrderedDict = OrderedDict()

if groupby:
if (is_sip_38 and metrics and columns) or (not is_sip_38 and groupby):
# dedup columns while preserving order
groupby = list(dict.fromkeys(groupby))
groupby = list(dict.fromkeys(columns if is_sip_38 else groupby))

select_exprs = []
for s in groupby:
Expand Down Expand Up @@ -829,7 +834,7 @@ def get_sqla_query( # sqla

tbl = self.get_from_clause(template_processor)

if not columns:
if (is_sip_38 and metrics) or (not is_sip_38 and not columns):
qry = qry.group_by(*groupby_exprs_with_timestamp.values())

where_clause_and = []
Expand Down Expand Up @@ -892,7 +897,7 @@ def get_sqla_query( # sqla
qry = qry.where(and_(*where_clause_and))
qry = qry.having(and_(*having_clause_and))

if not orderby and not columns:
if not orderby and ((is_sip_38 and metrics) or (not is_sip_38 and not columns)):
orderby = [(main_metric_expr, not order_desc)]

# To ensure correct handling of the ORDER BY labeling we need to reference the
Expand All @@ -914,7 +919,12 @@ def get_sqla_query( # sqla
if row_limit:
qry = qry.limit(row_limit)

if is_timeseries and timeseries_limit and groupby and not time_groupby_inline:
if (
is_timeseries
and timeseries_limit
and not time_groupby_inline
and ((is_sip_38 and columns) or (not is_sip_38 and groupby))
):
if self.database.db_engine_spec.allows_joins:
# some sql dialects require for order by expressions
# to also be in the select clause -- others, e.g. vertica,
Expand Down Expand Up @@ -972,7 +982,6 @@ def get_sqla_query( # sqla
prequery_obj = {
"is_timeseries": False,
"row_limit": timeseries_limit,
"groupby": groupby,
"metrics": metrics,
"granularity": granularity,
"from_dttm": inner_from_dttm or from_dttm,
Expand All @@ -983,6 +992,9 @@ def get_sqla_query( # sqla
"columns": columns,
"order_desc": True,
}
if not is_sip_38:
prequery_obj["groupby"] = groupby

result = self.query(prequery_obj)
prequeries.append(result.query)
dimensions = [
Expand Down
6 changes: 5 additions & 1 deletion superset/models/slice.py
Expand Up @@ -31,7 +31,11 @@
from superset.models.helpers import AuditMixinNullable, ImportMixin
from superset.models.tags import ChartUpdater
from superset.utils import core as utils
from superset.viz import BaseViz, viz_types

if is_feature_enabled("SIP_38_VIZ_REARCHITECTURE"):
from superset.viz_sip38 import BaseViz, viz_types # type: ignore
else:
from superset.viz import BaseViz, viz_types # type: ignore

if TYPE_CHECKING:
# pylint: disable=unused-import
Expand Down
8 changes: 7 additions & 1 deletion superset/views/utils.py
Expand Up @@ -23,14 +23,20 @@
from flask import request

import superset.models.core as models
from superset import app, db, viz
from superset import app, db, is_feature_enabled
from superset.connectors.connector_registry import ConnectorRegistry
from superset.exceptions import SupersetException
from superset.legacy import update_time_range
from superset.models.dashboard import Dashboard
from superset.models.slice import Slice
from superset.utils.core import QueryStatus, TimeRangeEndpoint

if is_feature_enabled("SIP_38_VIZ_REARCHITECTURE"):
from superset import viz_sip38 as viz # type: ignore
else:
from superset import viz # type: ignore


FORM_DATA_KEY_BLACKLIST: List[str] = []
if not app.config["ENABLE_JAVASCRIPT_CONTROLS"]:
FORM_DATA_KEY_BLACKLIST = ["js_tooltip", "js_onclick_href", "js_data_mutator"]
Expand Down

0 comments on commit 9447381

Please sign in to comment.