Skip to content
This repository was archived by the owner on May 10, 2024. It is now read-only.

Commit 225fba7

Browse files
Aliaksei Sandryhailawesm
authored andcommitted
PARQUET-542: Support custom memory allocators
Added `MemoryAllocator` interface and a default implementation, ensured `MemPool` can use a custom allocator, added `Vector<T>` to replace `std::vector<T>`. Author: Aliaksei Sandryhaila <aliaksei.sandryhaila@hp.com> Closes #72 from asandryh/PARQUET-542 and squashes the following commits: a740edb [Aliaksei Sandryhaila] Incorporated PR feedback. 6422e0d [Aliaksei Sandryhaila] Added MemoryAllocator interface and default implementation, ensured MemPool can use a custom allocator, added Vector<T> to replace std::vector<T>.
1 parent 5c4f645 commit 225fba7

34 files changed

+496
-161
lines changed

src/parquet/api/io.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "parquet/exception.h"
2222
#include "parquet/util/buffer.h"
2323
#include "parquet/util/input.h"
24+
#include "parquet/util/mem-allocator.h"
2425
#include "parquet/util/output.h"
2526

2627
#endif // PARQUET_API_IO_H

src/parquet/column/reader.cc

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,12 @@
2929
namespace parquet_cpp {
3030

3131
ColumnReader::ColumnReader(const ColumnDescriptor* descr,
32-
std::unique_ptr<PageReader> pager)
32+
std::unique_ptr<PageReader> pager, MemoryAllocator* allocator)
3333
: descr_(descr),
3434
pager_(std::move(pager)),
3535
num_buffered_values_(0),
36-
num_decoded_values_(0) {}
36+
num_decoded_values_(0),
37+
allocator_(allocator) {}
3738

3839
template <int TYPE>
3940
void TypedColumnReader<TYPE>::ConfigureDictionary(const DictionaryPage* page) {
@@ -59,7 +60,7 @@ void TypedColumnReader<TYPE>::ConfigureDictionary(const DictionaryPage* page) {
5960
// TODO(wesm): investigate whether this all-or-nothing decoding of the
6061
// dictionary makes sense and whether performance can be improved
6162

62-
auto decoder = std::make_shared<DictionaryDecoder<TYPE> >(descr_);
63+
auto decoder = std::make_shared<DictionaryDecoder<TYPE> >(descr_, allocator_);
6364
decoder->SetDict(&dictionary);
6465
decoders_[encoding] = decoder;
6566
} else {
@@ -196,24 +197,26 @@ int64_t ColumnReader::ReadRepetitionLevels(int64_t batch_size, int16_t* levels)
196197

197198
std::shared_ptr<ColumnReader> ColumnReader::Make(
198199
const ColumnDescriptor* descr,
199-
std::unique_ptr<PageReader> pager) {
200+
std::unique_ptr<PageReader> pager,
201+
MemoryAllocator* allocator) {
200202
switch (descr->physical_type()) {
201203
case Type::BOOLEAN:
202-
return std::make_shared<BoolReader>(descr, std::move(pager));
204+
return std::make_shared<BoolReader>(descr, std::move(pager), allocator);
203205
case Type::INT32:
204-
return std::make_shared<Int32Reader>(descr, std::move(pager));
206+
return std::make_shared<Int32Reader>(descr, std::move(pager), allocator);
205207
case Type::INT64:
206-
return std::make_shared<Int64Reader>(descr, std::move(pager));
208+
return std::make_shared<Int64Reader>(descr, std::move(pager), allocator);
207209
case Type::INT96:
208-
return std::make_shared<Int96Reader>(descr, std::move(pager));
210+
return std::make_shared<Int96Reader>(descr, std::move(pager), allocator);
209211
case Type::FLOAT:
210-
return std::make_shared<FloatReader>(descr, std::move(pager));
212+
return std::make_shared<FloatReader>(descr, std::move(pager), allocator);
211213
case Type::DOUBLE:
212-
return std::make_shared<DoubleReader>(descr, std::move(pager));
214+
return std::make_shared<DoubleReader>(descr, std::move(pager), allocator);
213215
case Type::BYTE_ARRAY:
214-
return std::make_shared<ByteArrayReader>(descr, std::move(pager));
216+
return std::make_shared<ByteArrayReader>(descr, std::move(pager), allocator);
215217
case Type::FIXED_LEN_BYTE_ARRAY:
216-
return std::make_shared<FixedLenByteArrayReader>(descr, std::move(pager));
218+
return std::make_shared<FixedLenByteArrayReader>(descr,
219+
std::move(pager), allocator);
217220
default:
218221
ParquetException::NYI("type reader not implemented");
219222
}

src/parquet/column/reader.h

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,17 @@
3030
#include "parquet/exception.h"
3131
#include "parquet/schema/descriptor.h"
3232
#include "parquet/types.h"
33+
#include "parquet/util/mem-allocator.h"
3334

3435
namespace parquet_cpp {
3536

3637
class ColumnReader {
3738
public:
38-
ColumnReader(const ColumnDescriptor*, std::unique_ptr<PageReader>);
39+
ColumnReader(const ColumnDescriptor*, std::unique_ptr<PageReader>,
40+
MemoryAllocator* allocator = default_allocator());
3941

4042
static std::shared_ptr<ColumnReader> Make(const ColumnDescriptor*,
41-
std::unique_ptr<PageReader>);
43+
std::unique_ptr<PageReader>, MemoryAllocator* allocator = default_allocator());
4244

4345
// Returns true if there are still values in this column.
4446
bool HasNext() {
@@ -95,6 +97,8 @@ class ColumnReader {
9597
// The number of values from the current data page that have been decoded
9698
// into memory
9799
int num_decoded_values_;
100+
101+
MemoryAllocator* allocator_;
98102
};
99103

100104
// API to read values from a single column. This is the main client facing API.
@@ -104,8 +108,8 @@ class TypedColumnReader : public ColumnReader {
104108
typedef typename type_traits<TYPE>::value_type T;
105109

106110
TypedColumnReader(const ColumnDescriptor* schema,
107-
std::unique_ptr<PageReader> pager) :
108-
ColumnReader(schema, std::move(pager)),
111+
std::unique_ptr<PageReader> pager, MemoryAllocator* allocator) :
112+
ColumnReader(schema, std::move(pager), allocator),
109113
current_decoder_(NULL) {
110114
}
111115

src/parquet/column/scanner.cc

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,24 +25,25 @@
2525
namespace parquet_cpp {
2626

2727
std::shared_ptr<Scanner> Scanner::Make(std::shared_ptr<ColumnReader> col_reader,
28-
int64_t batch_size) {
28+
int64_t batch_size, MemoryAllocator* allocator) {
2929
switch (col_reader->type()) {
3030
case Type::BOOLEAN:
31-
return std::make_shared<BoolScanner>(col_reader, batch_size);
31+
return std::make_shared<BoolScanner>(col_reader, batch_size, allocator);
3232
case Type::INT32:
33-
return std::make_shared<Int32Scanner>(col_reader, batch_size);
33+
return std::make_shared<Int32Scanner>(col_reader, batch_size, allocator);
3434
case Type::INT64:
35-
return std::make_shared<Int64Scanner>(col_reader, batch_size);
35+
return std::make_shared<Int64Scanner>(col_reader, batch_size, allocator);
3636
case Type::INT96:
37-
return std::make_shared<Int96Scanner>(col_reader, batch_size);
37+
return std::make_shared<Int96Scanner>(col_reader, batch_size, allocator);
3838
case Type::FLOAT:
39-
return std::make_shared<FloatScanner>(col_reader, batch_size);
39+
return std::make_shared<FloatScanner>(col_reader, batch_size, allocator);
4040
case Type::DOUBLE:
41-
return std::make_shared<DoubleScanner>(col_reader, batch_size);
41+
return std::make_shared<DoubleScanner>(col_reader, batch_size, allocator);
4242
case Type::BYTE_ARRAY:
43-
return std::make_shared<ByteArrayScanner>(col_reader, batch_size);
43+
return std::make_shared<ByteArrayScanner>(col_reader, batch_size, allocator);
4444
case Type::FIXED_LEN_BYTE_ARRAY:
45-
return std::make_shared<FixedLenByteArrayScanner>(col_reader, batch_size);
45+
return std::make_shared<FixedLenByteArrayScanner>(col_reader,
46+
batch_size, allocator);
4647
default:
4748
ParquetException::NYI("type reader not implemented");
4849
}

src/parquet/column/scanner.h

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "parquet/exception.h"
3030
#include "parquet/schema/descriptor.h"
3131
#include "parquet/types.h"
32+
#include "parquet/util/mem-allocator.h"
3233

3334
namespace parquet_cpp {
3435

@@ -37,10 +38,12 @@ static constexpr int64_t DEFAULT_SCANNER_BATCH_SIZE = 128;
3738
class Scanner {
3839
public:
3940
explicit Scanner(std::shared_ptr<ColumnReader> reader,
40-
int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE) :
41+
int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
42+
MemoryAllocator* allocator = default_allocator()) :
4143
batch_size_(batch_size),
4244
level_offset_(0),
4345
levels_buffered_(0),
46+
value_buffer_(0, allocator),
4447
value_offset_(0),
4548
values_buffered_(0),
4649
reader_(reader) {
@@ -52,7 +55,8 @@ class Scanner {
5255
virtual ~Scanner() {}
5356

5457
static std::shared_ptr<Scanner> Make(std::shared_ptr<ColumnReader> col_reader,
55-
int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE);
58+
int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
59+
MemoryAllocator* allocator = default_allocator());
5660

5761
virtual void PrintNext(std::ostream& out, int width) = 0;
5862

@@ -78,7 +82,7 @@ class Scanner {
7882
int level_offset_;
7983
int levels_buffered_;
8084

81-
std::vector<uint8_t> value_buffer_;
85+
OwnedMutableBuffer value_buffer_;
8286
int value_offset_;
8387
int64_t values_buffered_;
8488

@@ -93,11 +97,12 @@ class TypedScanner : public Scanner {
9397
typedef typename type_traits<TYPE>::value_type T;
9498

9599
explicit TypedScanner(std::shared_ptr<ColumnReader> reader,
96-
int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE) :
97-
Scanner(reader, batch_size) {
100+
int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
101+
MemoryAllocator* allocator = default_allocator()) :
102+
Scanner(reader, batch_size, allocator) {
98103
typed_reader_ = static_cast<TypedColumnReader<TYPE>*>(reader.get());
99104
int value_byte_size = type_traits<TYPE>::value_byte_size;
100-
value_buffer_.resize(batch_size_ * value_byte_size);
105+
value_buffer_.Resize(batch_size_ * value_byte_size);
101106
values_ = reinterpret_cast<T*>(&value_buffer_[0]);
102107
}
103108

src/parquet/column/test-util.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -252,23 +252,23 @@ class DictionaryPageBuilder {
252252
if (TN == Type::FIXED_LEN_BYTE_ARRAY) {
253253
type_length = d->type_length();
254254
}
255-
encoder_.reset(new DictEncoder<TC>(&pool_, type_length));
255+
encoder_.reset(new DictEncoder<TC>(&pool_, default_allocator(), type_length));
256256
}
257257

258258
~DictionaryPageBuilder() {
259259
pool_.FreeAll();
260260
}
261261

262262
shared_ptr<Buffer> AppendValues(const vector<TC>& values) {
263-
shared_ptr<OwnedMutableBuffer> rle_indices = std::make_shared<OwnedMutableBuffer>();
264263
int num_values = values.size();
265264
// Dictionary encoding
266265
for (int i = 0; i < num_values; ++i) {
267266
encoder_->Put(values[i]);
268267
}
269268
num_dict_values_ = encoder_->num_entries();
270269
have_values_ = true;
271-
rle_indices->Resize(sizeof(int) * encoder_->EstimatedDataEncodedSize());
270+
shared_ptr<OwnedMutableBuffer> rle_indices = std::make_shared<OwnedMutableBuffer>(
271+
sizeof(int) * encoder_->EstimatedDataEncodedSize());
272272
int actual_bytes = encoder_->WriteIndices(rle_indices->mutable_data(),
273273
rle_indices->size());
274274
rle_indices->Resize(actual_bytes);
@@ -277,8 +277,8 @@ class DictionaryPageBuilder {
277277
}
278278

279279
shared_ptr<Buffer> WriteDict() {
280-
shared_ptr<OwnedMutableBuffer> dict_buffer = std::make_shared<OwnedMutableBuffer>();
281-
dict_buffer->Resize(encoder_->dict_encoded_size());
280+
shared_ptr<OwnedMutableBuffer> dict_buffer = std::make_shared<OwnedMutableBuffer>(
281+
encoder_->dict_encoded_size());
282282
encoder_->WriteDict(dict_buffer->mutable_data());
283283
return dict_buffer;
284284
}

src/parquet/encodings/decoder.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
#include "parquet/exception.h"
2424
#include "parquet/types.h"
25+
#include "parquet/util/mem-allocator.h"
2526

2627
namespace parquet_cpp {
2728

@@ -54,8 +55,7 @@ class Decoder {
5455
const Encoding::type encoding() const { return encoding_; }
5556

5657
protected:
57-
explicit Decoder(const ColumnDescriptor* descr,
58-
const Encoding::type& encoding)
58+
explicit Decoder(const ColumnDescriptor* descr, const Encoding::type& encoding)
5959
: descr_(descr), encoding_(encoding), num_values_(0) {}
6060

6161
// For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY

src/parquet/encodings/delta-bit-pack-encoding.h

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
#include "parquet/encodings/decoder.h"
2626
#include "parquet/util/bit-stream-utils.inline.h"
27+
#include "parquet/util/buffer.h"
2728

2829
namespace parquet_cpp {
2930

@@ -32,8 +33,10 @@ class DeltaBitPackDecoder : public Decoder<TYPE> {
3233
public:
3334
typedef typename type_traits<TYPE>::value_type T;
3435

35-
explicit DeltaBitPackDecoder(const ColumnDescriptor* descr)
36-
: Decoder<TYPE>(descr, Encoding::DELTA_BINARY_PACKED) {
36+
explicit DeltaBitPackDecoder(const ColumnDescriptor* descr,
37+
MemoryAllocator* allocator = default_allocator())
38+
: Decoder<TYPE>(descr, Encoding::DELTA_BINARY_PACKED),
39+
delta_bit_widths_(0, allocator) {
3740
if (TYPE != Type::INT32 && TYPE != Type::INT64) {
3841
throw ParquetException("Delta bit pack encoding should only be for integer data.");
3942
}
@@ -61,7 +64,7 @@ class DeltaBitPackDecoder : public Decoder<TYPE> {
6164
ParquetException::EofException();
6265
}
6366
if (!decoder_.GetZigZagVlqInt(&last_value_)) ParquetException::EofException();
64-
delta_bit_widths_.resize(num_mini_blocks_);
67+
delta_bit_widths_.Resize(num_mini_blocks_);
6568

6669
if (!decoder_.GetZigZagVlqInt(&min_delta_)) ParquetException::EofException();
6770
for (int i = 0; i < num_mini_blocks_; ++i) {
@@ -81,7 +84,7 @@ class DeltaBitPackDecoder : public Decoder<TYPE> {
8184
for (int i = 0; i < max_values; ++i) {
8285
if (UNLIKELY(values_current_mini_block_ == 0)) {
8386
++mini_block_idx_;
84-
if (mini_block_idx_ < delta_bit_widths_.size()) {
87+
if (mini_block_idx_ < static_cast<size_t>(delta_bit_widths_.size())) {
8588
delta_bit_width_ = delta_bit_widths_[mini_block_idx_];
8689
values_current_mini_block_ = values_per_mini_block_;
8790
} else {
@@ -111,7 +114,7 @@ class DeltaBitPackDecoder : public Decoder<TYPE> {
111114

112115
int32_t min_delta_;
113116
size_t mini_block_idx_;
114-
std::vector<uint8_t> delta_bit_widths_;
117+
OwnedMutableBuffer delta_bit_widths_;
115118
int delta_bit_width_;
116119

117120
int32_t last_value_;

src/parquet/encodings/delta-byte-array-encoding.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,11 @@ namespace parquet_cpp {
2828

2929
class DeltaByteArrayDecoder : public Decoder<Type::BYTE_ARRAY> {
3030
public:
31-
explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr)
31+
explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
32+
MemoryAllocator* allocator = default_allocator())
3233
: Decoder<Type::BYTE_ARRAY>(descr, Encoding::DELTA_BYTE_ARRAY),
33-
prefix_len_decoder_(nullptr),
34-
suffix_decoder_(nullptr) {
34+
prefix_len_decoder_(nullptr, allocator),
35+
suffix_decoder_(nullptr, allocator) {
3536
}
3637

3738
virtual void SetData(int num_values, const uint8_t* data, int len) {

src/parquet/encodings/delta-length-byte-array-encoding.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ namespace parquet_cpp {
2929

3030
class DeltaLengthByteArrayDecoder : public Decoder<Type::BYTE_ARRAY> {
3131
public:
32-
explicit DeltaLengthByteArrayDecoder(const ColumnDescriptor* descr)
33-
: Decoder<Type::BYTE_ARRAY>(descr,
34-
Encoding::DELTA_LENGTH_BYTE_ARRAY),
35-
len_decoder_(nullptr) {
32+
explicit DeltaLengthByteArrayDecoder(const ColumnDescriptor* descr,
33+
MemoryAllocator* allocator = default_allocator()) :
34+
Decoder<Type::BYTE_ARRAY>(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY),
35+
len_decoder_(nullptr, allocator) {
3636
}
3737

3838
virtual void SetData(int num_values, const uint8_t* data, int len) {

0 commit comments

Comments
 (0)