Skip to content

Commit

Permalink
use BufferedHTTPReader when loading data from http results
Browse files Browse the repository at this point in the history
  • Loading branch information
DanCech committed Nov 30, 2017
1 parent 203e959 commit 5a58e4f
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 6 deletions.
8 changes: 5 additions & 3 deletions webapp/graphite/finders/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from graphite.logger import log
from graphite.node import LeafNode, BranchNode
from graphite.render.hashing import compactHash
from graphite.util import unpickle, logtime, is_local_interface, json, msgpack
from graphite.util import unpickle, logtime, is_local_interface, json, msgpack, BufferedHTTPReader

from graphite.finders.utils import BaseFinder
from graphite.readers.remote import RemoteReader
Expand Down Expand Up @@ -108,9 +108,11 @@ def find_nodes(self, query, timer=None):

try:
if result.getheader('content-type') == 'application/x-msgpack':
results = msgpack.load(result)
results = msgpack.load(BufferedHTTPReader(
result, buffer_size=settings.REMOTE_BUFFER_SIZE))
else:
results = unpickle.load(result)
results = unpickle.load(BufferedHTTPReader(
result, buffer_size=settings.REMOTE_BUFFER_SIZE))
except Exception as err:
self.fail()
log.exception(
Expand Down
3 changes: 3 additions & 0 deletions webapp/graphite/local_settings.py.example
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,9 @@ DEFAULT_XFILES_FACTOR = 0

#REMOTE_STORE_USE_POST = False # Use POST instead of GET for remote requests

# Size of the buffer used for streaming remote cluster responses
#REMOTE_BUFFER_SIZE = 1024 * 1024

# During a rebalance of a consistent hash cluster, after a partition event on a replication > 1 cluster,
# or in other cases we might receive multiple TimeSeries data for a metric key. Merge them together rather
# that choosing the "most complete" one (pre-0.9.14 behaviour).
Expand Down
8 changes: 5 additions & 3 deletions webapp/graphite/readers/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from graphite.logger import log
from graphite.readers.utils import BaseReader
from graphite.util import unpickle, msgpack
from graphite.util import unpickle, msgpack, BufferedHTTPReader


class RemoteReader(BaseReader):
Expand Down Expand Up @@ -76,9 +76,11 @@ def fetch_multi(self, startTime, endTime, now=None, requestContext=None):

try:
if result.getheader('content-type') == 'application/x-msgpack':
data = msgpack.load(result)
data = msgpack.load(BufferedHTTPReader(
result, buffer_size=settings.REMOTE_BUFFER_SIZE))
else:
data = unpickle.load(result)
data = unpickle.load(BufferedHTTPReader(
result, buffer_size=settings.REMOTE_BUFFER_SIZE))
except Exception as err:
self.finder.fail()
log.exception(
Expand Down
1 change: 1 addition & 0 deletions webapp/graphite/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
REMOTE_STORE_MERGE_RESULTS = True
REMOTE_STORE_FORWARD_HEADERS = []
REMOTE_STORE_USE_POST = False
REMOTE_BUFFER_SIZE = 1024 * 1024
CARBON_METRIC_PREFIX='carbon'
CARBONLINK_HOSTS = ["127.0.0.1:7002"]
CARBONLINK_TIMEOUT = 1.0
Expand Down
23 changes: 23 additions & 0 deletions webapp/graphite/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
limitations under the License."""

import imp
import io
import socket
import time
import sys
Expand Down Expand Up @@ -257,3 +258,25 @@ def wrapped_f(*args, **kwargs):
timer.stop()

return wrapped_f


class BufferedHTTPReader(io.IOBase):
def __init__(self, response, buffer_size=1048576):
self.response = response
self.buffer_size = buffer_size
self.buffer = ''
self.pos = 0

def read(self, amt=None):
if amt is None:
return self.response.read()
if len(self.buffer) - self.pos < amt:
self.buffer = self.buffer[self.pos:]
self.pos = 0
self.buffer += self.response.read(self.buffer_size)
data = self.buffer[self.pos:self.pos + amt]
self.pos += amt
if self.pos >= len(self.buffer):
self.pos = 0
self.buffer = ''
return data

0 comments on commit 5a58e4f

Please sign in to comment.