Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Add basic .gitignore #6

Closed
wants to merge 5 commits into from

2 participants

@inactivist

I've added a .gitignore file to the project. Feel free to merge it into your project.

@awestendorf
Owner

There's a lot in this commit and some of it doesn't seem right for a library. The gitignore is needed though if you want to commit this separately.

@awestendorf awestendorf closed this
@inactivist

Hm... not sure why all those other commits ended up in the pull request -- was only aiming for the .gitignore... Clearly, I need to back to Github school. :D

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on May 22, 2012
  1. @inactivist
  2. @inactivist

    Adding basic .gitignore

    inactivist authored
  3. @inactivist
  4. @inactivist

    Adding rudimentary generate/monitor script.

    inactivist authored
    Depending on mode, generates randome data or monitors  the generated data.
    Usage: python gen_mon.py generate | monitor [-i 'minute' | 'hour' | 'daily']
  5. @inactivist

    Adding a bit of help.

    inactivist authored
This page is out of date. Refresh to see the latest.
Showing with 188 additions and 53 deletions.
  1. +29 −0 .gitignore
  2. +69 −53 kairos/timeseries.py
  3. +90 −0 script/gen_mon.py
View
29 .gitignore
@@ -0,0 +1,29 @@
+*.py[co]
+
+# Packages
+*.egg
+*.egg-info
+dist
+build
+eggs
+parts
+bin
+var
+sdist
+develop-eggs
+.installed.cfg
+
+# Installer logs
+pip-log.txt
+
+# Unit test / coverage reports
+.coverage
+.tox
+
+#Translations
+*.mo
+
+#Mr Developer
+.mr.developer.cfg
+/.project
+/.pydevproject
View
122 kairos/timeseries.py
@@ -12,6 +12,21 @@
else:
from ordereddict import OrderedDict
+class _ConfigStruct:
+ """
+ Helper class to reduce code repetition and enforce consistent
+ default values.
+ """
+ def __init__(self, **entries):
+ self.count_only = False
+ self.compress = False
+ self.read_cast = None
+ self.step = 1
+ self.steps = None
+ self.__dict__.update(entries)
+ # Now set self.resolution default = self.step.
+ self.resolution = self.step
+
class Timeseries(object):
'''
A time series object provides the interface to manage data sets in redis
@@ -85,9 +100,10 @@ def insert(self, name, value, timestamp=None):
pipe = self._client.pipeline()
for interval,config in self._config.iteritems():
- step = config.get('step', 1)
- steps = config.get('steps',None)
- resolution = config.get('resolution',step)
+ _cfg = _ConfigStruct(**config)
+ step = _cfg.step
+ steps = _cfg.steps
+ resolution = _cfg.resolution
interval_bucket = int( timestamp / step )
resolution_bucket = int( timestamp / resolution )
@@ -97,9 +113,9 @@ def insert(self, name, value, timestamp=None):
# If resolution is the same as step, store in the same row.
if resolution==step:
- if config.get('count_only',False):
+ if _cfg.count_only:
pipe.incr(interval_key)
- elif config.get('compress', False):
+ elif _cfg.compress:
pipe.hincrby(interval_key, value, 1)
else:
pipe.rpush(interval_key, value)
@@ -108,9 +124,9 @@ def insert(self, name, value, timestamp=None):
pipe.sadd(interval_key, resolution_bucket)
# Figure out what we're storing.
- if config.get('count_only',False):
+ if _cfg.count_only:
pipe.incr(resolution_key)
- elif config.get('compress', False):
+ elif _cfg.compress:
pipe.hincrby(resolution_key, value, 1)
else:
pipe.rpush(resolution_key, value)
@@ -138,28 +154,28 @@ def get(self, name, interval, timestamp=None, condensed=False):
if not timestamp:
timestamp = time.time()
- config = self._config[interval]
- step = config.get('step', 1)
- resolution = config.get('resolution',step)
+ config = _ConfigStruct(**self._config[interval])
+ step = config.step
+ resolution = config.resolution # config.get('resolution',step)
interval_bucket = int( timestamp / step )
interval_key = '%s%s:%s:%s'%(self._prefix, name, interval, interval_bucket)
rval = OrderedDict()
if resolution==step:
- if config.get('count_only',False):
+ if config.count_only:
data = int( self._client.get(interval_key) or 0 )
- elif config.get('compress', False):
+ elif config.compress:
data = self._client.hgetall(interval_key)
# Turn back into a time series
# TODO: this might be too slow because of the addition
data = reduce(lambda res, (key,val): res + int(val)*[key], data.iteritems(), [] )
- if config.get('read_cast'):
- data = map(config.get('read_cast'), data)
+ if config.read_cast:
+ data = map(config.read_cast, data)
else:
data = self._client.lrange(interval_key, 0, -1)
- if config.get('read_cast'):
- data = map(config.get('read_cast'), data)
+ if config.read_cast:
+ data = map(config.read_cast, data)
rval[ interval_bucket*step ] = data
else:
# First fetch all of the resolution buckets for this set.
@@ -170,25 +186,25 @@ def get(self, name, interval, timestamp=None, condensed=False):
for bucket in resolution_buckets:
resolution_key = '%s:%s'%(interval_key, bucket)
- if config.get('count_only',False):
+ if config.count_only:
pipe.get(resolution_key)
- elif config.get('compress', False):
+ elif config.compress:
pipe.hgetall(resolution_key)
else:
pipe.lrange(resolution_key, 0, -1)
res = pipe.execute()
for idx,data in enumerate(res):
- if config.get('count_only',False):
+ if config.count_only:
data = int(data) if data else 0
- elif config.get('compress', False):
+ elif config.compress:
# Turn back into a time series
# TODO: this might be too slow because of the addition
data = reduce(lambda res, (key,val): res + int(val)*[key], data.iteritems(), [] )
- if config.get('read_cast'):
- data = map(config.get('read_cast'), data)
- elif config.get('read_cast'):
- data = map(config.get('read_cast'), data)
+ if config.read_cast:
+ data = map(config.read_cast, data)
+ elif config.read_cast:
+ data = map(config.read_cast, data)
rval[ resolution_buckets[idx]*resolution ] = data
@@ -207,18 +223,18 @@ def count(self, name, interval, timestamp=None, condensed=False):
if not timestamp:
timestamp = time.time()
- config = self._config[interval]
- step = config.get('step', 1)
- resolution = config.get('resolution',step)
+ config = _ConfigStruct(**self._config[interval])
+ step = config.step
+ resolution = config.resolution
interval_bucket = int( timestamp / step )
interval_key = '%s%s:%s:%s'%(self._prefix, name, interval, interval_bucket)
rval = OrderedDict()
if resolution==step:
- if config.get('count_only',False):
+ if config.count_only:
data = int( self._client.get(interval_key) or 0 )
- elif config.get('compress', False):
+ elif config.compress:
data = sum( map(int, self._client.hvals(interval_key)) )
else:
data = int( self._client.llen(interval_key) )
@@ -232,16 +248,16 @@ def count(self, name, interval, timestamp=None, condensed=False):
for bucket in resolution_buckets:
resolution_key = '%s:%s'%(interval_key, bucket)
- if config.get('count_only',False):
+ if config.count_only:
pipe.get(resolution_key)
- elif config.get('compress', False):
+ elif config.compress:
pipe.hvals(resolution_key)
else:
pipe.llen(resolution_key)
res = pipe.execute()
for idx,data in enumerate(res):
- if config.get('compress', False):
+ if config.compress:
rval[ resolution_buckets[idx]*resolution ] = sum(map(int,data)) if data else 0
else:
rval[ resolution_buckets[idx]*resolution ] = int(data) if data else 0
@@ -261,10 +277,10 @@ def series(self, name, interval, steps=None, condensed=False):
matches the return value in get().
'''
# TODO: support start and end timestamps
- config = self._config[interval]
- step = config.get('step', 1)
- steps = steps if steps else config.get('steps',1)
- resolution = config.get('resolution',step)
+ config = _ConfigStruct(**self._config[interval])
+ step = config.step
+ steps = steps if steps else config.steps if config.steps else 1
+ resolution = config.resolution
end_timestamp = time.time()
end_bucket = int( end_timestamp / step )
@@ -280,9 +296,9 @@ def series(self, name, interval, steps=None, condensed=False):
rval[interval_bucket*step] = OrderedDict()
if step==resolution:
- if config.get('count_only',False):
+ if config.count_only:
pipe.get(interval_key)
- elif config.get('compress',False):
+ elif config.compress:
pipe.hgetall(interval_key)
else:
pipe.lrange(interval_key, 0, -1)
@@ -298,16 +314,16 @@ def series(self, name, interval, steps=None, condensed=False):
interval_key = '%s%s:%s:%s'%(self._prefix, name, interval, interval_bucket)
if step==resolution:
- if config.get('count_only',False):
+ if config.count_only:
data = int(data) if data else 0
- elif config.get('compress',False):
+ elif config.compress:
# Turn back into a time series
# TODO: this might be too slow because of the addition
data = reduce(lambda res, (key,val): res + int(val)*[key], data.iteritems(), [] )
- if config.get('read_cast'):
- data = map(config.get('read_cast'), data)
- elif config.get('read_cast'):
- data = map(config.get('read_cast'), data)
+ if config.read_cast:
+ data = map(config.read_cast, data)
+ elif config.read_cast:
+ data = map(config.read_cast, data)
rval[interval_bucket*step] = data
else:
@@ -315,32 +331,32 @@ def series(self, name, interval, steps=None, condensed=False):
for bucket in resolution_buckets:
resolution_key = '%s:%s'%(interval_key, bucket)
- if config.get('count_only',False):
+ if config.count_only:
pipe.get(resolution_key)
- elif config.get('compress',False):
+ elif config.compress:
pipe.hgetall(resolution_key)
else:
pipe.lrange(resolution_key, 0, -1)
resolution_res = pipe.execute()
for x,data in enumerate(resolution_res):
- if config.get('count_only',False):
+ if config.count_only:
data = int(data) if data else 0
- elif config.get('compress',False):
+ elif config.compress:
# Turn back into a time series
# TODO: this might be too slow because of the addition
data = reduce(lambda res, (key,val): res + int(val)*[key], data.iteritems(), [] )
- if config.get('read_cast'):
- data = map(config.get('read_cast'), data)
- elif config.get('read_cast'):
- data = map(config.get('read_cast'), data)
+ if config.read_cast:
+ data = map(config.read_cast, data)
+ elif config.read_cast:
+ data = map(config.read_cast, data)
rval[interval_bucket*step][ resolution_buckets[x]*resolution ] = data
# If condensed, collapse each interval into a single value
if condensed:
for key in rval.keys():
- if config.get('count_only',False):
+ if config.count_only:
rval[key] = sum(rval[key].values())
else:
rval[key] = reduce(operator.add, rval[key].values(), [])
View
90 script/gen_mon.py
@@ -0,0 +1,90 @@
+#!/usr/bin/env python
+
+from kairos import Timeseries
+import redis
+import time
+import datetime
+import random
+
+MAX_WIDTH_COLUMNS = 60
+
+KEY = 'example.com'
+KEY_PREFIX = 'timedata:domain'
+
+client = redis.Redis('localhost', 6379)
+counters = Timeseries(client, {
+ 'minute': {
+ 'step': 60, # 60 seconds
+ 'steps': 60, # last hour
+ 'count_only' : True, # store counts only.
+ },
+ 'hour': {
+ 'step': 3600, # Hourly
+ 'steps': 24, # Last day
+ 'count_only' : True, # Store counts only.
+ },
+ 'daily': {
+ 'step': 86400, # Daily
+ 'steps': 30, # Last 30 days
+ 'count_only': True, # Store counts only.
+ },
+ },
+ key_prefix=KEY_PREFIX)
+
+def hit(domain):
+ print "hit: %s @ %d" % (domain, time.time())
+ counters.insert(domain, 1)
+
+def dump_series(base_time, series):
+ for ts, value in series.iteritems():
+ print "%02d(%02d)" % ((ts-base_time)/60, value),
+ print
+
+def plot_series(base_time, series, max_val):
+ scale = max_val / MAX_WIDTH_COLUMNS
+ for ts, count in series.iteritems():
+ print "%4d minutes (%03d): %s" % ((ts-base_time)/60, count, "*" * (count/scale))
+
+def sum_series(series):
+ # series to list: series.list()
+ return sum(series.values())
+
+def generate():
+ # record a couple of hits.
+ hit(KEY)
+ hit(KEY)
+
+ start = datetime.datetime.now()
+ x = 0
+ while True:
+ time.sleep(1)
+ # Record a hit every once in a while (approx. every 3.5 seconds...)
+ if x % random.randint(2,5) == 0:
+ hit(KEY)
+ x += 1
+
+interval_max_values = { 'minute' : 100, 'hour': 2000, 'daily': 2000*24 }
+
+def monitor(interval_name):
+ while True:
+ # get = counters.get(KEY, interval_name)
+ series = counters.series(KEY, interval_name)
+ # count = counters.count(KEY, interval_name)
+ last_5 = counters.series(KEY, interval_name, steps=5, condensed=False)
+ sum = sum_series(last_5)
+ # This should work but breaks: sum = counters.series(KEY, interval_name, steps=5, condensed=True)
+ #dump_series(time.time(), series)
+ plot_series(time.time(), series, interval_max_values[interval_name])
+ print "%d in last 5 %s (~%2.2f per %s)." % (sum, interval_name, sum/5.0, interval_name)
+ time.sleep(1)
+
+if __name__ == "__main__":
+ import argparse
+ parser = argparse.ArgumentParser(description='Test something.')
+ parser.add_argument('op', metavar='op', help='If op=generate, generates random data. If op=monitor, monitors generated data.', default='generate', action="store", choices=['generate', 'monitor'])
+ parser.add_argument('-i', '--interval', metavar='interval', default='minute', action='store', choices=['minute', 'hour', 'daily'])
+ opts = parser.parse_args()
+ if opts.op == 'generate':
+ generate()
+ else:
+ monitor(opts.interval)
Something went wrong with that request. Please try again.