From c789a61a390c30432179f459178755f700c3f913 Mon Sep 17 00:00:00 2001 From: Ted Kaemming Date: Tue, 9 Feb 2016 13:41:37 -0800 Subject: [PATCH] Add a Count-Min sketch implementation of frequency tables. --- setup.py | 2 +- src/sentry/event_manager.py | 13 + src/sentry/models/group.py | 36 +- src/sentry/scripts/tsdb/cmsketch.lua | 569 +++++++++++++++++++++++++++ src/sentry/tsdb/base.py | 64 ++- src/sentry/tsdb/dummy.py | 26 ++ src/sentry/tsdb/inmemory.py | 73 +++- src/sentry/tsdb/redis.py | 182 +++++++-- src/sentry/web/api.py | 21 + tests/sentry/manager/tests.py | 18 + tests/sentry/tsdb/test_redis.py | 139 ++++++- 11 files changed, 1101 insertions(+), 42 deletions(-) create mode 100644 src/sentry/scripts/tsdb/cmsketch.lua diff --git a/setup.py b/setup.py index 203f7f18984ab5..4b042123002a3e 100755 --- a/setup.py +++ b/setup.py @@ -130,7 +130,7 @@ 'ua-parser>=0.6.1,<0.7.0', 'urllib3>=1.14,<1.15', 'uwsgi>2.0.0,<2.1.0', - 'rb>=1.3.0,<2.0.0', + 'rb>=1.4.0,<2.0.0', ] postgres_requires = [ diff --git a/src/sentry/event_manager.py b/src/sentry/event_manager.py index 5cd589b7322de3..e0bf33bda118f6 100644 --- a/src/sentry/event_manager.py +++ b/src/sentry/event_manager.py @@ -662,6 +662,19 @@ def _save_aggregate(self, event, hashes, release, **kwargs): (tsdb.models.project, project.id), ], timestamp=event.datetime) + tsdb.record_frequency_multi([ + (tsdb.models.frequent_projects_by_organization, { + project.organization_id: { + project.id: 1, + }, + }), + (tsdb.models.frequent_issues_by_project, { + project.id: { + group.id: 1, + }, + }), + ], timestamp=event.datetime) + return group, is_new, is_regression, is_sample def _handle_regression(self, group, event, release): diff --git a/src/sentry/models/group.py b/src/sentry/models/group.py index fa2b00f5d37398..18f2dc58f7eff5 100644 --- a/src/sentry/models/group.py +++ b/src/sentry/models/group.py @@ -9,27 +9,27 @@ import logging import math -import six import time import warnings - from base64 import b16decode, b16encode from datetime import timedelta + +import six from django.core.urlresolvers import reverse from django.db import models from django.utils import timezone from django.utils.translation import ugettext_lazy as _ -from sentry.app import buffer +from sentry.app import buffer, tsdb from sentry.constants import ( - DEFAULT_LOGGER_NAME, LOG_LEVELS, MAX_CULPRIT_LENGTH, EVENT_ORDERING_KEY, + DEFAULT_LOGGER_NAME, EVENT_ORDERING_KEY, LOG_LEVELS, MAX_CULPRIT_LENGTH ) from sentry.db.models import ( BaseManager, BoundedIntegerField, BoundedPositiveIntegerField, - FlexibleForeignKey, Model, GzippedDictField, sane_repr + FlexibleForeignKey, GzippedDictField, Model, sane_repr ) from sentry.utils.http import absolute_uri -from sentry.utils.strings import truncatechars, strip +from sentry.utils.strings import strip, truncatechars # TODO(dcramer): pull in enum library @@ -58,12 +58,16 @@ def add_tags(self, group, tags): project_id = group.project_id date = group.last_seen - for tag_item in tags: - if len(tag_item) == 2: - (key, value), data = tag_item, None + def normalize(item): + if len(item) == 2: + (key, value), data = item, None else: - key, value, data = tag_item + key, value, data = item + return key, value, data + tags = map(normalize, tags) + + for key, value, data in tags: buffer.incr(TagValue, { 'times_seen': 1, }, { @@ -86,6 +90,18 @@ def add_tags(self, group, tags): 'last_seen': date, }) + metrics = {} + for key, value, data in tags: + metric = metrics.setdefault(u'{}:{}'.format(group.id, key), {}) + metric[value] = metric.get(value, 0.0) + 1.0 + + # XXX: No access to the event where the tags were recorded here, this + # assumes ``last_seen`` is updated before this function is called!!! + tsdb.record_frequency_multi( + ((tsdb.models.frequent_values_by_issue_tag, metrics),), + timestamp=group.last_seen + ) + class Group(Model): """ diff --git a/src/sentry/scripts/tsdb/cmsketch.lua b/src/sentry/scripts/tsdb/cmsketch.lua new file mode 100644 index 00000000000000..fa03efa22023c9 --- /dev/null +++ b/src/sentry/scripts/tsdb/cmsketch.lua @@ -0,0 +1,569 @@ +--[[ + +Count-Min Sketch +================ + +This provides a Redis-based implementation of the Count-Min Sketch, a +probabilistic data structure that allows counting observations of items from +high cardinality input stream in sublinear space, with the tradeoff of +potentially overcounting lower-frequency items due to hash collisions. + +This implementation extends the conventional Count-Min algorithm, adding an +index that allows querying for the top N items that have been observed in the +stream. The index also serves as the primary storage, reducing storage +requirements and improving accuracy, auntil it's capacity is exceeded, at which +point the index data is used to initialize the estimation matrix. Once the +index capacity as been exceeded and the estimation matrix has been initialized, +the index of most frequent items is maintained using the estimates from the +matrix. + +The public API consists of three main methods: + +- INCR: used to record observations of items, +- ESTIMATE: used to query the number of times a specific item has been seen, +- RANKED: used to query the top N items that have been recorded in a sketch. + +The named command to use is the first item passed as ``ARGV``. For commands +that mutate data (`INCR`), the command is followed by the accuracy and storage +parameters to use when initializing a new sketch: + +- DEPTH: number of rows for the estimation matrix, +- WIDTH: number of columns for the estimation matrix, +- CAPACITY: maximum size of the index (to disable indexing entirely, set to 0.) + +(Configuration parameters are not required for readonly commands such as +`ESTIMATE` and `RANKED`.) + +The ``KEYS`` provided to each command are the three keys used for sketch storage: + +- configuration key (bytes, serialized MessagePack data) +- index key (sorted set) +- estimation matrix key (hash of frequencies (floats), keyed by struct packed matrix coordinates) + +Multiple sketches can be provided to each command by providing another set of keys, e.g. + + EVALSHA $SHA 6 1:config 1:index 1:estimates 2:config 2:index 2:estimates [...] + +(Whether a command returns a single result that encompasses all sketches, or a +sequence of results that correspond to each sketch is dependent on the command +being called.) + +To add two items, "foo" with a score of 1, and "bar" with a score of 2 to two +sketches with depth 5, width 64 and index capacity of 50: + + EVALSHA $SHA 6 1:c 1:i 1:e 2:c 2:i 2:e INCR 5 64 50 1 foo 2 bar + +To query the top 5 items from the first sketch: + + EVALSHA $SHA 3 1:c 1:i 1:e RANKED 5 + +]]-- + +--[[ Helpers ]]-- + +local function filter(f, t) + local result = {} + for i, value in ipairs(t) do + if f(value) then + table.insert(result, value) + end + end + return result +end + +local function map(f, t) + local result = {} + for i, value in ipairs(t) do + result[i] = f(value) + end + return result +end + +local function head(t) + return ( + function (head, ...) + return head, {...} + end + )(unpack(t)) +end + +local function reduce(f, t, initializer) + if initializer == nil then + initializer, t = head(t) + end + + local result = initializer + for _, value in pairs(t) do + result = f(result, value) + end + return result +end + +local function sum(series) + return reduce( + function (x, y) + return x + y + end, + series, + 0 + ) +end + +local function zip(items) + local length = reduce( + math.min, + map( + function (t) + return #t + end, + items + ) + ) + local results = {} + for i = 1, length do + local value = {} + for j = 1, #items do + value[j] = items[j][i] + end + results[i] = value + end + return results +end + + +--[[ + MurmurHash3 + + This implementation of MurmurHash3 is the 32-bit variation, based on the + example implementations here: https://en.wikipedia.org/wiki/MurmurHash +]]-- +local function mmh3(key, seed) + local c1 = 0xcc9e2d51 + local c2 = 0x1b873593 + local r1 = 15 + local r2 = 13 + local m = 5 + local n = 0xe6546b64 + + local function multiply(x, y) + -- This is required to emulate uint32 overflow correctly -- otherwise, + -- higher order bits are simply truncated and discarded. + return (bit.band(x, 0xffff) * y) + bit.lshift(bit.band(bit.rshift(x, 16) * y, 0xffff), 16) + end + + local hash = bit.tobit(seed) + local remainder = #key % 4 + + for i = 1, #key - remainder, 4 do + local k = struct.unpack('HH', unpack(coordinates)))) or 0 +end + +function Sketch:estimate(value) + if self.configuration:exists() then + local score = tonumber(redis.call('ZSCORE', self.index, value)) + if score ~= nil then + return score + end + return reduce( + math.min, + map( + function (c) + return self:observations(c) + end, + self:coordinates(value) + ) + ) + else + return 0 + end +end + +function Sketch:increment(items) + assert(not self.configuration.readonly) + + local results = {} + local usage = redis.call('ZCARD', self.index) + if self.configuration:get('index') > usage then + -- Add all of the items to the index. (Note that this can cause the + -- index to temporarily grow to the size of the capacity - 1 + number + -- of items being updated in the worst case.) + local added = 0 + for i, item in pairs(items) do + local value, delta = unpack(item) + local score = tonumber(redis.call('ZINCRBY', self.index, delta, value)) + if score == delta then + added = added + 1 + end + end + + -- If the number of items added pushes the index to capacity, we need + -- to initialize the sketch matrix with all of the current members of + -- the index. + if added + usage >= self.configuration:get('index') then + -- TODO: Use this data to generate the response value. + local members = redis.call('ZRANGE', self.index, 0, -1, 'WITHSCORES') + for i = 1, #members, 2 do + local value = members[i] + local score = members[i + 1] + local coordinates = self:coordinates(value) + local estimates = map( + function (c) + return self:observations(c) + end, + coordinates + ) + for i, item in pairs(zip({coordinates, estimates})) do + local c, estimate = unpack(item) + local update = math.max(score, estimate) + if estimate ~= update then + redis.call('HSET', self.estimates, struct.pack('>HH', unpack(c)), update) + end + end + end + + -- Remove extra items from the index. + redis.call('ZREMRANGEBYRANK', self.index, 0, -self.configuration:get('index') - 1) + end + else + -- Fetch the estimates for each item and update them. + for i, item in pairs(items) do + local value, delta = unpack(item) + local coordinates = self:coordinates(value) + local estimates = map( + function (c) + return self:observations(c) + end, + coordinates + ) + + -- This uses the score from the index (if it's available) instead + -- of the index to avoid data rot as much as possible. + local score = (tonumber(redis.call('ZSCORE', self.index, item)) or reduce(math.min, estimates)) + delta + for i, item in pairs(zip({coordinates, estimates})) do + local c, estimate = unpack(item) + local update = math.max(score, estimate) + if estimate ~= update then + redis.call('HSET', self.estimates, struct.pack('>HH', unpack(c)), update) + end + end + results[i] = score + end + + if self.configuration:get('index') > 0 then + local added = 0 + local minimum = tonumber(redis.call('ZRANGE', self.index, 0, 0, 'WITHSCORES')[2]) + for i, item in pairs(items) do + local score = results[i] + -- TODO: This should also probably lexicographically sort items for consistent behavior. + if score > minimum then + local value = unpack(item) + added = added + redis.call('ZADD', self.index, score, value) + end + end + + if added > 0 then + -- Remove extra items from the index. + redis.call('ZREMRANGEBYRANK', self.index, 0, -self.configuration:get('index') - 1) + end + end + end + return results +end + + +--[[ Redis API ]]-- + +local Command = {} + +function Command:new(fn, readonly) + if readonly == nil then + readonly = false + end + + return function (keys, arguments) + local defaults = nil + if not readonly then + defaults, arguments = ( + function (depth, width, index, ...) + return { + -- TODO: Actually validate these. + depth=tonumber(depth), + width=tonumber(width), + index=tonumber(index) + }, {...} + end + )(unpack(arguments)) + end + + local sketches = {} + for i = 1, #keys, 3 do + table.insert(sketches, Sketch:new( + Configuration:new(keys[i], readonly, defaults), + keys[i + 1], + keys[i + 2] + )) + end + return fn(sketches, arguments) + end +end + + +local Router = {} + +function Router:new(commands) + return function (keys, arguments) + local name, arguments = head(arguments) + return commands[name:upper()](keys, arguments) + end +end + + +return Router:new({ + + --[[ + Increment the number of observations for each item in all sketches. + ]]-- + INCR = Command:new( + function (sketches, arguments) + local items = {} + for i = 1, #arguments, 2 do + -- The increment value needs to be positive, since we're using the conservative + -- update strategy proposed by Estan and Varghese: + -- http://www.eecs.harvard.edu/~michaelm/CS223/mice.pdf + local delta = tonumber(arguments[i]) + assert(delta > 0, 'The increment value must be positive and nonzero.') + + local value = arguments[i + 1] + table.insert(items, {value, delta}) + end + + return map( + function (sketch) + return sketch:increment(items) + end, + sketches + ) + end, + false + ), + + --[[ + Estimate the number of observations for each item in all sketches, + returning a sequence containing scores for items in the order that they + were provided for each sketch. + ]]-- + ESTIMATE = Command:new( + function (sketches, values) + return map( + function (sketch) + return map( + function (value) + return string.format( + '%s', + sketch:estimate(value) + ) + end, + values + ) + end, + sketches + ) + end, + true + ), + + --[[ + Find the most frequently observed items across all sketches, returning a + seqeunce of item, score pairs. + ]]-- + RANKED = Command:new( + function (sketches, arguments) + local limit = unpack(arguments) + + -- We only care about sketches that actually exist. + sketches = filter( + function (sketch) + return sketch.configuration:exists() + end, + sketches + ) + + if #sketches == 0 then + return {} + end + + -- TODO: There are probably a bunch of performance optimizations that could be made here. + -- If no limit is provided, use an implicit limit of the smallest index. + if limit == nil then + limit = reduce( + math.min, + map( + function (sketch) + return sketch.configuration:get('index') + end, + sketches + ) + ) + end + + if #sketches == 1 then + local results = {} + -- Note that the ZREVRANGE bounds are *inclusive*, so the limit + -- needs to be reduced by one to act as a typical slice bound. + local members = redis.call('ZREVRANGE', sketches[1].index, 0, limit - 1, 'WITHSCORES') + for i=1, #members, 2 do + table.insert( + results, + { + members[i], + string.format('%s', members[i + 1]) + } + ) + end + return results + else + -- As the first pass, we need to find all of the items to look + -- up in all sketches. + local items = {} + for _, sketch in pairs(sketches) do + local members = redis.call('ZRANGE', sketch.index, 0, -1) + for _, member in pairs(members) do + items[member] = true + end + end + + local results = {} + for value in pairs(items) do + table.insert( + results, + { + value, + sum( + map( + function (sketch) + return sketch:estimate(value) + end, + sketches + ) + ), + } + ) + end + + local function comparator(x, y) + if x[2] == y[2] then + return x[1] < y[1] -- lexicographically by key ascending + else + return x[2] > y[2] -- score descending + end + end + + table.sort(results, comparator) + + -- Trim the results to the limit. + local trimmed = {} + for i = 1, math.min(limit, #results) do + local item, score = unpack(results[i]) + trimmed[i] = { + item, + string.format('%s', score) + } + end + return trimmed + end + end, + true + ) +})(KEYS, ARGV) diff --git a/src/sentry/tsdb/base.py b/src/sentry/tsdb/base.py index 7aeaf932a2faed..8b9b1f658468a4 100644 --- a/src/sentry/tsdb/base.py +++ b/src/sentry/tsdb/base.py @@ -13,7 +13,6 @@ from sentry.utils.dates import to_timestamp - ONE_MINUTE = 60 ONE_HOUR = ONE_MINUTE * 60 ONE_DAY = ONE_HOUR * 24 @@ -50,10 +49,22 @@ class TSDBModel(Enum): # distinct count of users that have been affected by an event in a group users_affected_by_group = 300 - # distinct count of users that have been affected by an event in a project users_affected_by_project = 301 + # number of events sent to server for an organization (key is always 0) + frequent_organization_received_by_system = 400 + # number of events rejected by server for an organization (key is always 0) + frequent_organization_rejected_by_system = 401 + # number of events blacklisted by server for an organization (key is always 0) + frequent_organization_blacklisted_by_system = 402 + # number of events seen for a project, by organization + frequent_projects_by_organization = 403 + # number of issues seen for a project, by project + frequent_issues_by_project = 404 + # number of issues seen for a tag value, by issue:tag + frequent_values_by_issue_tag = 405 + class BaseTSDB(object): models = TSDBModel @@ -221,3 +232,52 @@ def get_distinct_counts_totals(self, model, keys, start, end=None, rollup=None): Count distinct items during a time range. """ raise NotImplementedError + + def record_frequency_multi(self, requests, timestamp=None): + """ + Record items in a frequency table. + + Metrics to increment should be passed as sequence pairs, using this + structure: ``(model, {key: {item: score, ...}, ...})`` + """ + raise NotImplementedError + + def get_most_frequent(self, model, keys, start, end=None, rollup=None, limit=None): + """ + Retrieve the most frequently seen items in a frequency table. + + Results are returned as a mapping, where the key is the key requested + and the value is a list of ``(member, score)`` tuples, ordered by the + highest (most frequent) to lowest (least frequent) score. The maximum + number of items returned is ``index capacity * rollup intervals`` if no + ``limit`` is provided. + """ + raise NotImplementedError + + def get_frequency_series(self, model, items, start, end=None, rollup=None): + """ + Retrieve the frequency of known items in a table over time. + + The items requested should be passed as a mapping, where the key is the + metric key, and the value is a sequence of members to retrieve scores + for. + + Results are returned as a mapping, where the key is the key requested + and the value is a list of ``(timestamp, {item: score, ...})`` pairs + over the series. + """ + raise NotImplementedError + + def get_frequency_totals(self, model, items, start, end=None, rollup=None): + """ + Retrieve the total frequency of known items in a table over time. + + The items requested should be passed as a mapping, where the key is the + metric key, and the value is a sequence of members to retrieve scores + for. + + Results are returned as a mapping, where the key is the key requested + and the value is a mapping of ``{item: score, ...}`` containing the + total score of items over the interval. + """ + raise NotImplementedError diff --git a/src/sentry/tsdb/dummy.py b/src/sentry/tsdb/dummy.py index 3cb746f3bac8d2..eb63a5e85fed6c 100644 --- a/src/sentry/tsdb/dummy.py +++ b/src/sentry/tsdb/dummy.py @@ -28,3 +28,29 @@ def get_distinct_counts_series(self, model, keys, start, end=None, rollup=None): def get_distinct_counts_totals(self, model, keys, start, end=None, rollup=None): return {k: 0 for k in keys} + + def record_frequency_multi(self, requests, timestamp=None): + pass + + def get_most_frequent(self, model, keys, start, end=None, rollup=None, limit=None): + return {key: [] for key in keys} + + def get_frequency_series(self, model, items, start, end=None, rollup=None): + rollup, series = self.get_optimal_rollup_series(start, end, rollup) + + results = {} + for key, members in items.items(): + result = results[key] = [] + for timestamp in series: + result.append(( + timestamp, + {k: 0.0 for k in members}, + )) + + return results + + def get_frequency_totals(self, model, items, start, end=None, rollup=None): + results = {} + for key, members in items.items(): + results[key] = {member: 0.0 for member in members} + return results diff --git a/src/sentry/tsdb/inmemory.py b/src/sentry/tsdb/inmemory.py index f66cd3bb878a0b..65c760004e925d 100644 --- a/src/sentry/tsdb/inmemory.py +++ b/src/sentry/tsdb/inmemory.py @@ -7,8 +7,9 @@ """ from __future__ import absolute_import -from collections import defaultdict +from collections import Counter, defaultdict from datetime import timedelta + from django.utils import timezone from sentry.tsdb.base import BaseTSDB @@ -99,4 +100,72 @@ def flush(self): self.data = defaultdict(lambda: defaultdict(lambda: defaultdict(int))) # self.sets[model][key][rollup] = set of elements - self.sets = defaultdict(lambda: defaultdict(lambda: defaultdict(set))) + self.sets = defaultdict( + lambda: defaultdict( + lambda: defaultdict( + set, + ), + ), + ) + + # self.frequencies[model][key][rollup] = Counter() + self.frequencies = defaultdict( + lambda: defaultdict( + lambda: defaultdict( + Counter, + ) + ), + ) + + def record_frequency_multi(self, requests, timestamp=None): + if timestamp is None: + timestamp = timezone.now() + + for model, request in requests: + for key, items in request.items(): + items = {k: float(v) for k, v in items.items()} + source = self.frequencies[model][key] + for rollup, _ in self.rollups: + source[self.normalize_to_rollup(timestamp, rollup)].update(items) + + def get_most_frequent(self, model, keys, start, end=None, rollup=None, limit=None): + rollup, series = self.get_optimal_rollup_series(start, end, rollup) + + results = {} + for key in keys: + result = results[key] = Counter() + source = self.frequencies[model][key] + for timestamp in series: + result.update(source[self.normalize_ts_to_rollup(timestamp, rollup)]) + + for key, counter in results.items(): + results[key] = counter.most_common(limit) + + return results + + def get_frequency_series(self, model, items, start, end=None, rollup=None): + rollup, series = self.get_optimal_rollup_series(start, end, rollup) + + results = {} + for key, members in items.items(): + result = results[key] = [] + source = self.frequencies[model][key] + for timestamp in series: + scores = source[self.normalize_ts_to_rollup(timestamp, rollup)] + result.append(( + timestamp, + {k: scores.get(k, 0.0) for k in members}, + )) + + return results + + def get_frequency_totals(self, model, items, start, end=None, rollup=None): + results = {} + + for key, series in self.get_frequency_series(model, items, start, end, rollup).iteritems(): + result = results[key] = {} + for timestamp, scores in series: + for member, score in scores.items(): + result[member] = result.get(member, 0.0) + score + + return results diff --git a/src/sentry/tsdb/redis.py b/src/sentry/tsdb/redis.py index 279c67747018f1..9fddec7fdb0e8c 100644 --- a/src/sentry/tsdb/redis.py +++ b/src/sentry/tsdb/redis.py @@ -8,35 +8,44 @@ from __future__ import absolute_import import logging -import six - +import operator from binascii import crc32 -from collections import defaultdict +from collections import defaultdict, namedtuple from datetime import timedelta +from hashlib import md5 + +import six from django.conf import settings from django.utils import timezone -from hashlib import md5 +from pkg_resources import resource_string +from redis.client import Script from sentry.tsdb.base import BaseTSDB from sentry.utils.dates import to_timestamp -from sentry.utils.redis import ( - check_cluster_versions, - make_rb_cluster, -) +from sentry.utils.redis import check_cluster_versions, make_rb_cluster from sentry.utils.versioning import Version - logger = logging.getLogger(__name__) +SketchParameters = namedtuple('SketchParameters', 'depth width capacity') + + +CountMinScript = Script( + None, + resource_string('sentry', 'scripts/tsdb/cmsketch.lua'), +) + + class RedisTSDB(BaseTSDB): """ A time series storage backend for Redis. - The time series API supports two data types: + The time series API supports three data types: * simple counters * distinct counters (number of unique elements seen) + * frequency tables (a set of items ranked by most frequently observed) The backend also supports virtual nodes (``vnodes``) which controls shard distribution. This value should be set to the anticipated maximum number of @@ -65,7 +74,25 @@ class RedisTSDB(BaseTSDB): ... } + Frequency tables are modeled using two data structures: + + * top-N index: a sorted set containing the most frequently observed items, + * estimation matrix: a hash table containing counters, used in a Count-Min sketch + + Member scores are 100% accurate until the index is filled (and no memory is + used for the estimation matrix until this point), after which the data + structure switches to a probabilistic implementation and accuracy begins to + degrade for less frequently observed items, but remains accurate for more + frequently observed items. + + Frequency tables are especially useful when paired with a (non-distinct) + counter of the total number of observations so that scores of items of the + frequency table can be displayed as percentages of the whole data set. + (Additional documentation and the bulk of the logic for implementing the + frequency table API can be found in the ``cmsketch.lua`` script.) """ + DEFAULT_SKETCH_PARAMETERS = SketchParameters(3, 128, 50) + def __init__(self, hosts=None, prefix='ts:', vnodes=64, **kwargs): # inherit default options from REDIS_OPTIONS defaults = settings.SENTRY_REDIS_OPTIONS @@ -86,7 +113,22 @@ def validate(self): label='TSDB', ) - def make_key(self, model, epoch, model_key): + def make_key(self, model, rollup, timestamp, key): + """ + Make a key that is used for distinct counter and frequency table + values. + """ + return '{prefix}{model}:{epoch}:{key}'.format( + prefix=self.prefix, + model=model.value, + epoch=self.normalize_ts_to_rollup(timestamp, rollup), + key=self.get_model_key(key), + ) + + def make_counter_key(self, model, epoch, model_key): + """ + Make a key that is used for counter values. + """ if isinstance(model_key, six.integer_types): vnode = model_key % self.vnodes else: @@ -114,7 +156,7 @@ def incr_multi(self, items, timestamp=None, count=1): >>> incr_multi([(TimeSeriesModel.project, 1), (TimeSeriesModel.group, 5)]) """ - make_key = self.make_key + make_key = self.make_counter_key normalize_to_rollup = self.normalize_to_rollup if timestamp is None: timestamp = timezone.now() @@ -144,7 +186,7 @@ def get_range(self, model, keys, start, end, rollup=None): """ normalize_to_epoch = self.normalize_to_epoch normalize_to_rollup = self.normalize_to_rollup - make_key = self.make_key + make_key = self.make_counter_key if rollup is None: rollup = self.get_optimal_rollup(start, end) @@ -172,14 +214,6 @@ def get_range(self, model, keys, start, end, rollup=None): results_by_key[key] = sorted(points.items()) return dict(results_by_key) - def make_distinct_counter_key(self, model, rollup, timestamp, key): - return '{prefix}{model}:{epoch}:{key}'.format( - prefix=self.prefix, - model=model.value, - epoch=self.normalize_ts_to_rollup(timestamp, rollup), - key=self.get_model_key(key), - ) - def record(self, model, key, values, timestamp=None): self.record_multi(((model, key, values),), timestamp) @@ -196,7 +230,7 @@ def record_multi(self, items, timestamp=None): for model, key, values in items: c = client.target_key(key) for rollup, max_values in self.rollups: - k = self.make_distinct_counter_key( + k = self.make_key( model, rollup, ts, @@ -227,7 +261,7 @@ def get_distinct_counts_series(self, model, keys, start, end=None, rollup=None): r.append(( timestamp, c.pfcount( - self.make_distinct_counter_key( + self.make_key( model, rollup, timestamp, @@ -255,8 +289,108 @@ def get_distinct_counts_totals(self, model, keys, start, end=None, rollup=None): # directly here instead. ks = [] for timestamp in series: - ks.append(self.make_distinct_counter_key(model, rollup, timestamp, key)) + ks.append(self.make_key(model, rollup, timestamp, key)) responses[key] = client.target_key(key).execute_command('PFCOUNT', *ks) return {key: value.value for key, value in responses.iteritems()} + + def make_frequency_table_keys(self, model, rollup, timestamp, key): + prefix = self.make_key(model, rollup, timestamp, key) + return map( + operator.methodcaller('format', prefix), + ('{}:c', '{}:i', '{}:e'), + ) + + def record_frequency_multi(self, requests, timestamp=None): + if timestamp is None: + timestamp = timezone.now() + + ts = int(to_timestamp(timestamp)) # ``timestamp`` is not actually a timestamp :( + + commands = {} + + for model, request in requests: + for key, items in request.iteritems(): + keys = [] + expirations = {} + + # Figure out all of the keys we need to be incrementing, as + # well as their expiration policies. + for rollup, max_values in self.rollups: + chunk = self.make_frequency_table_keys(model, rollup, ts, key) + keys.extend(chunk) + + expiry = self.calculate_expiry(rollup, max_values, timestamp) + for k in chunk: + expirations[k] = expiry + + arguments = ['INCR'] + list(self.DEFAULT_SKETCH_PARAMETERS) + for member, score in items.items(): + arguments.extend((score, member)) + + commands[key] = [(CountMinScript, keys, arguments)] + [ + ('EXPIREAT', key, exp) for key, exp in expirations.items() + ] + + self.cluster.execute_commands(commands) + + def get_most_frequent(self, model, keys, start, end=None, rollup=None, limit=None): + rollup, series = self.get_optimal_rollup_series(start, end, rollup) + + commands = {} + + arguments = ['RANKED'] + if limit is not None: + arguments.append(int(limit)) + + for key in keys: + ks = [] + for timestamp in series: + ks.extend(self.make_frequency_table_keys(model, rollup, timestamp, key)) + commands[key] = [(CountMinScript, ks, arguments)] + + results = {} + + for key, responses in self.cluster.execute_commands(commands).items(): + results[key] = [(member, float(score)) for member, score in responses[0].value] + + return results + + def get_frequency_series(self, model, items, start, end=None, rollup=None): + rollup, series = self.get_optimal_rollup_series(start, end, rollup) + + # Freeze ordering of the members (we'll need these later.) + for key, members in items.items(): + items[key] = tuple(members) + + commands = {} + + for key, members in items.items(): + ks = [] + for timestamp in series: + ks.extend(self.make_frequency_table_keys(model, rollup, timestamp, key)) + + commands[key] = [(CountMinScript, ks, ('ESTIMATE',) + members)] + + results = {} + + for key, responses in self.cluster.execute_commands(commands).items(): + members = items[key] + + chunk = results[key] = [] + for timestamp, scores in zip(series, responses[0].value): + chunk.append((timestamp, dict(zip(members, map(float, scores))))) + + return results + + def get_frequency_totals(self, model, items, start, end=None, rollup=None): + responses = {} + + for key, series in self.get_frequency_series(model, items, start, end, rollup).iteritems(): + response = responses[key] = {} + for timestamp, results in series: + for member, value in results.items(): + response[member] = response.get(member, 0.0) + value + + return responses diff --git a/src/sentry/web/api.py b/src/sentry/web/api.py index 373c2c96eaf512..6a0c600a0b8f5e 100644 --- a/src/sentry/web/api.py +++ b/src/sentry/web/api.py @@ -292,6 +292,14 @@ def process(self, request, project, auth, helper, data, **kwargs): (app.tsdb.models.organization_total_received, project.organization_id), (app.tsdb.models.organization_total_blacklisted, project.organization_id), ]) + app.tsdb.record_frequency_multi([ + (app.tsdb.models.frequent_organization_received_by_system, { + 0: {project.organization_id: 1}, + }), + (app.tsdb.models.frequent_organization_blacklisted_by_system, { + 0: {project.organization_id: 1}, + }), + ]) metrics.incr('events.blacklisted') raise APIForbidden('Blacklisted IP address: %s' % (remote_addr,)) @@ -312,6 +320,14 @@ def process(self, request, project, auth, helper, data, **kwargs): (app.tsdb.models.organization_total_received, project.organization_id), (app.tsdb.models.organization_total_rejected, project.organization_id), ]) + app.tsdb.record_frequency_multi([ + (app.tsdb.models.frequent_organization_received_by_system, { + 0: {project.organization_id: 1}, + }), + (app.tsdb.models.frequent_organization_rejected_by_system, { + 0: {project.organization_id: 1}, + }), + ]) metrics.incr('events.dropped') if rate_limit is not None: raise APIRateLimited(rate_limit.retry_after) @@ -320,6 +336,11 @@ def process(self, request, project, auth, helper, data, **kwargs): (app.tsdb.models.project_total_received, project.id), (app.tsdb.models.organization_total_received, project.organization_id), ]) + app.tsdb.record_frequency_multi([ + (app.tsdb.models.frequent_organization_received_by_system, { + 0: {project.organization_id: 1}, + }), + ]) content_encoding = request.META.get('HTTP_CONTENT_ENCODING', '') diff --git a/tests/sentry/manager/tests.py b/tests/sentry/manager/tests.py index 10b40bdfe8148c..ee1098a5c95631 100644 --- a/tests/sentry/manager/tests.py +++ b/tests/sentry/manager/tests.py @@ -2,6 +2,7 @@ from __future__ import absolute_import +from sentry.app import tsdb from sentry.models import Group, GroupTagValue, Team, User from sentry.testutils import TestCase @@ -37,6 +38,23 @@ def test_add_tags(self): self.assertEquals(res.value, 'boz') self.assertEquals(res.times_seen, 1) + assert tsdb.get_most_frequent( + tsdb.models.frequent_values_by_issue_tag, + ( + '{}:foo'.format(group.id), + '{}:biz'.format(group.id), + ), + group.last_seen, + ) == { + '{}:foo'.format(group.id): [ + ('bar', 1.0), + ('baz', 1.0), + ], + '{}:biz'.format(group.id): [ + ('boz', 1.0), + ], + } + class TeamManagerTest(TestCase): def test_simple(self): diff --git a/tests/sentry/tsdb/test_redis.py b/tests/sentry/tsdb/test_redis.py index f77dee72f3d72d..8605cce0cdec42 100644 --- a/tests/sentry/tsdb/test_redis.py +++ b/tests/sentry/tsdb/test_redis.py @@ -28,11 +28,11 @@ def setUp(self): with self.db.cluster.all() as client: client.flushdb() - def test_make_key(self): - result = self.db.make_key(TSDBModel.project, 1368889980, 1) + def test_make_counter_key(self): + result = self.db.make_counter_key(TSDBModel.project, 1368889980, 1) assert result == 'ts:1:1368889980:1' - result = self.db.make_key(TSDBModel.project, 1368889980, 'foo') + result = self.db.make_counter_key(TSDBModel.project, 1368889980, 'foo') assert result == 'ts:1:1368889980:33' def test_get_model_key(self): @@ -150,3 +150,136 @@ def timestamp(d): 1: 3, 2: 2, } + + def test_frequency_tables(self): + now = datetime.utcnow().replace(tzinfo=pytz.UTC) + model = TSDBModel.frequent_projects_by_organization + + self.db.record_frequency_multi( + ( + (model, { + 'organization:1': { + "project:1": 1, + "project:2": 2, + "project:3": 3, + }, + }), + ), + now + ) + + self.db.record_frequency_multi( + ( + (model, { + 'organization:1': { + "project:1": 1, + "project:2": 2, + "project:3": 3, + "project:4": 4, + }, + "organization:2": { + "project:5": 1.5, + }, + }), + ), + now - timedelta(hours=1), + ) + + assert self.db.get_most_frequent( + model, + ('organization:1', 'organization:2'), + now, + ) == { + 'organization:1': [ + ('project:3', 3.0), + ('project:2', 2.0), + ('project:1', 1.0), + ], + 'organization:2': [], + } + + assert self.db.get_most_frequent( + model, + ('organization:1', 'organization:2'), + now, + limit=1, + ) == { + 'organization:1': [ + ('project:3', 3.0), + ], + 'organization:2': [], + } + + assert self.db.get_most_frequent( + model, + ('organization:1', 'organization:2'), + now - timedelta(hours=1), + now, + ) == { + 'organization:1': [ + ('project:3', 3.0 + 3.0), + ('project:2', 2.0 + 2.0), + ('project:4', 4.0), + ('project:1', 1.0 + 1.0), + ], + 'organization:2': [ + ('project:5', 1.5), + ], + } + + rollup = 3600 + timestamp = int(to_timestamp(now) // rollup) * rollup + assert self.db.get_frequency_series( + model, + { + 'organization:1': ("project:1", "project:2", "project:3", "project:4"), + 'organization:2': ("project:5",), + }, + now - timedelta(hours=1), + now, + rollup=rollup, + ) == { + 'organization:1': [ + (timestamp - rollup, { + "project:1": 1.0, + "project:2": 2.0, + "project:3": 3.0, + "project:4": 4.0, + }), + (timestamp, { + "project:1": 1.0, + "project:2": 2.0, + "project:3": 3.0, + "project:4": 0.0, + }), + ], + 'organization:2': [ + (timestamp - rollup, { + "project:5": 1.5, + }), + (timestamp, { + "project:5": 0.0, + }), + ], + } + + assert self.db.get_frequency_totals( + model, + { + 'organization:1': ("project:1", "project:2", "project:3", "project:4", "project:5"), + 'organization:2': ("project:1",), + }, + now - timedelta(hours=1), + now, + ) == { + 'organization:1': { + "project:1": 1.0 + 1.0, + "project:2": 2.0 + 2.0, + "project:3": 3.0 + 3.0, + "project:4": 4.0, + "project:5": 0.0, + }, + 'organization:2': { + "project:1": 0.0, + }, + }