Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for Druid post aggregations (continued) #410

Closed
wants to merge 8 commits into from
19 changes: 19 additions & 0 deletions caravel/migrations/versions/6113409d871f_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""empty message

Revision ID: 6113409d871f
Revises: ('c3a8f8611885', 'b4a0abe21630')
Create Date: 2016-04-28 17:32:51.688497

"""

# revision identifiers, used by Alembic.
revision = '6113409d871f'
down_revision = ('c3a8f8611885', 'b4a0abe21630')


def upgrade():
pass


def downgrade():
pass
31 changes: 31 additions & 0 deletions caravel/migrations/versions/b4a0abe21630_post_aggs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""post_aggs

Revision ID: b4a0abe21630
Revises: 430039611635
Create Date: 2016-02-10 15:16:58.953042

"""

# revision identifiers, used by Alembic.
revision = 'b4a0abe21630'
down_revision = '430039611635'

from alembic import op
import sqlalchemy as sa


def upgrade():
op.create_table('post_aggregators',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=512), nullable=True),
sa.Column('verbose_name', sa.String(length=1024), nullable=True),
sa.Column('datasource_name', sa.String(length=250), nullable=True),
sa.Column('json', sa.Text(), nullable=True),
sa.Column('description', sa.Text(), nullable=True),
sa.ForeignKeyConstraint(['datasource_name'], ['datasources.datasource_name'], ),
sa.PrimaryKeyConstraint('id')
)


def downgrade():
op.drop_table('post_aggregators')
75 changes: 74 additions & 1 deletion caravel/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from __future__ import unicode_literals

import functools
import itertools
import json
import logging
import textwrap
Expand All @@ -24,6 +25,9 @@
from pydruid.client import PyDruid
from flask.ext.appbuilder.models.decorators import renders
from pydruid.utils.filters import Dimension, Filter
from pydruid.utils.postaggregator import Postaggregator
from pydruid.utils.postaggregator import Const as ConstPostaggregator
from pydruid.utils.postaggregator import Field as FieldPostaggregator
from six import string_types
from sqlalchemy import (
Column, Integer, String, ForeignKey, Text, Boolean, DateTime, Date,
Expand Down Expand Up @@ -907,8 +911,16 @@ class DruidDatasource(Model, AuditMixinNullable, Queryable):

@property
def metrics_combo(self):
# Treat post aggregators as one kind of metrics
# TODO: Deal with name conflict between metrics and post_aggregators
return sorted(
[(m.metric_name, m.verbose_name) for m in self.metrics],
key=lambda x: x[1]) + self.post_aggregators_combo

@property
def post_aggregators_combo(self):
return sorted(
[(m.name, m.verbose_name) for m in self.post_aggregators],
key=lambda x: x[1])

@property
Expand Down Expand Up @@ -1012,6 +1024,40 @@ def sync_to_db(cls, name, cluster):
col_obj.datasource = datasource
col_obj.generate_metrics()

@classmethod
def get_metrics_dependencies(cls, post_aggregation):
post_agg_dict = post_aggregation.post_aggregator if isinstance(post_aggregation,
Postaggregator) else post_aggregation
if post_agg_dict.get('type') == 'fieldAccess':
return [post_agg_dict['fieldName']]

if post_agg_dict.get('type') == 'arithmetic':
fields = post_agg_dict.get('fields')
if fields:
return list(set(itertools.chain(*[cls.get_metrics_dependencies(field) for field in fields])))

return []

@classmethod
def get_post_aggregator(cls, params_json):
try:
params = json.loads(params_json)
except Exception:
# TODO error messages
raise

obj = None
_type = params.get('type')
name = params.get('name')
if _type == 'arithmetic':
obj = Postaggregator(params.get('fn'), params.get('fields'), name)
elif _type == 'constant':
obj = ConstPostaggregator(params.get('value'), name)
elif _type == 'fieldAccess':
obj = FieldPostaggregator(params.get('fieldName'))

return obj

def query( # druid
self, groupby, metrics,
granularity,
Expand All @@ -1038,10 +1084,22 @@ def query( # druid
to_dttm = to_dttm.replace(tzinfo=config.get("DRUID_TZ"))

query_str = ""

# TODO: Deal with name conflict between metrics and post_aggregators
post_aggregators = {
m.name: self.get_post_aggregator(m.json)
for m in self.post_aggregators if m.name in metrics
}

metrics_dependencies = list(
set(itertools.chain(*[self.get_metrics_dependencies(agg)
for agg in post_aggregators.values()])) - set(metrics)
)
aggregations = {
m.metric_name: m.json_obj
for m in self.metrics if m.metric_name in metrics
for m in self.metrics if m.metric_name in metrics + metrics_dependencies
}

granularity = granularity or "all"
if granularity != "all":
granularity = utils.parse_human_timedelta(
Expand All @@ -1057,6 +1115,7 @@ def query( # druid
datasource=self.datasource_name,
dimensions=groupby,
aggregations=aggregations,
post_aggregations=post_aggregators,
granularity=granularity,
intervals=from_dttm.isoformat() + '/' + to_dttm.isoformat(),
)
Expand Down Expand Up @@ -1163,6 +1222,7 @@ def query( # druid
cols += [col for col in groupby if col in df.columns]
cols += [col for col in metrics if col in df.columns]
cols += [col for col in df.columns if col not in cols]
cols = [col for col in cols if col not in metrics_dependencies]
df = df[cols]
return QueryResult(
df=df,
Expand Down Expand Up @@ -1341,6 +1401,19 @@ def generate_metrics(self):
session.commit()


class DruidPostAggregator(Model):
__tablename__ = 'post_aggregators'
id = Column(Integer, primary_key=True)
name = Column(String(512))
verbose_name = Column(String(1024))
datasource_name = Column(
String(250),
ForeignKey('datasources.datasource_name'))
datasource = relationship('DruidDatasource', backref='post_aggregators')
json = Column(Text)
description = Column(Text)


class FavStar(Model):
__tablename__ = 'favstar'

Expand Down
19 changes: 19 additions & 0 deletions caravel/panoramix/static/widgets/viz_helloworld.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
px.registerViz('helloworld', function(slice) {

function refresh() {
$('#code').attr('rows', '15')
$.getJSON(slice.jsonEndpoint(), function(payload) {
slice.container.html(
'<h1>HELLOW '+ payload.form_data.username +' !!!</h1>');
console.log(payload);
slice.done();
})
.fail(function(xhr) {
slice.error(xhr.responseText);
});
};
return {
render: refresh,
resize: refresh,
};
});
17 changes: 16 additions & 1 deletion caravel/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,19 @@ class DruidMetricInlineView(CompactCRUDMixin, CaravelModelView): # noqa
appbuilder.add_view_no_menu(DruidMetricInlineView)


class DruidPostAggregatorInlineView(CompactCRUDMixin, CaravelModelView):
datamodel = SQLAInterface(models.DruidPostAggregator)
list_columns = ['name', 'verbose_name']
edit_columns = [
'name', 'description', 'verbose_name', 'datasource', 'json']
add_columns = edit_columns
page_size = 500
validators_columns = {
'json': [validate_json],
}
appbuilder.add_view_no_menu(DruidPostAggregatorInlineView)


class DatabaseView(CaravelModelView, DeleteMixin): # noqa
datamodel = SQLAInterface(models.Database)
list_columns = ['database_name', 'sql_link', 'creator', 'changed_on']
Expand Down Expand Up @@ -394,7 +407,9 @@ class DruidDatasourceModelView(CaravelModelView, DeleteMixin): # noqa
'creator', 'created_on',
'changed_by_', 'changed_on',
'offset']
related_views = [DruidColumnInlineView, DruidMetricInlineView]
related_views = [
DruidColumnInlineView, DruidMetricInlineView,
DruidPostAggregatorInlineView]
edit_columns = [
'datasource_name', 'cluster', 'description', 'owner',
'is_featured', 'is_hidden', 'default_endpoint', 'offset',
Expand Down