Skip to content

Commit

Permalink
ARROW-7741: [C++] Adds parquet write support for nested types
Browse files Browse the repository at this point in the history
1.  Keeps the old version around (without unit tests) under V1 and
    make this version (V2) a default
2.  Reworks PathInternal slightly to preserve column at a time writing
    with chunked columns.  Address @bkietz comment that the vendored
    visitor implementation for variants supports a return type.
3.  While debugging I found we don't preserve all null counts when
    slicing, so included a small patch and unit test for that.

I'll plan on adding bindings in python for the versioning in a follow-up CR that can also be toggled with a environment variable in case regressions are encountered "in the wild"

Closes #6586 from emkornfield/add_parquet_write

Lead-authored-by: Micah Kornfield <emkornfield@gmail.com>
Co-authored-by: emkornfield <emkornfield@gmail.com>
Signed-off-by: Wes McKinney <wesm+git@apache.org>
  • Loading branch information
emkornfield authored and wesm committed Mar 27, 2020
1 parent 2ca1706 commit 0f512af
Show file tree
Hide file tree
Showing 9 changed files with 405 additions and 59 deletions.
6 changes: 5 additions & 1 deletion cpp/src/arrow/array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,11 @@ ArrayData ArrayData::Slice(int64_t off, int64_t len) const {
auto copy = *this;
copy.length = len;
copy.offset = off;
copy.null_count = null_count != 0 ? kUnknownNullCount : 0;
if (null_count == length) {
copy.null_count = len;
} else {
copy.null_count = null_count != 0 ? kUnknownNullCount : 0;
}
return copy;
}

Expand Down
10 changes: 10 additions & 0 deletions cpp/src/arrow/array_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@ TEST_F(TestArray, TestNullCount) {
ASSERT_EQ(kUnknownNullCount, arr_default_null_count->data()->null_count);
}

TEST_F(TestArray, TestSlicePreservesAllNullCount) {
// These are placeholders
auto data = std::make_shared<Buffer>(nullptr, 0);
auto null_bitmap = std::make_shared<Buffer>(nullptr, 0);

Int32Array arr(/*length=*/100, data, null_bitmap,
/*null_count*/ 100);
EXPECT_EQ(arr.Slice(1, 99)->data()->null_count, arr.Slice(1, 99)->length());
}

TEST_F(TestArray, TestLength) {
// Placeholder buffer
auto data = std::make_shared<Buffer>(nullptr, 0);
Expand Down
20 changes: 18 additions & 2 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,8 @@ void CheckSimpleRoundtrip(const std::shared_ptr<Table>& table, int64_t row_group
const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
default_arrow_writer_properties()) {
std::shared_ptr<Table> result;
DoSimpleRoundtrip(table, false /* use_threads */, row_group_size, {}, &result,
arrow_properties);
ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
table, false /* use_threads */, row_group_size, {}, &result, arrow_properties));
::arrow::AssertSchemaEqual(*table->schema(), *result->schema(),
/*check_metadata=*/false);
::arrow::AssertTablesEqual(*table, *result, false);
Expand Down Expand Up @@ -2235,6 +2235,22 @@ TEST(TestArrowReadWrite, TableWithDuplicateColumns) {
ASSERT_NO_FATAL_FAILURE(CheckSimpleRoundtrip(table, table->num_rows()));
}

TEST(ArrowReadWrite, SimpleStructRoundTrip) {
auto links = field(
"Links", ::arrow::struct_({field("Backward", ::arrow::int64(), /*nullable=*/true),
field("Forward", ::arrow::int64(), /*nullable=*/true)}));

auto links_id_array = ::arrow::ArrayFromJSON(links->type(),
"[{\"Backward\": null, \"Forward\": 20}, "
"{\"Backward\": 10, \"Forward\": 40}]");

CheckSimpleRoundtrip(
::arrow::Table::Make(std::make_shared<::arrow::Schema>(
std::vector<std::shared_ptr<::arrow::Field>>{links}),
{links_id_array}),
2);
}

// Disabled until implementation can be finished.
TEST(TestArrowReadWrite, DISABLED_CanonicalNestedRoundTrip) {
auto doc_id = field("DocId", ::arrow::int64(), /*nullable=*/false);
Expand Down
114 changes: 83 additions & 31 deletions cpp/src/parquet/arrow/path_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
#include "arrow/util/bit_util.h"
#include "arrow/util/logging.h"
#include "arrow/util/macros.h"
#include "arrow/util/make_unique.h"
#include "arrow/util/variant.h"
#include "arrow/visitor_inline.h"

Expand Down Expand Up @@ -136,6 +137,17 @@ enum IterationResult {
} \
} while (false)

int64_t LazyNullCount(const Array& array) { return array.data()->null_count.load(); }

bool LazyNoNulls(const Array& array) {
int64_t null_count = LazyNullCount(array);
return null_count == 0 ||
// kUnkownNullCount comparison is needed to account
// for null arrays.
(null_count == ::arrow::kUnknownNullCount &&
array.null_bitmap_data() == nullptr);
}

struct PathWriteContext {
PathWriteContext(::arrow::MemoryPool* pool,
std::shared_ptr<::arrow::ResizableBuffer> def_levels_buffer)
Expand Down Expand Up @@ -564,35 +576,39 @@ Status WritePath(ElementRange root_range, PathInfo* path_info,
// |root_range| are processed.
while (stack_position >= stack_base) {
PathInfo::Node& node = path_info->path[stack_position - stack_base];
IterationResult result = kError;
struct {
void operator()(NullableNode& node) { // NOLINT google-runtime-references
*result = node.Run(stack_position, stack_position + 1, context);
IterationResult operator()(
NullableNode& node) { // NOLINT google-runtime-references
return node.Run(stack_position, stack_position + 1, context);
}
void operator()(ListNode& node) { // NOLINT google-runtime-references
*result = node.Run(stack_position, stack_position + 1, context);
IterationResult operator()(ListNode& node) { // NOLINT google-runtime-references
return node.Run(stack_position, stack_position + 1, context);
}
void operator()(NullableTerminalNode& node) { // NOLINT google-runtime-references
*result = node.Run(*stack_position, context);
IterationResult operator()(
NullableTerminalNode& node) { // NOLINT google-runtime-references
return node.Run(*stack_position, context);
}
void operator()(FixedSizeListNode& node) { // NOLINT google-runtime-references
*result = node.Run(stack_position, stack_position + 1, context);
IterationResult operator()(
FixedSizeListNode& node) { // NOLINT google-runtime-references
return node.Run(stack_position, stack_position + 1, context);
}
void operator()(AllPresentTerminalNode& node) { // NOLINT google-runtime-references
*result = node.Run(*stack_position, context);
IterationResult operator()(
AllPresentTerminalNode& node) { // NOLINT google-runtime-references
return node.Run(*stack_position, context);
}
void operator()(AllNullsTerminalNode& node) { // NOLINT google-runtime-references
*result = node.Run(*stack_position, context);
IterationResult operator()(
AllNullsTerminalNode& node) { // NOLINT google-runtime-references
return node.Run(*stack_position, context);
}
void operator()(LargeListNode& node) { // NOLINT google-runtime-references
*result = node.Run(stack_position, stack_position + 1, context);
IterationResult operator()(
LargeListNode& node) { // NOLINT google-runtime-references
return node.Run(stack_position, stack_position + 1, context);
}
ElementRange* stack_position;
PathWriteContext* context;
IterationResult* result;
} visitor = {stack_position, &context, &result};
} visitor = {stack_position, &context};

::arrow::util::visit(visitor, node);
IterationResult result = ::arrow::util::visit(visitor, node);

if (ARROW_PREDICT_FALSE(result == kError)) {
DCHECK(!context.last_status.ok());
Expand Down Expand Up @@ -701,10 +717,9 @@ class PathBuilder {
// and the array does in fact contain nulls, we will end up
// traversing the null bitmap twice (once here and once when calculating
// rep/def levels).
int64_t null_count = array.data()->null_count.load();
if (null_count == 0) {
if (LazyNoNulls(array)) {
info_.path.push_back(AllPresentTerminalNode{info_.max_def_level});
} else if (null_count == array.length()) {
} else if (LazyNullCount(array) == array.length()) {
info_.path.push_back(AllNullsTerminalNode(info_.max_def_level - 1));
} else {
info_.path.push_back(NullableTerminalNode(array.null_bitmap_data(), array.offset(),
Expand All @@ -730,9 +745,9 @@ class PathBuilder {
// Increment necessary due to empty lists.
info_.max_def_level++;
info_.max_rep_level++;
// raw_value_offsets() accounts for any slice offset.
ListPathNode<VarRangeSelector<typename T::offset_type>> node(
VarRangeSelector<typename T::offset_type>{array.raw_value_offsets() +
array.data()->offset},
VarRangeSelector<typename T::offset_type>{array.raw_value_offsets()},
info_.max_rep_level, info_.max_def_level - 1);
info_.path.push_back(node);
nullable_in_parent_ = array.list_type()->value_field()->nullable();
Expand Down Expand Up @@ -767,14 +782,13 @@ class PathBuilder {
// rep/def levels). Because this isn't terminal this might not be
// the right decision for structs that share the same nullable
// parents.
int64_t null_count = array.data()->null_count.load();
if (null_count == 0) {
if (LazyNoNulls(array)) {
// Don't add anything because there won't be any point checking
// null values for the array. There will always be at least
// one more array to handle nullability.
return;
}
if (null_count == array.length()) {
if (LazyNullCount(array) == array.length()) {
info_.path.push_back(AllNullsTerminalNode(info_.max_def_level - 1));
return;
}
Expand Down Expand Up @@ -841,14 +855,52 @@ Status PathBuilder::VisitInline(const Array& array) {
#undef RETURN_IF_ERROR
} // namespace

Status MultipathLevelBuilder::Write(const Array& array, bool array_nullable,
class MultipathLevelBuilderImpl : public MultipathLevelBuilder {
public:
MultipathLevelBuilderImpl(std::shared_ptr<::arrow::ArrayData> data,
std::unique_ptr<PathBuilder> path_builder)
: root_range_{0, data->length},
data_(std::move(data)),
path_builder_(std::move(path_builder)) {}

int GetLeafCount() const override {
return static_cast<int>(path_builder_->paths().size());
}

::arrow::Status Write(int leaf_index, ArrowWriteContext* context,
CallbackFunction write_leaf_callback) override {
DCHECK_GE(leaf_index, 0);
DCHECK_LT(leaf_index, GetLeafCount());
return WritePath(root_range_, &path_builder_->paths()[leaf_index], context,
std::move(write_leaf_callback));
}

private:
ElementRange root_range_;
// Reference holder to ensure the data stays valid.
std::shared_ptr<::arrow::ArrayData> data_;
std::unique_ptr<PathBuilder> path_builder_;
};

// static
::arrow::Result<std::unique_ptr<MultipathLevelBuilder>> MultipathLevelBuilder::Make(
const ::arrow::Array& array, bool array_field_nullable) {
auto constructor = ::arrow::internal::make_unique<PathBuilder>(array_field_nullable);
RETURN_NOT_OK(VisitArrayInline(array, constructor.get()));
return ::arrow::internal::make_unique<MultipathLevelBuilderImpl>(
array.data(), std::move(constructor));
}

// static
Status MultipathLevelBuilder::Write(const Array& array, bool array_field_nullable,
ArrowWriteContext* context,
MultipathLevelBuilder::CallbackFunction callback) {
PathBuilder constructor(array_nullable);
ARROW_ASSIGN_OR_RAISE(std::unique_ptr<MultipathLevelBuilder> builder,
MultipathLevelBuilder::Make(array, array_field_nullable));
PathBuilder constructor(array_field_nullable);
RETURN_NOT_OK(VisitArrayInline(array, &constructor));
ElementRange root_range{0, array.length()};
for (auto& write_path_info : constructor.paths()) {
RETURN_NOT_OK(WritePath(root_range, &write_path_info, context, callback));
for (int leaf_idx = 0; leaf_idx < builder->GetLeafCount(); leaf_idx++) {
RETURN_NOT_OK(builder->Write(leaf_idx, context, callback));
}
return Status::OK();
}
Expand Down
39 changes: 30 additions & 9 deletions cpp/src/parquet/arrow/path_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <memory>
#include <vector>

#include "arrow/result.h"
#include "arrow/status.h"

#include "parquet/platform.h"
Expand Down Expand Up @@ -111,20 +112,40 @@ class PARQUET_EXPORT MultipathLevelBuilder {
/// first traversal-order.
///
/// \param[in] array The array to process.
/// \param[in] array_nullable Whether the algorithm should consider
/// the elements in the top level array nullable.
/// \param[in] array_field_nullable Whether the algorithm should consider
/// the the array column as nullable (as determined by its type's parent
/// field).
/// \param[in, out] context for use when allocating memory, etc.
/// \param[out] write_leaf_callback Callback to receive results.
/// There will be one call to the write_leaf_callback for
static ::arrow::Status Write(const ::arrow::Array& array, bool array_nullable,
/// There will be one call to the write_leaf_callback for each leaf node.
static ::arrow::Status Write(const ::arrow::Array& array, bool array_field_nullable,
ArrowWriteContext* context,
CallbackFunction write_leaf_callback);

private:
MultipathLevelBuilder();
// Not copyable.
MultipathLevelBuilder(const MultipathLevelBuilder&) = delete;
MultipathLevelBuilder& operator=(const MultipathLevelBuilder&) = delete;
/// \brief Construct a new instance of the builder.
///
/// \param[in] array The array to process.
/// \param[in] array_field_nullable Whether the algorithm should consider
/// the the array column as nullable (as determined by its type's parent
/// field).
static ::arrow::Result<std::unique_ptr<MultipathLevelBuilder>> Make(
const ::arrow::Array& array, bool array_field_nullable);

virtual ~MultipathLevelBuilder() = default;

/// \brief Returns the number of leaf columns that need to be written
/// to Parquet.
virtual int GetLeafCount() const = 0;

/// \brief Calls write_leaf_callback with the MultipathLevelBuilderResult corresponding
/// to |leaf_index|.
///
/// \param[in] leaf_index The index of the leaf column to write. Must be in the range
/// [0, GetLeafCount()].
/// \param[in, out] context for use when allocating memory, etc.
/// \param[out] write_leaf_callback Callback to receive the result.
virtual ::arrow::Status Write(int leaf_index, ArrowWriteContext* context,
CallbackFunction write_leaf_callback) = 0;
};

} // namespace arrow
Expand Down
35 changes: 35 additions & 0 deletions cpp/src/parquet/arrow/path_internal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ class CapturedResult {
EXPECT_THAT(rep_levels_, ElementsAreArray(expected_rep));
}

friend std::ostream& operator<<(std::ostream& os, const CapturedResult& result) {
// This print method is to silence valgrind issues. Whats printed
// is not important because all asserts happen directly on
// members.
os << "CapturedResult (null def, null_rep):" << result.null_def_levels << " "
<< result.null_rep_levels;
return os;
}

private:
std::vector<int16_t> def_levels_;
std::vector<int16_t> rep_levels_;
Expand Down Expand Up @@ -161,6 +170,32 @@ TEST_F(MultipathLevelBuilderTest, NullableSingleListWithAllNullsLists) {
/*rep_levels=*/std::vector<int16_t>(4, 0));
}

TEST_F(MultipathLevelBuilderTest, NullableSingleListWithAllEmptyLists) {
auto entries = field("Entries", ::arrow::int64(), /*nullable=*/false);
auto list_type = list(entries);
// Translates to parquet schema:
// optional group bag {
// repeated group [unseen] (List) {
// required int64 Entries;
// }
// }
// So:
// def level 0: a null list
// def level 1: an empty list
// def level 2: a null entry
// def level 3: a non-null entry

auto array = ::arrow::ArrayFromJSON(list_type, R"([[], [], [], []])");

ASSERT_OK(
MultipathLevelBuilder::Write(*array, /*nullable=*/true, &context_, callback_));

ASSERT_THAT(results_, SizeIs(1));
const CapturedResult& result = results_[0];
result.CheckLevels(/*def_levels=*/std::vector<int16_t>(/*count=*/4, 1),
/*rep_levels=*/std::vector<int16_t>(4, 0));
}

// This Parquet schema used for the next several tests
//
// optional group bag {
Expand Down
Loading

0 comments on commit 0f512af

Please sign in to comment.