Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize rows_from_chunks #262

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
56 changes: 23 additions & 33 deletions pydruid/db/api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import itertools
import json
import sys
from collections import namedtuple, OrderedDict
from urllib import parse

Expand Down Expand Up @@ -380,47 +381,36 @@ def _stream_query(self, query):
yield Row(*row.values())


def rows_from_chunks(chunks):
def rows_from_chunks(chunks, decoder=None):
"""
A generator that yields rows from JSON chunks.

Druid will return the data in chunks, but they are not aligned with the
JSON objects. This function will parse all complete rows inside each chunk,
yielding them as soon as possible.
"""
body = ""
buffer = ""

if decoder is None:
if sys.version_info >= (3, 7):
decoder = json.JSONDecoder()
else:
# Prior to Python 3.7, dict is not guaranteed to be ordered, so parsing
# can be scrambled unless collections.OrderedDict is specifically requested
decoder = json.JSONDecoder(object_pairs_hook=OrderedDict)

for chunk in chunks:
if chunk:
body = "".join((body, chunk))

# find last complete row
boundary = 0
brackets = 0
in_string = False
for i, char in enumerate(body):
if char == '"':
if not in_string:
in_string = True
elif body[i - 1] != "\\":
in_string = False

if in_string:
continue

if char == "{":
brackets += 1
elif char == "}":
brackets -= 1
if brackets == 0 and i > boundary:
boundary = i + 1

rows = body[:boundary].lstrip("[,")
body = body[boundary:]

for row in json.loads(
"[{rows}]".format(rows=rows), object_pairs_hook=OrderedDict
):
yield row
buffer += chunk
min_index = 0

try:
while True:
row, min_index = decoder.raw_decode(
buffer, idx=buffer.find("{", min_index)
)
yield row
except ValueError:
buffer = buffer[min_index:]


def apply_parameters(operation, parameters):
Expand Down