Skip to content

Commit

Permalink
ARROW-13685: [C++] Cannot write dataset to S3FileSystem if bucket alr…
Browse files Browse the repository at this point in the history
…eady exists

I still need to add a regression test.  I've been able to test by configuring my server with minio client.  I think it'd probably be easiest to create a crossbow test for this situation.  Current steps:

```
mc alias set myminio http://localhost:9000 minioadmin minioadmin
mc admin policy add myminio/ no-create-buckets ci/etc/minio-no-create-bucket-policy.json
mc admin user add myminio/ limited limited123
mc admin policy set myminio no-create-buckets user=limited
mc mb myminio/existing-bucket
```

Then, in python:

```
import pyarrow.fs as fs
filesystem = fs.S3FileSystem(access_key='limited', secret_key='limited123', endpoint_override='http://localhost:9000')
filesystem.create_dir('existing-bucket/foo') # This line fails without the change
```

Closes #11136 from westonpace/bugfix/ARROW-13685-cannot-write-to-s3-if-bucket-exists

Authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
  • Loading branch information
westonpace committed Oct 1, 2021
1 parent c273ea7 commit 700ac1e
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 18 deletions.
2 changes: 2 additions & 0 deletions ci/scripts/install_minio.sh
Expand Up @@ -49,4 +49,6 @@ elif [[ ${version} != "latest" ]]; then
fi

wget -nv -P ${prefix}/bin https://dl.min.io/server/minio/release/${platform}-${arch}/minio
wget -nv -P ${prefix}/bin https://dl.min.io/client/mc/release/${platform}-${arch}/mc
chmod +x ${prefix}/bin/minio
chmod +x ${prefix}/bin/mc
22 changes: 21 additions & 1 deletion cpp/src/arrow/filesystem/s3fs.cc
Expand Up @@ -1530,6 +1530,23 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
}
}

// Tests to see if a bucket exists
Result<bool> BucketExists(const std::string& bucket) {
S3Model::HeadBucketRequest req;
req.SetBucket(ToAwsString(bucket));

auto outcome = client_->HeadBucket(req);
if (!outcome.IsSuccess()) {
if (!IsNotFound(outcome.GetError())) {
return ErrorToStatus(std::forward_as_tuple(
"When testing for existence of bucket '", bucket, "': "),
outcome.GetError());
}
return false;
}
return true;
}

// Create a bucket. Successful if bucket already exists.
Status CreateBucket(const std::string& bucket) {
S3Model::CreateBucketConfiguration config;
Expand Down Expand Up @@ -2159,7 +2176,10 @@ Status S3FileSystem::CreateDir(const std::string& s, bool recursive) {
// Create object
if (recursive) {
// Ensure bucket exists
RETURN_NOT_OK(impl_->CreateBucket(path.bucket));
ARROW_ASSIGN_OR_RAISE(bool bucket_exists, impl_->BucketExists(path.bucket));
if (!bucket_exists) {
RETURN_NOT_OK(impl_->CreateBucket(path.bucket));
}
// Ensure that all parents exist, then the directory itself
std::string parent_key;
for (const auto& part : path.key_parts) {
Expand Down
6 changes: 5 additions & 1 deletion python/pyarrow/tests/conftest.py
Expand Up @@ -292,7 +292,11 @@ def s3_server(s3_connection):
except OSError:
pytest.skip('`minio` command cannot be located')
else:
yield proc
yield {
'connection': s3_connection,
'process': proc,
'tempdir': tempdir
}
finally:
if proc is not None:
proc.kill()
12 changes: 6 additions & 6 deletions python/pyarrow/tests/parquet/conftest.py
Expand Up @@ -26,11 +26,11 @@ def datadir(base_datadir):


@pytest.fixture
def s3_bucket(request, s3_connection, s3_server):
def s3_bucket(s3_server):
boto3 = pytest.importorskip('boto3')
botocore = pytest.importorskip('botocore')

host, port, access_key, secret_key = s3_connection
host, port, access_key, secret_key = s3_server['connection']
s3 = boto3.resource(
's3',
endpoint_url='http://{}:{}'.format(host, port),
Expand All @@ -49,10 +49,10 @@ def s3_bucket(request, s3_connection, s3_server):


@pytest.fixture
def s3_example_s3fs(s3_connection, s3_server, s3_bucket):
def s3_example_s3fs(s3_server, s3_bucket):
s3fs = pytest.importorskip('s3fs')

host, port, access_key, secret_key = s3_connection
host, port, access_key, secret_key = s3_server['connection']
fs = s3fs.S3FileSystem(
key=access_key,
secret=secret_key,
Expand All @@ -72,10 +72,10 @@ def s3_example_s3fs(s3_connection, s3_server, s3_bucket):


@pytest.fixture
def s3_example_fs(s3_connection, s3_server):
def s3_example_fs(s3_server):
from pyarrow.fs import FileSystem

host, port, access_key, secret_key = s3_connection
host, port, access_key, secret_key = s3_server['connection']
uri = (
"s3://{}:{}@mybucket/data.parquet?scheme=http&endpoint_override={}:{}"
.format(access_key, secret_key, host, port)
Expand Down
8 changes: 4 additions & 4 deletions python/pyarrow/tests/test_dataset.py
Expand Up @@ -2242,11 +2242,11 @@ def test_dataset_partitioned_dictionary_type_reconstruct(tempdir):


@pytest.fixture
def s3_example_simple(s3_connection, s3_server):
def s3_example_simple(s3_server):
from pyarrow.fs import FileSystem
import pyarrow.parquet as pq

host, port, access_key, secret_key = s3_connection
host, port, access_key, secret_key = s3_server['connection']
uri = (
"s3://{}:{}@mybucket/data.parquet?scheme=http&endpoint_override={}:{}"
.format(access_key, secret_key, host, port)
Expand Down Expand Up @@ -2305,11 +2305,11 @@ def test_open_dataset_from_uri_s3_fsspec(s3_example_simple):

@pytest.mark.parquet
@pytest.mark.s3
def test_open_dataset_from_s3_with_filesystem_uri(s3_connection, s3_server):
def test_open_dataset_from_s3_with_filesystem_uri(s3_server):
from pyarrow.fs import FileSystem
import pyarrow.parquet as pq

host, port, access_key, secret_key = s3_connection
host, port, access_key, secret_key = s3_server['connection']
bucket = 'theirbucket'
path = 'nested/folder/data.parquet'
uri = "s3://{}:{}@{}/{}?scheme=http&endpoint_override={}:{}".format(
Expand Down
127 changes: 121 additions & 6 deletions python/pyarrow/tests/test_fs.py
Expand Up @@ -20,7 +20,9 @@
import os
import pathlib
import pickle
import subprocess
import sys
import time

import pytest
import weakref
Expand Down Expand Up @@ -263,11 +265,11 @@ def subtree_localfs(request, tempdir, localfs):


@pytest.fixture
def s3fs(request, s3_connection, s3_server):
def s3fs(request, s3_server):
request.config.pyarrow.requires('s3')
from pyarrow.fs import S3FileSystem

host, port, access_key, secret_key = s3_connection
host, port, access_key, secret_key = s3_server['connection']
bucket = 'pyarrow-filesystem/'

fs = S3FileSystem(
Expand Down Expand Up @@ -298,6 +300,104 @@ def subtree_s3fs(request, s3fs):
)


_minio_limited_policy = """{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:ListAllMyBuckets",
"s3:PutObject",
"s3:GetObject",
"s3:ListBucket",
"s3:PutObjectTagging",
"s3:DeleteObject",
"s3:GetObjectVersion"
],
"Resource": [
"arn:aws:s3:::*"
]
}
]
}"""


def _run_mc_command(mcdir, *args):
full_args = ['mc', '-C', mcdir] + list(args)
proc = subprocess.Popen(full_args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, encoding='utf-8')
retval = proc.wait(10)
cmd_str = ' '.join(full_args)
print(f'Cmd: {cmd_str}')
print(f' Return: {retval}')
print(f' Stdout: {proc.stdout.read()}')
print(f' Stderr: {proc.stderr.read()}')
if retval != 0:
raise ChildProcessError("Could not run mc")


def _wait_for_minio_startup(mcdir, address, access_key, secret_key):
start = time.time()
while time.time() - start < 10:
try:
_run_mc_command(mcdir, 'alias', 'set', 'myminio',
f'http://{address}', access_key, secret_key)
return
except ChildProcessError:
time.sleep(1)
raise Exception("mc command could not connect to local minio")


def _configure_limited_user(tmpdir, address, access_key, secret_key):
"""
Attempts to use the mc command to configure the minio server
with a special user limited:limited123 which does not have
permission to create buckets. This mirrors some real life S3
configurations where users are given strict permissions.
Arrow S3 operations should still work in such a configuration
(e.g. see ARROW-13685)
"""
try:
mcdir = os.path.join(tmpdir, 'mc')
os.mkdir(mcdir)
policy_path = os.path.join(tmpdir, 'limited-buckets-policy.json')
with open(policy_path, mode='w') as policy_file:
policy_file.write(_minio_limited_policy)
# The s3_server fixture starts the minio process but
# it takes a few moments for the process to become available
_wait_for_minio_startup(mcdir, address, access_key, secret_key)
# These commands create a limited user with a specific
# policy and creates a sample bucket for that user to
# write to
_run_mc_command(mcdir, 'admin', 'policy', 'add',
'myminio/', 'no-create-buckets', policy_path)
_run_mc_command(mcdir, 'admin', 'user', 'add',
'myminio/', 'limited', 'limited123')
_run_mc_command(mcdir, 'admin', 'policy', 'set',
'myminio', 'no-create-buckets', 'user=limited')
_run_mc_command(mcdir, 'mb', 'myminio/existing-bucket')
return True
except FileNotFoundError:
# If mc is not found, skip these tests
return False


@pytest.fixture(scope='session')
def limited_s3_user(request, s3_server):
if sys.platform == 'win32':
# Can't rely on FileNotFound check because
# there is sometimes an mc command on Windows
# which is unrelated to the minio mc
pytest.skip('The mc command is not installed on Windows')
request.config.pyarrow.requires('s3')
tempdir = s3_server['tempdir']
host, port, access_key, secret_key = s3_server['connection']
address = '{}:{}'.format(host, port)
if not _configure_limited_user(tempdir, address, access_key, secret_key):
pytest.skip('Could not locate mc command to configure limited user')


@pytest.fixture
def hdfs(request, hdfs_connection):
request.config.pyarrow.requires('hdfs')
Expand Down Expand Up @@ -345,13 +445,13 @@ def py_fsspec_memoryfs(request, tempdir):


@pytest.fixture
def py_fsspec_s3fs(request, s3_connection, s3_server):
def py_fsspec_s3fs(request, s3_server):
s3fs = pytest.importorskip("s3fs")
if (sys.version_info < (3, 7) and
Version(s3fs.__version__) >= Version("0.5")):
pytest.skip("s3fs>=0.5 version is async and requires Python >= 3.7")

host, port, access_key, secret_key = s3_connection
host, port, access_key, secret_key = s3_server['connection']
bucket = 'pyarrow-filesystem/'

fs = s3fs.S3FileSystem(
Expand Down Expand Up @@ -473,6 +573,21 @@ def skip_fsspec_s3fs(fs):
pytest.xfail(reason="Not working with fsspec's s3fs")


@pytest.mark.s3
def test_s3fs_limited_permissions_create_bucket(s3_server, limited_s3_user):
from pyarrow.fs import S3FileSystem

host, port, _, _ = s3_server['connection']

fs = S3FileSystem(
access_key='limited',
secret_key='limited123',
endpoint_override='{}:{}'.format(host, port),
scheme='http'
)
fs.create_dir('existing-bucket/test')


def test_file_info_constructor():
dt = datetime.fromtimestamp(1568799826, timezone.utc)

Expand Down Expand Up @@ -1319,10 +1434,10 @@ def test_filesystem_from_path_object(path):


@pytest.mark.s3
def test_filesystem_from_uri_s3(s3_connection, s3_server):
def test_filesystem_from_uri_s3(s3_server):
from pyarrow.fs import S3FileSystem

host, port, access_key, secret_key = s3_connection
host, port, access_key, secret_key = s3_server['connection']

uri = "s3://{}:{}@mybucket/foo/bar?scheme=http&endpoint_override={}:{}" \
.format(access_key, secret_key, host, port)
Expand Down

0 comments on commit 700ac1e

Please sign in to comment.