Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-10438: [C++][Dataset] Partitioning::Format on nulls #9323

Closed
wants to merge 33 commits into from

Conversation

westonpace
Copy link
Member

@westonpace westonpace commented Jan 26, 2021

Tested and added support for partitioning with nulls.

I had to make some changes to the hash kernels. You can now specify how you want DictionaryEncode to treat nulls. The MASK option will continue the current behavior (null not in dictionary, null value in indices) and the ENCODE option will put null in the dictionary and there will be no null values in the indices array.

Partitioning on nulls will depend on the partitioning scheme.

For directory partitioning null is allowed on inner fields but it is not allowed on an outer field if an inner field is defined. In other words, if the schema is a(int32), b(int32), c(int32) then the following are allowed

/ (a=null, b=null, c=null)
/32 (a=32, b=null, c=null)
/32/57 (a=32, b=57, c=null)

There is no way to specify a=null, b=57, c=null. This does mean that partition directories can contain a mix of files and nested partition directories (e.g. /32 might contain file.parquet and the directory /57). Alternatively we could just forbid nulls in the directory partitioning scheme.

For the hive scheme we need to be compatible with other tools that read/write hive. Those tools use a fallback value which defaults to __HIVE_DEFAULT_PARTITION__. So by default you would have directories that look like...

/a=__HIVE_DEFAULT_PARTITION__/b=__HIVE_DEFAULT_PARTITION__/c=__HIVE_DEFAULT_PARTITION__

The null fallback value is configurable as a string passed to HivePartitioning::HivePartitioning or HivePartitioning::MakeFactory.

ARROW-11649 has been created for extending this null fallback configuration to R.

@github-actions
Copy link

@bkietz bkietz self-requested a review January 26, 2021 20:51
cpp/src/arrow/compute/api_vector.h Outdated Show resolved Hide resolved
cpp/src/arrow/compute/api_vector.h Outdated Show resolved Hide resolved
cpp/src/arrow/compute/kernels/vector_hash.cc Outdated Show resolved Hide resolved
cpp/src/arrow/dataset/partition.cc Outdated Show resolved Hide resolved
cpp/src/arrow/pretty_print.cc Outdated Show resolved Hide resolved
Copy link
Member

@jorisvandenbossche jorisvandenbossche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(didn't yet take a detailed look, just a few quick questions)

What currently happens if you use directory partitioning and you have null values?
And should we give the user a null_fallback option there as well?

cpp/src/arrow/dataset/partition.cc Outdated Show resolved Hide resolved
python/pyarrow/tests/test_dataset.py Show resolved Hide resolved
@@ -1587,33 +1587,54 @@ def test_open_dataset_non_existing_file():

@pytest.mark.parquet
@pytest.mark.parametrize('partitioning', ["directory", "hive"])
@pytest.mark.parametrize('null_fallback', ['xyz', None])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does null_fallback=None mean? (based on the docstring above it seems it can only be a string?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case it does not pass in anything when it creates the partitioning and ensures that it defaults to the correct default value.

])
def test_open_dataset_partitioned_dictionary_type(tempdir, partitioning,
partition_keys):
def test_open_dataset_partitioned_dictionary_type(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you added this to a test that is specifically about reading partitioned datasets while inferring the partition fields as dictionary. Which is fine (as this case also needs to be able to hand that), but this should also work (and so be tested) in the default case not inferring dictionary type?
And should we also have a test for the writing part? (this one only tests reading)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a parameter so this test will now cover both inferred dictionary and normal reading. I renamed it to just test_partition_discovery since the test case now covers several methods of discovery. I've also added tests for writing and a few other situations.

Copy link
Member

@bkietz bkietz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking great, thanks for working on this! A few smaller comments follow and a large reversion for which I've included a suggestion PR.

cpp/src/arrow/compute/kernels/vector_hash.cc Outdated Show resolved Hide resolved
@@ -147,6 +152,8 @@ class ValueCountsAction final : ActionBase {
}
}

bool ShouldEncodeNulls() { return true; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
bool ShouldEncodeNulls() { return true; }
constexpr bool ShouldEncodeNulls() const { return true; }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cpp/src/arrow/compute/kernels/vector_hash.cc Outdated Show resolved Hide resolved
cpp/src/arrow/compute/kernels/vector_hash_test.cc Outdated Show resolved Hide resolved
cpp/src/arrow/dataset/expression.h Outdated Show resolved Hide resolved
cpp/src/arrow/dataset/partition.h Outdated Show resolved Hide resolved
cpp/src/arrow/dataset/partition.cc Outdated Show resolved Hide resolved
python/pyarrow/_dataset.pyx Outdated Show resolved Hide resolved
cpp/src/arrow/dataset/partition.cc Outdated Show resolved Hide resolved
cpp/src/arrow/dataset/expression.h Outdated Show resolved Hide resolved
Copy link
Member

@bkietz bkietz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rebase to pick up the fix for https://issues.apache.org/jira/browse/ARROW-11724
(should resolve the appveyor build failure)

cpp/src/arrow/dataset/expression.cc Outdated Show resolved Hide resolved
Copy link
Member

@bkietz bkietz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few concerns remain but they can/probably should be addressed in follow up; this is good to go.

Thanks for working on this!

const RecordBatchVector& expected_batches,
const std::vector<Expression>& expected_expressions) {
ASSERT_OK_AND_ASSIGN(auto partition_results, partitioning->Partition(full_batch));
std::shared_ptr<RecordBatch> rest = full_batch;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused:

Suggested change
std::shared_ptr<RecordBatch> rest = full_batch;

if (field->type()->id() == Type::DICTIONARY) {
if (!key.value.has_value()) {
return is_null(field_ref(field->name()));
} else if (field->type()->id() == Type::DICTIONARY) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
} else if (field->type()->id() == Type::DICTIONARY) {
}
if (field->type()->id() == Type::DICTIONARY) {

@@ -625,8 +650,27 @@ class StructDictionary {

private:
Status AddOne(Datum column, std::shared_ptr<Int32Array>* fused_indices) {
if (column.type()->id() == Type::DICTIONARY) {
if (column.null_count() != 0) {
// TODO(ARROW-11732) Optimize this by allowign DictionaryEncode to transfer a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// TODO(ARROW-11732) Optimize this by allowign DictionaryEncode to transfer a
// TODO(ARROW-11732) Optimize this by allowing DictionaryEncode to transfer a

Comment on lines +307 to +312
auto field_index = GetOrInsertField(name);
if (repr.has_value()) {
return InsertRepr(field_index, *repr);
} else {
return Status::OK();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
auto field_index = GetOrInsertField(name);
if (repr.has_value()) {
return InsertRepr(field_index, *repr);
} else {
return Status::OK();
}
if (repr.has_value()) {
auto field_index = GetOrInsertField(name);
return InsertRepr(field_index, *repr);
}
return Status::OK();

Comment on lines +551 to +553
CheckUnique<NullType, std::nullptr_t>(null(), {nullptr, nullptr}, {false, true},
{nullptr}, {false});
CheckUnique<NullType, std::nullptr_t>(null(), {}, {}, {}, {});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
CheckUnique<NullType, std::nullptr_t>(null(), {nullptr, nullptr}, {false, true},
{nullptr}, {false});
CheckUnique<NullType, std::nullptr_t>(null(), {}, {}, {}, {});
CheckUnique(ArrayFromJSON(null(), "[null, null, null]"), ArrayFromJSON(null(), "[null]"));
CheckUnique(ArrayFromJSON(null(), "[]"), ArrayFromJSON(null(), "[]"));

@@ -282,9 +348,16 @@ TEST_F(TestPartitioning, HivePartitioningFormat) {
equal(field_ref("alpha"), literal(0))),
"alpha=0/beta=3.25");
AssertFormat(equal(field_ref("alpha"), literal(0)), "alpha=0");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since a null valued partition key is semantically equivalent to an absent one, we should ensure they format identically. I've created ARROW-11762 for follow up

@bkietz bkietz closed this in 8e8a000 Feb 24, 2021
@westonpace westonpace deleted the feature/arrow-10438 branch April 14, 2021 20:17
@westonpace westonpace restored the feature/arrow-10438 branch April 14, 2021 20:17
@westonpace westonpace deleted the feature/arrow-10438 branch April 14, 2021 20:17
@westonpace westonpace restored the feature/arrow-10438 branch April 14, 2021 20:18
@westonpace westonpace deleted the feature/arrow-10438 branch April 14, 2021 20:18
GeorgeAp pushed a commit to sirensolutions/arrow that referenced this pull request Jun 7, 2021
Tested and added support for partitioning with nulls.

I had to make some changes to the hash kernels.  You can now specify how you want DictionaryEncode to treat nulls.  The MASK option will continue the current behavior (null not in dictionary, null value in indices) and the ENCODE option will put `null` in the dictionary and there will be no null values in the indices array.

Partitioning on nulls will depend on the partitioning scheme.

For directory partitioning null is allowed on inner fields but it is not allowed on an outer field if an inner field is defined.  In other words, if the schema is a(int32), b(int32), c(int32) then the following are allowed

```
/ (a=null, b=null, c=null)
/32 (a=32, b=null, c=null)
/32/57 (a=32, b=57, c=null)
```

There is no way to specify `a=null, b=57, c=null`.  This does mean that partition directories can contain a mix of files and nested partition directories (e.g. /32 might contain file.parquet and the directory /57).  Alternatively we could just forbid nulls in the directory partitioning scheme.

For the hive scheme we need to be compatible with other tools that read/write hive.  Those tools use a fallback value which defaults to `__HIVE_DEFAULT_PARTITION__`.  So by default you would have directories that look like...

```
/a=__HIVE_DEFAULT_PARTITION__/b=__HIVE_DEFAULT_PARTITION__/c=__HIVE_DEFAULT_PARTITION__
```

The null fallback value is configurable as a string passed to HivePartitioning::HivePartitioning or HivePartitioning::MakeFactory.

ARROW-11649 has been created for extending this null fallback configuration to R.

Closes apache#9323 from westonpace/feature/arrow-10438

Lead-authored-by: Weston Pace <weston.pace@gmail.com>
Co-authored-by: Benjamin Kietzman <bengilgit@gmail.com>
Signed-off-by: Benjamin Kietzman <bengilgit@gmail.com>
michalursa pushed a commit to michalursa/arrow that referenced this pull request Jun 13, 2021
Tested and added support for partitioning with nulls.

I had to make some changes to the hash kernels.  You can now specify how you want DictionaryEncode to treat nulls.  The MASK option will continue the current behavior (null not in dictionary, null value in indices) and the ENCODE option will put `null` in the dictionary and there will be no null values in the indices array.

Partitioning on nulls will depend on the partitioning scheme.

For directory partitioning null is allowed on inner fields but it is not allowed on an outer field if an inner field is defined.  In other words, if the schema is a(int32), b(int32), c(int32) then the following are allowed

```
/ (a=null, b=null, c=null)
/32 (a=32, b=null, c=null)
/32/57 (a=32, b=57, c=null)
```

There is no way to specify `a=null, b=57, c=null`.  This does mean that partition directories can contain a mix of files and nested partition directories (e.g. /32 might contain file.parquet and the directory /57).  Alternatively we could just forbid nulls in the directory partitioning scheme.

For the hive scheme we need to be compatible with other tools that read/write hive.  Those tools use a fallback value which defaults to `__HIVE_DEFAULT_PARTITION__`.  So by default you would have directories that look like...

```
/a=__HIVE_DEFAULT_PARTITION__/b=__HIVE_DEFAULT_PARTITION__/c=__HIVE_DEFAULT_PARTITION__
```

The null fallback value is configurable as a string passed to HivePartitioning::HivePartitioning or HivePartitioning::MakeFactory.

ARROW-11649 has been created for extending this null fallback configuration to R.

Closes apache#9323 from westonpace/feature/arrow-10438

Lead-authored-by: Weston Pace <weston.pace@gmail.com>
Co-authored-by: Benjamin Kietzman <bengilgit@gmail.com>
Signed-off-by: Benjamin Kietzman <bengilgit@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants