Skip to content

Commit

Permalink
Implemented compressed encoding according to Java implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
wallin committed Jan 2, 2016
1 parent fbc1531 commit bda6639
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 7 deletions.
71 changes: 64 additions & 7 deletions lib/tdigest/tdigest.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,39 @@ def initialize(delta = 0.01, k = 25, cx = 1.1)

def as_bytes
# compression as defined by Java implementation
compression = self.compression
output = [VERBOSE_ENCODING, compression, size]
output += @centroids.map { |_, c| c.mean }
output += @centroids.map { |_, c| c.n }
output.pack("LdLd#{size}L#{size}")
end

def as_small_bytes
output = [SMALL_ENCODING, compression, size]
x = 0
# delta encoding allows saving 4-bytes floats
mean_arr = @centroids.map do |_, c|
val = c.mean - x
x = c.mean
val
end
output += mean_arr
# Variable length encoding of numbers
c_arr = @centroids.each_with_object([]) do |(_, c), arr|
k = 0
n = c.n
while n < 0 || n > 0x7f
b = 0x80 | (0x7f & n)
arr << b
n = n >> 7
k += 1
fail 'Unreasonable large number' if k > 6
end
arr << n
end
output += c_arr
output.pack("LdLf#{mean_arr.size}C#{c_arr.size}")
end

def as_json(_ = nil)
@centroids.map { |_, c| c.as_json }
end
Expand Down Expand Up @@ -171,12 +197,43 @@ def to_a

def self.from_bytes(bytes)
format, compression, size = bytes.unpack('LdL')
fail 'Unknown format' unless format == VERBOSE_ENCODING
delta = 1 / compression
array = bytes[16..-1].unpack("d#{size}L#{size}")
tdigest = new(delta)
if array.size > 0
means, counts = array.each_slice(size).to_a
tdigest = new(1 / compression)

start_idx = 16 # after header
case format
when VERBOSE_ENCODING
array = bytes[start_idx..-1].unpack("d#{size}L#{size}")
means, counts = array.each_slice(size).to_a if array.size > 0
when SMALL_ENCODING
means = bytes[start_idx..(start_idx + 4 * size)].unpack("f#{size}")
# Decode delta encoding of means
x = 0
means.map! do |m|
m += x
x = m
m
end
counts_bytes = bytes[(start_idx + 4 * size)..-1].unpack('C*')
counts = []
# Decode variable length integer bytes
size.times do
v = counts_bytes.shift
z = 0x7f & v
shift = 7
while (v & 0x80) != 0
fail 'Shift too large in decode' if shift > 28
v = counts_bytes.shift || 0
z += (v & 0x7f) << shift
shift += 7
end
counts << z
end
# This shouldn't happen
fail 'Mismatch' unless counts.size == means.size
else
fail 'Unknown compression format'
end
if means && counts
means.zip(counts).each { |val| tdigest.push(val[0], val[1]) }
end
tdigest
Expand Down
20 changes: 20 additions & 0 deletions test/tdigest_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ def test_that_it_has_a_version_number
describe 'byte serialization' do
it 'loads serialized data' do
tdigest.push(60, 100)
10.times { tdigest.push(rand * 100) }
bytes = tdigest.as_bytes
new_tdigest = ::TDigest::TDigest.from_bytes(bytes)
new_tdigest.percentile(0.9).must_equal tdigest.percentile(0.9)
new_tdigest.as_bytes.must_equal bytes
end

it 'handles zero size' do
Expand All @@ -30,6 +32,24 @@ def test_that_it_has_a_version_number
end
end

describe 'small byte serialization' do
it 'loads serialized data' do
tdigest.push(60, 1000)
10.times { tdigest.push(rand * 100) }
bytes = tdigest.as_small_bytes
new_tdigest = ::TDigest::TDigest.from_bytes(bytes)
# Expect some rounding error due to compression
new_tdigest.percentile(0.9).round(5).must_equal(
tdigest.percentile(0.9).round(5))
new_tdigest.as_small_bytes.must_equal bytes
end

it 'handles zero size' do
bytes = tdigest.as_small_bytes
::TDigest::TDigest.from_bytes(bytes).size.must_equal 0
end
end

describe 'JSON serialization' do
it 'loads serialized data' do
tdigest.push(60, 100)
Expand Down

0 comments on commit bda6639

Please sign in to comment.