From 86e354fa7020cb6ab8aa11e5d1d19f818658ac6f Mon Sep 17 00:00:00 2001 From: Maxime Beauchemin Date: Wed, 10 Feb 2016 16:01:22 -0800 Subject: [PATCH 1/7] Adding support for Druid post aggregations --- .../versions/b4a0abe21630_post_aggs.py | 32 +++++++++++++++++++ panoramix/models.py | 13 ++++++++ panoramix/static/widgets/viz_helloworld.js | 19 +++++++++++ panoramix/views.py | 18 ++++++++++- 4 files changed, 81 insertions(+), 1 deletion(-) create mode 100644 panoramix/migrations/versions/b4a0abe21630_post_aggs.py create mode 100644 panoramix/static/widgets/viz_helloworld.js diff --git a/panoramix/migrations/versions/b4a0abe21630_post_aggs.py b/panoramix/migrations/versions/b4a0abe21630_post_aggs.py new file mode 100644 index 000000000000..b913323b2ac9 --- /dev/null +++ b/panoramix/migrations/versions/b4a0abe21630_post_aggs.py @@ -0,0 +1,32 @@ +"""post_aggs + +Revision ID: b4a0abe21630 +Revises: 430039611635 +Create Date: 2016-02-10 15:16:58.953042 + +""" + +# revision identifiers, used by Alembic. +revision = 'b4a0abe21630' +down_revision = '430039611635' + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import mysql + + +def upgrade(): + op.create_table('post_aggregators', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('name', sa.String(length=512), nullable=True), + sa.Column('verbose_name', sa.String(length=1024), nullable=True), + sa.Column('datasource_name', sa.String(length=250), nullable=True), + sa.Column('json', sa.Text(), nullable=True), + sa.Column('description', sa.Text(), nullable=True), + sa.ForeignKeyConstraint(['datasource_name'], ['datasources.datasource_name'], ), + sa.PrimaryKeyConstraint('id') + ) + + +def downgrade(): + op.drop_table('post_aggregators') diff --git a/panoramix/models.py b/panoramix/models.py index 93657f9c786a..c341d7987119 100644 --- a/panoramix/models.py +++ b/panoramix/models.py @@ -1170,3 +1170,16 @@ def generate_metrics(self): if not m: session.add(metric) session.commit() + + +class DruidPostAggregator(Model): + __tablename__ = 'post_aggregators' + id = Column(Integer, primary_key=True) + name = Column(String(512)) + verbose_name = Column(String(1024)) + datasource_name = Column( + String(250), + ForeignKey('datasources.datasource_name')) + datasource = relationship('DruidDatasource', backref='post_aggregators') + json = Column(Text) + description = Column(Text) diff --git a/panoramix/static/widgets/viz_helloworld.js b/panoramix/static/widgets/viz_helloworld.js new file mode 100644 index 000000000000..142054927e4e --- /dev/null +++ b/panoramix/static/widgets/viz_helloworld.js @@ -0,0 +1,19 @@ +px.registerViz('helloworld', function(slice) { + + function refresh() { + $('#code').attr('rows', '15') + $.getJSON(slice.jsonEndpoint(), function(payload) { + slice.container.html( + '

HELLOW '+ payload.form_data.username +' !!!

'); + console.log(payload); + slice.done(); + }) + .fail(function(xhr) { + slice.error(xhr.responseText); + }); + }; + return { + render: refresh, + resize: refresh, + }; +}); diff --git a/panoramix/views.py b/panoramix/views.py index 20b9e92a50ae..a921c926b6f8 100644 --- a/panoramix/views.py +++ b/panoramix/views.py @@ -114,6 +114,19 @@ class DruidMetricInlineView(CompactCRUDMixin, PanoramixModelView): appbuilder.add_view_no_menu(DruidMetricInlineView) +class DruidPostAggregatorInlineView(CompactCRUDMixin, PanoramixModelView): + datamodel = SQLAInterface(models.DruidPostAggregator) + list_columns = ['name', 'verbose_name'] + edit_columns = [ + 'name', 'description', 'verbose_name', 'datasource', 'json'] + add_columns = edit_columns + page_size = 500 + validators_columns = { + 'json': [validate_json], + } +appbuilder.add_view_no_menu(DruidPostAggregatorInlineView) + + class DatabaseView(PanoramixModelView, DeleteMixin): datamodel = SQLAInterface(models.Database) list_columns = ['database_name', 'sql_link', 'created_by', 'changed_on_'] @@ -284,10 +297,13 @@ class DruidDatasourceModelView(PanoramixModelView, DeleteMixin): 'created_by', 'created_on', 'changed_by_', 'changed_on', 'offset'] - related_views = [DruidColumnInlineView, DruidMetricInlineView] + related_views = [ + DruidColumnInlineView, DruidMetricInlineView, + DruidPostAggregatorInlineView] edit_columns = [ 'datasource_name', 'cluster', 'description', 'owner', 'is_featured', 'is_hidden', 'default_endpoint', 'offset'] + add_columns = edit_columns page_size = 500 base_order = ('datasource_name', 'asc') description_columns = { From 2c33668949b83914c30e46885e2251e01e7519bd Mon Sep 17 00:00:00 2001 From: x4base Date: Thu, 28 Apr 2016 18:07:41 +0800 Subject: [PATCH 2/7] Merge alembic branches in the database --- caravel/migrations/versions/6113409d871f_.py | 22 +++++++++++++++++++ .../versions/b4a0abe21630_post_aggs.py | 0 2 files changed, 22 insertions(+) create mode 100644 caravel/migrations/versions/6113409d871f_.py rename caravel/{panoramix => }/migrations/versions/b4a0abe21630_post_aggs.py (100%) diff --git a/caravel/migrations/versions/6113409d871f_.py b/caravel/migrations/versions/6113409d871f_.py new file mode 100644 index 000000000000..dfe4ddab4815 --- /dev/null +++ b/caravel/migrations/versions/6113409d871f_.py @@ -0,0 +1,22 @@ +"""empty message + +Revision ID: 6113409d871f +Revises: ('c3a8f8611885', 'b4a0abe21630') +Create Date: 2016-04-28 17:32:51.688497 + +""" + +# revision identifiers, used by Alembic. +revision = '6113409d871f' +down_revision = ('c3a8f8611885', 'b4a0abe21630') + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + pass + + +def downgrade(): + pass diff --git a/caravel/panoramix/migrations/versions/b4a0abe21630_post_aggs.py b/caravel/migrations/versions/b4a0abe21630_post_aggs.py similarity index 100% rename from caravel/panoramix/migrations/versions/b4a0abe21630_post_aggs.py rename to caravel/migrations/versions/b4a0abe21630_post_aggs.py From 8ef0f15eb1e73fca45dc404fd44822908b4c22fd Mon Sep 17 00:00:00 2001 From: x4base Date: Wed, 27 Apr 2016 15:12:58 +0800 Subject: [PATCH 3/7] Add post aggregator list in TableViz --- caravel/forms.py | 4 ++++ caravel/models.py | 6 ++++++ caravel/viz.py | 1 + 3 files changed, 11 insertions(+) diff --git a/caravel/forms.py b/caravel/forms.py index c6c96665298e..2a2189c2bbb6 100644 --- a/caravel/forms.py +++ b/caravel/forms.py @@ -130,6 +130,10 @@ def __init__(self, viz): 'Metrics', choices=datasource.metrics_combo, default=[default_metric], description="One or many metrics to display"), + 'post_aggregators': SelectMultipleSortableField( + 'Post Aggregators', choices=datasource.post_aggregators_combo, + # default=[default_metric], + description=""), 'metric': SelectField( 'Metric', choices=datasource.metrics_combo, default=default_metric, diff --git a/caravel/models.py b/caravel/models.py index 2ff6730bf88d..13515292ccb8 100644 --- a/caravel/models.py +++ b/caravel/models.py @@ -911,6 +911,12 @@ def metrics_combo(self): [(m.metric_name, m.verbose_name) for m in self.metrics], key=lambda x: x[1]) + @property + def post_aggregators_combo(self): + return sorted( + [(m.name, m.verbose_name) for m in self.post_aggregators], + key=lambda x: x[1]) + @property def name(self): return self.datasource_name diff --git a/caravel/viz.py b/caravel/viz.py index 70504f6f3042..f4a89e7190e0 100644 --- a/caravel/viz.py +++ b/caravel/viz.py @@ -331,6 +331,7 @@ class TableViz(BaseViz): 'fields': ( 'groupby', 'metrics', + 'post_aggregators', ) }, { 'label': "NOT GROUPED BY", From b9941d4201b289aa97a283b548063b23dcd08e80 Mon Sep 17 00:00:00 2001 From: x4base Date: Thu, 28 Apr 2016 17:52:19 +0800 Subject: [PATCH 4/7] Include post aggregation in the druid requests Conflicts: caravel/models.py --- caravel/models.py | 28 ++++++++++++++++++++++++++++ caravel/viz.py | 1 + 2 files changed, 29 insertions(+) diff --git a/caravel/models.py b/caravel/models.py index 13515292ccb8..f35c340e62f8 100644 --- a/caravel/models.py +++ b/caravel/models.py @@ -24,6 +24,9 @@ from pydruid.client import PyDruid from flask.ext.appbuilder.models.decorators import renders from pydruid.utils.filters import Dimension, Filter +from pydruid.utils.postaggregator import Postaggregator +from pydruid.utils.postaggregator import Const as ConstPostaggregator +from pydruid.utils.postaggregator import Field as FieldPostaggregator from six import string_types from sqlalchemy import ( Column, Integer, String, ForeignKey, Text, Boolean, DateTime, Date, @@ -1018,6 +1021,26 @@ def sync_to_db(cls, name, cluster): col_obj.datasource = datasource col_obj.generate_metrics() + @classmethod + def get_post_aggregator(cls, params_json): + try: + params = json.loads(params_json) + except Exception: + # TODO error messages + raise + + obj = None + _type = params.get('type') + name = params.get('name') + if _type == 'arithmetic': + obj = Postaggregator(params.get('fn'), params.get('fields'), name) + elif _type == 'constant': + obj = ConstPostaggregator(params.get('value'), name) + elif _type == 'fieldAccess': + obj = FieldPostaggregator(params.get('fieldName')) + + return obj + def query( # druid self, groupby, metrics, granularity, @@ -1048,6 +1071,10 @@ def query( # druid m.metric_name: m.json_obj for m in self.metrics if m.metric_name in metrics } + post_aggregators = { + m.name: self.get_post_aggregator(m.json) + for m in self.post_aggregators if m.name in extras.get('post_aggregators') + } granularity = granularity or "all" if granularity != "all": granularity = utils.parse_human_timedelta( @@ -1063,6 +1090,7 @@ def query( # druid datasource=self.datasource_name, dimensions=groupby, aggregations=aggregations, + post_aggregations=post_aggregators, granularity=granularity, intervals=from_dttm.isoformat() + '/' + to_dttm.isoformat(), ) diff --git a/caravel/viz.py b/caravel/viz.py index f4a89e7190e0..2d0b87addc61 100644 --- a/caravel/viz.py +++ b/caravel/viz.py @@ -210,6 +210,7 @@ def query_obj(self): 'having': form_data.get("having", ''), 'time_grain_sqla': form_data.get("time_grain_sqla", ''), 'druid_time_origin': form_data.get("druid_time_origin", ''), + 'post_aggregators': form_data.get("post_aggregators", ''), } d = { 'granularity': granularity, From f4e8b0630efdf2d7f3dcafa5c790aa0749b778d7 Mon Sep 17 00:00:00 2001 From: x4base Date: Thu, 28 Apr 2016 19:25:31 +0800 Subject: [PATCH 5/7] Fix the bug that breaks the tests --- caravel/migrations/versions/6113409d871f_.py | 3 --- caravel/migrations/versions/b4a0abe21630_post_aggs.py | 1 - caravel/models.py | 6 ++++++ 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/caravel/migrations/versions/6113409d871f_.py b/caravel/migrations/versions/6113409d871f_.py index dfe4ddab4815..71ca91cf80d1 100644 --- a/caravel/migrations/versions/6113409d871f_.py +++ b/caravel/migrations/versions/6113409d871f_.py @@ -10,9 +10,6 @@ revision = '6113409d871f' down_revision = ('c3a8f8611885', 'b4a0abe21630') -from alembic import op -import sqlalchemy as sa - def upgrade(): pass diff --git a/caravel/migrations/versions/b4a0abe21630_post_aggs.py b/caravel/migrations/versions/b4a0abe21630_post_aggs.py index b913323b2ac9..fd8d48ff3921 100644 --- a/caravel/migrations/versions/b4a0abe21630_post_aggs.py +++ b/caravel/migrations/versions/b4a0abe21630_post_aggs.py @@ -12,7 +12,6 @@ from alembic import op import sqlalchemy as sa -from sqlalchemy.dialects import mysql def upgrade(): diff --git a/caravel/models.py b/caravel/models.py index f35c340e62f8..c3a7d73352b2 100644 --- a/caravel/models.py +++ b/caravel/models.py @@ -535,6 +535,11 @@ def metrics_combo(self): for m in self.metrics], key=lambda x: x[1]) + @property + # TODO: Sqlable shouldn't have this for post aggregators are only for Druid + def post_aggregators_combo(self): + return [] + @property def sql_url(self): return self.database.sql_url + "?table_name=" + str(self.table_name) @@ -1387,6 +1392,7 @@ class DruidPostAggregator(Model): json = Column(Text) description = Column(Text) + class FavStar(Model): __tablename__ = 'favstar' From 0a56051b2dd8ab3468fc86e837317b07aa7dcb5c Mon Sep 17 00:00:00 2001 From: x4base Date: Fri, 29 Apr 2016 19:05:09 +0800 Subject: [PATCH 6/7] Calculate the dependencies of the post aggregators and hide them in the output --- caravel/models.py | 33 ++++++++++++++++++++++++++++----- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/caravel/models.py b/caravel/models.py index c3a7d73352b2..33952cc76400 100644 --- a/caravel/models.py +++ b/caravel/models.py @@ -5,6 +5,7 @@ from __future__ import unicode_literals import functools +import itertools import json import logging import textwrap @@ -1026,6 +1027,20 @@ def sync_to_db(cls, name, cluster): col_obj.datasource = datasource col_obj.generate_metrics() + @classmethod + def get_metrics_dependencies(cls, post_aggregation): + post_agg_dict = post_aggregation.post_aggregator if isinstance(post_aggregation, + Postaggregator) else post_aggregation + if post_agg_dict.get('type') == 'fieldAccess': + return [post_agg_dict['fieldName']] + + if post_agg_dict.get('type') == 'arithmetic': + fields = post_agg_dict.get('fields') + if fields: + return list(set(itertools.chain(*[cls.get_metrics_dependencies(field) for field in fields]))) + + return [] + @classmethod def get_post_aggregator(cls, params_json): try: @@ -1072,14 +1087,21 @@ def query( # druid to_dttm = to_dttm.replace(tzinfo=config.get("DRUID_TZ")) query_str = "" - aggregations = { - m.metric_name: m.json_obj - for m in self.metrics if m.metric_name in metrics - } + post_aggregators = { m.name: self.get_post_aggregator(m.json) for m in self.post_aggregators if m.name in extras.get('post_aggregators') - } + } + + metrics_dependencies = list( + set(itertools.chain(*[self.get_metrics_dependencies(agg) + for agg in post_aggregators.values()])) - set(metrics) + ) + aggregations = { + m.metric_name: m.json_obj + for m in self.metrics if m.metric_name in metrics + metrics_dependencies + } + granularity = granularity or "all" if granularity != "all": granularity = utils.parse_human_timedelta( @@ -1202,6 +1224,7 @@ def query( # druid cols += [col for col in groupby if col in df.columns] cols += [col for col in metrics if col in df.columns] cols += [col for col in df.columns if col not in cols] + cols = [col for col in cols if col not in metrics_dependencies] df = df[cols] return QueryResult( df=df, From 152b47c1bdd686b1f00f77689dccd4259bbb97b4 Mon Sep 17 00:00:00 2001 From: x4base Date: Fri, 29 Apr 2016 19:16:20 +0800 Subject: [PATCH 7/7] Treat post aggregators as one kind of metrics --- caravel/forms.py | 4 ---- caravel/models.py | 12 +++++------- caravel/viz.py | 2 -- 3 files changed, 5 insertions(+), 13 deletions(-) diff --git a/caravel/forms.py b/caravel/forms.py index 2a2189c2bbb6..c6c96665298e 100644 --- a/caravel/forms.py +++ b/caravel/forms.py @@ -130,10 +130,6 @@ def __init__(self, viz): 'Metrics', choices=datasource.metrics_combo, default=[default_metric], description="One or many metrics to display"), - 'post_aggregators': SelectMultipleSortableField( - 'Post Aggregators', choices=datasource.post_aggregators_combo, - # default=[default_metric], - description=""), 'metric': SelectField( 'Metric', choices=datasource.metrics_combo, default=default_metric, diff --git a/caravel/models.py b/caravel/models.py index 33952cc76400..8e7a92fb4101 100644 --- a/caravel/models.py +++ b/caravel/models.py @@ -536,11 +536,6 @@ def metrics_combo(self): for m in self.metrics], key=lambda x: x[1]) - @property - # TODO: Sqlable shouldn't have this for post aggregators are only for Druid - def post_aggregators_combo(self): - return [] - @property def sql_url(self): return self.database.sql_url + "?table_name=" + str(self.table_name) @@ -916,9 +911,11 @@ class DruidDatasource(Model, AuditMixinNullable, Queryable): @property def metrics_combo(self): + # Treat post aggregators as one kind of metrics + # TODO: Deal with name conflict between metrics and post_aggregators return sorted( [(m.metric_name, m.verbose_name) for m in self.metrics], - key=lambda x: x[1]) + key=lambda x: x[1]) + self.post_aggregators_combo @property def post_aggregators_combo(self): @@ -1088,9 +1085,10 @@ def query( # druid query_str = "" + # TODO: Deal with name conflict between metrics and post_aggregators post_aggregators = { m.name: self.get_post_aggregator(m.json) - for m in self.post_aggregators if m.name in extras.get('post_aggregators') + for m in self.post_aggregators if m.name in metrics } metrics_dependencies = list( diff --git a/caravel/viz.py b/caravel/viz.py index 2d0b87addc61..70504f6f3042 100644 --- a/caravel/viz.py +++ b/caravel/viz.py @@ -210,7 +210,6 @@ def query_obj(self): 'having': form_data.get("having", ''), 'time_grain_sqla': form_data.get("time_grain_sqla", ''), 'druid_time_origin': form_data.get("druid_time_origin", ''), - 'post_aggregators': form_data.get("post_aggregators", ''), } d = { 'granularity': granularity, @@ -332,7 +331,6 @@ class TableViz(BaseViz): 'fields': ( 'groupby', 'metrics', - 'post_aggregators', ) }, { 'label': "NOT GROUPED BY",