Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for arrays in torcharrow.from_arrow #500

Open
grapefroot opened this issue Sep 30, 2022 · 2 comments
Open

Support for arrays in torcharrow.from_arrow #500

grapefroot opened this issue Sep 30, 2022 · 2 comments

Comments

@grapefroot
Copy link

Hi guys!
When trying to use ParquetDataFrameLoader I ran across a problem when trying to load parquet file if it has an array field. It looks like it comes down to torcharrow.from_arrow not supporting array columns. But it seems that torcharrow already has support for array columns. Are there any plans to implement it when loading from parquet files or are there any problems which stop this from being implemented?

The error basically looks like this:

NotImplementedError                       Traceback (most recent call last)
Input In [25], in <cell line: 1>()
----> 1 next(iter(datapipe))

File /opt/conda/lib/python3.8/site-packages/torch/utils/data/datapipes/_typing.py:514, in hook_iterator.<locals>.wrap_generator(*args, **kwargs)
    512         response = gen.send(None)
    513 else:
--> 514     response = gen.send(None)
    516 while True:
    517     request = yield response

File /opt/conda/lib/python3.8/site-packages/torch/utils/data/datapipes/iter/combinatorics.py:127, in ShufflerIterDataPipe.__iter__(self)
    125 self._rng.seed(self._seed)
    126 self._seed = None
--> 127 for x in self.datapipe:
    128     if len(self._buffer) == self.buffer_size:
    129         idx = self._rng.randint(0, len(self._buffer) - 1)

File /opt/conda/lib/python3.8/site-packages/torch/utils/data/datapipes/_typing.py:514, in hook_iterator.<locals>.wrap_generator(*args, **kwargs)
    512         response = gen.send(None)
    513 else:
--> 514     response = gen.send(None)
    516 while True:
    517     request = yield response

File /opt/conda/lib/python3.8/site-packages/torchdata/datapipes/iter/util/dataframemaker.py:138, in ParquetDFLoaderIterDataPipe.__iter__(self)
    135 for i in range(num_row_groups):
    136     # TODO: More fine-grain control over the number of rows or row group per DataFrame
    137     row_group = parquet_file.read_row_group(i, columns=self.columns, use_threads=self.use_threads)
--> 138     yield torcharrow.from_arrow(row_group, dtype=self.dtype)

File /opt/conda/lib/python3.8/site-packages/torcharrow/interop.py:32, in from_arrow(data, dtype, device)
     30     return _from_arrow_array(data, dtype, device=device)
     31 elif isinstance(data, pa.Table):
---> 32     return _from_arrow_table(data, dtype, device=device)
     33 else:
     34     raise ValueError

File /opt/conda/lib/python3.8/site-packages/torcharrow/interop_arrow.py:86, in _from_arrow_table(table, dtype, device)
     83     field = table.schema.field(i)
     85     assert len(table[i].chunks) == 1
---> 86     df_data[field.name] = _from_arrow_array(
     87         table[i].chunk(0),
     88         dtype=(
     89             # pyre-fixme[16]: `DType` has no attribute `get`.
     90             dtype.get(field.name)
     91             if dtype is not None
     92             else _arrowtype_to_dtype(field.type, field.nullable)
     93         ),
     94         device=device,
     95     )
     97 return dataframe(df_data, device=device)

File /opt/conda/lib/python3.8/site-packages/torcharrow/interop_arrow.py:37, in _from_arrow_array(array, dtype, device)
     28 assert isinstance(array, pa.Array)
     30 # Using the most narrow type we can, we (i) don't restrict in any
     31 # way where it can be used (since we can pass a narrower typed
     32 # non-null column to a function expecting a nullable type, but not
   (...)
     35 # increase the amount of places we can use the from_arrow result
     36 # pyre-fixme[16]: `Array` has no attribute `type`.
---> 37 dtype_from_arrowtype = _arrowtype_to_dtype(array.type, array.null_count > 0)
     38 if dtype and (
     39     dt.get_underlying_dtype(dtype) != dt.get_underlying_dtype(dtype_from_arrowtype)
     40 ):
     41     raise NotImplementedError("Type casting is not supported")

File /opt/conda/lib/python3.8/site-packages/torcharrow/_interop.py:205, in _arrowtype_to_dtype(t, nullable)
    199 if pa.types.is_struct(t):
    200     return dt.Struct(
    201         # pyre-fixme[16]: `DataType` has no attribute `__iter__`.
    202         [dt.Field(f.name, _arrowtype_to_dtype(f.type, f.nullable)) for f in t],
    203         nullable,
    204     )
--> 205 raise NotImplementedError(f"Unsupported Arrow type: {str(t)}")

NotImplementedError: Unsupported Arrow type: list<element: float>
This exception is thrown by __iter__ of ParquetDFLoaderIterDataPipe()
@myzha0
Copy link

myzha0 commented Dec 6, 2022

+1, this would be super useful!

Is this something that's planned soon? If not, do you all have any pointers on how to implement this?

Seems like a easy hack would be to just make a wrapper to ListColumnCpu that wraps _from_pysequence here. I assume this requires a data copy though to a native list (e.g., call array.to_pylist() and pass to_from_pysequence)? Would you have suggestions otherwise?

@wenleix
Copy link
Contributor

wenleix commented Dec 18, 2022

thanks @myzha0

Seems like a easy hack would be to just make a wrapper to ListColumnCpu that wraps _from_pysequence here.

Yeah that requires two copies (arrow -> Python objects -> Velox memory buffer). For that approach to unblock, can just do ta.column(array.to_pylist(), _arrowtype_to_dtype(...)) right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants