From 2d87e4b0a0400451599c264716b52c4a4fcfa54b Mon Sep 17 00:00:00 2001 From: Adam Hooper Date: Wed, 19 Feb 2020 17:18:50 -0500 Subject: [PATCH 1/3] ARROW-6895: [C++] handle dictionary deltas Previously, ByteArrayDictionaryRecordReader would clear the dictionary after returning a chunk. Now, it persists the dictionary in case a future chunk needs it. --- cpp/src/parquet/column_reader.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 11f47f1bf73..54b368867b7 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1462,7 +1462,7 @@ class ByteArrayDictionaryRecordReader : public TypedRecordReader, result_chunks_.emplace_back(std::move(chunk)); // Also clears the dictionary memo table - builder_.ResetFull(); + builder_.Reset(); } } @@ -1471,6 +1471,7 @@ class ByteArrayDictionaryRecordReader : public TypedRecordReader, /// If there is a new dictionary, we may need to flush the builder, then /// insert the new dictionary values FlushBuilder(); + builder_.ResetFull(); auto decoder = dynamic_cast(this->current_decoder_); decoder->InsertDictionary(&builder_); this->new_dictionary_ = false; From 904ab0945f0d7e5439d5b8d5084d2efcbb3473ca Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Wed, 25 Mar 2020 20:21:19 -0500 Subject: [PATCH 2/3] Failing unit test --- .../parquet/arrow/arrow_reader_writer_test.cc | 68 +++++++++++++++---- 1 file changed, 54 insertions(+), 14 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 68db6446931..8aae258e3ba 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -2260,14 +2260,18 @@ TEST(TestArrowReadWrite, DISABLED_CanonicalNestedRoundTrip) { ::arrow::ArrayFromJSON(links->type(), "[{\"Backward\":[], \"Forward\":[20, 40, 60]}, " "{\"Backward\":[10, 30], \"Forward\":[80]}]"); + + // Written without C++11 string literal because many editors don't have C++11 + // string literals implemented properly auto name_array = - ::arrow::ArrayFromJSON(name->type(), - R"([[{"Language": [{"Code": "en_us", "Country":"us"}, - {"Code": "en_us", "Country": null}], - "Url": "http://A"}, - {"Url": "http://B"}, - {"Language": [{"Code": "en-gb", "Country": "gb"}]}], - [{"Url": "http://C"}]])"); + ::arrow::ArrayFromJSON( + name->type(), + "([[{\"Language\": [{\"Code\": \"en_us\", \"Country\":\"us\"}," + "{\"Code\": \"en_us\", \"Country\": null}]," + "\"Url\": \"http://A\"}," + "{\"Url\": \"http://B\"}," + "{\"Language\": [{\"Code\": \"en-gb\", \"Country\": \"gb\"}]}]," + "[{\"Url\": \"http://C\"}]]"); auto expected = ::arrow::Table::Make(schema, {doc_id_array, links_id_array, name_array}); CheckSimpleRoundtrip(expected, 2); @@ -2899,14 +2903,9 @@ class TestArrowReadDictionary : public ::testing::TestWithParam { } options; void SetUp() override { - GenerateData(GetParam()); - - // Write `num_row_groups` row groups; each row group will have a different dictionary - ASSERT_NO_FATAL_FAILURE( - WriteTableToBuffer(expected_dense_, options.num_rows / options.num_row_groups, - default_arrow_writer_properties(), &buffer_)); - properties_ = default_arrow_reader_properties(); + + GenerateData(GetParam()); } void GenerateData(double null_probability) { @@ -2920,6 +2919,13 @@ class TestArrowReadDictionary : public ::testing::TestWithParam { void TearDown() override {} + void WriteSimple() { + // Write `num_row_groups` row groups; each row group will have a different dictionary + ASSERT_NO_FATAL_FAILURE( + WriteTableToBuffer(expected_dense_, options.num_rows / options.num_row_groups, + default_arrow_writer_properties(), &buffer_)); + } + void CheckReadWholeFile(const Table& expected) { ASSERT_OK_AND_ASSIGN(auto reader, GetReader()); @@ -2970,6 +2976,8 @@ void AsDictionary32Encoded(const Array& arr, std::shared_ptr* out) { TEST_P(TestArrowReadDictionary, ReadWholeFileDict) { properties_.set_read_dictionary(0, true); + WriteSimple(); + auto num_row_groups = options.num_row_groups; auto chunk_size = options.num_rows / num_row_groups; @@ -2982,6 +2990,36 @@ TEST_P(TestArrowReadDictionary, ReadWholeFileDict) { CheckReadWholeFile(*ex_table); } +TEST_P(TestArrowReadDictionary, IncrementalReads) { + // ARROW-6895 + + properties_.set_read_dictionary(0, true); + + // Just write a single row group + ASSERT_NO_FATAL_FAILURE( + WriteTableToBuffer(expected_dense_, options.num_rows, + default_arrow_writer_properties(), &buffer_)); + + // Read in one shot + ASSERT_OK_AND_ASSIGN(std::unique_ptr reader, GetReader()); + std::shared_ptr expected; + ASSERT_OK_NO_THROW(reader->ReadTable(&expected)); + + ASSERT_OK_AND_ASSIGN(reader, GetReader()); + std::unique_ptr col; + ASSERT_OK(reader->GetColumn(0, &col)); + + int num_reads = 4; + int batch_size = options.num_rows / num_reads; + for (int i = 0; i < num_reads; ++i) { + std::shared_ptr chunk; + ASSERT_OK(col->NextBatch(batch_size, &chunk)); + AssertChunkedEqual(*expected->column(0)->Slice(num_reads * batch_size, + batch_size), + *chunk); + } +} + TEST_P(TestArrowReadDictionary, StreamReadWholeFileDict) { // ARROW-6895 and ARROW-7545 reading a parquet file with a dictionary of // binary data, e.g. String, will return invalid values when using the @@ -2992,6 +3030,7 @@ TEST_P(TestArrowReadDictionary, StreamReadWholeFileDict) { options.num_row_groups = 1; options.num_rows = 16; SetUp(); + WriteSimple(); // Would trigger an infinite loop when requesting a batch greater than the // number of available rows in a row group. @@ -3001,6 +3040,7 @@ TEST_P(TestArrowReadDictionary, StreamReadWholeFileDict) { TEST_P(TestArrowReadDictionary, ReadWholeFileDense) { properties_.set_read_dictionary(0, false); + WriteSimple(); CheckReadWholeFile(*expected_dense_); } From 869b9a37cba186ac13f650ab23b1060f4af7d592 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Wed, 25 Mar 2020 20:49:11 -0500 Subject: [PATCH 3/3] Finish unit test, removed some redundant code --- .../parquet/arrow/arrow_reader_writer_test.cc | 58 ++++++++----------- 1 file changed, 24 insertions(+), 34 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 8aae258e3ba..65a449077a8 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -26,10 +26,13 @@ #include #include #include +#include #include #include #include "arrow/api.h" +#include "arrow/compute/kernels/cast.h" +#include "arrow/pretty_print.h" #include "arrow/record_batch.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/random.h" @@ -355,25 +358,6 @@ void WriteTableToBuffer(const std::shared_ptr
& table, int64_t row_group_s ASSERT_OK_AND_ASSIGN(*out, sink->Finish()); } -void AssertChunkedEqual(const ChunkedArray& expected, const ChunkedArray& actual) { - ASSERT_EQ(expected.num_chunks(), actual.num_chunks()) << "# chunks unequal"; - if (!actual.Equals(expected)) { - std::stringstream pp_result; - std::stringstream pp_expected; - - for (int i = 0; i < actual.num_chunks(); ++i) { - auto c1 = actual.chunk(i); - auto c2 = expected.chunk(i); - if (!c1->Equals(*c2)) { - ARROW_EXPECT_OK(::arrow::PrettyPrint(*c1, 0, &pp_result)); - ARROW_EXPECT_OK(::arrow::PrettyPrint(*c2, 0, &pp_expected)); - FAIL() << "Chunk " << i << " Got: " << pp_result.str() - << "\nExpected: " << pp_expected.str(); - } - } - } -} - void DoRoundtrip(const std::shared_ptr
& table, int64_t row_group_size, std::shared_ptr
* out, const std::shared_ptr<::parquet::WriterProperties>& writer_properties = @@ -2263,15 +2247,14 @@ TEST(TestArrowReadWrite, DISABLED_CanonicalNestedRoundTrip) { // Written without C++11 string literal because many editors don't have C++11 // string literals implemented properly - auto name_array = - ::arrow::ArrayFromJSON( - name->type(), - "([[{\"Language\": [{\"Code\": \"en_us\", \"Country\":\"us\"}," - "{\"Code\": \"en_us\", \"Country\": null}]," - "\"Url\": \"http://A\"}," - "{\"Url\": \"http://B\"}," - "{\"Language\": [{\"Code\": \"en-gb\", \"Country\": \"gb\"}]}]," - "[{\"Url\": \"http://C\"}]]"); + auto name_array = ::arrow::ArrayFromJSON( + name->type(), + "([[{\"Language\": [{\"Code\": \"en_us\", \"Country\":\"us\"}," + "{\"Code\": \"en_us\", \"Country\": null}]," + "\"Url\": \"http://A\"}," + "{\"Url\": \"http://B\"}," + "{\"Language\": [{\"Code\": \"en-gb\", \"Country\": \"gb\"}]}]," + "[{\"Url\": \"http://C\"}]]"); auto expected = ::arrow::Table::Make(schema, {doc_id_array, links_id_array, name_array}); CheckSimpleRoundtrip(expected, 2); @@ -2992,13 +2975,15 @@ TEST_P(TestArrowReadDictionary, ReadWholeFileDict) { TEST_P(TestArrowReadDictionary, IncrementalReads) { // ARROW-6895 + options.num_rows = 100; + options.num_uniques = 10; + SetUp(); properties_.set_read_dictionary(0, true); // Just write a single row group - ASSERT_NO_FATAL_FAILURE( - WriteTableToBuffer(expected_dense_, options.num_rows, - default_arrow_writer_properties(), &buffer_)); + ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer( + expected_dense_, options.num_rows, default_arrow_writer_properties(), &buffer_)); // Read in one shot ASSERT_OK_AND_ASSIGN(std::unique_ptr reader, GetReader()); @@ -3011,12 +2996,17 @@ TEST_P(TestArrowReadDictionary, IncrementalReads) { int num_reads = 4; int batch_size = options.num_rows / num_reads; + + ::arrow::compute::FunctionContext fc; for (int i = 0; i < num_reads; ++i) { std::shared_ptr chunk; ASSERT_OK(col->NextBatch(batch_size, &chunk)); - AssertChunkedEqual(*expected->column(0)->Slice(num_reads * batch_size, - batch_size), - *chunk); + + std::shared_ptr result_dense; + ASSERT_OK(::arrow::compute::Cast(&fc, *chunk->chunk(0), ::arrow::utf8(), + ::arrow::compute::CastOptions::Safe(), + &result_dense)); + AssertArraysEqual(*dense_values_->Slice(i * batch_size, batch_size), *result_dense); } }