Skip to content

Commit

Permalink
ARROW-6655: [Python] Filesystem bindings for S3
Browse files Browse the repository at this point in the history
- Add support for S3FileSystem in the python bindings.
- Fixed issue with reading all the content of an S3 object
- Introduce `minio_server` fixture for parametrized testing of all filesystem implementations
- Fixed s3fs parquet test and updated it to use minio_server fixture

Closes #5423 from kszucs/s3 and squashes the following commits:

384c960 <Krisztián Szűcs> Resolve review comments
73e6625 <Krisztián Szűcs> S3Options
98bd91a <Krisztián Szűcs> remove commented tests
db89859 <Krisztián Szűcs> rename to s3fs
4478458 <Krisztián Szűcs> fix read() issue
c1df10b <Krisztián Szűcs> initialization in first use
192ab65 <Krisztián Szűcs> flake8
f70f9fb <Krisztián Szűcs> remove minio-client dependency
d399643 <Krisztián Szűcs> simplify test suite
fee57a9 <Krisztián Szűcs> remove accidentally committed files
751cfd4 <Krisztián Szűcs> resolve a couple of review comments; enum workaround
45436f7 <Krisztián Szűcs> cython flake8
38dcb88 <Krisztián Szűcs> rat
c541b3e <Krisztián Szűcs> comment left
098048a <Krisztián Szűcs> more compat
00340ed <Krisztián Szűcs> fixture error handling
2be25ce <Krisztián Szűcs> auto initialize s3 on import
88e0c9f <Krisztián Szűcs> py2 compat
8585a60 <Krisztián Szűcs> py2 compat
d372287 <Krisztián Szűcs> install minio in the conda-toolchain build
041cad4 <Krisztián Szűcs> executable flag
fb0f281 <Krisztián Szűcs> travis minio install script
8cbe0ee <Krisztián Szűcs> travis osx
72e56a6 <Krisztián Szűcs> enable S3 in travis python builds
7800c75 <Krisztián Szűcs> appveyor flag
7daf566 <Krisztián Szűcs> fix syntax error in travis script
68eb591 <Krisztián Szűcs> enable PYARROW_WITH_S3 on appveyor
2cb19d1 <Krisztián Szűcs> conditional import of test dependencies
efa05d2 <Krisztián Szűcs> use minio for dask.s3fs test too
f25ae5a <Krisztián Szűcs> travis
9042c7e <Krisztián Szűcs> use S3FS_DIR
9ce7180 <Krisztián Szűcs> cmake format; fix orc cimport
45a2a17 <Krisztián Szűcs> docstrings
c0b9162 <Krisztián Szűcs> test requirements; flake8
44aedfd <Krisztián Szűcs> stat test
a343950 <Krisztián Szűcs> testing suite
1551b52 <Krisztián Szűcs> wip
dd41d21 <Krisztián Szűcs> imports
5200af1 <Krisztián Szűcs> s3 filesystem bindings

Authored-by: Krisztián Szűcs <szucs.krisztian@gmail.com>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
kszucs committed Oct 5, 2019
1 parent 953d9e0 commit 8bbb297
Show file tree
Hide file tree
Showing 40 changed files with 1,054 additions and 480 deletions.
7 changes: 7 additions & 0 deletions .travis.yml
Expand Up @@ -87,6 +87,7 @@ matrix:
- if [ $ARROW_CI_CPP_AFFECTED != "1" ] && [ $ARROW_CI_JAVA_AFFECTED != "1" ]; then exit; fi
- $TRAVIS_BUILD_DIR/ci/travis_install_clang_tools.sh
- $TRAVIS_BUILD_DIR/ci/travis_install_linux.sh
- $TRAVIS_BUILD_DIR/ci/travis_install_minio.sh
# If either C++ or Python changed, we must install the C++ libraries
- git submodule update --init
- $TRAVIS_BUILD_DIR/ci/travis_before_script_cpp.sh
Expand All @@ -110,12 +111,14 @@ matrix:
- ARROW_TRAVIS_USE_SYSTEM_JAVA=1
- ARROW_TRAVIS_USE_TOOLCHAIN=1
- ARROW_TRAVIS_VALGRIND=1
- ARROW_TRAVIS_S3=1
# TODO(wesm): Run the benchmarks outside of Travis
# - ARROW_TRAVIS_PYTHON_BENCHMARKS=1
before_script:
- if [ $ARROW_CI_PYTHON_AFFECTED != "1" ] && [ $ARROW_CI_DOCS_AFFECTED != "1" ]; then exit; fi
- $TRAVIS_BUILD_DIR/ci/travis_install_clang_tools.sh
- $TRAVIS_BUILD_DIR/ci/travis_install_linux.sh
- $TRAVIS_BUILD_DIR/ci/travis_install_minio.sh
- $TRAVIS_BUILD_DIR/ci/travis_install_toolchain.sh
script:
- $TRAVIS_BUILD_DIR/ci/travis_script_java.sh || travis_terminate 1
Expand All @@ -136,6 +139,7 @@ matrix:
- ARROW_TRAVIS_PLASMA=1
- ARROW_TRAVIS_FLIGHT=1
- ARROW_TRAVIS_ORC=1
- ARROW_TRAVIS_S3=1
- ARROW_TRAVIS_PARQUET=1
# TODO(ARROW-4763): llvm and llvmdev packages are in conflict:
# https://github.com/conda-forge/llvmdev-feedstock/issues/60
Expand All @@ -149,6 +153,7 @@ matrix:
- if [ $ARROW_CI_CPP_AFFECTED != "1" ] && [ $ARROW_CI_JAVA_AFFECTED != "1" ]; then exit; fi
# If either C++ or Python changed, we must install the C++ libraries
- git submodule update --init
- $TRAVIS_BUILD_DIR/ci/travis_install_minio.sh
- $TRAVIS_BUILD_DIR/ci/travis_before_script_cpp.sh
script:
- $TRAVIS_BUILD_DIR/ci/travis_script_cpp.sh || travis_terminate 1
Expand All @@ -161,6 +166,7 @@ matrix:
cache:
addons:
env:
- ARROW_TRAVIS_S3=1
- ARROW_TRAVIS_PLASMA=1
- ARROW_TRAVIS_USE_TOOLCHAIN=1
- ARROW_BUILD_WARNING_LEVEL=CHECKIN
Expand All @@ -170,6 +176,7 @@ matrix:
before_script:
script:
- if [ $ARROW_CI_PYTHON_AFFECTED != "1" ]; then exit; fi
- $TRAVIS_BUILD_DIR/ci/travis_install_minio.sh
- $TRAVIS_BUILD_DIR/ci/travis_script_python.sh 3.6
- name: "Java OpenJDK8 and OpenJDK11"
language: cpp
Expand Down
1 change: 1 addition & 0 deletions ci/conda_env_python.yml
Expand Up @@ -22,6 +22,7 @@ numpy>=1.14
pandas
pytest
pytest-faulthandler
pytest-lazy-fixture
pytz
setuptools
setuptools_scm=3.2.0
3 changes: 3 additions & 0 deletions ci/cpp-msvc-build-main.bat
Expand Up @@ -98,6 +98,9 @@ pip install -r requirements.txt pickle5

set PYARROW_CXXFLAGS=%ARROW_CXXFLAGS%
set PYARROW_CMAKE_GENERATOR=%GENERATOR%
if "%ARROW_S3%" == "ON" (
set PYARROW_WITH_S3=ON
)
if "%ARROW_BUILD_FLIGHT%" == "ON" (
@rem ARROW-5441: bundling Arrow Flight libraries not implemented
set PYARROW_BUNDLE_ARROW_CPP=OFF
Expand Down
8 changes: 0 additions & 8 deletions ci/travis_install_linux.sh
Expand Up @@ -42,14 +42,6 @@ if [ "$ARROW_TRAVIS_GANDIVA" == "1" ]; then
sudo apt-get install -y -qq llvm-$ARROW_LLVM_MAJOR_VERSION-dev
fi

if [ "$ARROW_TRAVIS_S3" == "1" ]; then
# Download the Minio S3 server into PATH
S3FS_DIR=~/.local/bin/
mkdir -p $S3FS_DIR
wget --directory-prefix $S3FS_DIR https://dl.min.io/server/minio/release/linux-amd64/minio
chmod +x $S3FS_DIR/minio
fi

if [ "$ARROW_TRAVIS_USE_SYSTEM" == "1" ]; then
if [ "$DISTRO_CODENAME" == "xenial" ]; then
# TODO(ARROW-4761): Install libzstd-dev once we support zstd<1
Expand Down
35 changes: 35 additions & 0 deletions ci/travis_install_minio.sh
@@ -0,0 +1,35 @@
#!/usr/bin/env bash

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

set -e
set -x

if [ "$ARROW_TRAVIS_S3" == "1" ]; then
# Download the Minio S3 server into PATH
if [ $TRAVIS_OS_NAME = "osx" ]; then
MINIO_URL=https://dl.min.io/server/minio/release/darwin-amd64/minio
else
MINIO_URL=https://dl.min.io/server/minio/release/linux-amd64/minio
fi

S3FS_DIR=~/.local/bin/
mkdir -p $S3FS_DIR
wget --directory-prefix $S3FS_DIR $MINIO_URL
chmod +x $S3FS_DIR/minio
fi
2 changes: 1 addition & 1 deletion ci/travis_install_osx.sh
Expand Up @@ -40,4 +40,4 @@ if [ "$ARROW_CI_RUBY_AFFECTED" = "1" ]; then
run_brew bundle --file=$TRAVIS_BUILD_DIR/cpp/Brewfile --verbose
run_brew bundle --file=$TRAVIS_BUILD_DIR/c_glib/Brewfile --verbose
rm ${brew_log_path}
fi
fi
8 changes: 8 additions & 0 deletions ci/travis_script_python.sh
Expand Up @@ -100,6 +100,10 @@ CMAKE_COMMON_FLAGS="-DARROW_EXTRA_ERROR_CONTEXT=ON"

PYTHON_CPP_BUILD_TARGETS="arrow_python-all plasma parquet"

if [ "$ARROW_TRAVIS_S3" == "1" ]; then
CMAKE_COMMON_FLAGS="$CMAKE_COMMON_FLAGS -DARROW_S3=ON"
fi

if [ "$ARROW_TRAVIS_FLIGHT" == "1" ]; then
CMAKE_COMMON_FLAGS="$CMAKE_COMMON_FLAGS -DARROW_FLIGHT=ON"
fi
Expand Down Expand Up @@ -164,6 +168,9 @@ export PYARROW_BUILD_TYPE=$ARROW_BUILD_TYPE
export PYARROW_WITH_PARQUET=1
export PYARROW_WITH_PLASMA=1
export PYARROW_WITH_ORC=1
if [ "$ARROW_TRAVIS_S3" == "1" ]; then
export PYARROW_WITH_S3=1
fi
if [ "$ARROW_TRAVIS_FLIGHT" == "1" ]; then
export PYARROW_WITH_FLIGHT=1
fi
Expand All @@ -177,6 +184,7 @@ python setup.py develop
python -c "import pyarrow.parquet"
python -c "import pyarrow.plasma"
python -c "import pyarrow.orc"
python -c "import pyarrow.fs"

# Ensure we do eagerly import pandas (or other expensive imports)
python < scripts/test_imports.py
Expand Down
10 changes: 10 additions & 0 deletions cpp/cmake_modules/ThirdpartyToolchain.cmake
Expand Up @@ -2542,6 +2542,16 @@ if(ARROW_S3)
include_directories(SYSTEM ${AWSSDK_INCLUDE_DIR})
message(STATUS "Found AWS SDK headers: ${AWSSDK_INCLUDE_DIR}")
message(STATUS "Found AWS SDK libraries: ${AWSSDK_LINK_LIBRARIES}")

if(APPLE)
# CoreFoundation's path is hardcoded in the CMake files provided by
# aws-sdk-cpp to use the MacOSX SDK provided by XCode which makes
# XCode a hard dependency. Command Line Tools is often used instead
# of the full XCode suite, so let the linker to find it.
set_target_properties(AWS::aws-c-common
PROPERTIES INTERFACE_LINK_LIBRARIES
"-pthread;pthread;-framework CoreFoundation")
endif()
endif()

# Write out the package configurations.
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/filesystem/s3fs.cc
Expand Up @@ -341,6 +341,12 @@ class ObjectInputFile : public io::RandomAccessFile {
RETURN_NOT_OK(CheckClosed());
RETURN_NOT_OK(CheckPosition(position, "read"));

nbytes = std::min(nbytes, content_length_ - position);
if (nbytes == 0) {
*bytes_read = 0;
return Status::OK();
}

// Read the desired range of bytes
S3Model::GetObjectResult result;
RETURN_NOT_OK(GetObjectRange(client_, path_, position, nbytes, &result));
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/filesystem/s3fs.h
Expand Up @@ -132,7 +132,7 @@ class ARROW_EXPORT S3FileSystem : public FileSystem {
std::unique_ptr<Impl> impl_;
};

enum class S3LogLevel { Off, Fatal, Error, Warn, Info, Debug, Trace };
enum class S3LogLevel : int8_t { Off, Fatal, Error, Warn, Info, Debug, Trace };

struct ARROW_EXPORT S3GlobalOptions {
S3LogLevel log_level;
Expand Down
11 changes: 10 additions & 1 deletion cpp/src/arrow/filesystem/s3fs_test.cc
Expand Up @@ -687,7 +687,7 @@ TEST_F(TestS3FS, OpenInputStream) {
TEST_F(TestS3FS, OpenInputFile) {
std::shared_ptr<io::RandomAccessFile> file;
std::shared_ptr<Buffer> buf;
int64_t nbytes = -1, pos = -1;
int64_t nbytes = -1, pos = -1, bytes_read = 0;

// Non-existent
ASSERT_RAISES(IOError, fs_->OpenInputFile("non-existent-bucket/somefile", &file));
Expand All @@ -712,6 +712,15 @@ TEST_F(TestS3FS, OpenInputFile) {
AssertBufferEqual(*buf, "data");
ASSERT_OK(file->ReadAt(9, 20, &buf));
AssertBufferEqual(*buf, "");

char result[10];
ASSERT_OK(file->ReadAt(2, 5, &bytes_read, &result));
ASSERT_EQ(bytes_read, 5);
ASSERT_OK(file->ReadAt(5, 20, &bytes_read, &result));
ASSERT_EQ(bytes_read, 4);
ASSERT_OK(file->ReadAt(9, 0, &bytes_read, &result));
ASSERT_EQ(bytes_read, 0);

// Reading past end of file
ASSERT_RAISES(IOError, file->ReadAt(10, 20, &buf));

Expand Down
4 changes: 4 additions & 0 deletions python/CMakeLists.txt
Expand Up @@ -385,6 +385,10 @@ set(CYTHON_EXTENSIONS lib _fs _csv _json)

set(LINK_LIBS arrow_shared arrow_python_shared)

if(PYARROW_BUILD_S3)
set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} _s3fs)
endif()

if(PYARROW_BUILD_CUDA)
# Arrow CUDA
find_package(ArrowCuda)
Expand Down
4 changes: 2 additions & 2 deletions python/pyarrow/_csv.pyx
Expand Up @@ -466,7 +466,7 @@ cdef class ConvertOptions:
self.options.include_missing_columns = value


cdef _get_reader(input_file, shared_ptr[InputStream]* out):
cdef _get_reader(input_file, shared_ptr[CInputStream]* out):
use_memory_map = False
get_input_stream(input_file, use_memory_map, out)

Expand Down Expand Up @@ -522,7 +522,7 @@ def read_csv(input_file, read_options=None, parse_options=None,
Contents of the CSV file as a in-memory table.
"""
cdef:
shared_ptr[InputStream] stream
shared_ptr[CInputStream] stream
CCSVReadOptions c_read_options
CCSVParseOptions c_parse_options
CCSVConvertOptions c_convert_options
Expand Down
4 changes: 2 additions & 2 deletions python/pyarrow/_cuda.pyx
Expand Up @@ -729,7 +729,7 @@ cdef class BufferReader(NativeFile):
self.buffer = obj
self.reader = new CCudaBufferReader(self.buffer.buffer)
self.set_random_access_file(
shared_ptr[RandomAccessFile](self.reader))
shared_ptr[CRandomAccessFile](self.reader))
self.is_readable = True

def read_buffer(self, nbytes=None):
Expand Down Expand Up @@ -776,7 +776,7 @@ cdef class BufferWriter(NativeFile):
def __cinit__(self, CudaBuffer buffer):
self.buffer = buffer
self.writer = new CCudaBufferWriter(self.buffer.cuda_buffer)
self.set_output_stream(shared_ptr[OutputStream](self.writer))
self.set_output_stream(shared_ptr[COutputStream](self.writer))
self.is_writable = True

def writeat(self, int64_t position, object data):
Expand Down
68 changes: 68 additions & 0 deletions python/pyarrow/_fs.pxd
@@ -0,0 +1,68 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# cython: language_level = 3

import six

from pyarrow.compat import frombytes, tobytes
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport PyDateTime_from_TimePoint
from pyarrow.lib import _detect_compression
from pyarrow.lib cimport *


cpdef enum FileType:
NonExistent = <int8_t> CFileType_NonExistent
Unknown = <int8_t> CFileType_Unknown
File = <int8_t> CFileType_File
Directory = <int8_t> CFileType_Directory


cdef class FileStats:
cdef:
CFileStats stats

@staticmethod
cdef FileStats wrap(CFileStats stats)


cdef class Selector:
cdef:
CSelector selector


cdef class FileSystem:
cdef:
shared_ptr[CFileSystem] wrapped
CFileSystem* fs

cdef init(self, const shared_ptr[CFileSystem]& wrapped)


cdef class LocalFileSystem(FileSystem):
cdef:
CLocalFileSystem* localfs

cdef init(self, const shared_ptr[CFileSystem]& wrapped)


cdef class SubTreeFileSystem(FileSystem):
cdef:
CSubTreeFileSystem* subtreefs

cdef init(self, const shared_ptr[CFileSystem]& wrapped)

0 comments on commit 8bbb297

Please sign in to comment.