Skip to content

Commit

Permalink
apacheGH-40384: [Python] Expand the C Device Interface bindings to su…
Browse files Browse the repository at this point in the history
…pport import on CUDA device (apache#40385)

### Rationale for this change

Follow-up on apache#39979 which added `_export_to_c_device`/`_import_from_c_device` methods, but for now only for CPU devices.

### What changes are included in this PR?

* Ensure `pyarrow.cuda` is imported before importing data through the C Interface, to ensure the CUDA device is registered
* Add tests for exporting/importing with the device interface on CUDA

### Are these changes tested?

Yes, added tests for CUDA.

* GitHub Issue: apache#40384

Authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
  • Loading branch information
jorisvandenbossche committed Jun 19, 2024
1 parent 5252090 commit 89d6354
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 8 deletions.
10 changes: 6 additions & 4 deletions python/pyarrow/array.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -1825,23 +1825,25 @@ cdef class Array(_PandasConvertible):
This is a low-level function intended for expert users.
"""
cdef:
void* c_ptr = _as_c_pointer(in_ptr)
ArrowDeviceArray* c_device_array = <ArrowDeviceArray*>_as_c_pointer(in_ptr)
void* c_type_ptr
shared_ptr[CArray] c_array

if c_device_array.device_type == ARROW_DEVICE_CUDA:
_ensure_cuda_loaded()

c_type = pyarrow_unwrap_data_type(type)
if c_type == nullptr:
# Not a DataType object, perhaps a raw ArrowSchema pointer
c_type_ptr = _as_c_pointer(type)
with nogil:
c_array = GetResultValue(
ImportDeviceArray(<ArrowDeviceArray*> c_ptr,
<ArrowSchema*> c_type_ptr)
ImportDeviceArray(c_device_array, <ArrowSchema*> c_type_ptr)
)
else:
with nogil:
c_array = GetResultValue(
ImportDeviceArray(<ArrowDeviceArray*> c_ptr, c_type)
ImportDeviceArray(c_device_array, c_type)
)
return pyarrow_wrap_array(c_array)

Expand Down
7 changes: 6 additions & 1 deletion python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -2964,8 +2964,13 @@ cdef extern from "arrow/c/abi.h":
cdef struct ArrowArrayStream:
void (*release)(ArrowArrayStream*) noexcept nogil

ctypedef int32_t ArrowDeviceType
cdef ArrowDeviceType ARROW_DEVICE_CUDA

cdef struct ArrowDeviceArray:
pass
ArrowArray array
int64_t device_id
int32_t device_type

cdef extern from "arrow/c/bridge.h" namespace "arrow" nogil:
CStatus ExportType(CDataType&, ArrowSchema* out)
Expand Down
19 changes: 19 additions & 0 deletions python/pyarrow/lib.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ UnionMode_DENSE = _UnionMode_DENSE

__pc = None
__pac = None
__cuda_loaded = None


def _pc():
Expand All @@ -143,6 +144,24 @@ def _pac():
return __pac


def _ensure_cuda_loaded():
# Try importing the cuda module to ensure libarrow_cuda gets loaded
# to register the CUDA device for the C Data Interface import
global __cuda_loaded
if __cuda_loaded is None:
try:
import pyarrow.cuda # no-cython-lint
__cuda_loaded = True
except ImportError as exc:
__cuda_loaded = str(exc)

if __cuda_loaded is not True:
raise ImportError(
"Trying to import data on a CUDA device, but PyArrow is not built with "
f"CUDA support.\n(importing 'pyarrow.cuda' resulted in \"{__cuda_loaded}\")."
)


def _gdb_test_session():
GdbTestSession()

Expand Down
9 changes: 6 additions & 3 deletions python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -3752,21 +3752,24 @@ cdef class RecordBatch(_Tabular):
This is a low-level function intended for expert users.
"""
cdef:
void* c_ptr = _as_c_pointer(in_ptr)
ArrowDeviceArray* c_device_array = <ArrowDeviceArray*>_as_c_pointer(in_ptr)
void* c_schema_ptr
shared_ptr[CRecordBatch] c_batch

if c_device_array.device_type == ARROW_DEVICE_CUDA:
_ensure_cuda_loaded()

c_schema = pyarrow_unwrap_schema(schema)
if c_schema == nullptr:
# Not a Schema object, perhaps a raw ArrowSchema pointer
c_schema_ptr = _as_c_pointer(schema, allow_null=True)
with nogil:
c_batch = GetResultValue(ImportDeviceRecordBatch(
<ArrowDeviceArray*> c_ptr, <ArrowSchema*> c_schema_ptr))
c_device_array, <ArrowSchema*> c_schema_ptr))
else:
with nogil:
c_batch = GetResultValue(ImportDeviceRecordBatch(
<ArrowDeviceArray*> c_ptr, c_schema))
c_device_array, c_schema))
return pyarrow_wrap_batch(c_batch)


Expand Down
21 changes: 21 additions & 0 deletions python/pyarrow/tests/test_cffi.py
Original file line number Diff line number Diff line change
Expand Up @@ -705,3 +705,24 @@ def test_roundtrip_chunked_array_capsule_requested_schema():
ValueError, match="Could not cast string to requested type int64"
):
chunked.__arrow_c_stream__(requested_capsule)


def test_import_device_no_cuda():
try:
import pyarrow.cuda # noqa
except ImportError:
pass
else:
pytest.skip("pyarrow.cuda is available")

c_array = ffi.new("struct ArrowDeviceArray*")
ptr_array = int(ffi.cast("uintptr_t", c_array))
arr = pa.array([1, 2, 3], type=pa.int64())
arr._export_to_c_device(ptr_array)

# patch the device type of the struct, this results in an invalid ArrowDeviceArray
# but this is just to test we raise am error before actually importing buffers
c_array.device_type = 2 # ARROW_DEVICE_CUDA

with pytest.raises(ImportError, match="Trying to import data on a CUDA device"):
pa.Array._import_from_c_device(ptr_array, arr.type)
152 changes: 152 additions & 0 deletions python/pyarrow/tests/test_cuda.py
Original file line number Diff line number Diff line change
Expand Up @@ -792,3 +792,155 @@ def test_IPC(size):
p.start()
p.join()
assert p.exitcode == 0


def _arr_copy_to_host(carr):
# TODO replace below with copy to device when exposed in python
buffers = []
for cbuf in carr.buffers():
if cbuf is None:
buffers.append(None)
else:
buf = global_context.foreign_buffer(
cbuf.address, cbuf.size, cbuf
).copy_to_host()
buffers.append(buf)

child = pa.Array.from_buffers(carr.type.value_type, 3, buffers[2:])
new = pa.Array.from_buffers(carr.type, 2, buffers[:2], children=[child])
return new


def test_device_interface_array():
cffi = pytest.importorskip("pyarrow.cffi")
ffi = cffi.ffi

c_schema = ffi.new("struct ArrowSchema*")
ptr_schema = int(ffi.cast("uintptr_t", c_schema))
c_array = ffi.new("struct ArrowDeviceArray*")
ptr_array = int(ffi.cast("uintptr_t", c_array))

typ = pa.list_(pa.int32())
arr = pa.array([[1], [2, 42]], type=typ)

# TODO replace below with copy to device when exposed in python
cbuffers = []
for buf in arr.buffers():
if buf is None:
cbuffers.append(None)
else:
cbuf = global_context.new_buffer(buf.size)
cbuf.copy_from_host(buf, position=0, nbytes=buf.size)
cbuffers.append(cbuf)

carr = pa.Array.from_buffers(typ, 2, cbuffers[:2], children=[
pa.Array.from_buffers(typ.value_type, 3, cbuffers[2:])
])

# Type is known up front
carr._export_to_c_device(ptr_array)

# verify exported struct
assert c_array.device_type == 2 # ARROW_DEVICE_CUDA 2
assert c_array.device_id == global_context.device_number
assert c_array.array.length == 2

# Delete recreate C++ object from exported pointer
del carr
carr_new = pa.Array._import_from_c_device(ptr_array, typ)
assert carr_new.type == pa.list_(pa.int32())
arr_new = _arr_copy_to_host(carr_new)
assert arr_new.equals(arr)

del carr_new
# Now released
with pytest.raises(ValueError, match="Cannot import released ArrowArray"):
pa.Array._import_from_c_device(ptr_array, typ)

# Schema is exported and imported at the same time
carr = pa.Array.from_buffers(typ, 2, cbuffers[:2], children=[
pa.Array.from_buffers(typ.value_type, 3, cbuffers[2:])
])
carr._export_to_c_device(ptr_array, ptr_schema)
# Delete and recreate C++ objects from exported pointers
del carr
carr_new = pa.Array._import_from_c_device(ptr_array, ptr_schema)
assert carr_new.type == pa.list_(pa.int32())
arr_new = _arr_copy_to_host(carr_new)
assert arr_new.equals(arr)

del carr_new
# Now released
with pytest.raises(ValueError, match="Cannot import released ArrowSchema"):
pa.Array._import_from_c_device(ptr_array, ptr_schema)


def _batch_copy_to_host(cbatch):
# TODO replace below with copy to device when exposed in python
arrs = []
for col in cbatch.columns:
buffers = [
global_context.foreign_buffer(buf.address, buf.size, buf).copy_to_host()
if buf is not None else None
for buf in col.buffers()
]
new = pa.Array.from_buffers(col.type, len(col), buffers)
arrs.append(new)

return pa.RecordBatch.from_arrays(arrs, schema=cbatch.schema)


def test_device_interface_batch_array():
cffi = pytest.importorskip("pyarrow.cffi")
ffi = cffi.ffi

c_schema = ffi.new("struct ArrowSchema*")
ptr_schema = int(ffi.cast("uintptr_t", c_schema))
c_array = ffi.new("struct ArrowDeviceArray*")
ptr_array = int(ffi.cast("uintptr_t", c_array))

batch = make_recordbatch(10)
schema = batch.schema
cbuf = cuda.serialize_record_batch(batch, global_context)
cbatch = cuda.read_record_batch(cbuf, schema)

# Schema is known up front
cbatch._export_to_c_device(ptr_array)

# verify exported struct
assert c_array.device_type == 2 # ARROW_DEVICE_CUDA 2
assert c_array.device_id == global_context.device_number
assert c_array.array.length == 10

# Delete recreate C++ object from exported pointer
del cbatch
cbatch_new = pa.RecordBatch._import_from_c_device(ptr_array, schema)
assert cbatch_new.schema == schema
batch_new = _batch_copy_to_host(cbatch_new)
assert batch_new.equals(batch)

del cbatch_new
# Now released
with pytest.raises(ValueError, match="Cannot import released ArrowArray"):
pa.RecordBatch._import_from_c_device(ptr_array, schema)

# Schema is exported and imported at the same time
cbatch = cuda.read_record_batch(cbuf, schema)
cbatch._export_to_c_device(ptr_array, ptr_schema)
# Delete and recreate C++ objects from exported pointers
del cbatch
cbatch_new = pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema)
assert cbatch_new.schema == schema
batch_new = _batch_copy_to_host(cbatch_new)
assert batch_new.equals(batch)

del cbatch_new
# Now released
with pytest.raises(ValueError, match="Cannot import released ArrowSchema"):
pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema)

# Not a struct type
pa.int32()._export_to_c(ptr_schema)
with pytest.raises(ValueError,
match="ArrowSchema describes non-struct type"):
pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema)

0 comments on commit 89d6354

Please sign in to comment.