Skip to content

Commit

Permalink
Add refresh druid datasources endpoint. (#1065)
Browse files Browse the repository at this point in the history
  • Loading branch information
bkyryliuk committed Sep 8, 2016
1 parent 9ae231a commit d454fb4
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 2 deletions.
73 changes: 73 additions & 0 deletions caravel/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1276,6 +1276,79 @@ def generate_metrics(self):
for col in self.columns:
col.generate_metrics()

@classmethod
def sync_to_db_from_config(cls, druid_config, user, cluster):
"""Merges the ds config from druid_config into one stored in the db."""
session = db.session()
datasource = (
session.query(DruidDatasource)
.filter_by(
datasource_name=druid_config['name'])
).first()
# Create a new datasource.
if not datasource:
datasource = DruidDatasource(
datasource_name=druid_config['name'],
cluster=cluster,
owner=user,
changed_by_fk=user.id,
created_by_fk=user.id,
)
session.add(datasource)

dimensions = druid_config['dimensions']
for dim in dimensions:
col_obj = (
session.query(DruidColumn)
.filter_by(
datasource_name=druid_config['name'],
column_name=dim)
).first()
if not col_obj:
col_obj = DruidColumn(
datasource_name=druid_config['name'],
column_name=dim,
groupby=True,
filterable=True,
# TODO: fetch type from Hive.
type="STRING",
datasource=datasource
)
session.add(col_obj)
# Import Druid metrics
for metric_spec in druid_config["metrics_spec"]:
metric_name = metric_spec["name"]
metric_type = metric_spec["type"]
metric_json = json.dumps(metric_spec)

if metric_type == "count":
metric_type = "longSum"
metric_json = json.dumps({
"type": "longSum",
"name": metric_name,
"fieldName": metric_name,
})

metric_obj = (
session.query(DruidMetric)
.filter_by(
datasource_name=druid_config['name'],
metric_name=metric_name)
).first()
if not metric_obj:
metric_obj = DruidMetric(
metric_name=metric_name,
metric_type=metric_type,
verbose_name="%s(%s)" % (metric_type, metric_name),
datasource=datasource,
json=metric_json,
description=(
"Imported from the airolap config dir for %s" %
druid_config['name']),
)
session.add(metric_obj)
session.commit()

@classmethod
def sync_to_db(cls, name, cluster):
"""Fetches metadata for that datasource and merges the Caravel db"""
Expand Down
52 changes: 50 additions & 2 deletions caravel/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1341,6 +1341,55 @@ def dashboard(**kwargs): # noqa
dash_save_perm=dash_save_perm,
dash_edit_perm=dash_edit_perm)

@has_access
@expose("/sync_druid/", methods=['POST'])
@log_this
def sync_druid_source(self):
"""Syncs the druid datasource in main db with the provided config.
The endpoint takes 3 arguments:
user - user name to perform the operation as
cluster - name of the druid cluster
config - configuration stored in json that contains:
name: druid datasource name
dimensions: list of the dimensions, they become druid columns
with the type STRING
metrics_spec: list of metrics (dictionary). Metric consists of
2 attributes: type and name. Type can be count,
etc. `count` type is stored internally as longSum
other fields will be ignored.
Example: {
"name": "test_click",
"metrics_spec": [{"type": "count", "name": "count"}],
"dimensions": ["affiliate_id", "campaign", "first_seen"]
}
"""
druid_config = json.loads(request.form.get('config'))
user_name = request.form.get('user')
cluster_name = request.form.get('cluster')

user = sm.find_user(username=user_name)
if not user:
err_msg = __("Can't find User '%(name)s', please ask your admin "
"to create one.", name=user_name)
logging.error(err_msg)
return json_error_response(err_msg)
cluster = db.session.query(models.DruidCluster).filter_by(
cluster_name=cluster_name).first()
if not cluster:
err_msg = __("Can't find DruidCluster with cluster_name = "
"'%(name)s'", name=cluster_name)
logging.error(err_msg)
return json_error_response(err_msg)
try:
models.DruidDatasource.sync_to_db_from_config(
druid_config, user, cluster)
except Exception as e:
logging.exception(utils.error_msg_from_exception(e))
return json_error_response(utils.error_msg_from_exception(e))
return Response(status=201)

@has_access
@expose("/sqllab_viz/")
@log_this
Expand Down Expand Up @@ -1472,8 +1521,7 @@ def sql_json(self):

if not mydb:
json_error_response(
'Database with id {} is missing.'.format(database_id),
QueryStatus.FAILED)
'Database with id {} is missing.'.format(database_id))

if not self.database_access(mydb):
json_error_response(
Expand Down
68 changes: 68 additions & 0 deletions tests/core_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,74 @@ def test_add_slice_redirect_to_druid(self, username='admin'):
db.session.delete(datasource)
db.session.commit()

def test_druid_sync_from_config(self):
cluster = models.DruidCluster(cluster_name="new_druid")
db.session.add(cluster)
db.session.commit()

cfg = {
"name": "test_click",
"dimensions": ["affiliate_id", "campaign", "first_seen"],
"metrics_spec": [{"type": "count", "name": "count"},
{"type": "sum", "name": "sum"}],
"batch_ingestion": {
"sql": "SELECT * FROM clicks WHERE d='{{ ds }}'",
"ts_column": "d",
"sources": [{
"table": "clicks",
"partition": "d='{{ ds }}'"
}]
}
}
resp = self.client.post(
'/caravel/sync_druid/', data=dict(
config=json.dumps(cfg), user="admin", cluster="new_druid"))

druid_ds = db.session.query(DruidDatasource).filter_by(
datasource_name="test_click").first()
assert set([c.column_name for c in druid_ds.columns]) == set(
["affiliate_id", "campaign", "first_seen"])
assert set([m.metric_name for m in druid_ds.metrics]) == set(
["count", "sum"])
assert resp.status_code == 201

# Datasource exists, not changes required
resp = self.client.post(
'/caravel/sync_druid/', data=dict(
config=json.dumps(cfg), user="admin", cluster="new_druid"))
druid_ds = db.session.query(DruidDatasource).filter_by(
datasource_name="test_click").first()
assert set([c.column_name for c in druid_ds.columns]) == set(
["affiliate_id", "campaign", "first_seen"])
assert set([m.metric_name for m in druid_ds.metrics]) == set(
["count", "sum"])
assert resp.status_code == 201

# datasource exists, not changes required
cfg = {
"name": "test_click",
"dimensions": ["affiliate_id", "second_seen"],
"metrics_spec": [
{"type": "bla", "name": "sum"},
{"type": "unique", "name": "unique"}
],
}
resp = self.client.post(
'/caravel/sync_druid/', data=dict(
config=json.dumps(cfg), user="admin", cluster="new_druid"))
druid_ds = db.session.query(DruidDatasource).filter_by(
datasource_name="test_click").first()
# columns and metrics are not deleted if config is changed as
# user could define his own dimensions / metrics and want to keep them
assert set([c.column_name for c in druid_ds.columns]) == set(
["affiliate_id", "campaign", "first_seen", "second_seen"])
assert set([m.metric_name for m in druid_ds.metrics]) == set(
["count", "sum", "unique"])
# metric type will not be overridden, sum stays instead of bla
assert set([m.metric_type for m in druid_ds.metrics]) == set(
["longSum", "sum", "unique"])
assert resp.status_code == 201

def test_gamma(self):
self.login(username='gamma')
resp = self.client.get('/slicemodelview/list/')
Expand Down

0 comments on commit d454fb4

Please sign in to comment.