Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Use ScanSpec and column quals in HT #1

Merged
merged 1 commit into from

1 participant

@slackhappy
Owner

The performance of using column qualifiers seems to be worse,
but it should be more reliable as we fix the schema. Right now, it seems
to add a second per matched column qualifier (so whatever the
prefix resolves to).

@slackhappy slackhappy merged commit 5461e16 into foursquare:new_hypertable_api
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on May 15, 2012
  1. @slackhappy
This page is out of date. Refresh to see the latest.
View
22 webapp/graphite/hypertable_client.py
@@ -5,6 +5,7 @@
import hypertable.thriftclient
import threading
import re
+import time
def removePrefix(path):
if settings.HYPERTABLE_PREFIX:
@@ -38,6 +39,27 @@ def releaseConn(self, conn):
with self.lock:
return self.freeClients.append(conn)
+ def doScan(self, spec, table, cb):
+ with self.semaphore:
+ start = time.time()
+ conn = self.getConn()
+ namespace = conn.namespace_open('monitor')
+ scanner = conn.scanner_open(namespace, table, spec)
+
+ while True:
+ row_data = conn.scanner_get_cells_as_arrays(scanner)
+ if(len(row_data) == 0):
+ break
+ for key, family, column, val, ts in row_data:
+ cb(key, family, column, val, ts)
+
+ conn.close_scanner(scanner)
+ self.releaseConn(conn)
+ log.info(spec)
+ log.info('fetch time: %s' % (time.time() - start))
+
+
+
def doQuery(self, query, cb):
with self.semaphore:
conn = self.getConn()
View
78 webapp/graphite/render/datalib.py
@@ -17,11 +17,14 @@
import socket
import struct
import time
+import datetime
+from collections import defaultdict
from django.conf import settings
from graphite.logger import log
from graphite.storage import STORE, LOCAL_STORE
from graphite.hypertable_client import HyperTablePool, removePrefix, addPrefix
from graphite.metrics.hypertable_search import HyperStore
+from hyperthrift.gen.ttypes import ScanSpec, RowInterval
from graphite.render.hashing import ConsistentHashRing
try:
@@ -215,55 +218,78 @@ def fetchData(requestContext, pathExpr):
return fetchDataFromHyperTable(requestContext, pathExpr)
def fetchDataFromHyperTable(requestContext, pathExpr):
+ MIN_INTERVAL_SECS = 10
+ COL_INTERVAL_SECS = 60 * 60
if pathExpr.lower().startswith('graphite.'):
pathExpr = pathExpr[9:]
pathExpr = addPrefix(pathExpr)
metrics = [addPrefix(m) for m in HyperStore().find(pathExpr)]
- startTime = requestContext['startTime'].strftime('%Y-%m-%d %H:%M:%S')
- endTime = requestContext['endTime'].strftime('%Y-%m-%d %H:%M:%S')
- start, end, step = timestamp(requestContext['startTime']), timestamp(requestContext['endTime']), 10
- buckets = (end - start) / step
+ startDateTime = requestContext['startTime']
+ endDateTime = requestContext['endTime']
- nanosStart = start * 10**9
- nanosEnd = end * 10**9
+ start, end = int(timestamp(requestContext['startTime'])), int(timestamp(requestContext['endTime']))
+
+ diff = endDateTime - startDateTime
+ days = diff.days
+ if diff.seconds > 0:
+ days = days + 1
+ cqs = [ (startDateTime + datetime.timedelta(days=dd)).strftime('metric:^%Y-%m-%d') \
+ for dd in range(0, days) ]
+
+ nanosStart = start * 10**9L
+ nanosEnd = end * 10**9L
scan_spec = ScanSpec(None, None, None, 1)
scan_spec.start_time = nanosStart
scan_spec.end_time = nanosEnd
- intervals = [RowInterval(m, True, m, True) for m in metrics]
- columns = ['metric']
- print intervals
- scan_spec.row_intervals = intervals
-
- where = ' OR '.join(['ROW = "%s"' % m for m in metrics])
- query = 'SELECT metric FROM metrics WHERE (%s) AND "%s" < TIMESTAMP < "%s"' % (where, startTime, endTime)
- log.info(query)
- log.info(intervals)
+ scan_spec.row_intervals = [RowInterval(m, True, m, True) for m in metrics]
+ scan_spec.columns = cqs
+ scan_spec.versions = COL_INTERVAL_SECS / MIN_INTERVAL_SECS
+
+ log.info(startDateTime)
+ log.info(endDateTime)
log.info(scan_spec)
valuesMap = {}
+ metricStep = {}
for m in metrics:
- valuesMap[m] = [None for x in xrange(0, buckets)]
+ valuesMap[m] = []
def processResult(key, family, column, val, ts):
- its = long(ts) / 1000000000L
- bucket = int((its - start) / step)
- if bucket >= 0 or bucket < buckets:
- if valuesMap[key][bucket]:
- valuesMap[key][bucket] = float(val)
- else:
- valuesMap[key][bucket] = float(val)
+ its = long(ts) / 10**9L #nanoseconds -> seconds
+ valuesMap[key].append((its, val))
+
- client.scanner_open("monitor", "metrics", scan_spec);
+ HyperTablePool.doScan(scan_spec, "metrics", processResult)
- HyperTablePool.doQuery(query, processResult)
+ # post-fetch processing
+ for m in metrics:
+ # determine step size (its the minimum step found)
+ minStep = end - start
+ sortedVals = sorted(valuesMap[m], key=lambda x: x[0])
+ for i in range(1, len(sortedVals)):
+ step = sortedVals[i][0] - sortedVals[i-1][0]
+ minStep = min(minStep, step)
+
+ minStep = int(minStep)
+ steps = int(end - start) / minStep
+ metricStep[m] = minStep
+
+ # estimation of confidence: length / steps * 100
+
+ # push final values
+ finalValues = [None] * steps
+ for x in sortedVals:
+ bucket = (x[0] - start) / minStep
+ finalValues[bucket] = float(x[1])
+ valuesMap[m] = finalValues
seriesList = []
for m in metrics:
- series = TimeSeries(removePrefix(m), start, end, step, valuesMap[m])
+ series = TimeSeries(removePrefix(m), start, end, metricStep[m], valuesMap[m])
series.pathExpression = pathExpr # hack to pass expressions through to render functions
seriesList.append(series)
Something went wrong with that request. Please try again.