Skip to content

Commit

Permalink
Merge pull request #2354 from graphite-project/remote-buffer-size
Browse files Browse the repository at this point in the history
move deserialize into remote finder
  • Loading branch information
DanCech committed Oct 22, 2018
2 parents 108498f + 3999873 commit 183cfca
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 88 deletions.
80 changes: 65 additions & 15 deletions webapp/graphite/finders/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,7 @@ def find_nodes(self, query, timer=None):
headers=query.headers,
timeout=settings.FIND_TIMEOUT)

try:
if result.getheader('content-type') == 'application/x-msgpack':
results = msgpack.load(BufferedHTTPReader(
result, buffer_size=settings.REMOTE_BUFFER_SIZE), encoding='utf-8')
else:
results = unpickle.load(BufferedHTTPReader(
result, buffer_size=settings.REMOTE_BUFFER_SIZE))
except Exception as err:
self.fail()
log.exception(
"RemoteFinder[%s] Error decoding find response from %s: %s" %
(self.host, result.url_full, err))
raise Exception("Error decoding find response from %s: %s" % (result.url_full, err))
finally:
result.release_conn()
results = self.deserialize(result)

cache.set(cacheKey, results, settings.FIND_CACHE_DURATION)

Expand Down Expand Up @@ -286,3 +272,67 @@ def request(self, path, fields=None, headers=None, timeout=None):

log.debug("RemoteFinder[%s] Fetched %s" % (self.host, url_full))
return result

def deserialize(self, result):
"""
Based on configuration, either stream-deserialize a response in settings.REMOTE_BUFFER_SIZE chunks,
or read the entire payload and use inline deserialization.
:param result: an http response object
:return: deserialized response payload from cluster server
"""
start = time.time()
try:
should_buffer = settings.REMOTE_BUFFER_SIZE > 0
measured_reader = MeasuredReader(BufferedHTTPReader(result, settings.REMOTE_BUFFER_SIZE))

if should_buffer:
log.debug("Using streaming deserializer.")
reader = BufferedHTTPReader(measured_reader, settings.REMOTE_BUFFER_SIZE)
return self._deserialize_stream(reader, result.getheader('content-type'))

log.debug("Using inline deserializer for small payload")
return self._deserialize_buffer(measured_reader.read(), result.getheader('content-type'))
except Exception as err:
self.fail()
log.exception(
"RemoteFinder[%s] Error decoding response from %s: %s" %
(self.host, result.url_full, err))
raise Exception("Error decoding response from %s: %s" % (result.url_full, err))
finally:
log.debug("Processed %d bytes in %f seconds." % (measured_reader.bytes_read, time.time() - start))
result.release_conn()

@staticmethod
def _deserialize_buffer(byte_buffer, content_type):
if content_type == 'application/x-msgpack':
data = msgpack.unpackb(byte_buffer, encoding='utf-8')
else:
data = unpickle.loads(byte_buffer)

return data

@staticmethod
def _deserialize_stream(stream, content_type):
if content_type == 'application/x-msgpack':
data = msgpack.load(stream, encoding='utf-8')
else:
data = unpickle.load(stream)

return data


class MeasuredReader(object):
def __init__(self, reader):
self.reader = reader
self.bytes_read = 0

def read(self, amt=None):
b = b''
try:
if amt:
b = self.reader.read(amt)
else:
b = self.reader.read()
return b
finally:
self.bytes_read += len(b)
71 changes: 1 addition & 70 deletions webapp/graphite/readers/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,6 @@

from graphite.logger import log
from graphite.readers.utils import BaseReader
from graphite.util import unpickle, msgpack, BufferedHTTPReader

import time


class MeasuredReader(object):
def __init__(self, reader):
self.reader = reader
self.bytes_read = 0

def read(self, amt=None):
b = b''
try:
if amt:
b = self.reader.read(amt)
else:
b = self.reader.read()
return b
finally:
self.bytes_read += len(b)


class RemoteReader(BaseReader):
Expand Down Expand Up @@ -93,7 +73,7 @@ def fetch_multi(self, startTime, endTime, now=None, requestContext=None):
(retries, settings.MAX_FETCH_RETRIES, format_exc()))
retries += 1

data = self.deserialize(result)
data = self.finder.deserialize(result)

try:
return [
Expand All @@ -111,52 +91,3 @@ def fetch_multi(self, startTime, endTime, now=None, requestContext=None):
"RemoteReader[%s] Invalid render response from %s: %s" %
(self.finder.host, result.url_full, repr(err)))
raise Exception("Invalid render response from %s: %s" % (result.url_full, repr(err)))

def deserialize(self, result):
"""
Based on configuration, either stream-deserialize a response in settings.REMOTE_BUFFER_SIZE chunks,
or read the entire payload and use inline deserialization.
:param result: an http response object
:return: deserialized response payload from cluster server
"""
start = time.time()
try:
should_buffer = settings.REMOTE_BUFFER_SIZE > 0
measured_reader = MeasuredReader(BufferedHTTPReader(result, settings.REMOTE_BUFFER_SIZE))

if should_buffer:
log.debug("Using streaming deserializer.")
reader = BufferedHTTPReader(measured_reader, settings.REMOTE_BUFFER_SIZE)
deserialized = self._deserialize_stream(reader, result.getheader('content-type'))
return deserialized
else:
log.debug("Using inline deserializer for small payload")
deserialized = self._deserialize_buffer(measured_reader.read(), result.getheader('content-type'))
return deserialized
except Exception as err:
self.finder.fail()
log.exception(
"RemoteReader[%s] Error decoding render response from %s: %s" %
(self.finder.host, result.url_full, err))
raise Exception("Error decoding render response from %s: %s" % (result.url_full, err))
finally:
log.debug("Processed %d bytes in %f seconds." % (measured_reader.bytes_read, time.time() - start))
result.release_conn()

@staticmethod
def _deserialize_buffer(byte_buffer, content_type):
if content_type == 'application/x-msgpack':
data = msgpack.unpackb(byte_buffer, encoding='utf-8')
else:
data = unpickle.loads(byte_buffer)

return data

@staticmethod
def _deserialize_stream(stream, content_type):
if content_type == 'application/x-msgpack':
data = msgpack.load(stream, encoding='utf-8')
else:
data = unpickle.load(stream)

return data
12 changes: 10 additions & 2 deletions webapp/tests/test_finders_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,19 @@ def test_fail(self):
with patch('graphite.finders.remote.time.time', lambda: 110):
self.assertFalse(finder.disabled)

@override_settings(REMOTE_BUFFER_SIZE=1024 * 1024)
def test_find_nodes_with_buffering(self):
self._test_find_nodes()

@override_settings(REMOTE_BUFFER_SIZE=0)
def test_find_nodes_without_buffering(self):
self._test_find_nodes()

@patch('urllib3.PoolManager.request')
@override_settings(INTRACLUSTER_HTTPS=False)
@override_settings(REMOTE_STORE_USE_POST=True)
@override_settings(FIND_TIMEOUT=10)
def test_find_nodes(self, http_request):
def _test_find_nodes(self, http_request):
finder = RemoteFinder('127.0.0.1')

startTime = 1496262000
Expand Down Expand Up @@ -158,7 +166,7 @@ def test_find_nodes(self, http_request):
responseObject = HTTPResponse(body=BytesIO(b'error'), status=200, preload_content=False)
http_request.return_value = responseObject

with self.assertRaisesRegexp(Exception, 'Error decoding find response from https://[^ ]+: .+'):
with self.assertRaisesRegexp(Exception, 'Error decoding response from https://[^ ]+: .+'):
finder.find_nodes(query)

@patch('graphite.finders.remote.cache.get')
Expand Down
2 changes: 1 addition & 1 deletion webapp/tests/test_readers_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def test_RemoteReader_fetch_multi(self, http_request):
responseObject = HTTPResponse(body=BytesIO(b'error'), status=200, preload_content=False)
http_request.return_value = responseObject

with self.assertRaisesRegexp(Exception, 'Error decoding render response from http://[^ ]+: .+'):
with self.assertRaisesRegexp(Exception, 'Error decoding response from http://[^ ]+: .+'):
reader.fetch(startTime, endTime)

# invalid response data
Expand Down

0 comments on commit 183cfca

Please sign in to comment.