Skip to content

Commit

Permalink
ARROW-7080: [C++][Parquet] Read and write "field_id" attribute in Par…
Browse files Browse the repository at this point in the history
…quet files, propagate to Arrow field metadata. Assorted additional changes

The `field_id` is used for schema evolution and other things. It is surfaced in Python in the `Field.metadata` as `b'PARQUET:field_id'`

* `ChunkedArray::Equals` would fail if a child field had unequal metadata, now it does not check the metadata
* Improved diffing output in AssertTablesEqual in testing/gtest_util.h (may need some more tests around this)
* Added a generic binary ChunkedArray iterator (see `internal::MultipleChunkIterator`) and helpful applicator `internal::ApplyToChunkOverlaps`. I retrofitted `ChunkedArray::Equals` to use this (needed it to improve the diffing output in AssertTablesEqual)
* Add `KeyValueMetadata::Merge` method
* Add `Field::WithMergedMetadata` method that calls `KeyValueMetadata::Merge`
* Print metadata in `Field::ToString`
* Add `parquet.ParquetFile.schema_arrow` property to return the effective Arrow schema
* Print field_ids in `parquet::SchemaPrinter`

This also adds a flag `print_metadata` to `Field::ToString` and `Schema::ToString` with default `false` whether to print out the key value metadata, per ARROW-7063. I figure it's OK to merge this change and then decide whether we want to keep it like that before releasing the software

Closes #6408 from wesm/ARROW-7080 and squashes the following commits:

e0c7396 <Yosuke Shiro>  Fix test cases
239932c <Wes McKinney> Remove field metadata outputs from GLib unit test
03f2f18 <Wes McKinney> Add print_metadata option to Field::ToString / Schema::ToString and use expect_equivalent in R unit tests
169f274 <Yosuke Shiro> Use check_metadata instead of metadata
7b1f5a9 <Yosuke Shiro> Use true as the default argument
222af57 <Yosuke Shiro> Fix document of garrow_table_equal()
14fde57 <Yosuke Shiro> Add metadata parameter instead of using true
45f0c79 <Yosuke Shiro>  Fix schema equality check
0ce996e <Wes McKinney> export internal::MultipleChunkIterator
2c3f3ac <Wes McKinney> Correct inconsistent comments about null field_id's
6e3bdfd <Wes McKinney> Fix dataset Parquet unit tests
fd099f9 <Wes McKinney> Code review comments
f220767 <Wes McKinney> Start working on properly preserving and deserializing field_id in C++. Some field_id round trips working

Lead-authored-by: Wes McKinney <wesm+git@apache.org>
Co-authored-by: Yosuke Shiro <yosuke.shiro615@gmail.com>
Signed-off-by: Wes McKinney <wesm+git@apache.org>
  • Loading branch information
wesm and shiro615 committed Feb 21, 2020
1 parent 7fb03c0 commit f609298
Show file tree
Hide file tree
Showing 32 changed files with 774 additions and 463 deletions.
21 changes: 21 additions & 0 deletions c_glib/arrow-glib/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,27 @@ garrow_table_equal(GArrowTable *table, GArrowTable *other_table)
return arrow_table->Equals(*arrow_other_table);
}

/**
* garrow_table_equal_metadata:
* @table: A #GArrowTable.
* @other_table: A #GArrowTable to be compared.
* @check_metadata: Whether to compare metadata.
*
* Returns: %TRUE if both of them have the same data, %FALSE
* otherwise.
*
* Since: 1.0.0
*/
gboolean
garrow_table_equal_metadata(GArrowTable *table,
GArrowTable *other_table,
gboolean check_metadata)
{
const auto arrow_table = garrow_table_get_raw(table);
const auto arrow_other_table = garrow_table_get_raw(other_table);
return arrow_table->Equals(*arrow_other_table, check_metadata);
}

/**
* garrow_table_get_schema:
* @table: A #GArrowTable.
Expand Down
5 changes: 5 additions & 0 deletions c_glib/arrow-glib/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ garrow_table_new_record_batches(GArrowSchema *schema,

gboolean garrow_table_equal (GArrowTable *table,
GArrowTable *other_table);
GARROW_AVAILABLE_IN_1_0
gboolean
garrow_table_equal_metadata(GArrowTable *table,
GArrowTable *other_table,
gboolean check_metadata);

GArrowSchema *garrow_table_get_schema (GArrowTable *table);
GARROW_AVAILABLE_IN_1_0
Expand Down
4 changes: 2 additions & 2 deletions c_glib/test/parquet/test-arrow-file-writer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ def test_write
reader.use_threads = true
assert_equal([
enabled_values.length / chunk_size,
table,
true,
],
[
reader.n_row_groups,
reader.read_table,
table.equal_metadata(reader.read_table, false),
])
end
end
26 changes: 12 additions & 14 deletions c_glib/test/test-table.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,32 +99,30 @@ def test_record_batches

sub_test_case("instance methods") do
def setup
fields = [
@fields = [
Arrow::Field.new("visible", Arrow::BooleanDataType.new),
Arrow::Field.new("valid", Arrow::BooleanDataType.new),
]
schema = Arrow::Schema.new(fields)
columns = [
@schema = Arrow::Schema.new(@fields)
@columns = [
build_boolean_array([true]),
build_boolean_array([false]),
]
@table = Arrow::Table.new(schema, columns)
@table = Arrow::Table.new(@schema, @columns)
end

def test_equal
fields = [
Arrow::Field.new("visible", Arrow::BooleanDataType.new),
Arrow::Field.new("valid", Arrow::BooleanDataType.new),
]
schema = Arrow::Schema.new(fields)
columns = [
build_boolean_array([true]),
build_boolean_array([false]),
]
other_table = Arrow::Table.new(schema, columns)
other_table = Arrow::Table.new(@schema, @columns)
assert_equal(@table, other_table)
end

def test_equal_metadata
other_table = Arrow::Table.new(@schema, @columns)
assert do
@table.equal_metadata(other_table, true)
end
end

def test_schema
assert_equal(["visible", "valid"],
@table.schema.fields.collect(&:name))
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderProjected) {
for (auto maybe_batch : rb_it) {
ASSERT_OK_AND_ASSIGN(auto batch, std::move(maybe_batch));
row_count += batch->num_rows();
ASSERT_EQ(*batch->schema(), *expected_schema);
AssertSchemaEqual(*batch->schema(), *expected_schema,
/*check_metadata=*/false);
}
}

Expand Down Expand Up @@ -279,7 +280,7 @@ TEST_F(TestParquetFileFormat, Inspect) {
auto format = ParquetFileFormat();

ASSERT_OK_AND_ASSIGN(auto actual, format.Inspect(*source.get()));
EXPECT_EQ(*actual, *schema_);
AssertSchemaEqual(*actual, *schema_, /*check_metadata=*/false);
}

TEST_F(TestParquetFileFormat, IsSupported) {
Expand Down
85 changes: 50 additions & 35 deletions cpp/src/arrow/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,45 +73,22 @@ bool ChunkedArray::Equals(const ChunkedArray& other) const {
return false;
}
if (length_ == 0) {
return type_->Equals(other.type_);
// We cannot toggle check_metadata here yet, so we don't check it
return type_->Equals(*other.type_, /*check_metadata=*/false);
}

// Check contents of the underlying arrays. This checks for equality of
// the underlying data independently of the chunk size.
int this_chunk_idx = 0;
int64_t this_start_idx = 0;
int other_chunk_idx = 0;
int64_t other_start_idx = 0;

int64_t elements_compared = 0;
while (elements_compared < length_) {
const std::shared_ptr<Array> this_array = chunks_[this_chunk_idx];
const std::shared_ptr<Array> other_array = other.chunk(other_chunk_idx);
int64_t common_length = std::min(this_array->length() - this_start_idx,
other_array->length() - other_start_idx);
if (!this_array->RangeEquals(this_start_idx, this_start_idx + common_length,
other_start_idx, other_array)) {
return false;
}

elements_compared += common_length;

// If we have exhausted the current chunk, proceed to the next one individually.
if (this_start_idx + common_length == this_array->length()) {
this_chunk_idx++;
this_start_idx = 0;
} else {
this_start_idx += common_length;
}

if (other_start_idx + common_length == other_array->length()) {
other_chunk_idx++;
other_start_idx = 0;
} else {
other_start_idx += common_length;
}
}
return true;
return internal::ApplyBinaryChunked(
*this, other,
[](const Array& left_piece, const Array& right_piece,
int64_t ARROW_ARG_UNUSED(position)) {
if (!left_piece.Equals(right_piece)) {
return Status::Invalid("Unequal piece");
}
return Status::OK();
})
.ok();
}

bool ChunkedArray::Equals(const std::shared_ptr<ChunkedArray>& other) const {
Expand Down Expand Up @@ -222,6 +199,44 @@ Status ChunkedArray::ValidateFull() const {
return Status::OK();
}

namespace internal {

bool MultipleChunkIterator::Next(std::shared_ptr<Array>* next_left,
std::shared_ptr<Array>* next_right) {
if (pos_ == length_) return false;

// Find non-empty chunk
std::shared_ptr<Array> chunk_left, chunk_right;
while (true) {
chunk_left = left_.chunk(chunk_idx_left_);
chunk_right = right_.chunk(chunk_idx_right_);
if (chunk_pos_left_ == chunk_left->length()) {
chunk_pos_left_ = 0;
++chunk_idx_left_;
continue;
}
if (chunk_pos_right_ == chunk_right->length()) {
chunk_pos_right_ = 0;
++chunk_idx_right_;
continue;
}
break;
}
// Determine how big of a section to return
int64_t iteration_size = std::min(chunk_left->length() - chunk_pos_left_,
chunk_right->length() - chunk_pos_right_);

*next_left = chunk_left->Slice(chunk_pos_left_, iteration_size);
*next_right = chunk_right->Slice(chunk_pos_right_, iteration_size);

pos_ += iteration_size;
chunk_pos_left_ += iteration_size;
chunk_pos_right_ += iteration_size;
return true;
}

} // namespace internal

// ----------------------------------------------------------------------
// Table methods

Expand Down
78 changes: 78 additions & 0 deletions cpp/src/arrow/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,84 @@ class ARROW_EXPORT ChunkedArray {
ARROW_DISALLOW_COPY_AND_ASSIGN(ChunkedArray);
};

namespace internal {

/// \brief EXPERIMENTAL: Utility for incremental iteration over contiguous
/// pieces of potentially differently-chunked ChunkedArray objects
class ARROW_EXPORT MultipleChunkIterator {
public:
MultipleChunkIterator(const ChunkedArray& left, const ChunkedArray& right)
: left_(left),
right_(right),
pos_(0),
length_(left.length()),
chunk_idx_left_(0),
chunk_idx_right_(0),
chunk_pos_left_(0),
chunk_pos_right_(0) {}

bool Next(std::shared_ptr<Array>* next_left, std::shared_ptr<Array>* next_right);

int64_t position() const { return pos_; }

private:
const ChunkedArray& left_;
const ChunkedArray& right_;

// The amount of the entire ChunkedArray consumed
int64_t pos_;

// Length of the chunked array(s)
int64_t length_;

// Current left chunk
int chunk_idx_left_;

// Current right chunk
int chunk_idx_right_;

// Offset into the current left chunk
int64_t chunk_pos_left_;

// Offset into the current right chunk
int64_t chunk_pos_right_;
};

/// \brief Evaluate binary function on two ChunkedArray objects having possibly
/// different chunk layouts. The passed binary function / functor should have
/// the following signature.
///
/// Status(const Array&, const Array&, int64_t)
///
/// The third argument is the absolute position relative to the start of each
/// ChunkedArray. The function is executed against each contiguous pair of
/// array segments, slicing if necessary.
///
/// For example, if two arrays have chunk sizes
///
/// left: [10, 10, 20]
/// right: [15, 10, 15]
///
/// Then the following invocations take place (pseudocode)
///
/// func(left.chunk[0][0:10], right.chunk[0][0:10], 0)
/// func(left.chunk[1][0:5], right.chunk[0][10:15], 10)
/// func(left.chunk[1][5:10], right.chunk[1][0:5], 15)
/// func(left.chunk[2][0:5], right.chunk[1][5:10], 20)
/// func(left.chunk[2][5:20], right.chunk[2][:], 25)
template <typename Action>
Status ApplyBinaryChunked(const ChunkedArray& left, const ChunkedArray& right,
Action&& action) {
MultipleChunkIterator iterator(left, right);
std::shared_ptr<Array> left_piece, right_piece;
while (iterator.Next(&left_piece, &right_piece)) {
ARROW_RETURN_NOT_OK(action(*left_piece, *right_piece, iterator.position()));
}
return Status::OK();
}

} // namespace internal

/// \class Table
/// \brief Logical table as sequence of chunked arrays
class ARROW_EXPORT Table {
Expand Down
14 changes: 14 additions & 0 deletions cpp/src/arrow/table_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,20 @@ TEST_F(TestChunkedArray, EqualsDifferingLengths) {
ASSERT_TRUE(one_->Equals(*another_.get()));
}

TEST_F(TestChunkedArray, EqualsDifferingMetadata) {
auto left_ty = list(field("item", int32()));

auto metadata = key_value_metadata({"foo"}, {"bar"});
auto right_ty = list(field("item", int32(), true, metadata));

std::vector<std::shared_ptr<Array>> left_chunks = {ArrayFromJSON(left_ty, "[[]]")};
std::vector<std::shared_ptr<Array>> right_chunks = {ArrayFromJSON(right_ty, "[[]]")};

ChunkedArray left(left_chunks);
ChunkedArray right(right_chunks);
ASSERT_TRUE(left.Equals(right));
}

TEST_F(TestChunkedArray, SliceEquals) {
arrays_one_.push_back(MakeRandomArray<Int32Array>(100));
arrays_one_.push_back(MakeRandomArray<Int32Array>(50));
Expand Down
38 changes: 21 additions & 17 deletions cpp/src/arrow/testing/gtest_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,6 @@

namespace arrow {

static void PrintChunkedArray(const ChunkedArray& carr, std::stringstream* ss) {
for (int i = 0; i < carr.num_chunks(); ++i) {
auto c1 = carr.chunk(i);
*ss << "Chunk " << i << std::endl;
::arrow::PrettyPrintOptions options(/*indent=*/2);
ARROW_EXPECT_OK(::arrow::PrettyPrint(*c1, options, ss));
*ss << std::endl;
}
}

template <typename T>
void AssertTsEqual(const T& expected, const T& actual) {
if (!expected.Equals(actual)) {
Expand Down Expand Up @@ -280,15 +270,29 @@ void AssertTablesEqual(const Table& expected, const Table& actual, bool same_chu
}
} else {
std::stringstream ss;
if (!actual.Equals(expected)) {
for (int i = 0; i < expected.num_columns(); ++i) {
ss << "Actual column " << i << std::endl;
PrintChunkedArray(*actual.column(i), &ss);
for (int i = 0; i < actual.num_columns(); ++i) {
auto actual_col = actual.column(i);
auto expected_col = expected.column(i);

PrettyPrintOptions options(/*indent=*/2);
options.window = 50;

ss << "Expected column " << i << std::endl;
PrintChunkedArray(*expected.column(i), &ss);
if (!actual_col->Equals(*expected_col)) {
ASSERT_OK(internal::ApplyBinaryChunked(
*actual_col, *expected_col,
[&](const Array& left_piece, const Array& right_piece, int64_t position) {
std::stringstream diff;
if (!left_piece.Equals(right_piece, EqualOptions().diff_sink(&diff))) {
ss << "Unequal at absolute position " << position << "\n" << diff.str();
ss << "Expected:\n";
ARROW_EXPECT_OK(PrettyPrint(right_piece, options, &ss));
ss << "\nActual:\n";
ARROW_EXPECT_OK(PrettyPrint(left_piece, options, &ss));
}
return Status::OK();
}));
FAIL() << ss.str();
}
FAIL() << ss.str();
}
}
}
Expand Down
Loading

0 comments on commit f609298

Please sign in to comment.