Skip to content
This repository has been archived by the owner on Sep 16, 2021. It is now read-only.

Commit

Permalink
Revert "Backporting "@jneely's merge timeseries patch" (graphite-proj…
Browse files Browse the repository at this point in the history
…ect#1352) to master branch"

This reverts commit bc18f97.
  • Loading branch information
Corentin Chary committed Aug 26, 2016
1 parent f4c20d2 commit 6003270
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 122 deletions.
5 changes: 0 additions & 5 deletions webapp/graphite/local_settings.py.example
Expand Up @@ -259,11 +259,6 @@
# remote systems have data we don't have locally, which we probably do.
#FIND_TOLERANCE = 2 * FIND_CACHE_DURATION

# During a rebalance of a consistent hash cluster, after a partition event on a replication > 1 cluster,
# or in other cases we might receive multiple TimeSeries data for a metric key. Merge them together rather
# that choosing the "most complete" one (pre-0.9.14 behaviour).
#REMOTE_STORE_MERGE_RESULTS = True

## Remote rendering settings
# Set to True to enable rendering of Graphs on a remote webapp
#REMOTE_RENDERING = True
Expand Down
77 changes: 15 additions & 62 deletions webapp/graphite/render/datalib.py
Expand Up @@ -107,7 +107,8 @@ def getInfo(self):

# Data retrieval API
def fetchData(requestContext, pathExpr):
seriesList = {}

seriesList = []
startTime = int( epoch( requestContext['startTime'] ) )
endTime = int( epoch( requestContext['endTime'] ) )

Expand All @@ -131,69 +132,21 @@ def _fetchData(pathExpr,startTime, endTime, requestContext, seriesList):

series = TimeSeries(node.path, start, end, step, values)
series.pathExpression = pathExpr #hack to pass expressions through to render functions
seriesList.append(series)

# Used as a cache to avoid recounting series None values below.
series_best_nones = {}

if series.name in seriesList:
# This counts the Nones in each series, and is unfortunately O(n) for each
# series, which may be worth further optimization. The value of doing this
# at all is to avoid the "flipping" effect of loading a graph multiple times
# and having inconsistent data returned if one of the backing stores has
# inconsistent data. This is imperfect as a validity test, but in practice
# nicely keeps us using the "most complete" dataset available. Think of it
# as a very weak CRDT resolver.
candidate_nones = 0
if not settings.REMOTE_STORE_MERGE_RESULTS:
candidate_nones = len(
[val for val in series['values'] if val is None])

known = seriesList[series.name]
# To avoid repeatedly recounting the 'Nones' in series we've already seen,
# cache the best known count so far in a dict.
if known.name in series_best_nones:
known_nones = series_best_nones[known.name]
else:
known_nones = len([val for val in known if val is None])

if known_nones > candidate_nones:
if settings.REMOTE_STORE_MERGE_RESULTS:
# This series has potential data that might be missing from
# earlier series. Attempt to merge in useful data and update
# the cache count.
log.info("Merging multiple TimeSeries for %s" % known.name)
for i, j in enumerate(known):
if j is None and series[i] is not None:
known[i] = series[i]
known_nones -= 1
# Store known_nones in our cache
series_best_nones[known.name] = known_nones
else:
# Not merging data -
# we've found a series better than what we've already seen. Update
# the count cache and replace the given series in the array.
series_best_nones[known.name] = candidate_nones
seriesList[known.name] = series
else:
# In case if we are merging data - the existing series has no gaps and there is nothing to merge
# together. Save ourselves some work here.
#
# OR - if we picking best serie:
#
# We already have this series in the seriesList, and the
# candidate is 'worse' than what we already have, we don't need
# to compare anything else. Save ourselves some work here.
break

# If we looked at this series above, and it matched a 'known'
# series already, then it's already in the series list (or ignored).
# If not, append it here.
else:
seriesList[series.name] = series
# Prune empty series with duplicate metric paths to avoid showing empty graph elements for old whisper data
names = set([ s.name for s in seriesList ])
for name in names:
series_with_duplicate_names = [ s for s in seriesList if s.name == name ]
empty_duplicates = [ s for s in series_with_duplicate_names if not nonempty(s) ]

if series_with_duplicate_names == empty_duplicates and len(empty_duplicates) > 0: # if they're all empty
empty_duplicates.pop() # make sure we leave one in seriesList

for series in empty_duplicates:
seriesList.remove(series)

# Stabilize the order of the results by ordering the resulting series by name.
# This returns the result ordering to the behavior observed pre PR#1010.
return [seriesList[k] for k in sorted(seriesList)]
return seriesList

retries = 1 # start counting at one to make log output and settings more readable
while True:
Expand Down
1 change: 0 additions & 1 deletion webapp/graphite/settings.py
Expand Up @@ -57,7 +57,6 @@
REMOTE_FETCH_TIMEOUT = 3.0
REMOTE_RETRY_DELAY = 60.0
REMOTE_EXCLUDE_LOCAL = False
REMOTE_STORE_MERGE_RESULTS = True
CARBON_METRIC_PREFIX='carbon'
CARBONLINK_HOSTS = ["127.0.0.1:7002"]
CARBONLINK_TIMEOUT = 1.0
Expand Down
104 changes: 50 additions & 54 deletions webapp/graphite/storage.py
Expand Up @@ -80,60 +80,56 @@ def find(self, pattern, startTime=None, endTime=None, local=False):
if not leaf_nodes:
continue

if settings.REMOTE_STORE_MERGE_RESULTS:
# we need all nodes for merging results
minimal_node_set = leaf_nodes
else:
# Calculate best minimal node set
minimal_node_set = set()
covered_intervals = IntervalSet([])

# If the query doesn't fall entirely within the FIND_TOLERANCE window
# we disregard the window. This prevents unnecessary remote fetches
# caused when carbon's cache skews node.intervals, giving the appearance
# remote systems have data we don't have locally, which we probably do.
now = int( time.time() )
tolerance_window = now - settings.FIND_TOLERANCE
disregard_tolerance_window = query.interval.start < tolerance_window
prior_to_window = Interval( float('-inf'), tolerance_window )

def measure_of_added_coverage(node, drop_window=disregard_tolerance_window):
relevant_intervals = node.intervals.intersect_interval(query.interval)
if drop_window:
relevant_intervals = relevant_intervals.intersect_interval(prior_to_window)
return covered_intervals.union(relevant_intervals).size - covered_intervals.size

nodes_remaining = list(leaf_nodes)

# Prefer local nodes first (and do *not* drop the tolerance window)
for node in leaf_nodes:
if node.local and measure_of_added_coverage(node, False) > 0:
nodes_remaining.remove(node)
minimal_node_set.add(node)
covered_intervals = covered_intervals.union(node.intervals)

while nodes_remaining:
node_coverages = [ (measure_of_added_coverage(n), n) for n in nodes_remaining ]
best_coverage, best_node = max(node_coverages)

if best_coverage == 0:
break

nodes_remaining.remove(best_node)
minimal_node_set.add(best_node)
covered_intervals = covered_intervals.union(best_node.intervals)

# Sometimes the requested interval falls within the caching window.
# We include the most likely node if the gap is within tolerance.
if not minimal_node_set:
def distance_to_requested_interval(node):
latest = sorted(node.intervals, key=lambda i: i.end)[-1]
distance = query.interval.start - latest.end
return distance if distance >= 0 else float('inf')

best_candidate = min(leaf_nodes, key=distance_to_requested_interval)
if distance_to_requested_interval(best_candidate) <= settings.FIND_TOLERANCE:
minimal_node_set.add(best_candidate)
# Calculate best minimal node set
minimal_node_set = set()
covered_intervals = IntervalSet([])

# If the query doesn't fall entirely within the FIND_TOLERANCE window
# we disregard the window. This prevents unnecessary remote fetches
# caused when carbon's cache skews node.intervals, giving the appearance
# remote systems have data we don't have locally, which we probably do.
now = int( time.time() )
tolerance_window = now - settings.FIND_TOLERANCE
disregard_tolerance_window = query.interval.start < tolerance_window
prior_to_window = Interval( float('-inf'), tolerance_window )

def measure_of_added_coverage(node, drop_window=disregard_tolerance_window):
relevant_intervals = node.intervals.intersect_interval(query.interval)
if drop_window:
relevant_intervals = relevant_intervals.intersect_interval(prior_to_window)
return covered_intervals.union(relevant_intervals).size - covered_intervals.size

nodes_remaining = list(leaf_nodes)

# Prefer local nodes first (and do *not* drop the tolerance window)
for node in leaf_nodes:
if node.local and measure_of_added_coverage(node, False) > 0:
nodes_remaining.remove(node)
minimal_node_set.add(node)
covered_intervals = covered_intervals.union(node.intervals)

while nodes_remaining:
node_coverages = [ (measure_of_added_coverage(n), n) for n in nodes_remaining ]
best_coverage, best_node = max(node_coverages)

if best_coverage == 0:
break

nodes_remaining.remove(best_node)
minimal_node_set.add(best_node)
covered_intervals = covered_intervals.union(best_node.intervals)

# Sometimes the requested interval falls within the caching window.
# We include the most likely node if the gap is within tolerance.
if not minimal_node_set:
def distance_to_requested_interval(node):
latest = sorted(node.intervals, key=lambda i: i.end)[-1]
distance = query.interval.start - latest.end
return distance if distance >= 0 else float('inf')

best_candidate = min(leaf_nodes, key=distance_to_requested_interval)
if distance_to_requested_interval(best_candidate) <= settings.FIND_TOLERANCE:
minimal_node_set.add(best_candidate)

if len(minimal_node_set) == 1:
yield minimal_node_set.pop()
Expand Down

0 comments on commit 6003270

Please sign in to comment.