Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 64 additions & 34 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@
#include <arrow/compute/api.h>
#include <cstdint>
#include <functional>
#include <iostream>
#include <sstream>
#include <vector>

#include "arrow/api.h"
#include "arrow/compute/kernels/cast.h"
#include "arrow/pretty_print.h"
#include "arrow/record_batch.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
Expand Down Expand Up @@ -355,25 +358,6 @@ void WriteTableToBuffer(const std::shared_ptr<Table>& table, int64_t row_group_s
ASSERT_OK_AND_ASSIGN(*out, sink->Finish());
}

void AssertChunkedEqual(const ChunkedArray& expected, const ChunkedArray& actual) {
ASSERT_EQ(expected.num_chunks(), actual.num_chunks()) << "# chunks unequal";
if (!actual.Equals(expected)) {
std::stringstream pp_result;
std::stringstream pp_expected;

for (int i = 0; i < actual.num_chunks(); ++i) {
auto c1 = actual.chunk(i);
auto c2 = expected.chunk(i);
if (!c1->Equals(*c2)) {
ARROW_EXPECT_OK(::arrow::PrettyPrint(*c1, 0, &pp_result));
ARROW_EXPECT_OK(::arrow::PrettyPrint(*c2, 0, &pp_expected));
FAIL() << "Chunk " << i << " Got: " << pp_result.str()
<< "\nExpected: " << pp_expected.str();
}
}
}
}

void DoRoundtrip(const std::shared_ptr<Table>& table, int64_t row_group_size,
std::shared_ptr<Table>* out,
const std::shared_ptr<::parquet::WriterProperties>& writer_properties =
Expand Down Expand Up @@ -2260,14 +2244,17 @@ TEST(TestArrowReadWrite, DISABLED_CanonicalNestedRoundTrip) {
::arrow::ArrayFromJSON(links->type(),
"[{\"Backward\":[], \"Forward\":[20, 40, 60]}, "
"{\"Backward\":[10, 30], \"Forward\":[80]}]");
auto name_array =
::arrow::ArrayFromJSON(name->type(),
R"([[{"Language": [{"Code": "en_us", "Country":"us"},
{"Code": "en_us", "Country": null}],
"Url": "http://A"},
{"Url": "http://B"},
{"Language": [{"Code": "en-gb", "Country": "gb"}]}],
[{"Url": "http://C"}]])");

// Written without C++11 string literal because many editors don't have C++11
// string literals implemented properly
auto name_array = ::arrow::ArrayFromJSON(
name->type(),
"([[{\"Language\": [{\"Code\": \"en_us\", \"Country\":\"us\"},"
"{\"Code\": \"en_us\", \"Country\": null}],"
"\"Url\": \"http://A\"},"
"{\"Url\": \"http://B\"},"
"{\"Language\": [{\"Code\": \"en-gb\", \"Country\": \"gb\"}]}],"
"[{\"Url\": \"http://C\"}]]");
auto expected =
::arrow::Table::Make(schema, {doc_id_array, links_id_array, name_array});
CheckSimpleRoundtrip(expected, 2);
Expand Down Expand Up @@ -2899,14 +2886,9 @@ class TestArrowReadDictionary : public ::testing::TestWithParam<double> {
} options;

void SetUp() override {
GenerateData(GetParam());

// Write `num_row_groups` row groups; each row group will have a different dictionary
ASSERT_NO_FATAL_FAILURE(
WriteTableToBuffer(expected_dense_, options.num_rows / options.num_row_groups,
default_arrow_writer_properties(), &buffer_));

properties_ = default_arrow_reader_properties();

GenerateData(GetParam());
}

void GenerateData(double null_probability) {
Expand All @@ -2920,6 +2902,13 @@ class TestArrowReadDictionary : public ::testing::TestWithParam<double> {

void TearDown() override {}

void WriteSimple() {
// Write `num_row_groups` row groups; each row group will have a different dictionary
ASSERT_NO_FATAL_FAILURE(
WriteTableToBuffer(expected_dense_, options.num_rows / options.num_row_groups,
default_arrow_writer_properties(), &buffer_));
}

void CheckReadWholeFile(const Table& expected) {
ASSERT_OK_AND_ASSIGN(auto reader, GetReader());

Expand Down Expand Up @@ -2970,6 +2959,8 @@ void AsDictionary32Encoded(const Array& arr, std::shared_ptr<Array>* out) {
TEST_P(TestArrowReadDictionary, ReadWholeFileDict) {
properties_.set_read_dictionary(0, true);

WriteSimple();

auto num_row_groups = options.num_row_groups;
auto chunk_size = options.num_rows / num_row_groups;

Expand All @@ -2982,6 +2973,43 @@ TEST_P(TestArrowReadDictionary, ReadWholeFileDict) {
CheckReadWholeFile(*ex_table);
}

TEST_P(TestArrowReadDictionary, IncrementalReads) {
// ARROW-6895
options.num_rows = 100;
options.num_uniques = 10;
SetUp();

properties_.set_read_dictionary(0, true);

// Just write a single row group
ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(
expected_dense_, options.num_rows, default_arrow_writer_properties(), &buffer_));

// Read in one shot
ASSERT_OK_AND_ASSIGN(std::unique_ptr<FileReader> reader, GetReader());
std::shared_ptr<Table> expected;
ASSERT_OK_NO_THROW(reader->ReadTable(&expected));

ASSERT_OK_AND_ASSIGN(reader, GetReader());
std::unique_ptr<ColumnReader> col;
ASSERT_OK(reader->GetColumn(0, &col));

int num_reads = 4;
int batch_size = options.num_rows / num_reads;

::arrow::compute::FunctionContext fc;
for (int i = 0; i < num_reads; ++i) {
std::shared_ptr<ChunkedArray> chunk;
ASSERT_OK(col->NextBatch(batch_size, &chunk));

std::shared_ptr<Array> result_dense;
ASSERT_OK(::arrow::compute::Cast(&fc, *chunk->chunk(0), ::arrow::utf8(),
::arrow::compute::CastOptions::Safe(),
&result_dense));
AssertArraysEqual(*dense_values_->Slice(i * batch_size, batch_size), *result_dense);
}
}

TEST_P(TestArrowReadDictionary, StreamReadWholeFileDict) {
// ARROW-6895 and ARROW-7545 reading a parquet file with a dictionary of
// binary data, e.g. String, will return invalid values when using the
Expand All @@ -2992,6 +3020,7 @@ TEST_P(TestArrowReadDictionary, StreamReadWholeFileDict) {
options.num_row_groups = 1;
options.num_rows = 16;
SetUp();
WriteSimple();

// Would trigger an infinite loop when requesting a batch greater than the
// number of available rows in a row group.
Expand All @@ -3001,6 +3030,7 @@ TEST_P(TestArrowReadDictionary, StreamReadWholeFileDict) {

TEST_P(TestArrowReadDictionary, ReadWholeFileDense) {
properties_.set_read_dictionary(0, false);
WriteSimple();
CheckReadWholeFile(*expected_dense_);
}

Expand Down
3 changes: 2 additions & 1 deletion cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1462,7 +1462,7 @@ class ByteArrayDictionaryRecordReader : public TypedRecordReader<ByteArrayType>,
result_chunks_.emplace_back(std::move(chunk));

// Also clears the dictionary memo table
builder_.ResetFull();
builder_.Reset();
}
}

Expand All @@ -1471,6 +1471,7 @@ class ByteArrayDictionaryRecordReader : public TypedRecordReader<ByteArrayType>,
/// If there is a new dictionary, we may need to flush the builder, then
/// insert the new dictionary values
FlushBuilder();
builder_.ResetFull();
auto decoder = dynamic_cast<BinaryDictDecoder*>(this->current_decoder_);
decoder->InsertDictionary(&builder_);
this->new_dictionary_ = false;
Expand Down