Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic .gitignore #6

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions .gitignore
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,29 @@
*.py[co]

# Packages
*.egg
*.egg-info
dist
build
eggs
parts
bin
var
sdist
develop-eggs
.installed.cfg

# Installer logs
pip-log.txt

# Unit test / coverage reports
.coverage
.tox

#Translations
*.mo

#Mr Developer
.mr.developer.cfg
/.project
/.pydevproject
122 changes: 69 additions & 53 deletions kairos/timeseries.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -12,6 +12,21 @@
else: else:
from ordereddict import OrderedDict from ordereddict import OrderedDict


class _ConfigStruct:
"""
Helper class to reduce code repetition and enforce consistent
default values.
"""
def __init__(self, **entries):
self.count_only = False
self.compress = False
self.read_cast = None
self.step = 1
self.steps = None
self.__dict__.update(entries)
# Now set self.resolution default = self.step.
self.resolution = self.step

class Timeseries(object): class Timeseries(object):
''' '''
A time series object provides the interface to manage data sets in redis A time series object provides the interface to manage data sets in redis
Expand Down Expand Up @@ -85,9 +100,10 @@ def insert(self, name, value, timestamp=None):
pipe = self._client.pipeline() pipe = self._client.pipeline()


for interval,config in self._config.iteritems(): for interval,config in self._config.iteritems():
step = config.get('step', 1) _cfg = _ConfigStruct(**config)
steps = config.get('steps',None) step = _cfg.step
resolution = config.get('resolution',step) steps = _cfg.steps
resolution = _cfg.resolution


interval_bucket = int( timestamp / step ) interval_bucket = int( timestamp / step )
resolution_bucket = int( timestamp / resolution ) resolution_bucket = int( timestamp / resolution )
Expand All @@ -97,9 +113,9 @@ def insert(self, name, value, timestamp=None):


# If resolution is the same as step, store in the same row. # If resolution is the same as step, store in the same row.
if resolution==step: if resolution==step:
if config.get('count_only',False): if _cfg.count_only:
pipe.incr(interval_key) pipe.incr(interval_key)
elif config.get('compress', False): elif _cfg.compress:
pipe.hincrby(interval_key, value, 1) pipe.hincrby(interval_key, value, 1)
else: else:
pipe.rpush(interval_key, value) pipe.rpush(interval_key, value)
Expand All @@ -108,9 +124,9 @@ def insert(self, name, value, timestamp=None):
pipe.sadd(interval_key, resolution_bucket) pipe.sadd(interval_key, resolution_bucket)


# Figure out what we're storing. # Figure out what we're storing.
if config.get('count_only',False): if _cfg.count_only:
pipe.incr(resolution_key) pipe.incr(resolution_key)
elif config.get('compress', False): elif _cfg.compress:
pipe.hincrby(resolution_key, value, 1) pipe.hincrby(resolution_key, value, 1)
else: else:
pipe.rpush(resolution_key, value) pipe.rpush(resolution_key, value)
Expand Down Expand Up @@ -138,28 +154,28 @@ def get(self, name, interval, timestamp=None, condensed=False):
if not timestamp: if not timestamp:
timestamp = time.time() timestamp = time.time()


config = self._config[interval] config = _ConfigStruct(**self._config[interval])
step = config.get('step', 1) step = config.step
resolution = config.get('resolution',step) resolution = config.resolution # config.get('resolution',step)


interval_bucket = int( timestamp / step ) interval_bucket = int( timestamp / step )
interval_key = '%s%s:%s:%s'%(self._prefix, name, interval, interval_bucket) interval_key = '%s%s:%s:%s'%(self._prefix, name, interval, interval_bucket)


rval = OrderedDict() rval = OrderedDict()
if resolution==step: if resolution==step:
if config.get('count_only',False): if config.count_only:
data = int( self._client.get(interval_key) or 0 ) data = int( self._client.get(interval_key) or 0 )
elif config.get('compress', False): elif config.compress:
data = self._client.hgetall(interval_key) data = self._client.hgetall(interval_key)
# Turn back into a time series # Turn back into a time series
# TODO: this might be too slow because of the addition # TODO: this might be too slow because of the addition
data = reduce(lambda res, (key,val): res + int(val)*[key], data.iteritems(), [] ) data = reduce(lambda res, (key,val): res + int(val)*[key], data.iteritems(), [] )
if config.get('read_cast'): if config.read_cast:
data = map(config.get('read_cast'), data) data = map(config.read_cast, data)
else: else:
data = self._client.lrange(interval_key, 0, -1) data = self._client.lrange(interval_key, 0, -1)
if config.get('read_cast'): if config.read_cast:
data = map(config.get('read_cast'), data) data = map(config.read_cast, data)
rval[ interval_bucket*step ] = data rval[ interval_bucket*step ] = data
else: else:
# First fetch all of the resolution buckets for this set. # First fetch all of the resolution buckets for this set.
Expand All @@ -170,25 +186,25 @@ def get(self, name, interval, timestamp=None, condensed=False):
for bucket in resolution_buckets: for bucket in resolution_buckets:
resolution_key = '%s:%s'%(interval_key, bucket) resolution_key = '%s:%s'%(interval_key, bucket)


if config.get('count_only',False): if config.count_only:
pipe.get(resolution_key) pipe.get(resolution_key)
elif config.get('compress', False): elif config.compress:
pipe.hgetall(resolution_key) pipe.hgetall(resolution_key)
else: else:
pipe.lrange(resolution_key, 0, -1) pipe.lrange(resolution_key, 0, -1)


res = pipe.execute() res = pipe.execute()
for idx,data in enumerate(res): for idx,data in enumerate(res):
if config.get('count_only',False): if config.count_only:
data = int(data) if data else 0 data = int(data) if data else 0
elif config.get('compress', False): elif config.compress:
# Turn back into a time series # Turn back into a time series
# TODO: this might be too slow because of the addition # TODO: this might be too slow because of the addition
data = reduce(lambda res, (key,val): res + int(val)*[key], data.iteritems(), [] ) data = reduce(lambda res, (key,val): res + int(val)*[key], data.iteritems(), [] )
if config.get('read_cast'): if config.read_cast:
data = map(config.get('read_cast'), data) data = map(config.read_cast, data)
elif config.get('read_cast'): elif config.read_cast:
data = map(config.get('read_cast'), data) data = map(config.read_cast, data)


rval[ resolution_buckets[idx]*resolution ] = data rval[ resolution_buckets[idx]*resolution ] = data


Expand All @@ -207,18 +223,18 @@ def count(self, name, interval, timestamp=None, condensed=False):
if not timestamp: if not timestamp:
timestamp = time.time() timestamp = time.time()


config = self._config[interval] config = _ConfigStruct(**self._config[interval])
step = config.get('step', 1) step = config.step
resolution = config.get('resolution',step) resolution = config.resolution


interval_bucket = int( timestamp / step ) interval_bucket = int( timestamp / step )
interval_key = '%s%s:%s:%s'%(self._prefix, name, interval, interval_bucket) interval_key = '%s%s:%s:%s'%(self._prefix, name, interval, interval_bucket)


rval = OrderedDict() rval = OrderedDict()
if resolution==step: if resolution==step:
if config.get('count_only',False): if config.count_only:
data = int( self._client.get(interval_key) or 0 ) data = int( self._client.get(interval_key) or 0 )
elif config.get('compress', False): elif config.compress:
data = sum( map(int, self._client.hvals(interval_key)) ) data = sum( map(int, self._client.hvals(interval_key)) )
else: else:
data = int( self._client.llen(interval_key) ) data = int( self._client.llen(interval_key) )
Expand All @@ -232,16 +248,16 @@ def count(self, name, interval, timestamp=None, condensed=False):
for bucket in resolution_buckets: for bucket in resolution_buckets:
resolution_key = '%s:%s'%(interval_key, bucket) resolution_key = '%s:%s'%(interval_key, bucket)


if config.get('count_only',False): if config.count_only:
pipe.get(resolution_key) pipe.get(resolution_key)
elif config.get('compress', False): elif config.compress:
pipe.hvals(resolution_key) pipe.hvals(resolution_key)
else: else:
pipe.llen(resolution_key) pipe.llen(resolution_key)


res = pipe.execute() res = pipe.execute()
for idx,data in enumerate(res): for idx,data in enumerate(res):
if config.get('compress', False): if config.compress:
rval[ resolution_buckets[idx]*resolution ] = sum(map(int,data)) if data else 0 rval[ resolution_buckets[idx]*resolution ] = sum(map(int,data)) if data else 0
else: else:
rval[ resolution_buckets[idx]*resolution ] = int(data) if data else 0 rval[ resolution_buckets[idx]*resolution ] = int(data) if data else 0
Expand All @@ -261,10 +277,10 @@ def series(self, name, interval, steps=None, condensed=False):
matches the return value in get(). matches the return value in get().
''' '''
# TODO: support start and end timestamps # TODO: support start and end timestamps
config = self._config[interval] config = _ConfigStruct(**self._config[interval])
step = config.get('step', 1) step = config.step
steps = steps if steps else config.get('steps',1) steps = steps if steps else config.steps if config.steps else 1
resolution = config.get('resolution',step) resolution = config.resolution


end_timestamp = time.time() end_timestamp = time.time()
end_bucket = int( end_timestamp / step ) end_bucket = int( end_timestamp / step )
Expand All @@ -280,9 +296,9 @@ def series(self, name, interval, steps=None, condensed=False):
rval[interval_bucket*step] = OrderedDict() rval[interval_bucket*step] = OrderedDict()


if step==resolution: if step==resolution:
if config.get('count_only',False): if config.count_only:
pipe.get(interval_key) pipe.get(interval_key)
elif config.get('compress',False): elif config.compress:
pipe.hgetall(interval_key) pipe.hgetall(interval_key)
else: else:
pipe.lrange(interval_key, 0, -1) pipe.lrange(interval_key, 0, -1)
Expand All @@ -298,49 +314,49 @@ def series(self, name, interval, steps=None, condensed=False):
interval_key = '%s%s:%s:%s'%(self._prefix, name, interval, interval_bucket) interval_key = '%s%s:%s:%s'%(self._prefix, name, interval, interval_bucket)


if step==resolution: if step==resolution:
if config.get('count_only',False): if config.count_only:
data = int(data) if data else 0 data = int(data) if data else 0
elif config.get('compress',False): elif config.compress:
# Turn back into a time series # Turn back into a time series
# TODO: this might be too slow because of the addition # TODO: this might be too slow because of the addition
data = reduce(lambda res, (key,val): res + int(val)*[key], data.iteritems(), [] ) data = reduce(lambda res, (key,val): res + int(val)*[key], data.iteritems(), [] )
if config.get('read_cast'): if config.read_cast:
data = map(config.get('read_cast'), data) data = map(config.read_cast, data)
elif config.get('read_cast'): elif config.read_cast:
data = map(config.get('read_cast'), data) data = map(config.read_cast, data)
rval[interval_bucket*step] = data rval[interval_bucket*step] = data


else: else:
resolution_buckets = sorted(map(int,data)) resolution_buckets = sorted(map(int,data))
for bucket in resolution_buckets: for bucket in resolution_buckets:
resolution_key = '%s:%s'%(interval_key, bucket) resolution_key = '%s:%s'%(interval_key, bucket)


if config.get('count_only',False): if config.count_only:
pipe.get(resolution_key) pipe.get(resolution_key)
elif config.get('compress',False): elif config.compress:
pipe.hgetall(resolution_key) pipe.hgetall(resolution_key)
else: else:
pipe.lrange(resolution_key, 0, -1) pipe.lrange(resolution_key, 0, -1)


resolution_res = pipe.execute() resolution_res = pipe.execute()
for x,data in enumerate(resolution_res): for x,data in enumerate(resolution_res):
if config.get('count_only',False): if config.count_only:
data = int(data) if data else 0 data = int(data) if data else 0
elif config.get('compress',False): elif config.compress:
# Turn back into a time series # Turn back into a time series
# TODO: this might be too slow because of the addition # TODO: this might be too slow because of the addition
data = reduce(lambda res, (key,val): res + int(val)*[key], data.iteritems(), [] ) data = reduce(lambda res, (key,val): res + int(val)*[key], data.iteritems(), [] )
if config.get('read_cast'): if config.read_cast:
data = map(config.get('read_cast'), data) data = map(config.read_cast, data)
elif config.get('read_cast'): elif config.read_cast:
data = map(config.get('read_cast'), data) data = map(config.read_cast, data)


rval[interval_bucket*step][ resolution_buckets[x]*resolution ] = data rval[interval_bucket*step][ resolution_buckets[x]*resolution ] = data


# If condensed, collapse each interval into a single value # If condensed, collapse each interval into a single value
if condensed: if condensed:
for key in rval.keys(): for key in rval.keys():
if config.get('count_only',False): if config.count_only:
rval[key] = sum(rval[key].values()) rval[key] = sum(rval[key].values())
else: else:
rval[key] = reduce(operator.add, rval[key].values(), []) rval[key] = reduce(operator.add, rval[key].values(), [])
Expand Down
90 changes: 90 additions & 0 deletions script/gen_mon.py
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,90 @@
#!/usr/bin/env python

from kairos import Timeseries
import redis
import time
import datetime
import random

MAX_WIDTH_COLUMNS = 60

KEY = 'example.com'
KEY_PREFIX = 'timedata:domain'

client = redis.Redis('localhost', 6379)
counters = Timeseries(client, {
'minute': {
'step': 60, # 60 seconds
'steps': 60, # last hour
'count_only' : True, # store counts only.
},
'hour': {
'step': 3600, # Hourly
'steps': 24, # Last day
'count_only' : True, # Store counts only.
},
'daily': {
'step': 86400, # Daily
'steps': 30, # Last 30 days
'count_only': True, # Store counts only.
},
},
key_prefix=KEY_PREFIX)

def hit(domain):
print "hit: %s @ %d" % (domain, time.time())
counters.insert(domain, 1)

def dump_series(base_time, series):
for ts, value in series.iteritems():
print "%02d(%02d)" % ((ts-base_time)/60, value),
print

def plot_series(base_time, series, max_val):
scale = max_val / MAX_WIDTH_COLUMNS
for ts, count in series.iteritems():
print "%4d minutes (%03d): %s" % ((ts-base_time)/60, count, "*" * (count/scale))

def sum_series(series):
# series to list: series.list()
return sum(series.values())

def generate():
# record a couple of hits.
hit(KEY)
hit(KEY)

start = datetime.datetime.now()
x = 0
while True:
time.sleep(1)
# Record a hit every once in a while (approx. every 3.5 seconds...)
if x % random.randint(2,5) == 0:
hit(KEY)
x += 1

interval_max_values = { 'minute' : 100, 'hour': 2000, 'daily': 2000*24 }

def monitor(interval_name):
while True:
# get = counters.get(KEY, interval_name)
series = counters.series(KEY, interval_name)
# count = counters.count(KEY, interval_name)
last_5 = counters.series(KEY, interval_name, steps=5, condensed=False)
sum = sum_series(last_5)
# This should work but breaks: sum = counters.series(KEY, interval_name, steps=5, condensed=True)
#dump_series(time.time(), series)
plot_series(time.time(), series, interval_max_values[interval_name])
print "%d in last 5 %s (~%2.2f per %s)." % (sum, interval_name, sum/5.0, interval_name)
time.sleep(1)

if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='Test something.')
parser.add_argument('op', metavar='op', help='If op=generate, generates random data. If op=monitor, monitors generated data.', default='generate', action="store", choices=['generate', 'monitor'])
parser.add_argument('-i', '--interval', metavar='interval', default='minute', action='store', choices=['minute', 'hour', 'daily'])
opts = parser.parse_args()
if opts.op == 'generate':
generate()
else:
monitor(opts.interval)