Skip to content

Commit

Permalink
Test ReadSpaced
Browse files Browse the repository at this point in the history
  • Loading branch information
xhochy committed Jan 17, 2017
1 parent 9dc6dc0 commit 798bc83
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 14 deletions.
10 changes: 5 additions & 5 deletions src/parquet/arrow/arrow-reader-writer-test.cc
Expand Up @@ -205,9 +205,9 @@ class TestParquetIO : public ::testing::Test {

void ReaderFromSink(std::unique_ptr<FileReader>* out) {
std::shared_ptr<Buffer> buffer = sink_->GetBuffer();
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(), ::parquet::default_reader_properties(),
nullptr, out));
ASSERT_OK_NO_THROW(
OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(),
::parquet::default_reader_properties(), nullptr, out));
}

void ReadSingleColumnFile(
Expand Down Expand Up @@ -567,8 +567,8 @@ class TestPrimitiveParquetIO : public TestParquetIO<TestType> {
public:
typedef typename c_type_trait<TestType>::ArrowCType T;

void MakeTestFile(std::vector<T>& values, int num_chunks,
std::unique_ptr<FileReader>* reader) {
void MakeTestFile(
std::vector<T>& values, int num_chunks, std::unique_ptr<FileReader>* reader) {
std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
std::unique_ptr<ParquetFileWriter> file_writer = this->MakeWriter(schema);
size_t chunk_size = values.size() / num_chunks;
Expand Down
4 changes: 2 additions & 2 deletions src/parquet/arrow/reader.cc
Expand Up @@ -204,8 +204,8 @@ Status OpenFile(const std::shared_ptr<::arrow::io::ReadableFileInterface>& file,

Status OpenFile(const std::shared_ptr<::arrow::io::ReadableFileInterface>& file,
MemoryPool* allocator, std::unique_ptr<FileReader>* reader) {
return OpenFile(file, allocator, ::parquet::default_reader_properties(),
nullptr, reader);
return OpenFile(
file, allocator, ::parquet::default_reader_properties(), nullptr, reader);
}

Status FileReader::GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out) {
Expand Down
87 changes: 82 additions & 5 deletions src/parquet/column/column-reader-test.cc
Expand Up @@ -43,6 +43,23 @@ using schema::NodePtr;

namespace test {

template <typename T>
static inline bool vector_equal_with_def_levels(const vector<T>& left,
const vector<int16_t> def_levels, int16_t max_def_levels, const vector<T>& right) {
size_t i_left = 0;
for (size_t i = 0; i < right.size(); ++i) {
if (def_levels[i] != max_def_levels) { continue; }
if (left[i_left] != right[i]) {
std::cerr << "index " << i << " left was " << left[i_left] << " right was "
<< right[i] << std::endl;
return false;
}
i_left++;
}

return true;
}

class TestPrimitiveReader : public ::testing::Test {
public:
void InitReader(const ColumnDescriptor* d) {
Expand Down Expand Up @@ -84,17 +101,69 @@ class TestPrimitiveReader : public ::testing::Test {
ASSERT_EQ(0, values_read);
}

void CheckResultsSpaced() {
vector<int32_t> vresult(num_levels_, -1);
vector<int16_t> dresult(num_levels_, -1);
vector<int16_t> rresult(num_levels_, -1);
vector<uint8_t> valid_bits(num_levels_, 255);
int total_values_read = 0;
int batch_actual = 0;
int null_count = -1;

Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
int32_t batch_size = 8;
int batch = 0;
// This will cover both the cases
// 1) batch_size < page_size (multiple ReadBatch from a single page)
// 2) batch_size > page_size (BatchRead limits to a single page)
do {
batch = reader->ReadBatchSpaced(batch_size, dresult.data() + batch_actual,
rresult.data() + batch_actual, vresult.data() + batch_actual, &null_count,
valid_bits.data() + batch_actual, 0);
total_values_read += batch - null_count;
batch_actual += batch;
batch_size = std::max(batch_size * 2, 4096);
} while (batch > 0);

if (max_def_level_ > 0) {
ASSERT_TRUE(vector_equal(def_levels_, dresult));
ASSERT_TRUE(
vector_equal_with_def_levels(values_, dresult, max_def_level_, vresult));
} else {
ASSERT_TRUE(vector_equal(values_, vresult));
}
if (max_rep_level_ > 0) { ASSERT_TRUE(vector_equal(rep_levels_, rresult)); }
ASSERT_EQ(num_levels_, batch_actual);
ASSERT_EQ(num_values_, total_values_read);
// catch improper writes at EOS
batch_actual = reader->ReadBatchSpaced(
5, nullptr, nullptr, nullptr, &null_count, valid_bits.data(), 0);
ASSERT_EQ(0, batch_actual);
ASSERT_EQ(0, null_count);
}

void Clear() {
values_.clear();
def_levels_.clear();
rep_levels_.clear();
pages_.clear();
reader_.reset();
}

void ExecutePlain(int num_pages, int levels_per_page, const ColumnDescriptor* d) {
num_values_ = MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_,
rep_levels_, values_, data_buffer_, pages_, Encoding::PLAIN);
num_levels_ = num_pages * levels_per_page;
InitReader(d);
CheckResults();
values_.clear();
def_levels_.clear();
rep_levels_.clear();
pages_.clear();
reader_.reset();
Clear();

num_values_ = MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_,
rep_levels_, values_, data_buffer_, pages_, Encoding::PLAIN);
num_levels_ = num_pages * levels_per_page;
InitReader(d);
CheckResultsSpaced();
Clear();
}

void ExecuteDict(int num_pages, int levels_per_page, const ColumnDescriptor* d) {
Expand All @@ -103,6 +172,14 @@ class TestPrimitiveReader : public ::testing::Test {
num_levels_ = num_pages * levels_per_page;
InitReader(d);
CheckResults();
Clear();

num_values_ = MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_,
rep_levels_, values_, data_buffer_, pages_, Encoding::RLE_DICTIONARY);
num_levels_ = num_pages * levels_per_page;
InitReader(d);
CheckResultsSpaced();
Clear();
}

protected:
Expand Down
5 changes: 4 additions & 1 deletion src/parquet/column/reader.h
Expand Up @@ -272,7 +272,10 @@ inline int64_t TypedColumnReader<DType>::ReadBatchSpaced(int batch_size,
int16_t* def_levels, int16_t* rep_levels, T* values, int* null_count_out,
uint8_t* valid_bits, int64_t valid_bits_offset) {
// HasNext invokes ReadNewPage
if (!HasNext()) { return 0; }
if (!HasNext()) {
*null_count_out = 0;
return 0;
}

int64_t total_values;
// TODO(wesm): keep reading data pages until batch_size is reached, or the
Expand Down
3 changes: 2 additions & 1 deletion src/parquet/encodings/encoding-test.cc
Expand Up @@ -274,7 +274,8 @@ class TestDictionaryEncoding : public TestEncodingBase<Type> {
// Also test spaced decoding
decoder.SetData(num_values_, indices->data(), indices->size());
std::vector<uint8_t> valid_bits(BitUtil::RoundUpNumBytes(num_values_), 255);
values_decoded = decoder.DecodeSpaced(decode_buf_, num_values_, 0, valid_bits.data(), 0);
values_decoded =
decoder.DecodeSpaced(decode_buf_, num_values_, 0, valid_bits.data(), 0);
ASSERT_EQ(num_values_, values_decoded);
VerifyResults<T>(decode_buf_, draws_, num_values_);
}
Expand Down

0 comments on commit 798bc83

Please sign in to comment.