From f2d1a24e24a81286973975356c14f7f40a0d667b Mon Sep 17 00:00:00 2001 From: Daniel O'Connell Date: Tue, 10 Mar 2015 10:52:17 +0100 Subject: [PATCH] Allow the setting of context properties in queries --- pydruid/client.py | 36 ++++++++++++++++++++++++++++++++---- 1 file changed, 32 insertions(+), 4 deletions(-) diff --git a/pydruid/client.py b/pydruid/client.py index 46296d1c..02ad08e2 100755 --- a/pydruid/client.py +++ b/pydruid/client.py @@ -268,6 +268,18 @@ def export_pandas(self): # --------- Query implementations --------- def validate_query(self, valid_parts, args): + """ + Validate the query parts so only allowed objects are sent. + + Each query type can have an optional 'context' object attached which is used to set certain + query context settings, etc. timeout or priority. As each query can have this object, there's + no need for it to be sent - it might as well be added here. + + :param list valid_parts: a list of valid object names + :param dict args: the dict of args to be sent + :raise ValueError: if an invalid object is given + """ + valid_parts = valid_parts[:] + ['context'] for key, val in args.iteritems(): if key not in valid_parts: raise ValueError( @@ -321,6 +333,7 @@ def topn(self, **kwargs): :param pydruid.utils.filters.Filter filter: Indicates which rows of data to include in the query :param post_aggregations: A dict with string key = 'post_aggregator_name', and value pydruid.utils.PostAggregator + :param dict context: A dict of query context options Example: @@ -335,7 +348,8 @@ def topn(self, **kwargs): dimension='user_name', metric='count', filter=Dimension('user_lang') == 'en', - threshold=1 + threshold=1, + context={"timeout": 1000} ) >>> print top >>> [{'timestamp': '2013-06-14T00:00:00.000Z', 'result': [{'count': 22.0, 'user': "cool_user"}}]}] @@ -369,6 +383,7 @@ def timeseries(self, **kwargs): :param pydruid.utils.filters.Filter filter: Indicates which rows of data to include in the query :param post_aggregations: A dict with string key = 'post_aggregator_name', and value pydruid.utils.PostAggregator + :param dict context: A dict of query context options Example: @@ -380,7 +395,8 @@ def timeseries(self, **kwargs): granularity='hour', intervals='2013-06-14/pt1h', aggregations={"count": doublesum("count"), "rows": count("rows")}, - post_aggregations={'percent': (Field('count') / Field('rows')) * Const(100))} + post_aggregations={'percent': (Field('count') / Field('rows')) * Const(100))}, + context={"timeout": 1000} ) >>> print counts >>> [{'timestamp': '2013-06-14T00:00:00.000Z', 'result': {'count': 9619.0, 'rows': 8007, 'percent': 120.13238416385663}}] @@ -415,6 +431,7 @@ def groupby(self, **kwargs): :param pydruid.utils.filters.Filter filter: Indicates which rows of data to include in the query :param pydruid.utils.having.Having having: Indicates which groups in results set of query to keep :param post_aggregations: A dict with string key = 'post_aggregator_name', and value pydruid.utils.PostAggregator + :param dict context: A dict of query context options Example: @@ -427,7 +444,8 @@ def groupby(self, **kwargs): intervals='2013-10-04/pt1h', dimensions=["user_name", "reply_to_name"], filter=~(Dimension("reply_to_name") == "Not A Reply"), - aggregations={"count": doublesum("count")} + aggregations={"count": doublesum("count")}, + context={"timeout": 1000} ) >>> for k in range(2): ... print group[k] @@ -461,6 +479,10 @@ def segment_metadata(self, **kwargs): :param intervals: ISO-8601 intervals for which to run the query on :type intervals: str or list + Optional key/value pairs: + + :param dict context: A dict of query context options + :return: The query result :rtype: list[dict] @@ -490,6 +512,10 @@ def time_boundary(self, **kwargs): :param str datasource: Data source to query + Optional key/value pairs: + + :param dict context: A dict of query context options + :return: The query result :rtype: list[dict] @@ -525,6 +551,7 @@ def select(self, **kwargs): :param pydruid.utils.filters.Filter filter: Indicates which rows of data to include in the query :param list dimensions: The list of dimensions to select. If left empty, all dimensions are returned :param list metrics: The list of metrics to select. If left empty, all metrics are returned + :param dict context: A dict of query context options :return: The query result :rtype: list[dict] @@ -538,7 +565,8 @@ def select(self, **kwargs): datasource=twitterstream, granularity='all', intervals='2013-06-14/pt1h', - paging_spec={'pagingIdentifies': {}, 'threshold': 1} + paging_spec={'pagingIdentifies': {}, 'threshold': 1}, + context={"timeout": 1000} ) >>> print raw_data >>> [{'timestamp': '2013-06-14T00:00:00.000Z', 'result': {'pagingIdentifiers': {'twitterstream_2013-06-14T00:00:00.000Z_2013-06-15T00:00:00.000Z_2013-06-15T08:00:00.000Z_v1': 1, 'events': [{'segmentId': 'twitterstream_2013-06-14T00:00:00.000Z_2013-06-15T00:00:00.000Z_2013-06-15T08:00:00.000Z_v1', 'offset': 0, 'event': {'timestamp': '2013-06-14T00:00:00.000Z', 'dim': 'value'}}]}}]