Skip to content

Commit

Permalink
Implement GPU support in pyarrow
Browse files Browse the repository at this point in the history
Change-Id: Id79eb87983b3e1eb449fd8ecf05def823d655ef2
  • Loading branch information
pearu authored and wesm committed Sep 10, 2018
1 parent a42d4bf commit d237b34
Show file tree
Hide file tree
Showing 10 changed files with 1,686 additions and 7 deletions.
50 changes: 50 additions & 0 deletions cpp/cmake_modules/FindArrow.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ find_library(ARROW_PYTHON_LIB_PATH NAMES arrow_python
NO_DEFAULT_PATH)
get_filename_component(ARROW_PYTHON_LIBS ${ARROW_PYTHON_LIB_PATH} DIRECTORY)

find_library(ARROW_GPU_LIB_PATH NAMES arrow_gpu
PATHS
${ARROW_SEARCH_LIB_PATH}
NO_DEFAULT_PATH)
get_filename_component(ARROW_GPU_LIBS ${ARROW_GPU_LIB_PATH} DIRECTORY)

if (MSVC)
SET(CMAKE_FIND_LIBRARY_SUFFIXES ".lib" ".dll")

Expand All @@ -95,6 +101,13 @@ if (MSVC)
PATH_SUFFIXES "bin" )
get_filename_component(ARROW_SHARED_LIBS ${ARROW_SHARED_LIBRARIES} PATH )
get_filename_component(ARROW_PYTHON_SHARED_LIBS ${ARROW_PYTHON_SHARED_LIBRARIES} PATH )

if (PYARROW_BUILD_ARROW_GPU)
find_library(ARROW_GPU_SHARED_LIBRARIES NAMES arrow_gpu
PATHS ${ARROW_HOME} NO_DEFAULT_PATH
PATH_SUFFIXES "bin" )
get_filename_component(ARROW_GPU_SHARED_LIBS ${ARROW_GPU_SHARED_LIBRARIES} PATH )
endif()
endif ()

if (ARROW_INCLUDE_DIR AND ARROW_LIBS)
Expand All @@ -117,10 +130,41 @@ if (ARROW_INCLUDE_DIR AND ARROW_LIBS)
endif()
endif()

if (PYARROW_BUILD_ARROW_GPU AND ARROW_GPU_LIBS)
set(ARROW_GPU_FOUND TRUE)
set(ARROW_GPU_LIB_NAME arrow_gpu)
if (MSVC)
set(ARROW_GPU_STATIC_LIB ${ARROW_GPU_LIBS}/${ARROW_GPU_LIB_NAME}${ARROW_MSVC_STATIC_LIB_SUFFIX}${CMAKE_STATIC_LIBRARY_SUFFIX})
set(ARROW_GPU_SHARED_LIB ${ARROW_GPU_SHARED_LIBS}/${ARROW_GPU_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
set(ARROW_GPU_SHARED_IMP_LIB ${ARROW_GPU_LIBS}/${ARROW_GPU_LIB_NAME}.lib)
else()
set(ARROW_GPU_STATIC_LIB ${ARROW_LIBS}/lib${ARROW_GPU_LIB_NAME}.a)
set(ARROW_GPU_SHARED_LIB ${ARROW_LIBS}/lib${ARROW_GPU_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
endif()
endif()


if (ARROW_FOUND)
if (NOT Arrow_FIND_QUIETLY)
message(STATUS "Found the Arrow core library: ${ARROW_LIB_PATH}")
message(STATUS "Found the Arrow Python library: ${ARROW_PYTHON_LIB_PATH}")
if (PYARROW_BUILD_ARROW_GPU)
if (ARROW_GPU_FOUND)
message(STATUS "Found the Arrow GPU library: ${ARROW_GPU_LIB_PATH}")
else()
set(ARROW_ERR_MSG "Could not find the Arrow GPU library. Looked for libs")
set(ARROW_ERR_MSG "${ARROW_ERR_MSG} in ${ARROW_SEARCH_LIB_PATH}")
if (Arrow_FIND_REQUIRED)
message(FATAL_ERROR "${ARROW_ERR_MSG}")
else (Arrow_FIND_REQUIRED)
message(STATUS "${ARROW_ERR_MSG}")
endif()
set(ARROW_GPU_FOUND FALSE)
endif()
else()
message(STATUS "Found but not using the Arrow GPU library: ${ARROW_GPU_LIB_PATH}")
set(ARROW_GPU_FOUND FALSE)
endif()
endif ()
else ()
if (NOT Arrow_FIND_QUIETLY)
Expand All @@ -134,6 +178,7 @@ else ()
endif (Arrow_FIND_REQUIRED)
endif ()
set(ARROW_FOUND FALSE)
set(ARROW_GPU_FOUND FALSE)
endif ()

if (MSVC)
Expand All @@ -145,6 +190,9 @@ if (MSVC)
ARROW_PYTHON_STATIC_LIB
ARROW_PYTHON_SHARED_LIB
ARROW_PYTHON_SHARED_IMP_LIB
ARROW_GPU_STATIC_LIB
ARROW_GPU_SHARED_LIB
ARROW_GPU_SHARED_IMP_LIB
)
else()
mark_as_advanced(
Expand All @@ -153,5 +201,7 @@ else()
ARROW_SHARED_LIB
ARROW_PYTHON_STATIC_LIB
ARROW_PYTHON_SHARED_LIB
ARROW_GPU_STATIC_LIB
ARROW_GPU_SHARED_LIB
)
endif()
30 changes: 28 additions & 2 deletions python/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ endif()

# Top level cmake dir
if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
option(PYARROW_BUILD_ARROW_GPU
"Build the PyArrow GPU support"
OFF)
option(PYARROW_BUILD_PARQUET
"Build the PyArrow Parquet integration"
OFF)
Expand Down Expand Up @@ -275,6 +278,12 @@ if (PYARROW_BUNDLE_ARROW_CPP)
ABI_VERSION ${ARROW_ABI_VERSION}
SO_VERSION ${ARROW_SO_VERSION})

if (ARROW_GPU_FOUND)
bundle_arrow_lib(ARROW_GPU_SHARED_LIB
ABI_VERSION ${ARROW_ABI_VERSION}
SO_VERSION ${ARROW_SO_VERSION})
endif()

# boost
if (PYARROW_BOOST_USE_SHARED AND PYARROW_BUNDLE_BOOST)
set(Boost_USE_STATIC_LIBS OFF)
Expand Down Expand Up @@ -305,6 +314,9 @@ if (PYARROW_BUNDLE_ARROW_CPP)
if (MSVC)
bundle_arrow_implib(ARROW_SHARED_IMP_LIB)
bundle_arrow_implib(ARROW_PYTHON_SHARED_IMP_LIB)
if (ARROW_GPU_FOUND)
bundle_arrow_implib(ARROW_GPU_SHARED_IMP_LIB)
endif()
endif()
endif()

Expand All @@ -313,11 +325,19 @@ if (MSVC)
SHARED_LIB ${ARROW_SHARED_IMP_LIB})
ADD_THIRDPARTY_LIB(arrow_python
SHARED_LIB ${ARROW_PYTHON_SHARED_IMP_LIB})
if (ARROW_GPU_FOUND)
ADD_THIRDPARTY_LIB(arrow_gpu
SHARED_LIB ${ARROW_GPU_SHARED_IMP_LIB})
endif()
else()
ADD_THIRDPARTY_LIB(arrow
SHARED_LIB ${ARROW_SHARED_LIB})
ADD_THIRDPARTY_LIB(arrow_python
SHARED_LIB ${ARROW_PYTHON_SHARED_LIB})
if (ARROW_GPU_FOUND)
ADD_THIRDPARTY_LIB(arrow_gpu
SHARED_LIB ${ARROW_GPU_SHARED_LIB})
endif()
endif()

############################################################
Expand All @@ -330,12 +350,18 @@ endif()

set(CYTHON_EXTENSIONS
lib
)
)

set(LINK_LIBS
arrow_shared
arrow_python_shared
)
)


if (ARROW_GPU_FOUND)
set(LINK_LIBS ${LINK_LIBS} arrow_gpu_shared)
set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} lib_gpu)
endif()

if (PYARROW_BUILD_PARQUET)
## Parquet
Expand Down
25 changes: 24 additions & 1 deletion python/pyarrow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,29 @@ def parse_git(root, **kwargs):

import pyarrow.types as types

# GPU support
#

try:
import pyarrow.lib_gpu as _lib_gpu
except ImportError as _msg:
if str(_msg) == "'No module named 'pyarrow.lib_gpu":
has_gpu_support = False
else: # do not silence ImportError in case of any bugs
raise
else:
has_gpu_support = True

if has_gpu_support:
from pyarrow.lib_gpu \
import (CudaDeviceManager, CudaContext, CudaIpcMemHandle,
CudaBuffer, CudaHostBuffer, CudaBufferReader, CudaBufferWriter,
allocate_cuda_host_buffer, cuda_serialize_record_batch,
cuda_read_message, cuda_read_record_batch,
)



# Entry point for starting the plasma store

def _plasma_store_entry_point():
Expand Down Expand Up @@ -205,7 +228,7 @@ def get_library_dirs():
# are not shipped inside the pyarrow package (see also ARROW-2976).
from subprocess import call, PIPE, Popen
pkg_config_executable = _os.environ.get('PKG_CONFIG', None) or 'pkg-config'
for package in ["arrow", "plasma", "arrow_python"]:
for package in ["arrow", "arrow_gpu", "plasma", "arrow_python"]:
cmd = '{0} --exists {1}'.format(pkg_config_executable, package).split()
try:
if call(cmd) == 0:
Expand Down
105 changes: 105 additions & 0 deletions python/pyarrow/includes/libarrow_gpu.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# distutils: language = c++

from pyarrow.includes.libarrow cimport *

cdef extern from "arrow/gpu/cuda_api.h" namespace "arrow::gpu" nogil:

cdef cppclass CCudaDeviceManager" arrow::gpu::CudaDeviceManager":
@staticmethod
CStatus GetInstance(CCudaDeviceManager ** manager)
CStatus GetContext(int gpu_number, shared_ptr[CCudaContext] * ctx)
CStatus CreateNewContext(int gpu_number,
shared_ptr[CCudaContext] * ctx)
CStatus AllocateHost(int64_t nbytes,
shared_ptr[CCudaHostBuffer] * buffer)
CStatus FreeHost(void * data, int64_t nbytes)
int num_devices() const

cdef cppclass CCudaContext" arrow::gpu::CudaContext":
shared_ptr[CCudaContext] shared_from_this()
CStatus Close()
CStatus Allocate(int64_t nbytes, shared_ptr[CCudaBuffer] * out)
CStatus OpenIpcBuffer(const CCudaIpcMemHandle & ipc_handle,
shared_ptr[CCudaBuffer] * buffer)
int64_t bytes_allocated() const

cdef cppclass CCudaIpcMemHandle" arrow::gpu::CudaIpcMemHandle":
@staticmethod
CStatus FromBuffer(const void * opaque_handle,
shared_ptr[CCudaIpcMemHandle] * handle)
CStatus Serialize(CMemoryPool * pool, shared_ptr[CBuffer] * out) const

cdef cppclass CCudaBuffer" arrow::gpu::CudaBuffer"(CBuffer):
CCudaBuffer(uint8_t * data, int64_t size,
const shared_ptr[CCudaContext] & context,
c_bool own_data=false, c_bool is_ipc=false)
CCudaBuffer(const shared_ptr[CCudaBuffer] & parent,
const int64_t offset, const int64_t size)

@staticmethod
CStatus FromBuffer(shared_ptr[CBuffer] buffer,
shared_ptr[CCudaBuffer] * out)

CStatus CopyToHost(const int64_t position, const int64_t nbytes,
void * out) const
CStatus CopyFromHost(const int64_t position, const void * data,
int64_t nbytes)
CStatus ExportForIpc(shared_ptr[CCudaIpcMemHandle] * handle)
shared_ptr[CCudaContext] context() const

cdef cppclass CCudaHostBuffer" arrow::gpu::CudaHostBuffer"(CMutableBuffer):
pass

cdef cppclass \
CCudaBufferReader" arrow::gpu::CudaBufferReader"(CBufferReader):
CCudaBufferReader(const shared_ptr[CBuffer] & buffer)
CStatus Read(int64_t nbytes, int64_t * bytes_read, void * buffer)
CStatus Read(int64_t nbytes, shared_ptr[CBuffer] * out)

cdef cppclass \
CCudaBufferWriter" arrow::gpu::CudaBufferWriter"(WriteableFile):
CCudaBufferWriter(const shared_ptr[CCudaBuffer] & buffer)
CStatus Close()
CStatus Flush()
# CStatus Seek(int64_t position)
CStatus Write(const void * data, int64_t nbytes)
CStatus WriteAt(int64_t position, const void * data, int64_t nbytes)
# CStatus Tell(int64_t* position) const
CStatus SetBufferSize(const int64_t buffer_size)
int64_t buffer_size()
int64_t num_bytes_buffered() const

CStatus AllocateCudaHostBuffer(const int64_t size,
shared_ptr[CCudaHostBuffer] * out)

# Cuda prefix is added to avoid picking up arrow::gpu functions
# from arrow namespace.
CStatus CudaSerializeRecordBatch" arrow::gpu::SerializeRecordBatch"\
(const CRecordBatch & batch,
CCudaContext * ctx,
shared_ptr[CCudaBuffer] * out)
CStatus CudaReadMessage" arrow::gpu::ReadMessage"\
(CCudaBufferReader * reader,
CMemoryPool * pool,
unique_ptr[CMessage] * message)
CStatus CudaReadRecordBatch" arrow::gpu::ReadRecordBatch"\
(const shared_ptr[CSchema] & schema,
const shared_ptr[CCudaBuffer] & buffer,
CMemoryPool * pool, shared_ptr[CRecordBatch] * out)
3 changes: 0 additions & 3 deletions python/pyarrow/ipc.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ cdef class Message:
"""
Container for an Arrow IPC message with metadata and optional body
"""
cdef:
unique_ptr[CMessage] message

def __cinit__(self):
pass

Expand Down
4 changes: 4 additions & 0 deletions python/pyarrow/lib.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ cdef extern from "Python.h":

cdef int check_status(const CStatus& status) nogil except -1

cdef class Message:
cdef:
unique_ptr[CMessage] message


cdef class MemoryPool:
cdef:
Expand Down

0 comments on commit d237b34

Please sign in to comment.