Skip to content

Commit

Permalink
Merge pull request #51976 from trociny/wip-59065-quincy
Browse files Browse the repository at this point in the history
quincy: rgw: fix multipart upload object leaks due to re-upload

Reviewed-by: Casey Bodley <cbodley@redhat.com>
  • Loading branch information
yuriw committed Jan 12, 2024
2 parents f7be69d + 9e783a7 commit 56daea9
Show file tree
Hide file tree
Showing 37 changed files with 2,803 additions and 2,095 deletions.
5 changes: 5 additions & 0 deletions qa/suites/rgw/verify/tasks/mp_reupload.yaml
@@ -0,0 +1,5 @@
tasks:
- workunit:
clients:
client.0:
- rgw/test_rgw_s3_mp_reupload.sh
121 changes: 121 additions & 0 deletions qa/workunits/rgw/test_rgw_s3_mp_reupload.py
@@ -0,0 +1,121 @@
import boto3
import botocore.exceptions
import sys
import os
import subprocess

#boto3.set_stream_logger(name='botocore')

# handles two optional system arguments:
# <bucket-name> : default is "bkt134"
# <0 or 1> : 0 -> upload aborted, 1 -> completed; default is completed

if len(sys.argv) >= 2:
bucket_name = sys.argv[1]
else:
bucket_name = "bkt314738362229"
print("bucket nams is %s" % bucket_name)

complete_mpu = True
if len(sys.argv) >= 3:
complete_mpu = int(sys.argv[2]) > 0

versioned_bucket = False
if len(sys.argv) >= 4:
versioned_bucket = int(sys.argv[3]) > 0

rgw_host = os.environ['RGW_HOST']
access_key = os.environ['RGW_ACCESS_KEY']
secret_key = os.environ['RGW_SECRET_KEY']

try:
endpoint='http://%s:%d' % (rgw_host, 80)
client = boto3.client('s3',
endpoint_url=endpoint,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key)
res = client.create_bucket(Bucket=bucket_name)
except botocore.exceptions.EndpointConnectionError:
try:
endpoint='https://%s:%d' % (rgw_host, 443)
client = boto3.client('s3',
endpoint_url=endpoint,
verify=False,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key)
res = client.create_bucket(Bucket=bucket_name)
except botocore.exceptions.EndpointConnectionError:
endpoint='http://%s:%d' % (rgw_host, 8000)
client = boto3.client('s3',
endpoint_url=endpoint,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key)
res = client.create_bucket(Bucket=bucket_name)

print("endpoint is %s" % endpoint)

if versioned_bucket:
res = client.put_bucket_versioning(
Bucket=bucket_name,
VersioningConfiguration={
'MFADelete': 'Disabled',
'Status': 'Enabled'}
)

key = "mpu_test4"
nparts = 2
ndups = 11
do_reupload = True

part_path = "/tmp/mp_part_5m"
subprocess.run(["dd", "if=/dev/urandom", "of=" + part_path, "bs=1M", "count=5"], check=True)

f = open(part_path, 'rb')

res = client.create_multipart_upload(Bucket=bucket_name, Key=key)
mpu_id = res["UploadId"]

print("start UploadId=%s" % (mpu_id))

parts = []
parts2 = []

for ix in range(0,nparts):
part_num = ix + 1
f.seek(0)
res = client.upload_part(Body=f, Bucket=bucket_name, Key=key,
UploadId=mpu_id, PartNumber=part_num)
# save
etag = res['ETag']
part = {'ETag': etag, 'PartNumber': part_num}
print("phase 1 uploaded part %s" % part)
parts.append(part)

if do_reupload:
# just re-upload part 1
part_num = 1
for ix in range(0,ndups):
f.seek(0)
res = client.upload_part(Body=f, Bucket=bucket_name, Key=key,
UploadId=mpu_id, PartNumber=part_num)
etag = res['ETag']
part = {'ETag': etag, 'PartNumber': part_num}
print ("phase 2 uploaded part %s" % part)

# save
etag = res['ETag']
part = {'ETag': etag, 'PartNumber': part_num}
parts2.append(part)

if complete_mpu:
print("completing multipart upload, parts=%s" % parts)
res = client.complete_multipart_upload(
Bucket=bucket_name, Key=key, UploadId=mpu_id,
MultipartUpload={'Parts': parts})
else:
print("aborting multipart upload, parts=%s" % parts)
res = client.abort_multipart_upload(
Bucket=bucket_name, Key=key, UploadId=mpu_id)

# clean up
subprocess.run(["rm", "-f", part_path], check=True)
110 changes: 110 additions & 0 deletions qa/workunits/rgw/test_rgw_s3_mp_reupload.sh
@@ -0,0 +1,110 @@
#!/usr/bin/env bash

# INITIALIZATION

mydir=$(dirname $0)
data_pool=default.rgw.buckets.data
orphan_list_out=/tmp/orphan_list.$$
radoslist_out=/tmp/radoslist.$$
rados_ls_out=/tmp/rados_ls.$$
diff_out=/tmp/diff.$$

rgw_host="$(hostname --fqdn)"
echo "INFO: fully qualified domain name: $rgw_host"

export RGW_ACCESS_KEY="0555b35654ad1656d804"
export RGW_SECRET_KEY="h7GhxuBLTrlhVUyxSPUKUV8r/2EI4ngqJxD7iBdBYLhwluN30JaT3Q=="
export RGW_HOST="${RGW_HOST:-$rgw_host}"

# random argument determines if multipart is aborted or completed 50/50
outcome=$((RANDOM % 2))
if [ $outcome -eq 0 ] ;then
echo "== TESTING *ABORTING* MULTIPART UPLOAD WITH RE-UPLOADS =="
else
echo "== TESTING *COMPLETING* MULTIPART UPLOAD WITH RE-UPLOADS =="
fi

# random argument determines if multipart is aborted or completed 50/50
versioning=$((RANDOM % 2))
if [ $versioning -eq 0 ] ;then
echo "== TESTING NON-VERSIONED BUCKET =="
else
echo "== TESTING VERSIONED BUCKET =="
fi

# create a randomized bucket name
bucket="reupload-bkt-$((RANDOM % 899999 + 100000))"


# SET UP PYTHON VIRTUAL ENVIRONMENT

# install boto3
python3 -m venv $mydir
source $mydir/bin/activate
pip install pip --upgrade
pip install boto3


# CREATE RGW USER IF NECESSARY

if radosgw-admin user info --access-key $RGW_ACCESS_KEY 2>/dev/null ;then
echo INFO: user already exists
else
echo INFO: creating user
radosgw-admin user create --uid testid \
--access-key $RGW_ACCESS_KEY \
--secret $RGW_SECRET_KEY \
--display-name 'M. Tester' \
--email tester@ceph.com 2>/dev/null
fi


# RUN REUPLOAD TEST

$mydir/bin/python3 ${mydir}/test_rgw_s3_mp_reupload.py $bucket $outcome $versioning


# ANALYZE FOR ERRORS
# (NOTE: for now we're choosing not to use the rgw-orphan-list tool)

# force garbage collection to remove extra parts
radosgw-admin gc process --include-all 2>/dev/null

marker=$(radosgw-admin metadata get bucket:$bucket 2>/dev/null | grep bucket_id | sed 's/.*: "\(.*\)".*/\1/')

# determine expected rados objects
radosgw-admin bucket radoslist --bucket=$bucket 2>/dev/null | sort >$radoslist_out
echo "radosgw-admin bucket radoslist:"
cat $radoslist_out

# determine found rados objects
rados ls -p $data_pool 2>/dev/null | grep "^$marker" | sort >$rados_ls_out
echo "rados ls:"
cat $rados_ls_out

# compare expected and found
diff $radoslist_out $rados_ls_out >$diff_out
if [ $(cat $diff_out | wc -l) -ne 0 ] ;then
error=1
echo "ERROR: Found differences between expected and actual rados objects for test bucket."
echo " note: indicators: '>' found but not expected; '<' expected but not found."
cat $diff_out
fi


# CLEAN UP

deactivate

rm -f $orphan_list_out $radoslist_out $rados_ls_out $diff_out


# PRODUCE FINAL RESULTS

if [ -n "$error" ] ;then
echo "== FAILED =="
exit 1
fi

echo "== PASSED =="
exit 0
15 changes: 15 additions & 0 deletions src/cls/CMakeLists.txt
Expand Up @@ -74,6 +74,9 @@ if (WITH_RADOSGW)
set(cls_otp_srcs otp/cls_otp.cc)
add_library(cls_otp SHARED ${cls_otp_srcs})
target_link_libraries(cls_otp OATH::OATH)
target_include_directories(cls_otp
PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw"
PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include")
set_target_properties(cls_otp PROPERTIES
VERSION "1.0.0"
SOVERSION "1"
Expand Down Expand Up @@ -198,6 +201,9 @@ if (WITH_RADOSGW)
${CMAKE_SOURCE_DIR}/src/common/ceph_json.cc)
add_library(cls_rgw SHARED ${cls_rgw_srcs})
target_link_libraries(cls_rgw json_spirit)
target_include_directories(cls_rgw
PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw"
PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include")
set_target_properties(cls_rgw PROPERTIES
VERSION "1.0.0"
SOVERSION "1"
Expand All @@ -210,6 +216,9 @@ if (WITH_RADOSGW)
rgw/cls_rgw_types.cc
rgw/cls_rgw_ops.cc)
add_library(cls_rgw_client STATIC ${cls_rgw_client_srcs})
target_include_directories(cls_rgw_client
PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw"
PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include")

endif (WITH_RADOSGW)

Expand Down Expand Up @@ -299,6 +308,9 @@ if (WITH_RADOSGW)
queue/cls_queue_src.cc
${CMAKE_SOURCE_DIR}/src/common/ceph_json.cc)
add_library(cls_rgw_gc SHARED ${cls_rgw_gc_srcs})
target_include_directories(cls_rgw_gc
PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw"
PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include")
set_target_properties(cls_rgw_gc PROPERTIES
VERSION "1.0.0"
SOVERSION "1"
Expand All @@ -309,6 +321,9 @@ if (WITH_RADOSGW)
set(cls_rgw_gc_client_srcs
rgw_gc/cls_rgw_gc_client.cc)
add_library(cls_rgw_gc_client STATIC ${cls_rgw_gc_client_srcs})
target_include_directories(cls_rgw_gc_client
PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw"
PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include")
endif (WITH_RADOSGW)


Expand Down
49 changes: 47 additions & 2 deletions src/cls/rgw/cls_rgw.cc
Expand Up @@ -74,7 +74,7 @@ static bool bi_is_plain_entry(const std::string& s) {
return (s.empty() || (unsigned char)s[0] != BI_PREFIX_CHAR);
}

int bi_entry_type(const string& s)
static int bi_entry_type(const string& s)
{
if (bi_is_plain_entry(s)) {
return BI_BUCKET_OBJS_INDEX;
Expand Down Expand Up @@ -3383,7 +3383,7 @@ static int usage_record_decode(bufferlist& record_bl, rgw_usage_log_entry& e)
return 0;
}

int rgw_user_usage_log_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
static int rgw_user_usage_log_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
CLS_LOG(10, "entered %s", __func__);

Expand Down Expand Up @@ -4180,6 +4180,47 @@ static int rgw_cls_lc_get_head(cls_method_context_t hctx, bufferlist *in, buffe
return 0;
}

static int rgw_mp_upload_part_info_update(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
CLS_LOG(10, "entered %s", __func__);
cls_rgw_mp_upload_part_info_update_op op;
auto in_iter = in->cbegin();
try {
decode(op, in_iter);
} catch (ceph::buffer::error& err) {
CLS_LOG(1, "ERROR: rgw_cls_mp_upload_part_info_update(): failed to decode op\n");
return -EINVAL;
}

RGWUploadPartInfo stored_info;

int ret = read_omap_entry(hctx, op.part_key, &stored_info);
if (ret < 0 && ret != -ENOENT) {
return ret;
}

/* merge all the prior (stored) manifest prefixes to carry forward */
if (!stored_info.manifest.empty()) {
op.info.past_prefixes.insert(stored_info.manifest.get_prefix());
}
op.info.past_prefixes.merge(stored_info.past_prefixes);

if (op.info.past_prefixes.count(op.info.manifest.get_prefix())) {
// Somehow the current chosen prefix collides with one of previous ones.
// Better fail this part upload so it can pick a different one in the next.
CLS_LOG(1, "ERROR: Current prefix %s is also a past prefix for part %s",
op.info.manifest.get_prefix().c_str(),
op.part_key.c_str());
return -EEXIST;
}

bufferlist bl;
encode(op.info, bl);
ret = cls_cxx_map_set_val(hctx, op.part_key, &bl);
CLS_LOG(10, "part info update on key [%s]: %zu past prefixes, ret %d", op.part_key.c_str(), op.info.past_prefixes.size(), ret);
return ret;
}

static int rgw_reshard_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
CLS_LOG(10, "entered %s", __func__);
Expand Down Expand Up @@ -4453,6 +4494,7 @@ CLS_INIT(rgw)
cls_method_handle_t h_rgw_lc_put_head;
cls_method_handle_t h_rgw_lc_get_head;
cls_method_handle_t h_rgw_lc_list_entries;
cls_method_handle_t h_rgw_mp_upload_part_info_update;
cls_method_handle_t h_rgw_reshard_add;
cls_method_handle_t h_rgw_reshard_list;
cls_method_handle_t h_rgw_reshard_get;
Expand Down Expand Up @@ -4516,6 +4558,9 @@ CLS_INIT(rgw)
cls_register_cxx_method(h_class, RGW_LC_GET_HEAD, CLS_METHOD_RD, rgw_cls_lc_get_head, &h_rgw_lc_get_head);
cls_register_cxx_method(h_class, RGW_LC_LIST_ENTRIES, CLS_METHOD_RD, rgw_cls_lc_list_entries, &h_rgw_lc_list_entries);

/* multipart */
cls_register_cxx_method(h_class, RGW_MP_UPLOAD_PART_INFO_UPDATE, CLS_METHOD_RD | CLS_METHOD_WR, rgw_mp_upload_part_info_update, &h_rgw_mp_upload_part_info_update);

/* resharding */
cls_register_cxx_method(h_class, RGW_RESHARD_ADD, CLS_METHOD_RD | CLS_METHOD_WR, rgw_reshard_add, &h_rgw_reshard_add);
cls_register_cxx_method(h_class, RGW_RESHARD_LIST, CLS_METHOD_RD, rgw_reshard_list, &h_rgw_reshard_list);
Expand Down
14 changes: 14 additions & 0 deletions src/cls/rgw/cls_rgw_client.cc
Expand Up @@ -1068,6 +1068,20 @@ int cls_rgw_lc_list(IoCtx& io_ctx, const string& oid,
return r;
}

void cls_rgw_mp_upload_part_info_update(librados::ObjectWriteOperation& op,
const std::string& part_key,
const RGWUploadPartInfo& info)
{
cls_rgw_mp_upload_part_info_update_op call;
call.part_key = part_key;
call.info = info;

buffer::list in;
encode(call, in);

op.exec(RGW_CLASS, RGW_MP_UPLOAD_PART_INFO_UPDATE, in);
}

void cls_rgw_reshard_add(librados::ObjectWriteOperation& op, const cls_rgw_reshard_entry& entry)
{
bufferlist in;
Expand Down

0 comments on commit 56daea9

Please sign in to comment.