diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 9bdd4f487bdec..bf0bcde14622a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -38,6 +38,7 @@ repos: # files: (/Dockerfile|\.dockerfile)$ files: >- ( + ?^ci/docker/conda-python-emscripten\.dockerfile$| ?^ci/docker/python-wheel-windows-test-vs2019\.dockerfile$| ) types: [] diff --git a/ci/docker/conda-python-emscripten.dockerfile b/ci/docker/conda-python-emscripten.dockerfile new file mode 100644 index 0000000000000..8ad705c920ba8 --- /dev/null +++ b/ci/docker/conda-python-emscripten.dockerfile @@ -0,0 +1,64 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +ARG repo +ARG arch +ARG python="3.12" +FROM ${repo}:${arch}-conda-python-${python} + +ARG selenium_version="4.15.2" +ARG pyodide_version="0.26.0" +ARG chrome_version="latest" +ARG required_python_min="(3,12)" +# fail if python version < 3.12 +RUN echo "check PYTHON>=${required_python_min}" && python -c "import sys;sys.exit(0 if sys.version_info>=${required_python_min} else 1)" + +# install selenium and pyodide-build and recent python + +# needs to be a login shell so ~/.profile is read +SHELL ["/bin/bash", "--login", "-c", "-o", "pipefail"] + +RUN python -m pip install --no-cache-dir selenium==${selenium_version} && \ + python -m pip install --no-cache-dir --upgrade pyodide-build==${pyodide_version} + +# install pyodide dist directory to /pyodide +RUN pyodide_dist_url="https://github.com/pyodide/pyodide/releases/download/${pyodide_version}/pyodide-${pyodide_version}.tar.bz2" && \ + wget -q "${pyodide_dist_url}" -O- | tar -xj -C / + +# install correct version of emscripten for this pyodide +COPY ci/scripts/install_emscripten.sh /arrow/ci/scripts/ +RUN bash /arrow/ci/scripts/install_emscripten.sh ~ /pyodide + +# make sure zlib is cached in the EMSDK folder +RUN source ~/emsdk/emsdk_env.sh && embuilder --pic build zlib + +# install node 20 (needed for async call support) +# and pthread-stubs for build, and unzip needed for chrome build to work +RUN conda install nodejs=20 unzip pthread-stubs make -c conda-forge + +# install chrome for testing browser based runner +COPY ci/scripts/install_chromedriver.sh /arrow/ci/scripts/ +RUN /arrow/ci/scripts/install_chromedriver.sh "${chrome_version}" + +# make the version of make that is installed by conda be available everywhere +# or else pyodide's isolated build fails to find it +RUN ln -s "$(type -P make)" /bin/make + +ENV ARROW_BUILD_TESTS="OFF" \ + ARROW_BUILD_TYPE="release" \ + ARROW_DEPENDENCY_SOURCE="BUNDLED" \ + ARROW_EMSCRIPTEN="ON" diff --git a/ci/scripts/cpp_build.sh b/ci/scripts/cpp_build.sh index 3ee7fbd9d19cd..bc2bba915f73a 100755 --- a/ci/scripts/cpp_build.sh +++ b/ci/scripts/cpp_build.sh @@ -30,7 +30,7 @@ if [ -x "$(command -v git)" ]; then fi # TODO(kszucs): consider to move these to CMake -if [ ! -z "${CONDA_PREFIX}" ]; then +if [ ! -z "${CONDA_PREFIX}" ] && [ "${ARROW_EMSCRIPTEN:-OFF}" = "OFF" ]; then echo -e "===\n=== Conda environment for build\n===" conda list @@ -99,6 +99,10 @@ if [ "${ARROW_EMSCRIPTEN:-OFF}" = "ON" ]; then fi n_jobs=2 # Emscripten build fails on docker unless this is set really low source ~/emsdk/emsdk_env.sh + export CMAKE_INSTALL_PREFIX=$(em-config CACHE)/sysroot + # conda sets LDFLAGS / CFLAGS etc. which break + # emcmake so we unset them + unset LDFLAGS CFLAGS CXXFLAGS CPPFLAGS emcmake cmake \ --preset=ninja-${ARROW_BUILD_TYPE:-debug}-emscripten \ -DCMAKE_VERBOSE_MAKEFILE=${CMAKE_VERBOSE_MAKEFILE:-OFF} \ diff --git a/ci/scripts/install_chromedriver.sh b/ci/scripts/install_chromedriver.sh new file mode 100755 index 0000000000000..9097a20bfc5c9 --- /dev/null +++ b/ci/scripts/install_chromedriver.sh @@ -0,0 +1,44 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Install Chrome and Chromedriver for Selenium + +set -e + +chrome_version=$1 + +if [ $chrome_version = "latest" ]; then + latest_release_path=LATEST_RELEASE_STABLE +else + latest_release_path=LATEST_RELEASE_${chrome_version} +fi +CHROME_VERSION_FULL=$(wget -q --no-verbose -O - "https://googlechromelabs.github.io/chrome-for-testing/${latest_release_path}") +CHROME_DOWNLOAD_URL="https://dl.google.com/linux/chrome/deb/pool/main/g/google-chrome-stable/google-chrome-stable_${CHROME_VERSION_FULL}-1_amd64.deb" +CHROMEDRIVER_DOWNLOAD_URL="https://storage.googleapis.com/chrome-for-testing-public/${CHROME_VERSION_FULL}/linux64/chromedriver-linux64.zip" +wget -q --no-verbose -O /tmp/google-chrome.deb "${CHROME_DOWNLOAD_URL}" +apt-get update +apt install -qqy /tmp/google-chrome.deb +rm -f /tmp/google-chrome.deb +rm -rf /var/lib/apt/lists/* +wget --no-verbose -O /tmp/chromedriver-linux64.zip "${CHROMEDRIVER_DOWNLOAD_URL}" +unzip /tmp/chromedriver-linux64.zip -d /opt/ +rm /tmp/chromedriver-linux64.zip +ln -fs /opt/chromedriver-linux64/chromedriver /usr/local/bin/chromedriver +echo "Using Chrome version: $(google-chrome --version)" +echo "Using Chrome Driver version: $(chromedriver --version)" diff --git a/ci/scripts/install_emscripten.sh b/ci/scripts/install_emscripten.sh new file mode 100755 index 0000000000000..4bad7238a6cdd --- /dev/null +++ b/ci/scripts/install_emscripten.sh @@ -0,0 +1,36 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# install emscripten sdk version to match pyodide in $2 to directory $1/emsdk + +set -e + +target_path=$1 +pyodide_path=$2 + +emscripten_version=$(${pyodide_path}/python -c "import sys;print(*sys._emscripten_info.emscripten_version,sep='.')") + +cd ${target_path} +if [ ! -d emsdk ]; then + git clone https://github.com/emscripten-core/emsdk.git +fi +cd emsdk +./emsdk install ${emscripten_version} +./emsdk activate ${emscripten_version} +echo "Installed emsdk to: ${target_path}" \ No newline at end of file diff --git a/ci/scripts/python_build_emscripten.sh b/ci/scripts/python_build_emscripten.sh new file mode 100755 index 0000000000000..14e9626202079 --- /dev/null +++ b/ci/scripts/python_build_emscripten.sh @@ -0,0 +1,40 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +set -ex + +arrow_dir=${1} +build_dir=${2} + + +source ~/emsdk/emsdk_env.sh + +source_dir=${arrow_dir}/python +python_build_dir=${build_dir}/python + +rm -rf ${python_build_dir} +cp -aL ${source_dir} ${python_build_dir} + +# conda sets LDFLAGS / CFLAGS etc. which break +# emcmake so we unset them +unset LDFLAGS CFLAGS CXXFLAGS CPPFLAGS + +pushd ${python_build_dir} +pyodide build +popd diff --git a/ci/scripts/python_test_emscripten.sh b/ci/scripts/python_test_emscripten.sh new file mode 100755 index 0000000000000..4029722568b9b --- /dev/null +++ b/ci/scripts/python_test_emscripten.sh @@ -0,0 +1,38 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# run tests against Chrome and node.js as representative +# WebAssembly platforms (i.e. one browser, one non-browser). + +set -ex + +build_dir=${1}/python +pyodide_dist_dir=${2} + +cd ${build_dir} + +# note: this uses the newest wheel in dist +pyodide_wheel=$(ls -t dist/pyarrow*.whl | head -1) + +echo "-------------- Running emscripten tests in Node ----------------------" +python scripts/run_emscripten_tests.py ${pyodide_wheel} --dist-dir=${pyodide_dist_dir} --runtime=node + +echo "-------------- Running emscripten tests in Chrome --------------------" +python scripts/run_emscripten_tests.py ${pyodide_wheel} --dist-dir=${pyodide_dist_dir} --runtime=chrome + diff --git a/cpp/CMakePresets.json b/cpp/CMakePresets.json index cb4cdfc03ac82..8886db0e11017 100644 --- a/cpp/CMakePresets.json +++ b/cpp/CMakePresets.json @@ -53,6 +53,7 @@ "ARROW_ACERO": "ON", "ARROW_BUILD_SHARED": "OFF", "ARROW_BUILD_STATIC": "ON", + "ARROW_CSV": "ON", "ARROW_CUDA": "OFF", "ARROW_DEPENDENCY_SOURCE": "BUNDLED", "ARROW_DEPENDENCY_USE_SHARED": "OFF", @@ -60,6 +61,7 @@ "ARROW_FLIGHT": "OFF", "ARROW_IPC": "ON", "ARROW_JEMALLOC": "OFF", + "ARROW_JSON": "ON", "ARROW_MIMALLOC": "OFF", "ARROW_ORC": "ON", "ARROW_RUNTIME_SIMD_LEVEL": "NONE", diff --git a/cpp/cmake_modules/ThirdpartyToolchain.cmake b/cpp/cmake_modules/ThirdpartyToolchain.cmake index 171c85baa86c3..7dab0a362ff24 100644 --- a/cpp/cmake_modules/ThirdpartyToolchain.cmake +++ b/cpp/cmake_modules/ThirdpartyToolchain.cmake @@ -4551,6 +4551,7 @@ macro(build_orc) BUILD_BYPRODUCTS ${ORC_STATIC_LIB} CMAKE_ARGS ${ORC_CMAKE_ARGS} DEPENDS ${ARROW_PROTOBUF_LIBPROTOBUF} + ${ARROW_PROTOBUF_PROTOC} ${ARROW_ZSTD_LIBZSTD} ${Snappy_TARGET} LZ4::lz4 diff --git a/dev/tasks/tasks.yml b/dev/tasks/tasks.yml index 61df283960ccf..7a86fd3e3e75f 100644 --- a/dev/tasks/tasks.yml +++ b/dev/tasks/tasks.yml @@ -1194,6 +1194,15 @@ tasks: image: conda-python {% endfor %} + test-conda-python-emscripten: + ci: github + template: docker-tests/github.linux.yml + params: + env: + UBUNTU: 22.04 + PYTHON: 3.12 + image: conda-python-emscripten + test-conda-python-3.11-hypothesis: ci: github template: docker-tests/github.linux.yml diff --git a/docker-compose.yml b/docker-compose.yml index a2a2b41c8747f..fa248d59037d3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -121,6 +121,7 @@ x-hierarchy: - conda-python-docs - conda-python-cython2 - conda-python-dask + - conda-python-emscripten - conda-python-hdfs - conda-python-java-integration - conda-python-jpype @@ -875,6 +876,38 @@ services: /arrow/ci/scripts/python_build.sh /arrow /build && /arrow/ci/scripts/python_test.sh /arrow"] + conda-python-emscripten: + # Usage: + # docker-compose build conda-python-emscripten + # docker-compose run --rm conda-python-emscripten + # Parameters: + # ARCH: amd64, arm64v8, ... + # UBUNTU: 22.04 + image: ${REPO}:${ARCH}-conda-python-emscripten + build: + context: . + dockerfile: ci/docker/conda-python-emscripten.dockerfile + cache_from: + - ${REPO}:${ARCH}-conda-python-${PYTHON} + args: + repo: ${REPO} + arch: ${ARCH} + clang_tools: ${CLANG_TOOLS} + llvm: ${LLVM} + pyodide_version: "0.26.0" + chrome_version: "122" + selenium_version: "4.15.2" + required_python_min: "(3,12)" + python: ${PYTHON} + shm_size: *shm-size + volumes: *ubuntu-volumes + environment: + <<: [*common, *ccache, *sccache, *cpp] + command: [" + /arrow/ci/scripts/cpp_build.sh /arrow /build && + /arrow/ci/scripts/python_build_emscripten.sh /arrow /build && + /arrow/ci/scripts/python_test_emscripten.sh /build /pyodide"] + ubuntu-cuda-python: # Usage: # docker-compose build cuda-cpp diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index a8bbed117163d..980a63133c83c 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -143,6 +143,21 @@ if(NOT DEFINED ARROW_RUNTIME_SIMD_LEVEL) endif() include(SetupCxxFlags) +if($ENV{PYODIDE}) + # These variables are needed for building PyArrow on Emscripten. + # If they aren't set, CMake cross compiling fails for Python + # modules (at least under Pyodide it does). + set(Python3_INCLUDE_DIR $ENV{PYTHONINCLUDE}) + set(Python3_LIBRARY $ENV{CPYTHONLIB}) + set(Python3_NumPy_INCLUDE_DIR $ENV{NUMPY_LIB}/core/include) + set(Python3_EXECUTABLE) + set(ENV{_PYTHON_SYSCONFIGDATA_NAME} $ENV{SYSCONFIG_NAME}) + # we set the c and cxx compiler manually to bypass pywasmcross + # which is pyodide's way of messing with C++ build parameters. + set(CMAKE_C_COMPILER emcc) + set(CMAKE_CXX_COMPILER em++) +endif() + # Add common flags set(CMAKE_CXX_FLAGS "${CXX_COMMON_FLAGS} ${CMAKE_CXX_FLAGS}") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${PYARROW_CXXFLAGS}") @@ -344,6 +359,27 @@ set(PYARROW_CPP_LINK_LIBS "") # # Check all the options from Arrow and PyArrow C++ to be in line +# +# Order is important for "NOT ARROW_BUILD_SHARED". We must use +# depending modules -> depended modules order. For example, +# ArrowSubstrait depends on ArrowDataset. So PYARROW_CPP_LINK_LIBS +# must use +# "ArrowSubstrait::arrow_substrait_static;ArrowDataset::arrow_dataset_static" +# order. + +if(PYARROW_BUILD_SUBSTRAIT) + message(STATUS "Building PyArrow with Substrait") + if(NOT ARROW_SUBSTRAIT) + message(FATAL_ERROR "You must build Arrow C++ with ARROW_SUBSTRAIT=ON") + endif() + find_package(ArrowSubstrait REQUIRED) + if(ARROW_BUILD_SHARED) + list(APPEND PYARROW_CPP_LINK_LIBS ArrowSubstrait::arrow_substrait_shared) + else() + list(APPEND PYARROW_CPP_LINK_LIBS ArrowSubstrait::arrow_substrait_static) + endif() +endif() + if(PYARROW_BUILD_DATASET) message(STATUS "Building PyArrow with Dataset") if(NOT ARROW_DATASET) @@ -414,7 +450,17 @@ if(NOT CMAKE_VERSION VERSION_LESS 3.16) target_precompile_headers(arrow_python PUBLIC "$<$:arrow/python/pch.h>") endif() -target_link_libraries(arrow_python PUBLIC ${PYARROW_CPP_LINK_LIBS} Python3::NumPy) + +# on static builds we need to be careful not to link PYARROW_CPP_LINK_LIBS +# into everything depending on arrow_python, or else we get duplicate +# libraries. Whereas conversely on shared builds, we need everything +# to depend on everything, as python loads modules separately +if(ARROW_BUILD_SHARED) + target_link_libraries(arrow_python PUBLIC ${PYARROW_CPP_LINK_LIBS}) +else() + target_link_libraries(arrow_python PRIVATE ${PYARROW_CPP_LINK_LIBS}) +endif() +target_link_libraries(arrow_python PUBLIC Python3::NumPy) target_compile_definitions(arrow_python PRIVATE ARROW_PYTHON_EXPORTING) install(TARGETS arrow_python ARCHIVE DESTINATION . @@ -650,27 +696,37 @@ endif() # Acero if(PYARROW_BUILD_ACERO) - if(PYARROW_BUNDLE_ARROW_CPP) - bundle_arrow_lib(${ARROW_ACERO_SHARED_LIB} SO_VERSION ${ARROW_SO_VERSION}) - if(MSVC) - bundle_arrow_import_lib(${ARROW_ACERO_IMPORT_LIB}) + if(ARROW_BUILD_SHARED) + if(PYARROW_BUNDLE_ARROW_CPP) + bundle_arrow_lib(${ARROW_ACERO_SHARED_LIB} SO_VERSION ${ARROW_SO_VERSION}) + if(MSVC) + bundle_arrow_import_lib(${ARROW_ACERO_IMPORT_LIB}) + endif() endif() - endif() - set(ACERO_LINK_LIBS ArrowAcero::arrow_acero_shared) + set(ACERO_LINK_LIBS ArrowAcero::arrow_acero_shared) + else() + # Acero is statically linked into libarrow_python already + set(ACERO_LINK_LIBS) + endif() list(APPEND CYTHON_EXTENSIONS _acero) endif() # Dataset if(PYARROW_BUILD_DATASET) - if(PYARROW_BUNDLE_ARROW_CPP) - bundle_arrow_lib(${ARROW_DATASET_SHARED_LIB} SO_VERSION ${ARROW_SO_VERSION}) - if(MSVC) - bundle_arrow_import_lib(${ARROW_DATASET_IMPORT_LIB}) + if(ARROW_BUILD_SHARED) + if(PYARROW_BUNDLE_ARROW_CPP) + bundle_arrow_lib(${ARROW_DATASET_SHARED_LIB} SO_VERSION ${ARROW_SO_VERSION}) + if(MSVC) + bundle_arrow_import_lib(${ARROW_DATASET_IMPORT_LIB}) + endif() endif() - endif() - set(DATASET_LINK_LIBS ArrowDataset::arrow_dataset_shared) + set(DATASET_LINK_LIBS ArrowDataset::arrow_dataset_shared) + else() + # dataset is statically linked into libarrow_python already + set(DATASET_LINK_LIBS) + endif() list(APPEND CYTHON_EXTENSIONS _dataset) endif() @@ -692,7 +748,9 @@ if(PYARROW_BUILD_PARQUET) endif() set(PARQUET_LINK_LIBS Parquet::parquet_shared) else() - set(PARQUET_LINK_LIBS Parquet::parquet_static) + # don't link the static lib as it is + # already in arrow_python + set(PARQUET_LINK_LIBS) endif() list(APPEND CYTHON_EXTENSIONS _parquet) if(PYARROW_BUILD_PARQUET_ENCRYPTION) @@ -741,18 +799,20 @@ endif() # Substrait if(PYARROW_BUILD_SUBSTRAIT) message(STATUS "Building PyArrow with Substrait") - if(NOT ARROW_SUBSTRAIT) - message(FATAL_ERROR "You must build Arrow C++ with ARROW_SUBSTRAIT=ON") - endif() - find_package(ArrowSubstrait REQUIRED) - if(PYARROW_BUNDLE_ARROW_CPP) - bundle_arrow_lib(${ARROW_SUBSTRAIT_SHARED_LIB} SO_VERSION ${ARROW_SO_VERSION}) - if(MSVC) - bundle_arrow_import_lib(${ARROW_SUBSTRAIT_IMPORT_LIB}) + + if(ARROW_BUILD_SHARED) + if(PYARROW_BUNDLE_ARROW_CPP) + bundle_arrow_lib(${ARROW_SUBSTRAIT_SHARED_LIB} SO_VERSION ${ARROW_SO_VERSION}) + if(MSVC) + bundle_arrow_import_lib(${ARROW_SUBSTRAIT_IMPORT_LIB}) + endif() endif() + set(SUBSTRAIT_LINK_LIBS ArrowSubstrait::arrow_substrait_shared) + else() + # Arrow Substrait is statically linked into libarrow_python already + set(SUBSTRAIT_LINK_LIBS) endif() - set(SUBSTRAIT_LINK_LIBS ArrowSubstrait::arrow_substrait_shared) list(APPEND CYTHON_EXTENSIONS _substrait) endif() diff --git a/python/pyarrow/_dataset_parquet.pyx b/python/pyarrow/_dataset_parquet.pyx index a7afd065b592e..8fe9f30d33af9 100644 --- a/python/pyarrow/_dataset_parquet.pyx +++ b/python/pyarrow/_dataset_parquet.pyx @@ -27,7 +27,7 @@ import warnings import pyarrow as pa from pyarrow.lib cimport * -from pyarrow.lib import frombytes, tobytes +from pyarrow.lib import frombytes, tobytes, is_threading_enabled from pyarrow.includes.libarrow cimport * from pyarrow.includes.libarrow_dataset cimport * from pyarrow.includes.libarrow_dataset_parquet cimport * @@ -739,6 +739,8 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): new CParquetFragmentScanOptions())) self.use_buffered_stream = use_buffered_stream self.buffer_size = buffer_size + if pre_buffer and not is_threading_enabled(): + pre_buffer = False self.pre_buffer = pre_buffer if cache_options is not None: self.cache_options = cache_options @@ -789,6 +791,8 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): @pre_buffer.setter def pre_buffer(self, bint pre_buffer): + if pre_buffer and not is_threading_enabled(): + return self.arrow_reader_properties().set_pre_buffer(pre_buffer) @property diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 6c5b0af826b4e..41b15b633d3d2 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -42,7 +42,7 @@ from pyarrow.lib cimport (_Weakrefable, Buffer, Schema, from pyarrow.lib import (ArrowException, NativeFile, BufferOutputStream, _stringify_path, - tobytes, frombytes) + tobytes, frombytes, is_threading_enabled) cimport cpython as cp @@ -1453,6 +1453,9 @@ cdef class ParquetReader(_Weakrefable): default_arrow_reader_properties()) FileReaderBuilder builder + if pre_buffer and not is_threading_enabled(): + pre_buffer = False + if metadata is not None: c_metadata = metadata.sp_metadata @@ -1555,7 +1558,10 @@ cdef class ParquetReader(_Weakrefable): ---------- use_threads : bool """ - self.reader.get().set_use_threads(use_threads) + if is_threading_enabled(): + self.reader.get().set_use_threads(use_threads) + else: + self.reader.get().set_use_threads(False) def set_batch_size(self, int64_t batch_size): """ diff --git a/python/pyarrow/conftest.py b/python/pyarrow/conftest.py index 2ac8427de17e7..29c850c142da1 100644 --- a/python/pyarrow/conftest.py +++ b/python/pyarrow/conftest.py @@ -16,9 +16,14 @@ # under the License. import pytest + +import os import pyarrow as pa from pyarrow import Codec from pyarrow import fs +from pyarrow.lib import is_threading_enabled +from pyarrow.tests.util import windows_has_tzdata +import sys import numpy as np @@ -31,6 +36,7 @@ 'dataset', 'hypothesis', 'fastparquet', + 'flight', 'gandiva', 'gcs', 'gdb', @@ -44,12 +50,15 @@ 'pandas', 'parquet', 'parquet_encryption', + 'processes', + 'requires_testing_data', 's3', + 'slow', 'snappy', + 'sockets', 'substrait', - 'flight', - 'slow', - 'requires_testing_data', + 'threading', + 'timezone_data', 'zstd', ] @@ -76,14 +85,31 @@ 'pandas': False, 'parquet': False, 'parquet_encryption': False, + 'processes': True, 'requires_testing_data': True, 's3': False, 'slow': False, 'snappy': Codec.is_available('snappy'), + 'sockets': True, 'substrait': False, + 'threading': is_threading_enabled(), + 'timezone_data': True, 'zstd': Codec.is_available('zstd'), } +if sys.platform == "emscripten": + # Emscripten doesn't support subprocess, + # multiprocessing, gdb or socket based + # networking + defaults['gdb'] = False + defaults['processes'] = False + defaults['sockets'] = False + +if sys.platform == "win32": + defaults['timezone_data'] = windows_has_tzdata() +elif sys.platform == "emscripten": + defaults['timezone_data'] = os.path.exists("/usr/share/zoneinfo") + try: import cython # noqa defaults['cython'] = True @@ -116,7 +142,13 @@ try: import pyarrow.orc # noqa - defaults['orc'] = True + if sys.platform == "win32": + defaults['orc'] = True + else: + # orc tests on non-Windows platforms only work + # if timezone data exists, so skip them if + # not. + defaults['orc'] = defaults['timezone_data'] except ImportError: pass diff --git a/python/pyarrow/error.pxi b/python/pyarrow/error.pxi index 4357cde32c31d..cbe25522e8d7e 100644 --- a/python/pyarrow/error.pxi +++ b/python/pyarrow/error.pxi @@ -26,6 +26,7 @@ import os import signal import threading +from pyarrow.lib import is_threading_enabled from pyarrow.util import _break_traceback_cycle_from_frame @@ -217,7 +218,9 @@ cdef class SignalStopHandler: maybe_source.status().Warn() else: self._stop_token.init(deref(maybe_source).token()) - self._enabled = True + # signals don't work on Emscripten without threads. + # and possibly other single-thread environments. + self._enabled = is_threading_enabled() def _init_signals(self): if (signal_handlers_enabled and diff --git a/python/pyarrow/includes/libarrow_python.pxd b/python/pyarrow/includes/libarrow_python.pxd index 136d6bc8b14cd..9fcc97aaf0a9c 100644 --- a/python/pyarrow/includes/libarrow_python.pxd +++ b/python/pyarrow/includes/libarrow_python.pxd @@ -317,3 +317,6 @@ cdef extern from "arrow/python/benchmark.h" namespace "arrow::py::benchmark": cdef extern from "arrow/python/gdb.h" namespace "arrow::gdb" nogil: void GdbTestSession "arrow::gdb::TestSession"() + +cdef extern from "arrow/python/helpers.h" namespace "arrow::py::internal": + c_bool IsThreadingEnabled() diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index 3a0125e957244..1d942e8ccabc6 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -659,6 +659,9 @@ cdef class NativeFile(_Weakrefable): int64_t bytes_read = 0 uint8_t* buf + if not is_threading_enabled(): + return self._download_nothreads(stream_or_path, buffer_size) + handle = self.get_input_stream() buffer_size = buffer_size or DEFAULT_BUFFER_SIZE @@ -738,6 +741,63 @@ cdef class NativeFile(_Weakrefable): if exc_info is not None: raise exc_info[0], exc_info[1], exc_info[2] + def _download_nothreads(self, stream_or_path, buffer_size=None): + """ + Internal method to do a download without separate threads, queues etc. + Called by download above if is_threading_enabled() == False + """ + cdef: + int64_t bytes_read = 0 + uint8_t* buf + + handle = self.get_input_stream() + + buffer_size = buffer_size or DEFAULT_BUFFER_SIZE + + if not hasattr(stream_or_path, 'read'): + stream = open(stream_or_path, 'wb') + + def cleanup(): + stream.close() + else: + stream = stream_or_path + + def cleanup(): + pass + + self.seek(0) + + # This isn't ideal -- PyBytes_FromStringAndSize copies the data from + # the passed buffer, so it's hard for us to avoid doubling the memory + buf = malloc(buffer_size) + if buf == NULL: + raise MemoryError("Failed to allocate {0} bytes" + .format(buffer_size)) + + cdef int64_t total_bytes = 0 + cdef int32_t c_buffer_size = buffer_size + + try: + while True: + with nogil: + bytes_read = GetResultValue( + handle.get().Read(c_buffer_size, buf)) + + total_bytes += bytes_read + + # EOF + if bytes_read == 0: + break + + pybuf = cp.PyBytes_FromStringAndSize(buf, + bytes_read) + + # no background thread - write on main thread + stream.write(pybuf) + finally: + free(buf) + cleanup() + def upload(self, stream, buffer_size=None): """ Write from a source stream to this file. @@ -749,6 +809,9 @@ cdef class NativeFile(_Weakrefable): buffer_size : int, optional The buffer size to use for data transfers. """ + if not is_threading_enabled(): + return self._upload_nothreads(stream, buffer_size) + write_queue = Queue(50) self._assert_writable() @@ -793,6 +856,24 @@ cdef class NativeFile(_Weakrefable): if exc_info is not None: raise exc_info[0], exc_info[1], exc_info[2] + def _upload_nothreads(self, stream, buffer_size=None): + """ + Internal method to do an upload without separate threads, queues etc. + Called by upload above if is_threading_enabled() == False + """ + self._assert_writable() + + buffer_size = buffer_size or DEFAULT_BUFFER_SIZE + + while True: + buf = stream.read(buffer_size) + if not buf: + break + + # no threading - just write + self.write(buf) + + BufferedIOBase.register(NativeFile) # ---------------------------------------------------------------------- diff --git a/python/pyarrow/lib.pyx b/python/pyarrow/lib.pyx index e08021c62b5ae..c72841c299566 100644 --- a/python/pyarrow/lib.pyx +++ b/python/pyarrow/lib.pyx @@ -79,6 +79,17 @@ def set_cpu_count(int count): check_status(SetCpuThreadPoolCapacity(count)) +def is_threading_enabled() -> bool: + """ + Returns True if threading is enabled in libarrow. + + If it isn't enabled, then python shouldn't create any + threads either, because we're probably on a system where + threading doesn't work (e.g. Emscripten). + """ + return libarrow_python.IsThreadingEnabled() + + Type_NA = _Type_NA Type_BOOL = _Type_BOOL Type_UINT8 = _Type_UINT8 diff --git a/python/pyarrow/pandas_compat.py b/python/pyarrow/pandas_compat.py index c23c64d532b66..fcccf564fc619 100644 --- a/python/pyarrow/pandas_compat.py +++ b/python/pyarrow/pandas_compat.py @@ -33,7 +33,7 @@ import numpy as np import pyarrow as pa -from pyarrow.lib import _pandas_api, frombytes # noqa +from pyarrow.lib import _pandas_api, frombytes, is_threading_enabled # noqa _logical_type_map = {} @@ -581,6 +581,9 @@ def dataframe_to_arrays(df, schema, preserve_index, nthreads=1, columns=None, nthreads = pa.cpu_count() else: nthreads = 1 + # if we don't have threading in libarrow, don't use threading here either + if not is_threading_enabled(): + nthreads = 1 def convert_column(col, field): if field is None: diff --git a/python/pyarrow/src/arrow/python/helpers.cc b/python/pyarrow/src/arrow/python/helpers.cc index 2c86c86a919be..18302e6fe0401 100644 --- a/python/pyarrow/src/arrow/python/helpers.cc +++ b/python/pyarrow/src/arrow/python/helpers.cc @@ -29,6 +29,7 @@ #include "arrow/python/decimal.h" #include "arrow/type_fwd.h" #include "arrow/util/checked_cast.h" +#include "arrow/util/config.h" #include "arrow/util/logging.h" namespace arrow { @@ -467,6 +468,14 @@ void DebugPrint(PyObject* obj) { PySys_WriteStderr("%s\n", repr.c_str()); } +bool IsThreadingEnabled() { +#ifdef ARROW_ENABLE_THREADING + return true; +#else + return false; +#endif +} + } // namespace internal } // namespace py } // namespace arrow diff --git a/python/pyarrow/src/arrow/python/helpers.h b/python/pyarrow/src/arrow/python/helpers.h index a8e5f80b60678..e2fd8212ae68d 100644 --- a/python/pyarrow/src/arrow/python/helpers.h +++ b/python/pyarrow/src/arrow/python/helpers.h @@ -154,6 +154,9 @@ Status IntegerScalarToFloat32Safe(PyObject* obj, float* result); // \brief Print Python object __repr__ void DebugPrint(PyObject* obj); +ARROW_PYTHON_EXPORT +bool IsThreadingEnabled(); + } // namespace internal } // namespace py } // namespace arrow diff --git a/python/pyarrow/tests/test_array.py b/python/pyarrow/tests/test_array.py index 78d06b26e3622..30d258b9aabd8 100644 --- a/python/pyarrow/tests/test_array.py +++ b/python/pyarrow/tests/test_array.py @@ -34,6 +34,7 @@ from pyarrow.vendored.version import Version +@pytest.mark.processes def test_total_bytes_allocated(): code = """if 1: import pyarrow as pa diff --git a/python/pyarrow/tests/test_cffi.py b/python/pyarrow/tests/test_cffi.py index 70841eeb0619a..e994a09f92ed2 100644 --- a/python/pyarrow/tests/test_cffi.py +++ b/python/pyarrow/tests/test_cffi.py @@ -749,6 +749,7 @@ def test_roundtrip_chunked_array_capsule_requested_schema(): chunked.__arrow_c_stream__(requested_capsule) +@needs_cffi def test_import_device_no_cuda(): try: import pyarrow.cuda # noqa diff --git a/python/pyarrow/tests/test_compute.py b/python/pyarrow/tests/test_compute.py index e7d7b9769740f..13e30ed1da493 100644 --- a/python/pyarrow/tests/test_compute.py +++ b/python/pyarrow/tests/test_compute.py @@ -38,7 +38,6 @@ import pyarrow as pa import pyarrow.compute as pc from pyarrow.lib import ArrowNotImplementedError -from pyarrow.tests import util try: import pyarrow.substrait as pas @@ -137,7 +136,7 @@ def test_exported_option_classes(): @pytest.mark.filterwarnings( "ignore:pyarrow.CumulativeSumOptions is deprecated as of 14.0" ) -def test_option_class_equality(): +def test_option_class_equality(request): options = [ pc.ArraySortOptions(), pc.AssumeTimezoneOptions("UTC"), @@ -193,17 +192,17 @@ def test_option_class_equality(): pc.WeekOptions(week_starts_monday=True, count_from_zero=False, first_week_is_fully_in_year=False), ] - # Timezone database might not be installed on Windows - if sys.platform != "win32" or util.windows_has_tzdata(): + # Timezone database might not be installed on Windows or Emscripten + if request.config.pyarrow.is_enabled["timezone_data"]: options.append(pc.AssumeTimezoneOptions("Europe/Ljubljana")) classes = {type(option) for option in options} for cls in exported_option_classes: - # Timezone database might not be installed on Windows + # Timezone database might not be installed on Windows or Emscripten if ( cls not in classes - and (sys.platform != "win32" or util.windows_has_tzdata()) + and (request.config.pyarrow.is_enabled["timezone_data"]) and cls != pc.AssumeTimezoneOptions ): try: @@ -2085,8 +2084,7 @@ def test_strptime(): @pytest.mark.pandas -@pytest.mark.skipif(sys.platform == "win32" and not util.windows_has_tzdata(), - reason="Timezone database is not installed on Windows") +@pytest.mark.timezone_data def test_strftime(): times = ["2018-03-10 09:00", "2038-01-31 12:23", None] timezones = ["CET", "UTC", "Europe/Ljubljana"] @@ -2245,7 +2243,7 @@ def _check_datetime_components(timestamps, timezone=None): @pytest.mark.pandas -def test_extract_datetime_components(): +def test_extract_datetime_components(request): timestamps = ["1970-01-01T00:00:59.123456789", "2000-02-29T23:23:23.999999999", "2033-05-18T03:33:20.000000000", @@ -2268,7 +2266,7 @@ def test_extract_datetime_components(): _check_datetime_components(timestamps) # Test timezone aware timestamp array - if sys.platform == "win32" and not util.windows_has_tzdata(): + if not request.config.pyarrow.is_enabled["timezone_data"]: pytest.skip('Timezone database is not installed on Windows') else: for timezone in timezones: @@ -2289,8 +2287,7 @@ def test_iso_calendar_longer_array(unit): @pytest.mark.pandas -@pytest.mark.skipif(sys.platform == "win32" and not util.windows_has_tzdata(), - reason="Timezone database is not installed on Windows") +@pytest.mark.timezone_data def test_assume_timezone(): ts_type = pa.timestamp("ns") timestamps = pd.to_datetime(["1970-01-01T00:00:59.123456789", @@ -2485,8 +2482,7 @@ def _check_temporal_rounding(ts, values, unit): np.testing.assert_array_equal(result, expected) -@pytest.mark.skipif(sys.platform == "win32" and not util.windows_has_tzdata(), - reason="Timezone database is not installed on Windows") +@pytest.mark.timezone_data @pytest.mark.parametrize('unit', ("nanosecond", "microsecond", "millisecond", "second", "minute", "hour", "day")) @pytest.mark.pandas diff --git a/python/pyarrow/tests/test_convert_builtin.py b/python/pyarrow/tests/test_convert_builtin.py index b824b89564374..6140163a8ee8c 100644 --- a/python/pyarrow/tests/test_convert_builtin.py +++ b/python/pyarrow/tests/test_convert_builtin.py @@ -21,7 +21,6 @@ import itertools import math import re -import sys import hypothesis as h import numpy as np @@ -29,7 +28,6 @@ from pyarrow.pandas_compat import _pandas_api # noqa import pyarrow as pa -from pyarrow.tests import util import pyarrow.tests.strategies as past @@ -1161,6 +1159,7 @@ def test_sequence_timestamp_with_timezone_inference(): assert arr.type == expected_type +@pytest.mark.timezone_data def test_sequence_timestamp_with_zoneinfo_timezone_inference(): pytest.importorskip("zoneinfo") import zoneinfo @@ -1354,8 +1353,7 @@ def test_sequence_timestamp_nanoseconds(): @pytest.mark.pandas -@pytest.mark.skipif(sys.platform == "win32" and not util.windows_has_tzdata(), - reason="Timezone database is not installed on Windows") +@pytest.mark.timezone_data def test_sequence_timestamp_from_int_with_unit(): # TODO(wesm): This test might be rewritten to assert the actual behavior # when pandas is not installed diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py index 9ddb5197e9120..112129d9602ed 100644 --- a/python/pyarrow/tests/test_csv.py +++ b/python/pyarrow/tests/test_csv.py @@ -1406,6 +1406,7 @@ def test_stress_convert_options_blowup(self): assert table.num_rows == 0 assert table.column_names == col_names + @pytest.mark.threading def test_cancellation(self): if (threading.current_thread().ident != threading.main_thread().ident): @@ -1475,6 +1476,7 @@ def signal_from_thread(): assert isinstance(e, pa.ArrowCancelled) assert e.signum == signum + @pytest.mark.threading def test_cancellation_disabled(self): # ARROW-12622: reader would segfault when the cancelling signal # handler was not enabled (e.g. if disabled, or if not on the @@ -1825,6 +1827,7 @@ def use_threads(self): return False +@pytest.mark.threading class TestThreadedStreamingCSVRead(BaseStreamingCSVRead): @property def use_threads(self): diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 0b79218fb0018..3b0284bcb74a6 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -37,6 +37,7 @@ import pyarrow.feather import pyarrow.fs as fs import pyarrow.json +from pyarrow.lib import is_threading_enabled from pyarrow.tests.util import (FSProtocolClass, ProxyHandler, _configure_s3_limited_user, _filesystem_uri, change_cwd) @@ -808,29 +809,34 @@ def test_parquet_scan_options(): assert opts1.use_buffered_stream is False assert opts1.buffer_size == 2**13 - assert opts1.pre_buffer is True + if is_threading_enabled(): # pre buffering requires threads + assert opts1.pre_buffer is True assert opts1.thrift_string_size_limit == 100_000_000 # default in C++ assert opts1.thrift_container_size_limit == 1_000_000 # default in C++ assert opts1.page_checksum_verification is False assert opts2.use_buffered_stream is False assert opts2.buffer_size == 2**12 - assert opts2.pre_buffer is True + if is_threading_enabled(): # pre buffering requires threads + assert opts2.pre_buffer is True assert opts3.use_buffered_stream is True assert opts3.buffer_size == 2**13 - assert opts3.pre_buffer is True + if is_threading_enabled(): # pre buffering requires threads + assert opts3.pre_buffer is True assert opts4.use_buffered_stream is False assert opts4.buffer_size == 2**13 - assert opts4.pre_buffer is False + if is_threading_enabled(): # pre buffering requires threads + assert opts4.pre_buffer is False assert opts5.thrift_string_size_limit == 123456 assert opts5.thrift_container_size_limit == 987654 assert opts6.page_checksum_verification is True - assert opts7.pre_buffer is True + if is_threading_enabled(): # pre buffering requires threads + assert opts7.pre_buffer is True assert opts7.cache_options == cache_opts assert opts7.cache_options != opts1.cache_options @@ -4106,6 +4112,7 @@ def test_write_dataset_with_scanner(tempdir): @pytest.mark.parquet +@pytest.mark.threading def test_write_dataset_with_backpressure(tempdir): consumer_gate = threading.Event() diff --git a/python/pyarrow/tests/test_flight.py b/python/pyarrow/tests/test_flight.py index 9553dc2507225..4853dafc76c72 100644 --- a/python/pyarrow/tests/test_flight.py +++ b/python/pyarrow/tests/test_flight.py @@ -960,6 +960,7 @@ def test_server_exit_reraises_exception(): raise ValueError() +@pytest.mark.threading @pytest.mark.slow def test_client_wait_for_available(): location = ('localhost', find_free_port()) @@ -1603,6 +1604,7 @@ def test_cancel_do_get(): reader.read_chunk() +@pytest.mark.threading @pytest.mark.slow def test_cancel_do_get_threaded(): """Test canceling a DoGet operation from another thread.""" @@ -2067,6 +2069,7 @@ def do_exchange(self, context, descriptor, reader, writer): time.sleep(0.5) +@pytest.mark.threading def test_interrupt(): if threading.current_thread().ident != threading.main_thread().ident: pytest.skip("test only works from main Python thread") diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py index 412daa2bd9ea1..f8ce74700dea8 100644 --- a/python/pyarrow/tests/test_fs.py +++ b/python/pyarrow/tests/test_fs.py @@ -1987,6 +1987,8 @@ def test_s3_finalize_region_resolver(): subprocess.check_call([sys.executable, "-c", code]) +@pytest.mark.processes +@pytest.mark.threading @pytest.mark.s3 def test_concurrent_s3fs_init(): # GH-39897: lazy concurrent initialization of S3 subsystem should not crash diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py index 2306014c4194a..ef499a3a8d76c 100644 --- a/python/pyarrow/tests/test_io.py +++ b/python/pyarrow/tests/test_io.py @@ -37,7 +37,7 @@ def check_large_seeks(file_factory, expected_error=None): - if sys.platform in ('win32', 'darwin'): + if sys.platform in ('win32', 'darwin', 'emscripten'): pytest.skip("need sparse file support") try: filename = tempfile.mktemp(prefix='test_io') @@ -1143,6 +1143,8 @@ def _try_delete(path): def test_memory_map_writer(tmpdir): + if sys.platform == "emscripten": + pytest.xfail("Multiple memory maps to same file don't work on emscripten") SIZE = 4096 arr = np.random.randint(0, 256, size=SIZE).astype('u1') data = arr.tobytes()[:SIZE] @@ -1334,6 +1336,9 @@ def test_native_file_modes(tmpdir): assert f.seekable() +@pytest.mark.xfail( + sys.platform == "emscripten", reason="umask doesn't work on Emscripten" +) def test_native_file_permissions(tmpdir): # ARROW-10124: permissions of created files should follow umask cur_umask = os.umask(0o002) diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py index d8eb6e926e4c0..1e5242efe40f0 100644 --- a/python/pyarrow/tests/test_ipc.py +++ b/python/pyarrow/tests/test_ipc.py @@ -242,6 +242,7 @@ def test_empty_stream(): @pytest.mark.pandas +@pytest.mark.processes def test_read_year_month_nano_interval(tmpdir): """ARROW-15783: Verify to_pandas works for interval types. @@ -895,6 +896,7 @@ def socket_fixture(): return SocketStreamFixture() +@pytest.mark.sockets def test_socket_simple_roundtrip(socket_fixture): socket_fixture.start_server(do_read_all=False) writer_batches = socket_fixture.write_batches() @@ -906,6 +908,7 @@ def test_socket_simple_roundtrip(socket_fixture): assert reader_batches[i].equals(batch) +@pytest.mark.sockets def test_socket_read_all(socket_fixture): socket_fixture.start_server(do_read_all=True) writer_batches = socket_fixture.write_batches() diff --git a/python/pyarrow/tests/test_jvm.py b/python/pyarrow/tests/test_jvm.py index c5996f9215343..e1bd0d82d9f16 100644 --- a/python/pyarrow/tests/test_jvm.py +++ b/python/pyarrow/tests/test_jvm.py @@ -26,6 +26,8 @@ jpype = pytest.importorskip("jpype") +pytestmark = pytest.mark.processes + @pytest.fixture(scope="session") def root_allocator(): diff --git a/python/pyarrow/tests/test_memory.py b/python/pyarrow/tests/test_memory.py index 4f199952344f2..53c25f3b3ef20 100644 --- a/python/pyarrow/tests/test_memory.py +++ b/python/pyarrow/tests/test_memory.py @@ -26,6 +26,7 @@ import pytest +pytestmark = pytest.mark.processes possible_backends = ["system", "jemalloc", "mimalloc"] diff --git a/python/pyarrow/tests/test_misc.py b/python/pyarrow/tests/test_misc.py index 3d8ab2999e603..c42e4fbdfc2e8 100644 --- a/python/pyarrow/tests/test_misc.py +++ b/python/pyarrow/tests/test_misc.py @@ -56,6 +56,7 @@ def test_io_thread_count(): pa.set_io_thread_count(n) +@pytest.mark.processes def test_env_var_io_thread_count(): # Test that the number of IO threads can be overridden with the # ARROW_IO_THREADS environment variable. @@ -117,6 +118,7 @@ def test_runtime_info(): subprocess.check_call([sys.executable, "-c", code], env=env) +@pytest.mark.processes def test_import_at_shutdown(): # GH-38626: importing PyArrow at interpreter shutdown would crash code = """if 1: diff --git a/python/pyarrow/tests/test_pandas.py b/python/pyarrow/tests/test_pandas.py index 7d74a60dcb921..208812c3ac458 100644 --- a/python/pyarrow/tests/test_pandas.py +++ b/python/pyarrow/tests/test_pandas.py @@ -2957,6 +2957,8 @@ def test_empty_arrays(self): def test_non_threaded_conversion(self): _non_threaded_conversion() + @pytest.mark.processes + @pytest.mark.threading def test_threaded_conversion_multiprocess(self): # Parallel conversion should work from child processes too (ARROW-2963) pool = mp.Pool(2) @@ -4824,6 +4826,7 @@ def test_timestamp_as_object_fixed_offset(): assert pa.table(result) == table +@pytest.mark.processes def test_threaded_pandas_import(): invoke_script("pandas_threaded_import.py") @@ -5127,6 +5130,7 @@ def roundtrip(df, schema=None): schema=schema) +@pytest.mark.processes def test_is_data_frame_race_condition(): # See https://github.com/apache/arrow/issues/39313 test_util.invoke_script('arrow_39313.py') diff --git a/python/pyarrow/tests/test_scalars.py b/python/pyarrow/tests/test_scalars.py index 6a814111898b7..bc50697e1be17 100644 --- a/python/pyarrow/tests/test_scalars.py +++ b/python/pyarrow/tests/test_scalars.py @@ -18,14 +18,12 @@ import datetime import decimal import pytest -import sys import weakref import numpy as np import pyarrow as pa import pyarrow.compute as pc -from pyarrow.tests import util @pytest.mark.parametrize(['value', 'ty', 'klass'], [ @@ -157,8 +155,7 @@ def test_hashing_struct_scalar(): assert hash1 == hash2 -@pytest.mark.skipif(sys.platform == "win32" and not util.windows_has_tzdata(), - reason="Timezone database is not installed on Windows") +@pytest.mark.timezone_data def test_timestamp_scalar(): a = repr(pa.scalar("0000-01-01").cast(pa.timestamp("s"))) assert a == "" @@ -325,8 +322,7 @@ def test_cast(): pa.scalar('foo').cast('int32') -@pytest.mark.skipif(sys.platform == "win32" and not util.windows_has_tzdata(), - reason="Timezone database is not installed on Windows") +@pytest.mark.timezone_data def test_cast_timestamp_to_string(): # GH-35370 pytest.importorskip("pytz") diff --git a/python/pyarrow/tests/test_schema.py b/python/pyarrow/tests/test_schema.py index 8793c9e773c1d..1b05c58384cf0 100644 --- a/python/pyarrow/tests/test_schema.py +++ b/python/pyarrow/tests/test_schema.py @@ -83,6 +83,7 @@ def test_type_to_pandas_dtype(): @pytest.mark.pandas +@pytest.mark.processes def test_type_to_pandas_dtype_check_import(): # ARROW-7980 test_util.invoke_script('arrow_7980.py') diff --git a/python/pyarrow/tests/test_tensor.py b/python/pyarrow/tests/test_tensor.py index 3e6a4ca8ed222..29c6de65b1607 100644 --- a/python/pyarrow/tests/test_tensor.py +++ b/python/pyarrow/tests/test_tensor.py @@ -188,6 +188,10 @@ def test_read_tensor(tmpdir): path = os.path.join(str(tmpdir), 'pyarrow-tensor-ipc-read-tensor') write_mmap = pa.create_memory_map(path, data_size) pa.ipc.write_tensor(tensor, write_mmap) + if sys.platform == 'emscripten': + # emscripten doesn't support multiple + # memory maps to same file + write_mmap.close() # Try to read tensor read_mmap = pa.memory_map(path, mode='r') array = pa.ipc.read_tensor(read_mmap).to_numpy() diff --git a/python/pyarrow/tests/test_types.py b/python/pyarrow/tests/test_types.py index f7b6040f510af..aecf32c5076be 100644 --- a/python/pyarrow/tests/test_types.py +++ b/python/pyarrow/tests/test_types.py @@ -345,6 +345,7 @@ def test_pytz_tzinfo_to_string(): assert [pa.lib.tzinfo_to_string(i) for i in tz] == expected +@pytest.mark.timezone_data def test_dateutil_tzinfo_to_string(): if sys.platform == 'win32': # Skip due to new release of python-dateutil @@ -360,6 +361,7 @@ def test_dateutil_tzinfo_to_string(): assert pa.lib.tzinfo_to_string(tz) == 'Europe/Paris' +@pytest.mark.timezone_data def test_zoneinfo_tzinfo_to_string(): zoneinfo = pytest.importorskip('zoneinfo') if sys.platform == 'win32': diff --git a/python/scripts/run_emscripten_tests.py b/python/scripts/run_emscripten_tests.py new file mode 100644 index 0000000000000..1a4b4a4e05614 --- /dev/null +++ b/python/scripts/run_emscripten_tests.py @@ -0,0 +1,343 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +import argparse +import contextlib +import http.server +import os +import queue +import shutil +import subprocess +import sys +import time +import threading + +from pathlib import Path +from io import BytesIO + +from selenium import webdriver + + +class TemplateOverrider(http.server.SimpleHTTPRequestHandler): + def log_request(self, code="-", size="-"): + # don't log successful requests + return + + def do_GET(self) -> bytes | None: + if self.path.endswith(PYARROW_WHEEL_PATH.name): + self.send_response(200) + self.send_header("Content-type", "application/x-zip") + self.end_headers() + with PYARROW_WHEEL_PATH.open(mode="rb") as wheel: + self.copyfile(wheel, self.wfile) + if self.path.endswith("/test.html"): + body = b""" + + + + + + + + """ + self.send_response(200) + self.send_header("Content-type", "text/html") + self.send_header("Content-length", len(body)) + self.end_headers() + self.copyfile(BytesIO(body), self.wfile) + elif self.path.endswith("/worker.js"): + body = b""" + importScripts("./pyodide.js"); + onmessage = async function (e) { + const data = e.data; + if (!self.pyodide) { + self.pyodide = await loadPyodide(); + } + function do_print(arg) { + let databytes = Array.from(arg); + self.postMessage({print:databytes}); + return databytes.length; + } + self.pyodide.setStdout({write:do_print,isatty:data.isatty}); + self.pyodide.setStderr({write:do_print,isatty:data.isatty}); + + await self.pyodide.loadPackagesFromImports(data.python); + let results = await self.pyodide.runPythonAsync(data.python); + self.postMessage({results}); + } + """ + self.send_response(200) + self.send_header("Content-type", "application/javascript") + self.send_header("Content-length", len(body)) + self.end_headers() + self.copyfile(BytesIO(body), self.wfile) + + else: + return super().do_GET() + + def end_headers(self): + # Enable Cross-Origin Resource Sharing (CORS) + self.send_header("Access-Control-Allow-Origin", "*") + super().end_headers() + + +def run_server_thread(dist_dir, q): + global _SERVER_ADDRESS + os.chdir(dist_dir) + server = http.server.HTTPServer(("", 0), TemplateOverrider) + q.put(server.server_address) + print(f"Starting server for {dist_dir} at: {server.server_address}") + server.serve_forever() + + +@contextlib.contextmanager +def launch_server(dist_dir): + q = queue.Queue() + p = threading.Thread(target=run_server_thread, args=[dist_dir, q], daemon=True) + p.start() + address = q.get(timeout=50) + time.sleep(0.1) # wait to make sure server is started + yield address + p.terminate() + + +class NodeDriver: + import subprocess + + def __init__(self, hostname, port): + self.process = subprocess.Popen( + [shutil.which("script"), "-c", shutil.which("node")], + stdin=subprocess.PIPE, + shell=False, + bufsize=0, + ) + print(self.process) + time.sleep(0.1) # wait for node to start + self.hostname = hostname + self.port = port + self.last_ret_code = None + + def load_pyodide(self, dist_dir): + self.execute_js( + f""" + const {{ loadPyodide }} = require('{dist_dir}/pyodide.js'); + let pyodide = await loadPyodide(); + """ + ) + + def clear_logs(self): + pass # we don't handle logs for node + + def write_stdin(self, buffer): + # because we use unbuffered IO for + # stdout, stdin.write is also unbuffered + # so might under-run on writes + while len(buffer) > 0 and self.process.poll() is None: + written = self.process.stdin.write(buffer) + if written == len(buffer): + break + elif written == 0: + # full buffer - wait + time.sleep(0.01) + else: + buffer = buffer[written:] + + def execute_js(self, code, wait_for_terminate=True): + self.write_stdin((code + "\n").encode("utf-8")) + + def load_arrow(self): + self.execute_js(f"await pyodide.loadPackage('{PYARROW_WHEEL_PATH}');") + + def execute_python(self, code, wait_for_terminate=True): + js_code = f""" + python = `{code}`; + await pyodide.loadPackagesFromImports(python); + python_output = await pyodide.runPythonAsync(python); + """ + self.last_ret_code = self.execute_js(js_code, wait_for_terminate) + return self.last_ret_code + + def wait_for_done(self): + # in node we just let it run above + # then send EOF and join process + self.write_stdin(b"process.exit(python_output)\n") + return self.process.wait() + + +class BrowserDriver: + def __init__(self, hostname, port, driver): + self.driver = driver + self.driver.get(f"http://{hostname}:{port}/test.html") + self.driver.set_script_timeout(100) + + def load_pyodide(self, dist_dir): + pass + + def load_arrow(self): + self.execute_python( + f"import pyodide_js as pjs\n" + f"await pjs.loadPackage('{PYARROW_WHEEL_PATH.name}')\n" + ) + + def execute_python(self, code, wait_for_terminate=True): + if wait_for_terminate: + self.driver.execute_async_script( + f""" + let callback = arguments[arguments.length-1]; + python = `{code}`; + window.python_done_callback = callback; + window.pyworker.postMessage( + {{python, isatty: {'true' if sys.stdout.isatty() else 'false'}}}); + """ + ) + else: + self.driver.execute_script( + f""" + let python = `{code}`; + window.python_done_callback= (x) => {{window.python_script_done=x;}}; + window.pyworker.postMessage( + {{python,isatty:{'true' if sys.stdout.isatty() else 'false'}}}); + """ + ) + + def clear_logs(self): + self.driver.execute_script("window.python_logs = [];") + + def wait_for_done(self): + while True: + # poll for console.log messages from our webworker + # which are the output of pytest + lines = self.driver.execute_script( + "let temp = window.python_logs;window.python_logs=[];return temp;" + ) + if len(lines) > 0: + sys.stdout.buffer.write(bytes(lines)) + done = self.driver.execute_script("return window.python_script_done;") + if done is not None: + value = done["result"] + self.driver.execute_script("delete window.python_script_done;") + return value + time.sleep(0.1) + + +class ChromeDriver(BrowserDriver): + def __init__(self, hostname, port): + from selenium.webdriver.chrome.options import Options + + options = Options() + options.add_argument("--headless") + options.add_argument("--no-sandbox") + super().__init__(hostname, port, webdriver.Chrome(options=options)) + + +class FirefoxDriver(BrowserDriver): + def __init__(self, hostname, port): + from selenium.webdriver.firefox.options import Options + + options = Options() + options.add_argument("--headless") + + super().__init__(hostname, port, webdriver.Firefox(options=options)) + + +def _load_pyarrow_in_runner(driver, wheel_name): + driver.load_arrow() + driver.execute_python( + """import sys +import micropip +if "pyarrow" not in sys.modules: + await micropip.install("hypothesis") + import pyodide_js as pjs + await pjs.loadPackage("numpy") + await pjs.loadPackage("pandas") + import pytest + import pandas # import pandas after pyarrow package load for pandas/pyarrow + # functions to work +import pyarrow + """, + wait_for_terminate=True, + ) + + +parser = argparse.ArgumentParser() +parser.add_argument( + "-d", + "--dist-dir", + type=str, + help="Pyodide distribution directory", + default="./pyodide", +) +parser.add_argument("wheel", type=str, help="Wheel to run tests from") +parser.add_argument( + "-t", "--test-submodule", help="Submodule that tests live in", default="test" +) +parser.add_argument( + "-r", + "--runtime", + type=str, + choices=["chrome", "node", "firefox"], + help="Runtime to run tests in", + default="chrome", +) +args = parser.parse_args() + +PYARROW_WHEEL_PATH = Path(args.wheel).resolve() + +dist_dir = Path(os.getcwd(), args.dist_dir).resolve() +print(f"dist dir={dist_dir}") +with launch_server(dist_dir) as (hostname, port): + if args.runtime == "chrome": + driver = ChromeDriver(hostname, port) + elif args.runtime == "node": + driver = NodeDriver(hostname, port) + elif args.runtime == "firefox": + driver = FirefoxDriver(hostname, port) + + print("Load pyodide in browser") + driver.load_pyodide(dist_dir) + print("Load pyarrow in browser") + _load_pyarrow_in_runner(driver, Path(args.wheel).name) + driver.clear_logs() + print("Run pytest in browser") + driver.execute_python( + """ +import pyarrow,pathlib +pyarrow_dir = pathlib.Path(pyarrow.__file__).parent +pytest.main([pyarrow_dir, '-v']) +""", + wait_for_terminate=False, + ) + print("Wait for done") + os._exit(driver.wait_for_done()) diff --git a/python/setup.py b/python/setup.py index b738b2f77290e..11cd7028023be 100755 --- a/python/setup.py +++ b/python/setup.py @@ -40,6 +40,14 @@ # Check if we're running 64-bit Python is_64_bit = sys.maxsize > 2**32 +# We can't use sys.platform in a cross-compiling situation +# as here it may be set to the host not target platform +is_emscripten = ( + sysconfig.get_config_var("SOABI") + and sysconfig.get_config_var("SOABI").find("emscripten") != -1 +) + + if Cython.__version__ < '0.29.31': raise Exception( 'Please update your Cython version. Supported Cython >= 0.29.31') @@ -298,8 +306,14 @@ def append_cmake_component(flag, varname): build_tool_args.append(f'-j{parallel}') # Generate the build files - print("-- Running cmake for PyArrow") - self.spawn(['cmake'] + extra_cmake_args + cmake_options + [source]) + if is_emscripten: + print("-- Running emcmake cmake for PyArrow on Emscripten") + self.spawn(['emcmake', 'cmake'] + extra_cmake_args + + cmake_options + [source]) + else: + print("-- Running cmake for PyArrow") + self.spawn(['cmake'] + extra_cmake_args + cmake_options + [source]) + print("-- Finished cmake for PyArrow") print("-- Running cmake --build for PyArrow")