Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reading a StructArray column with an ExtensionType causes segfault #20385

Closed
asfimport opened this issue Aug 26, 2022 · 2 comments · Fixed by #33634
Closed

Reading a StructArray column with an ExtensionType causes segfault #20385

asfimport opened this issue Aug 26, 2022 · 2 comments · Fixed by #33634

Comments

@asfimport
Copy link

We can make nested columns in a Parquet file by putting a pa.StructArray in a pa.Table and writing that Table to Parquet. We can selectively read back that nested column by specifying it with dot syntax:

pq.ParquetFile("f.parquet").read_row_groups([0], ["table_column.struct_field"])

But if the Arrow types are ExtensionTypes, then the above causes a segfault. The segfault depends both on the nested struct field and the ExtensionTypes.

Here is a minimally reproducing example of reading a nested struct field without extension types, which does not raise a segfault. (I'm building the pa.StructArray manually with from_buffers because I'll have to add the ExtensionTypes in the next example.)

import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq

one = pa.Array.from_buffers(
    pa.int64(),
    3,
    [None, pa.py_buffer(np.array([10, 20, 30], dtype=np.int64))],
)
two = pa.Array.from_buffers(
    pa.float64(),
    3,
    [None, pa.py_buffer(np.array([1.1, 2.2, 3.3], dtype=np.float64))],
)
record = pa.Array.from_buffers(
    pa.struct([
        pa.field("one", one.type, False),
        pa.field("two", two.type, False),
    ]),
    3,
    [None],
    children=[one, two],
)
assert record.to_pylist() == [
    {"one": 10, "two": 1.1},
    {"one": 20, "two": 2.2},
    {"one": 30, "two": 3.3},
]

table = pa.Table.from_arrays([record], names=["column"])
pq.write_table(table, "record.parquet")
table2 = pq.ParquetFile("record.parquet").read_row_groups([0], ["column.one"])
assert table2.to_pylist() == [
    {"column": {"one": 10}},
    {"column": {"one": 20}},
    {"column": {"one": 30}},
]

So far, so good; no segfault. Next, we define and register an ExtensionType,

import json

class AnnotatedType(pa.ExtensionType):
    def __init__(self, storage_type, annotation):
        self.annotation = annotation
        super().__init__(storage_type, "my:app")
    def __arrow_ext_serialize__(self):
        return json.dumps(self.annotation).encode()
    @classmethod
    def __arrow_ext_deserialize__(cls, storage_type, serialized):
        annotation = json.loads(serialized.decode())
        print(storage_type, annotation)
        return cls(storage_type, annotation)
    @property
    def num_buffers(self):
        return self.storage_type.num_buffers
    @property
    def num_fields(self):
        return self.storage_type.num_fields

pa.register_extension_type(AnnotatedType(pa.null(), None))

build the pa.StructArray again,

one = pa.Array.from_buffers(
    AnnotatedType(pa.int64(), {"annotated": "one"}),
    3,
    [None, pa.py_buffer(np.array([10, 20, 30], dtype=np.int64))],
)
two = pa.Array.from_buffers(
    AnnotatedType(pa.float64(), {"annotated": "two"}),
    3,
    [None, pa.py_buffer(np.array([1.1, 2.2, 3.3], dtype=np.float64))],
)
record = pa.Array.from_buffers(
    AnnotatedType(
        pa.struct([
            pa.field("one", one.type, False),
            pa.field("two", two.type, False),
        ]),
        {"annotated": "record"},
    ),
    3,
    [None],
    children=[one, two],
)
assert record.to_pylist() == [
    {"one": 10, "two": 1.1},
    {"one": 20, "two": 2.2},
    {"one": 30, "two": 3.3},
]

Now when we write and read this back, there's a segfault:

table = pa.Table.from_arrays([record], names=["column"])
pq.write_table(table, "record_annotated.parquet")

print("before segfault")

table2 = pq.ParquetFile("record_annotated.parquet").read_row_groups([0], ["column.one"])

print("after segfault")

The output, which prints each annotation as the ExtensionType is deserialized, is

before segfault
int64 {'annotated': 'one'}
double {'annotated': 'two'}
int64 {'annotated': 'one'}
double {'annotated': 'two'}
struct<one: extension<my:app<AnnotatedType>> not null, two: extension<my:app<AnnotatedType>> not null> {'annotated': 'record'}
Segmentation fault (core dumped)

Note that if we read back that file, {}record_annotated.parquet{}, without the ExtensionType, everything is fine:

Python 3.9.13 | packaged by conda-forge | (main, May 27 2022, 16:56:21) 
[GCC 10.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import pyarrow as pa
>>> import pyarrow.parquet as pq
>>> table2 = pq.ParquetFile("record_annotated.parquet").read_row_groups([0], ["column.one"])
>>> assert table2.to_pylist() == [
...     {"column": {"one": 10}},
...     {"column": {"one": 20}},
...     {"column": {"one": 30}},
... ]

and if we register the ExtensionType but don't select a column, everything is fine:

Python 3.9.13 | packaged by conda-forge | (main, May 27 2022, 16:56:21) 
[GCC 10.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import pyarrow as pa
>>> import pyarrow.parquet as pq
>>> import json
>>> 
>>> class AnnotatedType(pa.ExtensionType):
...     def __init__(self, storage_type, annotation):
...         self.annotation = annotation
...         super().__init__(storage_type, "my:app")
...     def __arrow_ext_serialize__(self):
...         return json.dumps(self.annotation).encode()
...     @classmethod
...     def __arrow_ext_deserialize__(cls, storage_type, serialized):
...         annotation = json.loads(serialized.decode())
...         print(storage_type, annotation)
...         return cls(storage_type, annotation)
...     @property
...     def num_buffers(self):
...         return self.storage_type.num_buffers
...     @property
...     def num_fields(self):
...         return self.storage_type.num_fields
... 
>>> pa.register_extension_type(AnnotatedType(pa.null(), None))
>>> 
>>> table2 = pq.ParquetFile("record_annotated.parquet").read_row_groups([0])
int64 {'annotated': 'one'}
double {'annotated': 'two'}
int64 {'annotated': 'one'}
double {'annotated': 'two'}
struct<one: extension<my:app<AnnotatedType>> not null, two: extension<my:app<AnnotatedType>> not null> {'annotated': 'record'}
>>> assert table2.to_pylist() == [
...     {"column": {"one": 10, "two": 1.1}},
...     {"column": {"one": 20, "two": 2.2}},
...     {"column": {"one": 30, "two": 3.3}},
... ]
int64 {'annotated': 'one'}
double {'annotated': 'two'}
struct<one: extension<my:app<AnnotatedType>> not null, two: extension<my:app<AnnotatedType>> not null> {'annotated': 'record'}
int64 {'annotated': 'one'}
double {'annotated': 'two'}
struct<one: extension<my:app<AnnotatedType>> not null, two: extension<my:app<AnnotatedType>> not null> {'annotated': 'record'}
int64 {'annotated': 'one'}
double {'annotated': 'two'}
struct<one: extension<my:app<AnnotatedType>> not null, two: extension<my:app<AnnotatedType>> not null> {'annotated': 'record'}
int64 {'annotated': 'one'}
double {'annotated': 'two'}
struct<one: extension<my:app<AnnotatedType>> not null, two: extension<my:app<AnnotatedType>> not null> {'annotated': 'record'}
int64 {'annotated': 'one'}
double {'annotated': 'two'}
struct<one: extension<my:app<AnnotatedType>> not null, two: extension<my:app<AnnotatedType>> not null> {'annotated': 'record'}
int64 {'annotated': 'one'}
double {'annotated': 'two'}
int64 {'annotated': 'one'}
double {'annotated': 'two'}
struct<one: extension<my:app<AnnotatedType>> not null, two: extension<my:app<AnnotatedType>> not null> {'annotated': 'record'}
int64 {'annotated': 'one'}
double {'annotated': 'two'}
int64 {'annotated': 'one'}
double {'annotated': 'two'}
struct<one: extension<my:app<AnnotatedType>> not null, two: extension<my:app<AnnotatedType>> not null> {'annotated': 'record'}
int64 {'annotated': 'one'}
double {'annotated': 'two'}

It's just the case of doing both that causes the segfault.

Reporter: Jim Pivarski / @jpivarski
Watchers: Rok Mihevc / @rok

Note: This issue was originally created as ARROW-17539. Please see the migration documentation for further details.

@jpivarski
Copy link

Update on this issue: we encountered it "in the wild" in dask-contrib/dask-awkward#140. (As a work-around, the user has turned off ExtensionArray/ExtensionType, but that's not a long-term solution because it drops metadata in the Awkward ←→ Arrow conversion.)

@westonpace
Copy link
Member

So I've found the problem but haven't yet worked out the solution. The segmentation fault occurs in reader.cc in GetReader. The handling for extension type is:

  if (type_id == ::arrow::Type::EXTENSION) {
    auto storage_field = arrow_field->WithType(
        checked_cast<const ExtensionType&>(*arrow_field->type()).storage_type());
    RETURN_NOT_OK(GetReader(field, storage_field, ctx, out));
    *out = std::make_unique<ExtensionReader>(arrow_field, std::move(*out));
    return Status::OK();
  }

However, if the nested field is not loaded, then the recursive GetReader call sets out to nullptr and this code creates an ExtensionReader with a null storage reader. This later crashes.

The fix is, unfortunately, not as simple as returning null. The problem is that the Parquet reader is trying to maintain the nested structure. As you see in your example that works, column.one yields a partial struct:

assert table2.to_pylist() == [
    {"column": {"one": 10}},
    {"column": {"one": 20}},
    {"column": {"one": 30}},
]

However, it is not clear that a partial "extension type" is a valid thing. For example, imagine your extension type was a 2DPoint with "x" and "y". What should be returned if the user loads points.x? We can't maintain structure in that case.

I'm a little new to parquet and nested references so I don't know if there is a syntax we can use to ask for the nested columns without structure. In this case you would get:

assert table2.to_pylist() == [
  { "one": 10 },
  { "one": 20 },
  { "one": 30 }
]

I will put together a PR that at least returns an invalid status in this case instead of a segmentation fault.

pitrou added a commit that referenced this issue May 17, 2023
…3634)

When the parquet reader is performing a partial read it will try and maintain field structure.  So, for example, given a schema of `points: struct<x: int32, y:int32>` and a load of `points.x` it will return `points: struct<x: int32>`.  However, if there is an extension type `points: Point` where `Point` has a storage type `struct<x: int32, y: int32>` then suddenly the reference `points.x` no longer makes sense.
* Closes: #20385

Lead-authored-by: Weston Pace <weston.pace@gmail.com>
Co-authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: Antoine Pitrou <antoine@python.org>
@pitrou pitrou added this to the 13.0.0 milestone May 17, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants