Skip to content
This repository was archived by the owner on May 10, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion example/parquet-dump-schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ int main(int argc, char** argv) {
std::string filename = argv[1];

parquet_cpp::ParquetFileReader reader;
parquet_cpp::LocalFile file;
parquet_cpp::LocalFileSource file;

file.Open(filename);
if (!file.is_open()) {
Expand Down
2 changes: 1 addition & 1 deletion example/parquet_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ int main(int argc, char** argv) {
}

parquet_cpp::ParquetFileReader reader;
parquet_cpp::LocalFile file;
parquet_cpp::LocalFileSource file;

file.Open(filename);
if (!file.is_open()) {
Expand Down
55 changes: 14 additions & 41 deletions src/parquet/column/column-reader-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "parquet/column/reader.h"
#include "parquet/column/test-util.h"

#include "parquet/util/output.h"
#include "parquet/util/test-common.h"

using std::string;
Expand Down Expand Up @@ -60,31 +61,15 @@ class TestPrimitiveReader : public ::testing::Test {
vector<shared_ptr<Page> > pages_;
};

template <typename T>
static vector<T> slice(const vector<T>& values, size_t start, size_t end) {
if (end < start) {
return vector<T>(0);
}

vector<T> out(end - start);
for (size_t i = start; i < end; ++i) {
out[i - start] = values[i];
}
return out;
}


TEST_F(TestPrimitiveReader, TestInt32FlatRequired) {
vector<int32_t> values = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
size_t num_values = values.size();
parquet::Encoding::type value_encoding = parquet::Encoding::PLAIN;

vector<uint8_t> page1;
test::DataPageBuilder<Type::INT32> page_builder(&page1);
page_builder.AppendValues(values, parquet::Encoding::PLAIN);
pages_.push_back(page_builder.Finish());
std::vector<uint8_t> buffer;
std::shared_ptr<DataPage> page = MakeDataPage<Type::INT32>(values, {}, 0,
{}, 0, &buffer);
pages_.push_back(page);

// TODO: simplify this
NodePtr type = schema::Int32("a", Repetition::REQUIRED);
ColumnDescriptor descr(type, 0, 0);
InitReader(&descr);
Expand All @@ -102,21 +87,16 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRequired) {
ASSERT_TRUE(vector_equal(result, values));
}


TEST_F(TestPrimitiveReader, TestInt32FlatOptional) {
vector<int32_t> values = {1, 2, 3, 4, 5};
vector<int16_t> def_levels = {1, 0, 0, 1, 1, 0, 0, 0, 1, 1};

size_t num_values = values.size();
parquet::Encoding::type value_encoding = parquet::Encoding::PLAIN;

vector<uint8_t> page1;
test::DataPageBuilder<Type::INT32> page_builder(&page1);

// Definition levels precede the values
page_builder.AppendDefLevels(def_levels, 1, parquet::Encoding::RLE);
page_builder.AppendValues(values, parquet::Encoding::PLAIN);
std::vector<uint8_t> buffer;
std::shared_ptr<DataPage> page = MakeDataPage<Type::INT32>(values, def_levels, 1,
{}, 0, &buffer);

pages_.push_back(page_builder.Finish());
pages_.push_back(page);

NodePtr type = schema::Int32("a", Repetition::OPTIONAL);
ColumnDescriptor descr(type, 1, 0);
Expand Down Expand Up @@ -159,18 +139,11 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) {
vector<int16_t> def_levels = {2, 1, 1, 2, 2, 1, 1, 2, 2, 1};
vector<int16_t> rep_levels = {0, 1, 1, 0, 0, 1, 1, 0, 0, 1};

size_t num_values = values.size();
parquet::Encoding::type value_encoding = parquet::Encoding::PLAIN;

vector<uint8_t> page1;
test::DataPageBuilder<Type::INT32> page_builder(&page1);

// Definition levels precede the values
page_builder.AppendRepLevels(rep_levels, 1, parquet::Encoding::RLE);
page_builder.AppendDefLevels(def_levels, 2, parquet::Encoding::RLE);
page_builder.AppendValues(values, parquet::Encoding::PLAIN);
std::vector<uint8_t> buffer;
std::shared_ptr<DataPage> page = MakeDataPage<Type::INT32>(values,
def_levels, 2, rep_levels, 1, &buffer);

pages_.push_back(page_builder.Finish());
pages_.push_back(page);

NodePtr type = schema::Int32("a", Repetition::REPEATED);
ColumnDescriptor descr(type, 2, 1);
Expand Down
3 changes: 1 addition & 2 deletions src/parquet/column/serialized-page.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

#include "parquet/exception.h"
#include "parquet/thrift/util.h"
#include "parquet/util/input_stream.h"

using parquet::PageType;

Expand Down Expand Up @@ -52,7 +51,7 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
// Loop here because there may be unhandled page types that we skip until
// finding a page that we do know what to do with
while (true) {
int bytes_read = 0;
int64_t bytes_read = 0;
const uint8_t* buffer = stream_->Peek(DATA_PAGE_SIZE, &bytes_read);
if (bytes_read == 0) {
return std::shared_ptr<Page>(nullptr);
Expand Down
2 changes: 1 addition & 1 deletion src/parquet/column/serialized-page.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

#include "parquet/column/page.h"
#include "parquet/compression/codec.h"
#include "parquet/util/input_stream.h"
#include "parquet/util/input.h"
#include "parquet/thrift/parquet_types.h"

namespace parquet_cpp {
Expand Down
100 changes: 52 additions & 48 deletions src/parquet/column/test-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,34 +52,30 @@ class MockPageReader : public PageReader {
size_t page_index_;
};

// TODO(wesm): this is only used for testing for now

static constexpr int DEFAULT_DATA_PAGE_SIZE = 64 * 1024;
static constexpr int INIT_BUFFER_SIZE = 1024;
// TODO(wesm): this is only used for testing for now. Refactor to form part of
// primary file write path

template <int TYPE>
class DataPageBuilder {
public:
typedef typename type_traits<TYPE>::value_type T;

// The passed vector is the owner of the page's data
explicit DataPageBuilder(std::vector<uint8_t>* out) :
out_(out),
buffer_size_(0),
// This class writes data and metadata to the passed inputs
explicit DataPageBuilder(InMemoryOutputStream* sink, parquet::DataPageHeader* header) :
sink_(sink),
header_(header),
num_values_(0),
have_def_levels_(false),
have_rep_levels_(false),
have_values_(false) {
out_->resize(INIT_BUFFER_SIZE);
buffer_capacity_ = INIT_BUFFER_SIZE;
}

void AppendDefLevels(const std::vector<int16_t>& levels,
int16_t max_level, parquet::Encoding::type encoding) {
AppendLevels(levels, max_level, encoding);

num_values_ = std::max(levels.size(), num_values_);
header_.__set_definition_level_encoding(encoding);
header_->__set_definition_level_encoding(encoding);
have_def_levels_ = true;
}

Expand All @@ -88,7 +84,7 @@ class DataPageBuilder {
AppendLevels(levels, max_level, encoding);

num_values_ = std::max(levels.size(), num_values_);
header_.__set_repetition_level_encoding(encoding);
header_->__set_repetition_level_encoding(encoding);
have_rep_levels_ = true;
}

Expand All @@ -98,53 +94,31 @@ class DataPageBuilder {
ParquetException::NYI("only plain encoding currently implemented");
}
size_t bytes_to_encode = values.size() * sizeof(T);
Reserve(bytes_to_encode);

PlainEncoder<TYPE> encoder(nullptr);
size_t nbytes = encoder.Encode(&values[0], values.size(), Head());
// In case for some reason it's fewer than bytes_to_encode
buffer_size_ += nbytes;
encoder.Encode(&values[0], values.size(), sink_);

num_values_ = std::max(values.size(), num_values_);
header_.__set_encoding(encoding);
header_->__set_encoding(encoding);
have_values_ = true;
}

std::shared_ptr<Page> Finish() {
void Finish() {
if (!have_values_) {
throw ParquetException("A data page must at least contain values");
}
header_.__set_num_values(num_values_);
return std::make_shared<DataPage>(&(*out_)[0], buffer_size_, header_);
header_->__set_num_values(num_values_);
}

private:
std::vector<uint8_t>* out_;

size_t buffer_size_;
size_t buffer_capacity_;

parquet::DataPageHeader header_;
InMemoryOutputStream* sink_;
parquet::DataPageHeader* header_;

size_t num_values_;

bool have_def_levels_;
bool have_rep_levels_;
bool have_values_;

void Reserve(size_t nbytes) {
while ((nbytes + buffer_size_) > buffer_capacity_) {
// TODO(wesm): limit to one reserve when this loop runs more than once
size_t new_capacity = 2 * buffer_capacity_;
out_->resize(new_capacity);
buffer_capacity_ = new_capacity;
}
}

uint8_t* Head() {
return &(*out_)[buffer_size_];
}

// Used internally for both repetition and definition levels
void AppendLevels(const std::vector<int16_t>& levels, int16_t max_level,
parquet::Encoding::type encoding) {
Expand All @@ -153,25 +127,55 @@ class DataPageBuilder {
}

// TODO: compute a more precise maximum size for the encoded levels
std::vector<uint8_t> encode_buffer(DEFAULT_DATA_PAGE_SIZE);

std::vector<uint8_t> encode_buffer(levels.size() * 4);

// We encode into separate memory from the output stream because the
// RLE-encoded bytes have to be preceded in the stream by their absolute
// size.
LevelEncoder encoder;
encoder.Init(encoding, max_level, levels.size(),
encode_buffer.data(), encode_buffer.size());

encoder.Encode(levels.size(), levels.data());

uint32_t rle_bytes = encoder.len();
size_t levels_footprint = sizeof(uint32_t) + rle_bytes;
Reserve(levels_footprint);

*reinterpret_cast<uint32_t*>(Head()) = rle_bytes;
memcpy(Head() + sizeof(uint32_t), encode_buffer.data(), rle_bytes);
buffer_size_ += levels_footprint;
sink_->Write(reinterpret_cast<const uint8_t*>(&rle_bytes), sizeof(uint32_t));
sink_->Write(encode_buffer.data(), rle_bytes);
}
};

template <int TYPE, typename T>
static std::shared_ptr<DataPage> MakeDataPage(const std::vector<T>& values,
const std::vector<int16_t>& def_levels, int16_t max_def_level,
const std::vector<int16_t>& rep_levels, int16_t max_rep_level,
std::vector<uint8_t>* out_buffer) {
size_t num_values = values.size();

InMemoryOutputStream page_stream;
parquet::DataPageHeader page_header;

test::DataPageBuilder<TYPE> page_builder(&page_stream, &page_header);

if (!rep_levels.empty()) {
page_builder.AppendRepLevels(rep_levels, max_rep_level,
parquet::Encoding::RLE);
}

if (!def_levels.empty()) {
page_builder.AppendDefLevels(def_levels, max_def_level,
parquet::Encoding::RLE);
}

page_builder.AppendValues(values, parquet::Encoding::PLAIN);
page_builder.Finish();

// Hand off the data stream to the passed std::vector
page_stream.Transfer(out_buffer);

return std::make_shared<DataPage>(&(*out_buffer)[0], out_buffer->size(), page_header);
}


} // namespace test

} // namespace parquet_cpp
Expand Down
8 changes: 2 additions & 6 deletions src/parquet/encodings/encodings.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "parquet/exception.h"
#include "parquet/types.h"

#include "parquet/util/output.h"
#include "parquet/util/rle-encoding.h"
#include "parquet/util/bit-stream-utils.inline.h"

Expand Down Expand Up @@ -82,14 +83,9 @@ class Encoder {

virtual ~Encoder() {}

// TODO(wesm): use an output stream

// Subclasses should override the ones they support
//
// @returns: the number of bytes written to dst
virtual size_t Encode(const T* src, int num_values, uint8_t* dst) {
virtual void Encode(const T* src, int num_values, OutputStream* dst) {
throw ParquetException("Encoder does not implement this type.");
return 0;
}

const parquet::Encoding::type encoding() const { return encoding_; }
Expand Down
11 changes: 7 additions & 4 deletions src/parquet/encodings/plain-encoding-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,18 @@ TEST(BooleanTest, TestEncodeDecode) {
PlainEncoder<Type::BOOLEAN> encoder(nullptr);
PlainDecoder<Type::BOOLEAN> decoder(nullptr);

std::vector<uint8_t> encode_buffer(nbytes);
InMemoryOutputStream dst;
encoder.Encode(draws, nvalues, &dst);

size_t encoded_bytes = encoder.Encode(draws, nvalues, &encode_buffer[0]);
ASSERT_EQ(nbytes, encoded_bytes);
std::vector<uint8_t> encode_buffer;
dst.Transfer(&encode_buffer);

ASSERT_EQ(nbytes, encode_buffer.size());

std::vector<uint8_t> decode_buffer(nbytes);
const uint8_t* decode_data = &decode_buffer[0];

decoder.SetData(nvalues, &encode_buffer[0], encoded_bytes);
decoder.SetData(nvalues, &encode_buffer[0], encode_buffer.size());
size_t values_decoded = decoder.Decode(&decode_buffer[0], nvalues);
ASSERT_EQ(nvalues, values_decoded);

Expand Down
Loading