diff --git a/pydruid/db/api.py b/pydruid/db/api.py index 86b64e5e..5c8a24c7 100644 --- a/pydruid/db/api.py +++ b/pydruid/db/api.py @@ -1,5 +1,6 @@ import itertools import json +import sys from collections import namedtuple, OrderedDict from urllib import parse @@ -380,7 +381,7 @@ def _stream_query(self, query): yield Row(*row.values()) -def rows_from_chunks(chunks): +def rows_from_chunks(chunks, decoder=None): """ A generator that yields rows from JSON chunks. @@ -388,39 +389,28 @@ def rows_from_chunks(chunks): JSON objects. This function will parse all complete rows inside each chunk, yielding them as soon as possible. """ - body = "" + buffer = "" + + if decoder is None: + if sys.version_info >= (3, 7): + decoder = json.JSONDecoder() + else: + # Prior to Python 3.7, dict is not guaranteed to be ordered, so parsing + # can be scrambled unless collections.OrderedDict is specifically requested + decoder = json.JSONDecoder(object_pairs_hook=OrderedDict) + for chunk in chunks: - if chunk: - body = "".join((body, chunk)) - - # find last complete row - boundary = 0 - brackets = 0 - in_string = False - for i, char in enumerate(body): - if char == '"': - if not in_string: - in_string = True - elif body[i - 1] != "\\": - in_string = False - - if in_string: - continue - - if char == "{": - brackets += 1 - elif char == "}": - brackets -= 1 - if brackets == 0 and i > boundary: - boundary = i + 1 - - rows = body[:boundary].lstrip("[,") - body = body[boundary:] - - for row in json.loads( - "[{rows}]".format(rows=rows), object_pairs_hook=OrderedDict - ): - yield row + buffer += chunk + min_index = 0 + + try: + while True: + row, min_index = decoder.raw_decode( + buffer, idx=buffer.find("{", min_index) + ) + yield row + except ValueError: + buffer = buffer[min_index:] def apply_parameters(operation, parameters):