Permalink
Browse files

Cleaning up the implementation of both counters

Factoring out jagged_transpose in the time series counter, adding
comments to both implementations.
  • Loading branch information...
1 parent d86c5ff commit 1b9f41a07b50dfbfa9f666363fed6489b838ae3e @aaw committed Nov 20, 2012
Showing with 50 additions and 13 deletions.
  1. +8 −6 lib/algorithm.rb
  2. +5 −0 lib/counter.rb
  3. +37 −7 lib/time_series_counter.rb
View
@@ -20,11 +20,6 @@ def initialize(redis, b=10)
end
end
- def hash_info(value)
- hash = MurmurHash3::V32.murmur3_32_str_hash(value)
- [hash, hash % @m, rho(hash / @m)]
- end
-
# Estimate the cardinality of the intersection of several sets. We do this by
# using the principle of inclusion and exclusion to represent the size of the
# intersection as the alternating sum of an exponential number of
@@ -37,7 +32,14 @@ def intersection(counter_names, time=0)
end.inject(0, :+)
[icount, 0].max
end
-
+
+ private
+
+ def hash_info(value)
+ hash = MurmurHash3::V32.murmur3_32_str_hash(value)
+ [hash, hash % @m, rho(hash / @m)]
+ end
+
def union_helper(counter_names, time=0)
all_estimates = raw_union(counter_names, time).select{ |i| i > 0 }
estimate_sum = all_estimates.reduce(0.0){ |a, score| a + 2.0 ** -score }
View
@@ -2,6 +2,9 @@ module HyperLogLog
class Counter
include Algorithm
+ # This is the implementation of the standard HyperLogLog algorithm, storing
+ # counts in each byte of a string of length 2 ** b.
+
def add(counter_name, value)
hash, function_name, new_value = hash_info(value)
existing_value = @redis.getrange(counter_name, function_name, function_name).unpack('C').first.to_i
@@ -23,6 +26,8 @@ def union(counter_names)
def union_store(destination, counter_names)
@redis.set(destination, raw_union(counter_names).inject('') {|a, e| a << e.chr})
end
+
+ private
def raw_union(counter_names, time=nil)
counters = @redis.mget(*counter_names).compact
@@ -2,6 +2,30 @@ module HyperLogLog
class TimeSeriesCounter
include Algorithm
+ # This is an implementation of HyperLogLog that allows for querying counts
+ # within time ranges of the form (t, current_time] with second-level
+ # granularity. The standard implementation of HyperLogLog stores the max
+ # number of leading zeros seen in the image of each of 2 ** b hash
+ # functions. These counts can naturally be stored in a string of length
+ # 2 ** b by allocating one byte per leading zero count.
+ #
+ # To provide counts within a time range, we alter the standard
+ # implementation to store a mapping of pairs of the form (hash function,
+ # leading zero count) -> timestamp, where the mapping (h,z) -> t represents
+ # the fact that we observed z leading zeros in the image of hash function h
+ # most recently at time t. This mapping is stored in a string by packing
+ # 4-byte words (timestamps, represented in seconds since the epoch) into
+ # a matrix indexed by hash function and zero count stored in row-major
+ # order. Since the max zero count for a counter with parameter b is (32-b),
+ # this representation takes up at most 4 * (32-b) * (2 ** b) bytes (and
+ # usually much less, since we don't allocate space for rows corresponding
+ # to higher leading zero counts until they're actaully observed.)
+ #
+ # To convert this representation to a HyperLogLog counter for the time
+ # range (t, current_time], we simply filter out all timestamps less than t
+ # in the matrix and then find, for each hash function, the maximum z for
+ # which that hash function has a non-zero timestamp.
+
def add(counter_name, value, time=nil)
hash, function_name, new_value = hash_info(value)
index = 4 * (function_name + (new_value.to_i * @m))
@@ -28,22 +52,28 @@ def union(counter_names, time=0)
# a HyperLogLog counter later.
def union_store(destination, counter_names, time=0)
raw_counters = @redis.mget(*counter_names).compact.map{ |c| c.unpack('N*').map{ |x| x > time ? x : 0 } }
- max_length = raw_counters.map{ |c| c.length }.max
- combined_counters = raw_counters.map{ |c| c.fill(0, c.length, max_length - c.length) }.transpose.map{ |e| e.max.to_i }
+ combined_counters = jagged_transpose(raw_counters).map{ |x| x.max.to_i }
@redis.set(destination, combined_counters.pack('N*'))
end
+
+ private
def raw_union(counter_names, time=0)
raw_counters = @redis.mget(*counter_names).compact
return [] if raw_counters.none?
hyperloglog_counters = raw_counters.map do |counter|
- slices = counter.unpack('N*').each_slice(@m).to_a
- slices.last.fill(0, slices.last.length, slices.first.length - slices.last.length)
- slices.transpose.map{ |x| x.rindex{ |c| c > time } || 0 }
+ jagged_transpose(counter.unpack('N*').each_slice(@m).to_a).map{ |x| x.rindex{ |c| c > time } || 0 }
end
return hyperloglog_counters.first if hyperloglog_counters.one?
- max_length = hyperloglog_counters.map{ |c| c.length }.max
- hyperloglog_counters.map{ |c| c.fill(0, c.length, max_length - c.length) }.transpose.map{ |e| e.max.to_i }
+ jagged_transpose(hyperloglog_counters).map{ |x| x.max.to_i }
+ end
+
+ # Given an array of non-uniform length arrays, right-pad all arrays with
+ # zeros so they're the same size, then transpose the array. This is a
+ # destructive operation: the zero-padding modifies the array-of-arrays
+ def jagged_transpose(arrays)
+ max_length = arrays.map{ |a| a.length }.max
+ arrays.map{ |a| a.fill(0, a.length, max_length - a.length) }.transpose
end
end

0 comments on commit 1b9f41a

Please sign in to comment.