Skip to content

Commit

Permalink
Allow the setting of context properties in queries
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel O'Connell committed Mar 10, 2015
1 parent a71e90b commit f2d1a24
Showing 1 changed file with 32 additions and 4 deletions.
36 changes: 32 additions & 4 deletions pydruid/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,18 @@ def export_pandas(self):
# --------- Query implementations ---------

def validate_query(self, valid_parts, args):
"""
Validate the query parts so only allowed objects are sent.
Each query type can have an optional 'context' object attached which is used to set certain
query context settings, etc. timeout or priority. As each query can have this object, there's
no need for it to be sent - it might as well be added here.
:param list valid_parts: a list of valid object names
:param dict args: the dict of args to be sent
:raise ValueError: if an invalid object is given
"""
valid_parts = valid_parts[:] + ['context']
for key, val in args.iteritems():
if key not in valid_parts:
raise ValueError(
Expand Down Expand Up @@ -321,6 +333,7 @@ def topn(self, **kwargs):
:param pydruid.utils.filters.Filter filter: Indicates which rows of data to include in the query
:param post_aggregations: A dict with string key = 'post_aggregator_name', and value pydruid.utils.PostAggregator
:param dict context: A dict of query context options
Example:
Expand All @@ -335,7 +348,8 @@ def topn(self, **kwargs):
dimension='user_name',
metric='count',
filter=Dimension('user_lang') == 'en',
threshold=1
threshold=1,
context={"timeout": 1000}
)
>>> print top
>>> [{'timestamp': '2013-06-14T00:00:00.000Z', 'result': [{'count': 22.0, 'user': "cool_user"}}]}]
Expand Down Expand Up @@ -369,6 +383,7 @@ def timeseries(self, **kwargs):
:param pydruid.utils.filters.Filter filter: Indicates which rows of data to include in the query
:param post_aggregations: A dict with string key = 'post_aggregator_name', and value pydruid.utils.PostAggregator
:param dict context: A dict of query context options
Example:
Expand All @@ -380,7 +395,8 @@ def timeseries(self, **kwargs):
granularity='hour',
intervals='2013-06-14/pt1h',
aggregations={"count": doublesum("count"), "rows": count("rows")},
post_aggregations={'percent': (Field('count') / Field('rows')) * Const(100))}
post_aggregations={'percent': (Field('count') / Field('rows')) * Const(100))},
context={"timeout": 1000}
)
>>> print counts
>>> [{'timestamp': '2013-06-14T00:00:00.000Z', 'result': {'count': 9619.0, 'rows': 8007, 'percent': 120.13238416385663}}]
Expand Down Expand Up @@ -415,6 +431,7 @@ def groupby(self, **kwargs):
:param pydruid.utils.filters.Filter filter: Indicates which rows of data to include in the query
:param pydruid.utils.having.Having having: Indicates which groups in results set of query to keep
:param post_aggregations: A dict with string key = 'post_aggregator_name', and value pydruid.utils.PostAggregator
:param dict context: A dict of query context options
Example:
Expand All @@ -427,7 +444,8 @@ def groupby(self, **kwargs):
intervals='2013-10-04/pt1h',
dimensions=["user_name", "reply_to_name"],
filter=~(Dimension("reply_to_name") == "Not A Reply"),
aggregations={"count": doublesum("count")}
aggregations={"count": doublesum("count")},
context={"timeout": 1000}
)
>>> for k in range(2):
... print group[k]
Expand Down Expand Up @@ -461,6 +479,10 @@ def segment_metadata(self, **kwargs):
:param intervals: ISO-8601 intervals for which to run the query on
:type intervals: str or list
Optional key/value pairs:
:param dict context: A dict of query context options
:return: The query result
:rtype: list[dict]
Expand Down Expand Up @@ -490,6 +512,10 @@ def time_boundary(self, **kwargs):
:param str datasource: Data source to query
Optional key/value pairs:
:param dict context: A dict of query context options
:return: The query result
:rtype: list[dict]
Expand Down Expand Up @@ -525,6 +551,7 @@ def select(self, **kwargs):
:param pydruid.utils.filters.Filter filter: Indicates which rows of data to include in the query
:param list dimensions: The list of dimensions to select. If left empty, all dimensions are returned
:param list metrics: The list of metrics to select. If left empty, all metrics are returned
:param dict context: A dict of query context options
:return: The query result
:rtype: list[dict]
Expand All @@ -538,7 +565,8 @@ def select(self, **kwargs):
datasource=twitterstream,
granularity='all',
intervals='2013-06-14/pt1h',
paging_spec={'pagingIdentifies': {}, 'threshold': 1}
paging_spec={'pagingIdentifies': {}, 'threshold': 1},
context={"timeout": 1000}
)
>>> print raw_data
>>> [{'timestamp': '2013-06-14T00:00:00.000Z', 'result': {'pagingIdentifiers': {'twitterstream_2013-06-14T00:00:00.000Z_2013-06-15T00:00:00.000Z_2013-06-15T08:00:00.000Z_v1': 1, 'events': [{'segmentId': 'twitterstream_2013-06-14T00:00:00.000Z_2013-06-15T00:00:00.000Z_2013-06-15T08:00:00.000Z_v1', 'offset': 0, 'event': {'timestamp': '2013-06-14T00:00:00.000Z', 'dim': 'value'}}]}}]
Expand Down

0 comments on commit f2d1a24

Please sign in to comment.