Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataset filtering from disk broken for duration type #37111

Closed
mattaubury opened this issue Aug 10, 2023 · 10 comments · Fixed by #37793
Closed

Dataset filtering from disk broken for duration type #37111

mattaubury opened this issue Aug 10, 2023 · 10 comments · Fixed by #37793

Comments

@mattaubury
Copy link

Describe the bug, including details regarding any error messages, version, and platform.

Using pyarrow-12.0.1 on RHEL8 Intel.

dataset filters work when the table is read into memory, but break when the table is a referenced Parquet file. The following code demonstrates the issue:

import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq

value_type = pa.duration("s")

# Create a file with a single row
table = pa.table([pa.array([1], value_type)], ["t"])
filename = "test.parquet"
pq.write_table(table, filename, version="2.6")

# Filter the row in-memory using dataset, THIS WORKS
table2 = pq.read_table(filename)
dataset = ds.dataset(table2)
x = dataset.to_table(filter=ds.field("t") == pa.scalar(1, type=value_type))

# Filter the row using dataset from disk, THIS BREAKS
dataset = ds.dataset(filename, format="parquet")
y = dataset.to_table(filter=ds.field("t") == pa.scalar(1, type=value_type))

assert x == y

The assert is not reached, instead we get the error:

Traceback (most recent call last):
  File "break_filter.py", line 19, in <module>
    y = dataset.to_table(filter=ds.field("t") == pa.scalar(1, type=value_type))
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "pyarrow/_dataset.pyx", line 546, in pyarrow._dataset.Dataset.to_table
  File "pyarrow/_dataset.pyx", line 3449, in pyarrow._dataset.Scanner.to_table
  File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 121, in pyarrow.lib.check_status
pyarrow.lib.ArrowNotImplementedError: Function 'equal' has no kernel matching input types (duration[us], int8)

Superficially, it appears as though the duration scalar is being prematurely unboxed.

The issue is not present with other value_types (such as int64, timestamp, date32), but a potentially related issue occurs with value_type = pa.time32("s"):

pyarrow.lib.ArrowNotImplementedError: Function 'equal' has no kernel matching input types (time32[ms], time32[s])

Component(s)

Python

@felipecrv
Copy link
Contributor

Note that it's complaining about a comparison between microseconds and seconds

Function 'equal' has no kernel matching input types (time32[ms], time32[s])

Which is IMO a good thing because you don't want to compare different time units. You might want to round the ms to s or convert the s to ms and look for exact millisecond-precise matches. It all depends on your application and meaning of the data you have.

@mattaubury
Copy link
Author

@felipecrv I agree... except that nowhere in the code I presented is the type time32[ms] mentioned. It has been generated internally by some transformation, which appears to be a bug.

@mapleFU
Copy link
Member

mapleFU commented Sep 14, 2023

You can use a cast or set schema ( like #37071 ?) to workaround?

Also I found a weird thing,

>>> dataset = ds.dataset(filename, format="parquet")
>>> y = dataset.to_table(filter=ds.field("t") == pa.scalar(1, type=value_type))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "pyarrow/_dataset.pyx", line 546, in pyarrow._dataset.Dataset.to_table
  File "pyarrow/_dataset.pyx", line 3449, in pyarrow._dataset.Scanner.to_table
  File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 121, in pyarrow.lib.check_status
pyarrow.lib.ArrowNotImplementedError: Function 'equal' has no kernel matching input types (duration[s], int8)
>>> y = dataset.to_table(filter=ds.field("t") == pa.scalar(1, type=value_type))

The first time it would raise ex, but later it works 🤔can you first retry as a workaround? I'll try to see why

@mapleFU
Copy link
Member

mapleFU commented Sep 14, 2023

using TestDurationParquetIO = TestParquetIO<::arrow::DurationType>;

TEST_F(TestDurationParquetIO, Roundtrip) {
  std::vector<bool> is_valid = {true, true, false, true};
  std::vector<int64_t> values = {1, 2, 3, 4};

  std::shared_ptr<Array> int_array, duration_arr;
  ::arrow::ArrayFromVector<::arrow::Int64Type, int64_t>(::arrow::int64(), is_valid,
                                                        values, &int_array);
  ::arrow::ArrayFromVector<::arrow::DurationType, int64_t>(
      ::arrow::duration(TimeUnit::NANO), is_valid, values, &duration_arr);

  // When the original Arrow schema isn't stored, a Duration array comes
  // back as int64 (how it is stored in Parquet)
  this->RoundTripSingleColumn(duration_arr, int_array, default_arrow_writer_properties());

  // When the original Arrow schema is stored, the Duration array type is preserved
  const auto arrow_properties =
      ::parquet::ArrowWriterProperties::Builder().store_schema()->build();
  this->RoundTripSingleColumn(duration_arr, duration_arr, arrow_properties);
}

Duration type is store as int64 in Parquet, and when the number is so small it's deduced as int8. So that's why we get int8. I'll se how can I fix that.

@mapleFU
Copy link
Member

mapleFU commented Sep 14, 2023

I've find out the root cause. It's caused by implemention of ParquetFileFragment::TestRowGroups and parquet schema. But I'm not sure how to actually fix it... (mapleFU@17a4922 this patch did a tiny patch about it. It's because mismatch schema between arrow and parquet in parquet::arrow::SchemaManifest)

@mapleFU
Copy link
Member

mapleFU commented Sep 15, 2023

I've submit a fix here: #37734 . You can have a try

@mattaubury
Copy link
Author

I've submit a fix here: #37734 . You can have a try

Awesome! Does this also fix the case where value_type = pa.time32("s")?

@mapleFU
Copy link
Member

mapleFU commented Sep 15, 2023

Hmm I'm not sure, let me check it tonight, currently I torward to fix the duration type bug, so I'm not sure value_type = pa.time32("s") will be fixed. If the reason is different I may try to fix it in another patch.

You can first use a "Cast" as a workaround, since merged patch will be release in 14.0.0 or even 15.0, which could be a long time

@mapleFU
Copy link
Member

mapleFU commented Sep 15, 2023

@mattaubury I think they may have different root cause. Let me explain it.

  1. Parquet Dataset will try to "Extract" statistics from parquet statistics. The rootcause of "duration" is that, during extract parquet statistics, it expect a "duration", but got an integer
  2. When extract an expression, it will try to test the expr, if it's value_type = pa.time32("s"), I guess it might still meet the problem, maybe other can have advice on this

@mapleFU
Copy link
Member

mapleFU commented Sep 20, 2023

@mattaubury

  1. Duration problem would be fixed in GH-37111: [C++][Parquet] Dataset: Fixing Schema Cast #37793
  2. As for timestamp, it's [C++][Parquet] Dataset cannot filter timestamp with time_unit SECOND #37799 . I'd like to fix it in another patch and try my best to fix it before 14.0 release

cc @bkietz

bkietz pushed a commit that referenced this issue Sep 20, 2023
### Rationale for this change

Parquet and Arrow has two schema:
1. Parquet has a SchemaElement[1], it's language and implement independent. Parquet Arrow will extract the schema and decude it.
2. Parquet arrow stores schema and possible `field_id` in `key_value_metadata`[2] when `store_schema` enabled. When deserializing, it will first parse `SchemaElement`[1], and if self-defined key_value_metadata exists, it will use `key_value_metadata` to override the [1]

[1] https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L356
[2] https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L1033

The bug raise from that, when dataset parsing `SchemaManifest`, it doesn't use `key_value_metadata` from `Metadata`, which raises the problem.

For duration, when `store_schema` enabled, it will store `Int64` as physical type, and add a `::arrow::Duration` in `key_value_metadata`. And there is no `equal(Duration, i64)`. So raise the un-impl

### What changes are included in this PR?

Set `key_value_metadata` in implemented.

### Are these changes tested?

Yes

### Are there any user-facing changes?

bugfix

* Closes: #37111

Authored-by: mwish <maplewish117@gmail.com>
Signed-off-by: Benjamin Kietzman <bengilgit@gmail.com>
@bkietz bkietz added this to the 14.0.0 milestone Sep 20, 2023
loicalleyne pushed a commit to loicalleyne/arrow that referenced this issue Nov 13, 2023
)

### Rationale for this change

Parquet and Arrow has two schema:
1. Parquet has a SchemaElement[1], it's language and implement independent. Parquet Arrow will extract the schema and decude it.
2. Parquet arrow stores schema and possible `field_id` in `key_value_metadata`[2] when `store_schema` enabled. When deserializing, it will first parse `SchemaElement`[1], and if self-defined key_value_metadata exists, it will use `key_value_metadata` to override the [1]

[1] https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L356
[2] https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L1033

The bug raise from that, when dataset parsing `SchemaManifest`, it doesn't use `key_value_metadata` from `Metadata`, which raises the problem.

For duration, when `store_schema` enabled, it will store `Int64` as physical type, and add a `::arrow::Duration` in `key_value_metadata`. And there is no `equal(Duration, i64)`. So raise the un-impl

### What changes are included in this PR?

Set `key_value_metadata` in implemented.

### Are these changes tested?

Yes

### Are there any user-facing changes?

bugfix

* Closes: apache#37111

Authored-by: mwish <maplewish117@gmail.com>
Signed-off-by: Benjamin Kietzman <bengilgit@gmail.com>
dgreiss pushed a commit to dgreiss/arrow that referenced this issue Feb 19, 2024
)

### Rationale for this change

Parquet and Arrow has two schema:
1. Parquet has a SchemaElement[1], it's language and implement independent. Parquet Arrow will extract the schema and decude it.
2. Parquet arrow stores schema and possible `field_id` in `key_value_metadata`[2] when `store_schema` enabled. When deserializing, it will first parse `SchemaElement`[1], and if self-defined key_value_metadata exists, it will use `key_value_metadata` to override the [1]

[1] https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L356
[2] https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L1033

The bug raise from that, when dataset parsing `SchemaManifest`, it doesn't use `key_value_metadata` from `Metadata`, which raises the problem.

For duration, when `store_schema` enabled, it will store `Int64` as physical type, and add a `::arrow::Duration` in `key_value_metadata`. And there is no `equal(Duration, i64)`. So raise the un-impl

### What changes are included in this PR?

Set `key_value_metadata` in implemented.

### Are these changes tested?

Yes

### Are there any user-facing changes?

bugfix

* Closes: apache#37111

Authored-by: mwish <maplewish117@gmail.com>
Signed-off-by: Benjamin Kietzman <bengilgit@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment