Skip to content

Commit

Permalink
PARQUET-1467: [C++] Remove defunct ChunkedAllocator code
Browse files Browse the repository at this point in the history
It does not seem that memory allocation on the dictionary encoding path requires something so elaborate right now

Author: Wes McKinney <wesm+git@apache.org>

Closes #3069 from wesm/PARQUET-1467 and squashes the following commits:

f37ed07 <Wes McKinney> Remove defunct memory allocator code
  • Loading branch information
wesm committed Dec 3, 2018
1 parent 1621868 commit 98bdde8
Show file tree
Hide file tree
Showing 9 changed files with 8 additions and 615 deletions.
6 changes: 1 addition & 5 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,6 @@ ColumnWriter::ColumnWriter(ColumnChunkMetaDataBuilder* metadata,
encoding_(encoding),
properties_(properties),
allocator_(properties->memory_pool()),
pool_(properties->memory_pool()),
num_buffered_values_(0),
num_buffered_encoded_values_(0),
rows_written_(0),
Expand Down Expand Up @@ -546,8 +545,7 @@ TypedColumnWriter<Type>::TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata,
break;
case Encoding::PLAIN_DICTIONARY:
case Encoding::RLE_DICTIONARY:
current_encoder_.reset(
new DictEncoder<Type>(descr_, &pool_, properties->memory_pool()));
current_encoder_.reset(new DictEncoder<Type>(descr_, properties->memory_pool()));
break;
default:
ParquetException::NYI("Selected encoding is not supported");
Expand Down Expand Up @@ -582,8 +580,6 @@ void TypedColumnWriter<Type>::WriteDictionaryPage() {
std::shared_ptr<ResizableBuffer> buffer =
AllocateBuffer(properties_->memory_pool(), dict_encoder->dict_encoded_size());
dict_encoder->WriteDict(buffer->mutable_data());
// TODO Get rid of this deep call
dict_encoder->mem_pool()->FreeAll();

DictionaryPage page(buffer, dict_encoder->num_entries(),
properties_->dictionary_index_encoding());
Expand Down
1 change: 0 additions & 1 deletion cpp/src/parquet/column_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ class PARQUET_EXPORT ColumnWriter {
LevelEncoder level_encoder_;

::arrow::MemoryPool* allocator_;
ChunkedAllocator pool_;

// The total number of values stored in the data page. This is the maximum of
// the number of encoded definition levels or encoded values. For
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/parquet/encoding-benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,10 @@ static void DecodeDict(std::vector<typename Type::c_type>& values,
typedef typename Type::c_type T;
int num_values = static_cast<int>(values.size());

ChunkedAllocator pool;
MemoryPool* allocator = default_memory_pool();
std::shared_ptr<ColumnDescriptor> descr = Int64Schema(Repetition::REQUIRED);

DictEncoder<Type> encoder(descr.get(), &pool, allocator);
DictEncoder<Type> encoder(descr.get(), allocator);
for (int i = 0; i < num_values; ++i) {
encoder.Put(values[i]);
}
Expand Down
9 changes: 1 addition & 8 deletions cpp/src/parquet/encoding-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -465,12 +465,10 @@ class DictEncoder : public Encoder<DType> {
public:
typedef typename DType::c_type T;

// XXX pool is unused
explicit DictEncoder(const ColumnDescriptor* desc, ChunkedAllocator* pool = nullptr,
explicit DictEncoder(const ColumnDescriptor* desc,
::arrow::MemoryPool* allocator = ::arrow::default_memory_pool())
: Encoder<DType>(desc, Encoding::PLAIN_DICTIONARY, allocator),
allocator_(allocator),
pool_(pool),
dict_encoded_size_(0),
type_length_(desc->type_length()),
memo_table_(INITIAL_HASH_TABLE_SIZE) {}
Expand Down Expand Up @@ -538,8 +536,6 @@ class DictEncoder : public Encoder<DType> {
/// dict_encoded_size() bytes.
void WriteDict(uint8_t* buffer);

ChunkedAllocator* mem_pool() { return pool_; }

/// The number of entries in the dictionary.
int num_entries() const { return memo_table_.size(); }

Expand All @@ -549,9 +545,6 @@ class DictEncoder : public Encoder<DType> {

::arrow::MemoryPool* allocator_;

// For ByteArray / FixedLenByteArray data. Not owned
ChunkedAllocator* pool_;

/// Indices that have not yet be written out by WriteIndices().
std::vector<int> buffered_indices_;

Expand Down
8 changes: 3 additions & 5 deletions cpp/src/parquet/encoding-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class TestEncodingBase : public ::testing::Test {
allocator_ = default_memory_pool();
}

void TearDown() { pool_.FreeAll(); }
void TearDown() {}

void InitData(int nvalues, int repeats) {
num_values_ = nvalues * repeats;
Expand All @@ -181,7 +181,6 @@ class TestEncodingBase : public ::testing::Test {
}

protected:
ChunkedAllocator pool_;
MemoryPool* allocator_;

int num_values_;
Expand All @@ -199,7 +198,6 @@ class TestEncodingBase : public ::testing::Test {
// Member variables are not visible to templated subclasses. Possibly figure
// out an alternative to this class layering at some point
#define USING_BASE_MEMBERS() \
using TestEncodingBase<Type>::pool_; \
using TestEncodingBase<Type>::allocator_; \
using TestEncodingBase<Type>::descr_; \
using TestEncodingBase<Type>::num_values_; \
Expand Down Expand Up @@ -253,14 +251,14 @@ class TestDictionaryEncoding : public TestEncodingBase<Type> {

void CheckRoundtrip() {
std::vector<uint8_t> valid_bits(BitUtil::BytesForBits(num_values_) + 1, 255);
DictEncoder<Type> encoder(descr_.get(), &pool_);
DictEncoder<Type> encoder(descr_.get());

ASSERT_NO_THROW(encoder.Put(draws_, num_values_));
dict_buffer_ = AllocateBuffer(default_memory_pool(), encoder.dict_encoded_size());
encoder.WriteDict(dict_buffer_->mutable_data());
std::shared_ptr<Buffer> indices = encoder.FlushValues();

DictEncoder<Type> spaced_encoder(descr_.get(), &pool_);
DictEncoder<Type> spaced_encoder(descr_.get());
// PutSpaced should lead to the same results
ASSERT_NO_THROW(spaced_encoder.PutSpaced(draws_, num_values_, valid_bits.data(), 0));
std::shared_ptr<Buffer> indices_from_spaced = spaced_encoder.FlushValues();
Expand Down
5 changes: 2 additions & 3 deletions cpp/src/parquet/test-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,10 @@ class DictionaryPageBuilder {
// This class writes data and metadata to the passed inputs
explicit DictionaryPageBuilder(const ColumnDescriptor* d)
: num_dict_values_(0), have_values_(false) {
encoder_.reset(new DictEncoder<TYPE>(d, &pool_));
encoder_.reset(new DictEncoder<TYPE>(d));
}

~DictionaryPageBuilder() { pool_.FreeAll(); }
~DictionaryPageBuilder() {}

shared_ptr<Buffer> AppendValues(const vector<TC>& values) {
int num_values = static_cast<int>(values.size());
Expand All @@ -271,7 +271,6 @@ class DictionaryPageBuilder {
int32_t num_values() const { return num_dict_values_; }

private:
ChunkedAllocator pool_;
shared_ptr<DictEncoder<TYPE>> encoder_;
int32_t num_dict_values_;
bool have_values_;
Expand Down
216 changes: 0 additions & 216 deletions cpp/src/parquet/util/memory-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,222 +34,6 @@ namespace parquet {

class TestBuffer : public ::testing::Test {};

// Utility class to call private functions on MemPool.
class ChunkedAllocatorTest {
public:
static bool CheckIntegrity(ChunkedAllocator* pool, bool current_chunk_empty) {
return pool->CheckIntegrity(current_chunk_empty);
}

static const int INITIAL_CHUNK_SIZE = ChunkedAllocator::INITIAL_CHUNK_SIZE;
static const int MAX_CHUNK_SIZE = ChunkedAllocator::MAX_CHUNK_SIZE;
};

const int ChunkedAllocatorTest::INITIAL_CHUNK_SIZE;
const int ChunkedAllocatorTest::MAX_CHUNK_SIZE;

TEST(ChunkedAllocatorTest, Basic) {
ChunkedAllocator p;
ChunkedAllocator p2;
ChunkedAllocator p3;

for (int iter = 0; iter < 2; ++iter) {
// allocate a total of 24K in 32-byte pieces (for which we only request 25 bytes)
for (int i = 0; i < 768; ++i) {
// pads to 32 bytes
p.Allocate(25);
}
// we handed back 24K
EXPECT_EQ(24 * 1024, p.total_allocated_bytes());
// .. and allocated 28K of chunks (4, 8, 16)
EXPECT_EQ(28 * 1024, p.GetTotalChunkSizes());

// we're passing on the first two chunks, containing 12K of data; we're left with
// one chunk of 16K containing 12K of data
p2.AcquireData(&p, true);
EXPECT_EQ(12 * 1024, p.total_allocated_bytes());
EXPECT_EQ(16 * 1024, p.GetTotalChunkSizes());

// we allocate 8K, for which there isn't enough room in the current chunk,
// so another one is allocated (32K)
p.Allocate(8 * 1024);
EXPECT_EQ((16 + 32) * 1024, p.GetTotalChunkSizes());

// we allocate 65K, which doesn't fit into the current chunk or the default
// size of the next allocated chunk (64K)
p.Allocate(65 * 1024);
EXPECT_EQ((12 + 8 + 65) * 1024, p.total_allocated_bytes());
if (iter == 0) {
EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes());
} else {
EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
}
EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes());

// Clear() resets allocated data, but doesn't remove any chunks
p.Clear();
EXPECT_EQ(0, p.total_allocated_bytes());
if (iter == 0) {
EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes());
} else {
EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
}
EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes());

// next allocation reuses existing chunks
p.Allocate(1024);
EXPECT_EQ(1024, p.total_allocated_bytes());
if (iter == 0) {
EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes());
} else {
EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
}
EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes());

// ... unless it doesn't fit into any available chunk
p.Allocate(120 * 1024);
EXPECT_EQ((1 + 120) * 1024, p.total_allocated_bytes());
if (iter == 0) {
EXPECT_EQ((1 + 120) * 1024, p.peak_allocated_bytes());
} else {
EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
}
EXPECT_EQ((130 + 16 + 32 + 65) * 1024, p.GetTotalChunkSizes());

// ... Try another chunk that fits into an existing chunk
p.Allocate(33 * 1024);
EXPECT_EQ((1 + 120 + 33) * 1024, p.total_allocated_bytes());
EXPECT_EQ((130 + 16 + 32 + 65) * 1024, p.GetTotalChunkSizes());

// we're releasing 3 chunks, which get added to p2
p2.AcquireData(&p, false);
EXPECT_EQ(0, p.total_allocated_bytes());
EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
EXPECT_EQ(0, p.GetTotalChunkSizes());

p3.AcquireData(&p2, true); // we're keeping the 65k chunk
EXPECT_EQ(33 * 1024, p2.total_allocated_bytes());
EXPECT_EQ(65 * 1024, p2.GetTotalChunkSizes());

p.FreeAll();
p2.FreeAll();
p3.FreeAll();
}
}

// Test that we can keep an allocated chunk and a free chunk.
// This case verifies that when chunks are acquired by another memory pool the
// remaining chunks are consistent if there were more than one used chunk and some
// free chunks.
TEST(ChunkedAllocatorTest, Keep) {
ChunkedAllocator p;
p.Allocate(4 * 1024);
p.Allocate(8 * 1024);
p.Allocate(16 * 1024);
EXPECT_EQ((4 + 8 + 16) * 1024, p.total_allocated_bytes());
EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes());
p.Clear();
EXPECT_EQ(0, p.total_allocated_bytes());
EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes());
p.Allocate(1 * 1024);
p.Allocate(4 * 1024);
EXPECT_EQ((1 + 4) * 1024, p.total_allocated_bytes());
EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes());

ChunkedAllocator p2;
p2.AcquireData(&p, true);
EXPECT_EQ(4 * 1024, p.total_allocated_bytes());
EXPECT_EQ((8 + 16) * 1024, p.GetTotalChunkSizes());
EXPECT_EQ(1 * 1024, p2.total_allocated_bytes());
EXPECT_EQ(4 * 1024, p2.GetTotalChunkSizes());

p.FreeAll();
p2.FreeAll();
}

// Tests that we can return partial allocations.
TEST(ChunkedAllocatorTest, ReturnPartial) {
ChunkedAllocator p;
uint8_t* ptr = p.Allocate(1024);
EXPECT_EQ(1024, p.total_allocated_bytes());
memset(ptr, 0, 1024);
p.ReturnPartialAllocation(1024);

uint8_t* ptr2 = p.Allocate(1024);
EXPECT_EQ(1024, p.total_allocated_bytes());
EXPECT_TRUE(ptr == ptr2);
p.ReturnPartialAllocation(1016);

ptr2 = p.Allocate(1016);
EXPECT_EQ(1024, p.total_allocated_bytes());
EXPECT_TRUE(ptr2 == ptr + 8);
p.ReturnPartialAllocation(512);
memset(ptr2, 1, 1016 - 512);

uint8_t* ptr3 = p.Allocate(512);
EXPECT_EQ(1024, p.total_allocated_bytes());
EXPECT_TRUE(ptr3 == ptr + 512);
memset(ptr3, 2, 512);

for (int i = 0; i < 8; ++i) {
EXPECT_EQ(0, ptr[i]);
}
for (int i = 8; i < 512; ++i) {
EXPECT_EQ(1, ptr[i]);
}
for (int i = 512; i < 1024; ++i) {
EXPECT_EQ(2, ptr[i]);
}

p.FreeAll();
}

// Test that the ChunkedAllocator overhead is bounded when we make allocations of
// INITIAL_CHUNK_SIZE.
TEST(ChunkedAllocatorTest, MemoryOverhead) {
ChunkedAllocator p;
const int alloc_size = ChunkedAllocatorTest::INITIAL_CHUNK_SIZE;
const int num_allocs = 1000;
int64_t total_allocated = 0;

for (int i = 0; i < num_allocs; ++i) {
uint8_t* mem = p.Allocate(alloc_size);
ASSERT_TRUE(mem != nullptr);
total_allocated += alloc_size;

int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated;
// The initial chunk fits evenly into MAX_CHUNK_SIZE, so should have at most
// one empty chunk at the end.
EXPECT_LE(wasted_memory, ChunkedAllocatorTest::MAX_CHUNK_SIZE);
// The chunk doubling algorithm should not allocate chunks larger than the total
// amount of memory already allocated.
EXPECT_LE(wasted_memory, total_allocated);
}

p.FreeAll();
}

// Test that the ChunkedAllocator overhead is bounded when we make alternating
// large and small allocations.
TEST(ChunkedAllocatorTest, FragmentationOverhead) {
ChunkedAllocator p;
const int num_allocs = 100;
int64_t total_allocated = 0;

for (int i = 0; i < num_allocs; ++i) {
int alloc_size = i % 2 == 0 ? 1 : ChunkedAllocatorTest::MAX_CHUNK_SIZE;
uint8_t* mem = p.Allocate(alloc_size);
ASSERT_TRUE(mem != nullptr);
total_allocated += alloc_size;

int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated;
// Fragmentation should not waste more than half of each completed chunk.
EXPECT_LE(wasted_memory, total_allocated + ChunkedAllocatorTest::MAX_CHUNK_SIZE);
}

p.FreeAll();
}

TEST(TestBufferedInputStream, Basics) {
int64_t source_size = 256;
int64_t stream_offset = 10;
Expand Down
Loading

0 comments on commit 98bdde8

Please sign in to comment.