Skip to content
Browse files

Merge pull request #7 from slackhappy/new_hypertable_api

hypertable: use new file-backed tree index
  • Loading branch information...
2 parents b041530 + 268e15a commit 6b0a2a44d06bc8c37f6db1f80fc7690d597cc045 @slackhappy slackhappy committed Jun 2, 2012
View
42 webapp/graphite/hypertable_client.py
@@ -38,6 +38,26 @@ def releaseConn(self, conn):
with self.lock:
return self.freeClients.append(conn)
+
+ def doScanAsArrays(self, spec, table, cb):
+ with self.semaphore:
+ start = time.time()
+ conn = self.getConn()
+ namespace = conn.namespace_open('monitor')
+ scanner = conn.scanner_open(namespace, table, spec)
+
+ while True:
+ row_data = conn.scanner_get_cells_as_arrays(scanner)
+ if(len(row_data) == 0):
+ break
+ for key, family, column, val, ts in row_data:
+ cb(key, family, column, val, ts)
+ conn.close_scanner(scanner)
+ self.releaseConn(conn)
+ log.info(spec)
+ log.info('scan-arrays-fetch time: %s' % (time.time() - start))
+
+
def doScan(self, spec, table, cb):
with self.semaphore:
start = time.time()
@@ -60,9 +80,27 @@ def doScan(self, spec, table, cb):
conn.close_scanner(scanner)
self.releaseConn(conn)
log.info(spec)
- log.info('fetch time: %s' % (time.time() - start))
+ log.info('scan-fetch time: %s' % (time.time() - start))
+
+ def doQueryAsArrays(self, query, cb):
+ with self.semaphore:
+ start = time.time()
+ conn = self.getConn()
+ namespace = conn.namespace_open('monitor')
+ results = conn.hql_exec2(namespace, query, 0, 1)
+ while True:
+ row_data = conn.scanner_get_cells_as_arrays(results.scanner)
+ if(len(row_data) == 0):
+ break
+ for key, family, column, val, ts in row_data:
+ cb(key, family, column, val, ts)
+
+ conn.close_scanner(results.scanner)
+ self.releaseConn(conn)
+ log.info(query)
+ log.info('query-fetch time: %s' % (time.time() - start))
def doQuery(self, query, cb):
with self.semaphore:
@@ -85,6 +123,6 @@ def doQuery(self, query, cb):
conn.close_scanner(results.scanner)
self.releaseConn(conn)
log.info(query)
- log.info('fetch time: %s' % (time.time() - start))
+ log.info('query-fetch time: %s' % (time.time() - start))
HyperTablePool = ConnectionPool(20)
View
189 webapp/graphite/metrics/hypertable_search.py
@@ -3,15 +3,167 @@
import subprocess
import os.path
from django.conf import settings
+from hyperthrift.gen.ttypes import ScanSpec
from graphite.hypertable_client import removePrefix, addPrefix
+from graphite.storage import _deduplicate, is_pattern
from graphite.logger import log
import re
+import fnmatch
from graphite.hypertable_client import HyperTablePool
EXPANDABLE_PATH_RE = re.compile('.*[\*{}\[\]]+.*')
def regexifyPathExpr(pathExpr):
- return '^%s$' % re.sub('\*', '[^\.]+', re.sub('\.', '\.', pathExpr))
+ return pathExpr.replace('+', '\\+').replace('.', '\\.').replace('*', '[^\.]+')
+
+CACHE_CHECK_INTERVAL_SECS = 300
+
+class HyperIndex:
+ def __init__(self):
+ self.index_path = 'index.txt'
+ self.last_atime = 0
+ self.every_metric = ''
+ self.tree = ({}, {})
+ log.info("[HyperIndex] performing initial index load")
+ self._loadFromFile()
+ self._loadFromHyperTable()
+
+ def _loadFromFile(self):
+ if os.path.exists(self.index_path):
+ s = time.time()
+ self.last_atime = int(os.path.getmtime(self.index_path)) * 10**9L
+ fh = open(self.index_path)
+ for l in fh:
+ self._add(l.strip())
+ fh.close()
+ log.info("[HyperIndex] initial load took %.6f seconds" % (time.time() - s))
+
+
+ def _loadFromHyperTable(self):
+ # if the index_path is suddenly deleted, that means start from scratch
+ if not os.path.exists(self.index_path):
+ self.last_atime = 0
+ spec = ScanSpec(keys_only=True, start_time=self.last_atime, versions=1)
+ s = time.time()
+ self.last_atime = int(s) * 10**9L
+ metrics = []
+ fh = open(self.index_path, 'a')
+ def processResult(key, family, column, val, ts):
+ if not self._existsInTree(key):
+ fh.write(key + '\n')
+ self._add(key)
+
+ # TODO(johng): convert to a doScan once SCR works on large results
+ HyperTablePool.doScanAsArrays(spec, "search", processResult)
+ fh.close()
+ log.info("[HyperIndex] index reload took %.6f seconds" % (time.time() - s))
+
+ # like find in tree, for exact matches.
+ # for index dup checking
+ def _existsInTree(self, key):
+ branches = key.split('.')
+ cursor = self.tree
+ leaf = branches.pop()
+ for branch in branches:
+ if branch not in cursor[1]:
+ return False
+ if leaf in cursor[0]:
+ return True
+ else:
+ return False
+
+ def _add(self, key):
+ branches = key.split('.')
+ cursor = self.tree
+ leaf = branches.pop()
+ for branch in branches:
+ if branch not in cursor[1]:
+ cursor[1][branch] = ({}, {}) # (leaves, children)
+ cursor = cursor[1][branch]
+ cursor[0][leaf] = 1 # add leaf
+
+ def _getMatches(self, haystack_dict, needle):
+ if type(needle) is list: # patterns, variants
+ entries = haystack_dict.keys()
+ matches = []
+ for variant in needle:
+ matches.extend(fnmatch.filter(entries, variant))
+ return list(_deduplicate(matches))
+ else:
+ if needle in haystack_dict:
+ return [needle]
+ else:
+ return[]
+
+ # splits the key by '.', exact parts are strings, patterns are lists
+ def _split(self, key):
+ parts = key.split('.')
+ for i in range (0, len(parts)):
+ if is_pattern(parts[i]):
+ parts[i] = self._variants(parts[i])
+ return parts
+
+ # computes variants in a pathExpr
+ def _variants(self, pattern):
+ v1, v2 = pattern.find('{'), pattern.find('}')
+ if v1 > -1 and v2 > v1:
+ variations = pattern[v1+1:v2].split(',')
+ variants = [ pattern[:v1] + v + pattern[v2+1:] for v in variations ]
+ return variants
+ return [pattern]
+
+
+ def _findInTree(self, cursor, keyparts, leaf_matches_leaves=True, leaf_matches_branches=False):
+ if not keyparts:
+ return []
+ #print keyparts
+ part = keyparts.pop()
+ if len(keyparts) == 0: #leaf
+ res = []
+ if leaf_matches_leaves:
+ res.extend([([e], True) for e in self._getMatches(cursor[0], part)])
+ if leaf_matches_branches:
+ res.extend([([e], False) for e in self._getMatches(cursor[1], part)])
+ #print res
+ return res
+ else:
+ results = []
+ for match in self._getMatches(cursor[1], part):
+ #print match
+ postfixes = self._findInTree(cursor[1][match], keyparts[:], leaf_matches_leaves, leaf_matches_branches)
+ for postfix in postfixes:
+ postfix[0].append(match)
+ results.append(postfix)
+ return results
+
+ def findInTree(self, pathExpr, leaf_matches_leaves=True, leaf_matches_branches=False):
+ if int(time.time()) * 10**9L - self.last_atime > CACHE_CHECK_INTERVAL_SECS * 10**9L:
+ self._loadFromHyperTable()
+ s = time.time()
+ parts = self._split(pathExpr)
+ parts.reverse() #keyparts is reversed, because pop is fast
+ res = self._findInTree(self.tree, parts, leaf_matches_leaves, leaf_matches_branches)
+ nodes = [HyperNode('.'.join(reversed(x[0])), x[1]) for x in res]
+
+ log.info("[HyperIndex] search for %s took %.6f seconds" % (pathExpr, time.time() - s))
+ return nodes
+
+ # only returns metrics
+ def findMetric(self, pathExpr):
+ return [node.metric_path for node in self.findInTree(pathExpr)]
+
+ # returns HyperNodes which could be metrics, or subfolders
+ def find(self, pathExpr):
+ return self.findInTree(pathExpr, True, True)
+
+ # weird format for seemingly deprecated metrics/search endpoint
+ def search(self, query, max_results=None, keep_query_pattern=False):
+ count = 0
+ for node in self.find(query):
+ if max_results != None and count >= max_results:
+ return
+ yield { 'path': node.metric_path, 'is_leaf': node.isLeaf() }
+ count += 1
class HyperStore:
def find(self, pathExpr):
@@ -46,7 +198,7 @@ def findHelper(self, where):
def processResult(key, family, column, val, ts):
metrics.append(key)
- HyperTablePool.doQuery(query, processResult)
+ HyperTablePool.doQueryAsArrays(query, processResult)
return metrics
class HyperNode:
@@ -59,34 +211,9 @@ def __init__(self, metric_path, isLeaf):
self.__isLeaf = isLeaf
def isLeaf(self):
- return self.__isLeaf
-
-class HyperTableIndexSearcher:
- def search(self, query, max_results=None, keep_query_pattern=False):
- query_parts = query.split('.')
- count = 0
- for result in self.find(query):
- if count >= max_results:
- return
- yield { 'path': result.metric_path, 'is_leaf': result.isLeaf() }
- count += 1
-
-
- def find(self, query):
- query = addPrefix(query)
- query_parts = query.split('.')
-
- pattern = '.'.join(query_parts[0:-1]) + '|'
- query = 'SELECT * FROM tree WHERE row =^ "%s"' % pattern
+ return self.__isLeaf
+ def __repr__(self):
+ return 'HyperNode(%s, %s)' % (self.metric_path, self.isLeaf())
- nodes = []
- def processResult(key, family, column, val, ts):
- log.info("%s = %s" % (key, val))
- if column == 'has_children':
- key = re.sub('^\|','', key)
- nodes.append(HyperNode(key.replace('|', '.'), val == '0'))
-
- HyperTablePool.doQuery(query, processResult)
- return nodes
-hypertable_searcher = HyperTableIndexSearcher()
+hypertable_index = HyperIndex()
View
6 webapp/graphite/metrics/views.py
@@ -20,7 +20,7 @@
from graphite.logger import log
from graphite.storage import STORE, LOCAL_STORE
from graphite.metrics.search import searcher
-from graphite.metrics.hypertable_search import hypertable_searcher
+from graphite.metrics.hypertable_search import hypertable_index
from graphite.render.datalib import CarbonLink
import fnmatch, os
@@ -57,7 +57,7 @@ def search_view(request):
#if not search_request['query'].endswith('*'):
# search_request['query'] += '*'
- search_backend = hypertable_searcher or searcher
+ search_backend = hypertable_index or searcher
results = sorted(search_backend.search(**search_request))
result_data = json.dumps( dict(metrics=results) )
@@ -122,7 +122,7 @@ def find_view(request):
else:
store = STORE
- store = hypertable_searcher or store
+ store = hypertable_index or store
if format == 'completer':
query = query.replace('..', '*.')
View
12 webapp/graphite/render/datalib.py
@@ -23,7 +23,7 @@
from graphite.logger import log
from graphite.storage import STORE, LOCAL_STORE
from graphite.hypertable_client import HyperTablePool, removePrefix, addPrefix
-from graphite.metrics.hypertable_search import HyperStore
+from graphite.metrics.hypertable_search import hypertable_index
from hyperthrift.gen.ttypes import ScanSpec, CellInterval
from graphite.render.hashing import ConsistentHashRing
@@ -220,11 +220,10 @@ def fetchData(requestContext, pathExpr):
def fetchDataFromHyperTable(requestContext, pathExpr):
MIN_INTERVAL_SECS = 10
COL_INTERVAL_SECS = 60 * 60
- if pathExpr.lower().startswith('graphite.'):
- pathExpr = pathExpr[9:]
+ log.info('fetching %s' % pathExpr)
pathExpr = addPrefix(pathExpr)
- metrics = [addPrefix(m) for m in HyperStore().find(pathExpr)]
+ metrics = [addPrefix(m) for m in hypertable_index.findMetric(pathExpr)]
if not metrics:
return []
@@ -238,6 +237,9 @@ def fetchDataFromHyperTable(requestContext, pathExpr):
endColString = endDateTime.strftime('metric:%Y-%m-%d %H')
cellIntervals = [ CellInterval(m, startColString, True, m, endColString, True) for m in metrics ]
+ if cellIntervals == None:
+ return []
+
nanosStart = start * 10**9L
nanosEnd = end * 10**9L
@@ -261,7 +263,7 @@ def processResult(key, family, column, val, ts):
- HyperTablePool.doScan(scan_spec, "metrics", processResult)
+ HyperTablePool.doScanAsArrays(scan_spec, "metrics", processResult)
elapsed = end - start
lowestMetricStep = elapsed

0 comments on commit 6b0a2a4

Please sign in to comment.
Something went wrong with that request. Please try again.