/
rle-encoding.h
529 lines (472 loc) · 17.7 KB
/
rle-encoding.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_RLE_ENCODING_H
#define IMPALA_RLE_ENCODING_H
#include <glog/logging.h>
#include "kudu/gutil/port.h"
#include "kudu/util/bit-stream-utils.inline.h"
#include "kudu/util/bit-util.h"
namespace kudu {
// Utility classes to do run length encoding (RLE) for fixed bit width values. If runs
// are sufficiently long, RLE is used, otherwise, the values are just bit-packed
// (literal encoding).
// For both types of runs, there is a byte-aligned indicator which encodes the length
// of the run and the type of the run.
// This encoding has the benefit that when there aren't any long enough runs, values
// are always decoded at fixed (can be precomputed) bit offsets OR both the value and
// the run length are byte aligned. This allows for very efficient decoding
// implementations.
// The encoding is:
// encoded-block := run*
// run := literal-run | repeated-run
// literal-run := literal-indicator < literal bytes >
// repeated-run := repeated-indicator < repeated value. padded to byte boundary >
// literal-indicator := varint_encode( number_of_groups << 1 | 1)
// repeated-indicator := varint_encode( number_of_repetitions << 1 )
//
// Each run is preceded by a varint. The varint's least significant bit is
// used to indicate whether the run is a literal run or a repeated run. The rest
// of the varint is used to determine the length of the run (eg how many times the
// value repeats).
//
// In the case of literal runs, the run length is always a multiple of 8 (i.e. encode
// in groups of 8), so that no matter the bit-width of the value, the sequence will end
// on a byte boundary without padding.
// Given that we know it is a multiple of 8, we store the number of 8-groups rather than
// the actual number of encoded ints. (This means that the total number of encoded values
// can not be determined from the encoded data, since the number of values in the last
// group may not be a multiple of 8).
// There is a break-even point when it is more storage efficient to do run length
// encoding. For 1 bit-width values, that point is 8 values. They require 2 bytes
// for both the repeated encoding or the literal encoding. This value can always
// be computed based on the bit-width.
// TODO: think about how to use this for strings. The bit packing isn't quite the same.
//
// Examples with bit-width 1 (eg encoding booleans):
// ----------------------------------------
// 100 1s followed by 100 0s:
// <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 byte>
// - (total 4 bytes)
//
// alternating 1s and 0s (200 total):
// 200 ints = 25 groups of 8
// <varint((25 << 1) | 1)> <25 bytes of values, bitpacked>
// (total 26 bytes, 1 byte overhead)
//
// Decoder class for RLE encoded data.
//
// NOTE: the encoded format does not have any length prefix or any other way of
// indicating that the encoded sequence ends at a certain point, so the Decoder
// methods may return some extra bits at the end before the read methods start
// to return 0/false.
template<typename T>
class RleDecoder {
public:
// Create a decoder object. buffer/buffer_len is the decoded data.
// bit_width is the width of each value (before encoding).
RleDecoder(const uint8_t* buffer, int buffer_len, int bit_width)
: bit_reader_(buffer, buffer_len),
bit_width_(bit_width),
current_value_(0),
repeat_count_(0),
literal_count_(0),
rewind_state_(CANT_REWIND) {
DCHECK_GE(bit_width_, 1);
DCHECK_LE(bit_width_, 64);
}
RleDecoder() {}
// Skip n values, and returns the number of non-zero entries skipped.
size_t Skip(size_t to_skip);
// Gets the next value. Returns false if there are no more.
bool Get(T* val);
// Seek to the previous value.
void RewindOne();
// Gets the next run of the same 'val'. Returns 0 if there is no
// more data to be decoded. Will return a run of at most 'max_run'
// values. If there are more values than this, the next call to
// GetNextRun will return more from the same run.
size_t GetNextRun(T* val, size_t max_run);
private:
bool ReadHeader();
enum RewindState {
REWIND_LITERAL,
REWIND_RUN,
CANT_REWIND
};
BitReader bit_reader_;
int bit_width_;
uint64_t current_value_;
uint32_t repeat_count_;
uint32_t literal_count_;
RewindState rewind_state_;
};
// Class to incrementally build the rle data.
// The encoding has two modes: encoding repeated runs and literal runs.
// If the run is sufficiently short, it is more efficient to encode as a literal run.
// This class does so by buffering 8 values at a time. If they are not all the same
// they are added to the literal run. If they are the same, they are added to the
// repeated run. When we switch modes, the previous run is flushed out.
template<typename T>
class RleEncoder {
public:
// buffer: buffer to write bits to.
// bit_width: max number of bits for value.
// TODO: consider adding a min_repeated_run_length so the caller can control
// when values should be encoded as repeated runs. Currently this is derived
// based on the bit_width, which can determine a storage optimal choice.
explicit RleEncoder(faststring *buffer, int bit_width)
: bit_width_(bit_width),
bit_writer_(buffer) {
DCHECK_GE(bit_width_, 1);
DCHECK_LE(bit_width_, 64);
Clear();
}
// Reserve 'num_bytes' bytes for a plain encoded header, set each
// byte with 'val': this is used for the RLE-encoded data blocks in
// order to be able to able to store the initial ordinal position
// and number of elements. This is a part of RleEncoder in order to
// maintain the correct offset in 'buffer'.
void Reserve(int num_bytes, uint8_t val);
// Encode value. This value must be representable with bit_width_ bits.
void Put(T value, size_t run_length = 1);
// Flushes any pending values to the underlying buffer.
// Returns the total number of bytes written
int Flush();
// Resets all the state in the encoder.
void Clear();
int32_t len() const { return bit_writer_.bytes_written(); }
private:
// Flushes any buffered values. If this is part of a repeated run, this is largely
// a no-op.
// If it is part of a literal run, this will call FlushLiteralRun, which writes
// out the buffered literal values.
// If 'done' is true, the current run would be written even if it would normally
// have been buffered more. This should only be called at the end, when the
// encoder has received all values even if it would normally continue to be
// buffered.
void FlushBufferedValues(bool done);
// Flushes literal values to the underlying buffer. If update_indicator_byte,
// then the current literal run is complete and the indicator byte is updated.
void FlushLiteralRun(bool update_indicator_byte);
// Flushes a repeated run to the underlying buffer.
void FlushRepeatedRun();
// Number of bits needed to encode the value.
const int bit_width_;
// Underlying buffer.
BitWriter bit_writer_;
// We need to buffer at most 8 values for literals. This happens when the
// bit_width is 1 (so 8 values fit in one byte).
// TODO: generalize this to other bit widths
uint64_t buffered_values_[8];
// Number of values in buffered_values_
int num_buffered_values_;
// The current (also last) value that was written and the count of how
// many times in a row that value has been seen. This is maintained even
// if we are in a literal run. If the repeat_count_ get high enough, we switch
// to encoding repeated runs.
uint64_t current_value_;
int repeat_count_;
// Number of literals in the current run. This does not include the literals
// that might be in buffered_values_. Only after we've got a group big enough
// can we decide if they should part of the literal_count_ or repeat_count_
int literal_count_;
// Index of a byte in the underlying buffer that stores the indicator byte.
// This is reserved as soon as we need a literal run but the value is written
// when the literal run is complete. We maintain an index rather than a pointer
// into the underlying buffer because the pointer value may become invalid if
// the underlying buffer is resized.
int literal_indicator_byte_idx_;
};
template<typename T>
inline bool RleDecoder<T>::ReadHeader() {
DCHECK(bit_reader_.is_initialized());
if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) {
// Read the next run's indicator int, it could be a literal or repeated run
// The int is encoded as a vlq-encoded value.
int32_t indicator_value = 0;
bool result = bit_reader_.GetVlqInt(&indicator_value);
if (PREDICT_FALSE(!result)) {
return false;
}
// lsb indicates if it is a literal run or repeated run
bool is_literal = indicator_value & 1;
if (is_literal) {
literal_count_ = (indicator_value >> 1) * 8;
DCHECK_GT(literal_count_, 0);
} else {
repeat_count_ = indicator_value >> 1;
DCHECK_GT(repeat_count_, 0);
bool result = bit_reader_.GetAligned<T>(
BitUtil::Ceil(bit_width_, 8), reinterpret_cast<T*>(¤t_value_));
DCHECK(result);
}
}
return true;
}
template<typename T>
inline bool RleDecoder<T>::Get(T* val) {
DCHECK(bit_reader_.is_initialized());
if (PREDICT_FALSE(!ReadHeader())) {
return false;
}
if (PREDICT_TRUE(repeat_count_ > 0)) {
*val = current_value_;
--repeat_count_;
rewind_state_ = REWIND_RUN;
} else {
DCHECK(literal_count_ > 0);
bool result = bit_reader_.GetValue(bit_width_, val);
DCHECK(result);
--literal_count_;
rewind_state_ = REWIND_LITERAL;
}
return true;
}
template<typename T>
inline void RleDecoder<T>::RewindOne() {
DCHECK(bit_reader_.is_initialized());
switch (rewind_state_) {
case CANT_REWIND:
LOG(FATAL) << "Can't rewind more than once after each read!";
break;
case REWIND_RUN:
++repeat_count_;
break;
case REWIND_LITERAL:
{
bit_reader_.Rewind(bit_width_);
++literal_count_;
break;
}
}
rewind_state_ = CANT_REWIND;
}
template<typename T>
inline size_t RleDecoder<T>::GetNextRun(T* val, size_t max_run) {
DCHECK(bit_reader_.is_initialized());
DCHECK_GT(max_run, 0);
size_t ret = 0;
size_t rem = max_run;
while (ReadHeader()) {
if (PREDICT_TRUE(repeat_count_ > 0)) {
if (PREDICT_FALSE(ret > 0 && *val != current_value_)) {
return ret;
}
*val = current_value_;
if (repeat_count_ >= rem) {
// The next run is longer than the amount of remaining data
// that the caller wants to read. Only consume it partially.
repeat_count_ -= rem;
ret += rem;
return ret;
}
ret += repeat_count_;
rem -= repeat_count_;
repeat_count_ = 0;
} else {
DCHECK(literal_count_ > 0);
if (ret == 0) {
bool result = bit_reader_.GetValue(bit_width_, val);
if (PREDICT_FALSE(!result)) {
return ret;
}
literal_count_--;
ret++;
rem--;
}
while (literal_count_ > 0) {
bool result = bit_reader_.GetValue(bit_width_, ¤t_value_);
if (PREDICT_FALSE(!result)) {
return ret;
}
if (current_value_ != *val || rem == 0) {
bit_reader_.Rewind(bit_width_);
return ret;
}
ret++;
rem--;
literal_count_--;
}
}
}
return ret;
}
template<typename T>
inline size_t RleDecoder<T>::Skip(size_t to_skip) {
DCHECK(bit_reader_.is_initialized());
size_t set_count = 0;
while (to_skip > 0) {
bool result = ReadHeader();
DCHECK(result);
if (PREDICT_TRUE(repeat_count_ > 0)) {
size_t nskip = (repeat_count_ < to_skip) ? repeat_count_ : to_skip;
repeat_count_ -= nskip;
to_skip -= nskip;
if (current_value_ != 0) {
set_count += nskip;
}
} else {
DCHECK(literal_count_ > 0);
size_t nskip = (literal_count_ < to_skip) ? literal_count_ : to_skip;
literal_count_ -= nskip;
to_skip -= nskip;
for (; nskip > 0; nskip--) {
T value = 0;
bool result = bit_reader_.GetValue(bit_width_, &value);
DCHECK(result);
if (value != 0) {
set_count++;
}
}
}
}
return set_count;
}
// This function buffers input values 8 at a time. After seeing all 8 values,
// it decides whether they should be encoded as a literal or repeated run.
template<typename T>
inline void RleEncoder<T>::Put(T value, size_t run_length) {
DCHECK(bit_width_ == 64 || value < (1LL << bit_width_));
// TODO(perf): remove the loop and use the repeat_count_
for (; run_length > 0; run_length--) {
if (PREDICT_TRUE(current_value_ == value)) {
++repeat_count_;
if (repeat_count_ > 8) {
// This is just a continuation of the current run, no need to buffer the
// values.
// Note that this is the fast path for long repeated runs.
continue;
}
} else {
if (repeat_count_ >= 8) {
// We had a run that was long enough but it has ended. Flush the
// current repeated run.
DCHECK_EQ(literal_count_, 0);
FlushRepeatedRun();
}
repeat_count_ = 1;
current_value_ = value;
}
buffered_values_[num_buffered_values_] = value;
if (++num_buffered_values_ == 8) {
DCHECK_EQ(literal_count_ % 8, 0);
FlushBufferedValues(false);
}
}
}
template<typename T>
inline void RleEncoder<T>::FlushLiteralRun(bool update_indicator_byte) {
if (literal_indicator_byte_idx_ < 0) {
// The literal indicator byte has not been reserved yet, get one now.
literal_indicator_byte_idx_ = bit_writer_.GetByteIndexAndAdvance(1);
DCHECK_GE(literal_indicator_byte_idx_, 0);
}
// Write all the buffered values as bit packed literals
for (int i = 0; i < num_buffered_values_; ++i) {
bit_writer_.PutValue(buffered_values_[i], bit_width_);
}
num_buffered_values_ = 0;
if (update_indicator_byte) {
// At this point we need to write the indicator byte for the literal run.
// We only reserve one byte, to allow for streaming writes of literal values.
// The logic makes sure we flush literal runs often enough to not overrun
// the 1 byte.
int num_groups = BitUtil::Ceil(literal_count_, 8);
int32_t indicator_value = (num_groups << 1) | 1;
DCHECK_EQ(indicator_value & 0xFFFFFF00, 0);
bit_writer_.buffer()->data()[literal_indicator_byte_idx_] = indicator_value;
literal_indicator_byte_idx_ = -1;
literal_count_ = 0;
}
}
template<typename T>
inline void RleEncoder<T>::FlushRepeatedRun() {
DCHECK_GT(repeat_count_, 0);
// The lsb of 0 indicates this is a repeated run
int32_t indicator_value = repeat_count_ << 1 | 0;
bit_writer_.PutVlqInt(indicator_value);
bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8));
num_buffered_values_ = 0;
repeat_count_ = 0;
}
// Flush the values that have been buffered. At this point we decide whether
// we need to switch between the run types or continue the current one.
template<typename T>
inline void RleEncoder<T>::FlushBufferedValues(bool done) {
if (repeat_count_ >= 8) {
// Clear the buffered values. They are part of the repeated run now and we
// don't want to flush them out as literals.
num_buffered_values_ = 0;
if (literal_count_ != 0) {
// There was a current literal run. All the values in it have been flushed
// but we still need to update the indicator byte.
DCHECK_EQ(literal_count_ % 8, 0);
DCHECK_EQ(repeat_count_, 8);
FlushLiteralRun(true);
}
DCHECK_EQ(literal_count_, 0);
return;
}
literal_count_ += num_buffered_values_;
int num_groups = BitUtil::Ceil(literal_count_, 8);
if (num_groups + 1 >= (1 << 6)) {
// We need to start a new literal run because the indicator byte we've reserved
// cannot store more values.
DCHECK_GE(literal_indicator_byte_idx_, 0);
FlushLiteralRun(true);
} else {
FlushLiteralRun(done);
}
repeat_count_ = 0;
}
template<typename T>
inline void RleEncoder<T>::Reserve(int num_bytes, uint8_t val) {
for (int i = 0; i < num_bytes; ++i) {
bit_writer_.PutValue(val, 8);
}
}
template<typename T>
inline int RleEncoder<T>::Flush() {
if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) {
bool all_repeat = literal_count_ == 0 &&
(repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0);
// There is something pending, figure out if it's a repeated or literal run
if (repeat_count_ > 0 && all_repeat) {
FlushRepeatedRun();
} else {
literal_count_ += num_buffered_values_;
FlushLiteralRun(true);
repeat_count_ = 0;
}
}
bit_writer_.Flush();
DCHECK_EQ(num_buffered_values_, 0);
DCHECK_EQ(literal_count_, 0);
DCHECK_EQ(repeat_count_, 0);
return bit_writer_.bytes_written();
}
template<typename T>
inline void RleEncoder<T>::Clear() {
current_value_ = 0;
repeat_count_ = 0;
num_buffered_values_ = 0;
literal_count_ = 0;
literal_indicator_byte_idx_ = -1;
bit_writer_.Clear();
}
} // namespace kudu
#endif