Skip to content

Commit

Permalink
Merge pull request #1774 from iksaif/storage
Browse files Browse the repository at this point in the history
Improved stability/performance for storage
  • Loading branch information
iksaif committed Jan 10, 2017
2 parents 73bc058 + 41cc96d commit 4005a0e
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 10 deletions.
5 changes: 4 additions & 1 deletion webapp/graphite/node.py
Expand Up @@ -23,11 +23,14 @@ class LeafNode(Node):
def __init__(self, path, reader):
Node.__init__(self, path)
self.reader = reader
self.intervals = reader.get_intervals()
self.is_leaf = True

def fetch(self, startTime, endTime):
return self.reader.fetch(startTime, endTime)

@property
def intervals(self):
return self.reader.get_intervals()

def __repr__(self):
return '<LeafNode[%x]: %s (%s)>' % (id(self), self.path, self.reader)
8 changes: 7 additions & 1 deletion webapp/graphite/readers.py
Expand Up @@ -54,7 +54,13 @@ def get_intervals(self):

def fetch(self, startTime, endTime):
# Start the fetch on each node
fetches = [ n.fetch(startTime, endTime) for n in self.nodes ]
fetches = []

for n in self.nodes:
try:
fetches.append(n.fetch(startTime, endTime))
except:
log.exception("Failed to initiate subfetch for %s" % str(n))

def merge_results():
results = {}
Expand Down
7 changes: 3 additions & 4 deletions webapp/graphite/remote_storage.py
Expand Up @@ -60,10 +60,6 @@ def send(self):
log.info("FindRequest(host=%s, query=%s) using cached result" % (self.store.host, self.query))
return

connector_class = connector_class_selector(settings.INTRACLUSTER_HTTPS)
self.connection = connector_class(self.store.host)
self.connection.timeout = settings.REMOTE_FIND_TIMEOUT

query_params = [
('local', '1'),
('format', 'pickle'),
Expand All @@ -78,6 +74,9 @@ def send(self):
query_string = urlencode(query_params)

try:
connector_class = connector_class_selector(settings.INTRACLUSTER_HTTPS)
self.connection = connector_class(self.store.host)
self.connection.timeout = settings.REMOTE_FIND_TIMEOUT
self.connection.request('GET', '/metrics/find/?' + query_string)
except:
log.exception("FindRequest.send(host=%s, query=%s) exception during request" % (self.store.host, self.query))
Expand Down
12 changes: 11 additions & 1 deletion webapp/graphite/storage.py
@@ -1,4 +1,5 @@
import time
import random

try:
from importlib import import_module
Expand Down Expand Up @@ -38,6 +39,7 @@ def find(self, pattern, startTime=None, endTime=None, local=False):

# Start remote searches
if not local:
random.shuffle(self.remote_stores)
remote_requests = [ r.find(query) for r in self.remote_stores if r.available ]

matching_nodes = set()
Expand Down Expand Up @@ -66,7 +68,10 @@ def find(self, pattern, startTime=None, endTime=None, local=False):
# Reduce matching nodes for each path to a minimal set
found_branch_nodes = set()

for path, nodes in nodes_by_path.iteritems():
items = list(nodes_by_path.iteritems())
random.shuffle(items)

for path, nodes in items:
leaf_nodes = []

# First we dispense with the BranchNodes
Expand All @@ -80,6 +85,11 @@ def find(self, pattern, startTime=None, endTime=None, local=False):
if not leaf_nodes:
continue

# Fast-path when there is a single node.
if len(leaf_nodes) == 1:
yield leaf_nodes[0]
continue

# Calculate best minimal node set
minimal_node_set = set()
covered_intervals = IntervalSet([])
Expand Down
6 changes: 3 additions & 3 deletions webapp/tests/test_finders.py
Expand Up @@ -278,12 +278,12 @@ def listdir_mock(d):
self._listdir_counter = 0
nodes = finder.find_nodes(FindQuery('foo', None, None))
self.assertEqual(len(list(nodes)), 1)
self.assertEqual(self._listdir_counter, 2)
self.assertEqual(self._listdir_counter, 1)

self._listdir_counter = 0
nodes = finder.find_nodes(FindQuery('foo.bar.baz', None, None))
self.assertEqual(len(list(nodes)), 1)
self.assertEqual(self._listdir_counter, 2)
self.assertEqual(self._listdir_counter, 1)

# No data in the expected time period
self._listdir_counter = 0
Expand All @@ -299,7 +299,7 @@ def listdir_mock(d):
self._listdir_counter = 0
nodes = finder.find_nodes(FindQuery('*.ba?.{baz,foo}', None, None))
self.assertEqual(len(list(nodes)), 2)
self.assertEqual(self._listdir_counter, 10)
self.assertEqual(self._listdir_counter, 8)

# Search for something that isn't valid Ceres content
fh = open(join(test_dir, 'foo', 'blah'), 'wb')
Expand Down

0 comments on commit 4005a0e

Please sign in to comment.