Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

quincy: rgw: fix multipart upload object leaks due to re-upload #51976

Merged
merged 6 commits into from Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions qa/suites/rgw/verify/tasks/mp_reupload.yaml
@@ -0,0 +1,5 @@
tasks:
- workunit:
clients:
client.0:
- rgw/test_rgw_s3_mp_reupload.sh
121 changes: 121 additions & 0 deletions qa/workunits/rgw/test_rgw_s3_mp_reupload.py
@@ -0,0 +1,121 @@
import boto3
import botocore.exceptions
import sys
import os
import subprocess

#boto3.set_stream_logger(name='botocore')

# handles two optional system arguments:
# <bucket-name> : default is "bkt134"
# <0 or 1> : 0 -> upload aborted, 1 -> completed; default is completed

if len(sys.argv) >= 2:
bucket_name = sys.argv[1]
else:
bucket_name = "bkt314738362229"
print("bucket nams is %s" % bucket_name)

complete_mpu = True
if len(sys.argv) >= 3:
complete_mpu = int(sys.argv[2]) > 0

versioned_bucket = False
if len(sys.argv) >= 4:
versioned_bucket = int(sys.argv[3]) > 0

rgw_host = os.environ['RGW_HOST']
access_key = os.environ['RGW_ACCESS_KEY']
secret_key = os.environ['RGW_SECRET_KEY']

try:
endpoint='http://%s:%d' % (rgw_host, 80)
client = boto3.client('s3',
endpoint_url=endpoint,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key)
res = client.create_bucket(Bucket=bucket_name)
except botocore.exceptions.EndpointConnectionError:
try:
endpoint='https://%s:%d' % (rgw_host, 443)
client = boto3.client('s3',
endpoint_url=endpoint,
verify=False,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key)
res = client.create_bucket(Bucket=bucket_name)
except botocore.exceptions.EndpointConnectionError:
endpoint='http://%s:%d' % (rgw_host, 8000)
client = boto3.client('s3',
endpoint_url=endpoint,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key)
res = client.create_bucket(Bucket=bucket_name)

print("endpoint is %s" % endpoint)

if versioned_bucket:
res = client.put_bucket_versioning(
Bucket=bucket_name,
VersioningConfiguration={
'MFADelete': 'Disabled',
'Status': 'Enabled'}
)

key = "mpu_test4"
nparts = 2
ndups = 11
do_reupload = True

part_path = "/tmp/mp_part_5m"
subprocess.run(["dd", "if=/dev/urandom", "of=" + part_path, "bs=1M", "count=5"], check=True)

f = open(part_path, 'rb')

res = client.create_multipart_upload(Bucket=bucket_name, Key=key)
mpu_id = res["UploadId"]

print("start UploadId=%s" % (mpu_id))

parts = []
parts2 = []

for ix in range(0,nparts):
part_num = ix + 1
f.seek(0)
res = client.upload_part(Body=f, Bucket=bucket_name, Key=key,
UploadId=mpu_id, PartNumber=part_num)
# save
etag = res['ETag']
part = {'ETag': etag, 'PartNumber': part_num}
print("phase 1 uploaded part %s" % part)
parts.append(part)

if do_reupload:
# just re-upload part 1
part_num = 1
for ix in range(0,ndups):
f.seek(0)
res = client.upload_part(Body=f, Bucket=bucket_name, Key=key,
UploadId=mpu_id, PartNumber=part_num)
etag = res['ETag']
part = {'ETag': etag, 'PartNumber': part_num}
print ("phase 2 uploaded part %s" % part)

# save
etag = res['ETag']
part = {'ETag': etag, 'PartNumber': part_num}
parts2.append(part)

if complete_mpu:
print("completing multipart upload, parts=%s" % parts)
res = client.complete_multipart_upload(
Bucket=bucket_name, Key=key, UploadId=mpu_id,
MultipartUpload={'Parts': parts})
else:
print("aborting multipart upload, parts=%s" % parts)
res = client.abort_multipart_upload(
Bucket=bucket_name, Key=key, UploadId=mpu_id)

# clean up
subprocess.run(["rm", "-f", part_path], check=True)
110 changes: 110 additions & 0 deletions qa/workunits/rgw/test_rgw_s3_mp_reupload.sh
@@ -0,0 +1,110 @@
#!/usr/bin/env bash

# INITIALIZATION

mydir=$(dirname $0)
data_pool=default.rgw.buckets.data
orphan_list_out=/tmp/orphan_list.$$
radoslist_out=/tmp/radoslist.$$
rados_ls_out=/tmp/rados_ls.$$
diff_out=/tmp/diff.$$

rgw_host="$(hostname --fqdn)"
echo "INFO: fully qualified domain name: $rgw_host"

export RGW_ACCESS_KEY="0555b35654ad1656d804"
export RGW_SECRET_KEY="h7GhxuBLTrlhVUyxSPUKUV8r/2EI4ngqJxD7iBdBYLhwluN30JaT3Q=="
export RGW_HOST="${RGW_HOST:-$rgw_host}"

# random argument determines if multipart is aborted or completed 50/50
outcome=$((RANDOM % 2))
if [ $outcome -eq 0 ] ;then
echo "== TESTING *ABORTING* MULTIPART UPLOAD WITH RE-UPLOADS =="
else
echo "== TESTING *COMPLETING* MULTIPART UPLOAD WITH RE-UPLOADS =="
fi

# random argument determines if multipart is aborted or completed 50/50
versioning=$((RANDOM % 2))
if [ $versioning -eq 0 ] ;then
echo "== TESTING NON-VERSIONED BUCKET =="
else
echo "== TESTING VERSIONED BUCKET =="
fi

# create a randomized bucket name
bucket="reupload-bkt-$((RANDOM % 899999 + 100000))"


# SET UP PYTHON VIRTUAL ENVIRONMENT

# install boto3
python3 -m venv $mydir
source $mydir/bin/activate
pip install pip --upgrade
pip install boto3


# CREATE RGW USER IF NECESSARY

if radosgw-admin user info --access-key $RGW_ACCESS_KEY 2>/dev/null ;then
echo INFO: user already exists
else
echo INFO: creating user
radosgw-admin user create --uid testid \
--access-key $RGW_ACCESS_KEY \
--secret $RGW_SECRET_KEY \
--display-name 'M. Tester' \
--email tester@ceph.com 2>/dev/null
fi


# RUN REUPLOAD TEST

$mydir/bin/python3 ${mydir}/test_rgw_s3_mp_reupload.py $bucket $outcome $versioning


# ANALYZE FOR ERRORS
# (NOTE: for now we're choosing not to use the rgw-orphan-list tool)

# force garbage collection to remove extra parts
radosgw-admin gc process --include-all 2>/dev/null

marker=$(radosgw-admin metadata get bucket:$bucket 2>/dev/null | grep bucket_id | sed 's/.*: "\(.*\)".*/\1/')

# determine expected rados objects
radosgw-admin bucket radoslist --bucket=$bucket 2>/dev/null | sort >$radoslist_out
echo "radosgw-admin bucket radoslist:"
cat $radoslist_out

# determine found rados objects
rados ls -p $data_pool 2>/dev/null | grep "^$marker" | sort >$rados_ls_out
echo "rados ls:"
cat $rados_ls_out

# compare expected and found
diff $radoslist_out $rados_ls_out >$diff_out
if [ $(cat $diff_out | wc -l) -ne 0 ] ;then
error=1
echo "ERROR: Found differences between expected and actual rados objects for test bucket."
echo " note: indicators: '>' found but not expected; '<' expected but not found."
cat $diff_out
fi


# CLEAN UP

deactivate

rm -f $orphan_list_out $radoslist_out $rados_ls_out $diff_out


# PRODUCE FINAL RESULTS

if [ -n "$error" ] ;then
echo "== FAILED =="
exit 1
fi

echo "== PASSED =="
exit 0
15 changes: 15 additions & 0 deletions src/cls/CMakeLists.txt
Expand Up @@ -74,6 +74,9 @@ if (WITH_RADOSGW)
set(cls_otp_srcs otp/cls_otp.cc)
add_library(cls_otp SHARED ${cls_otp_srcs})
target_link_libraries(cls_otp OATH::OATH)
target_include_directories(cls_otp
PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw"
PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include")
set_target_properties(cls_otp PROPERTIES
VERSION "1.0.0"
SOVERSION "1"
Expand Down Expand Up @@ -198,6 +201,9 @@ if (WITH_RADOSGW)
${CMAKE_SOURCE_DIR}/src/common/ceph_json.cc)
add_library(cls_rgw SHARED ${cls_rgw_srcs})
target_link_libraries(cls_rgw json_spirit)
target_include_directories(cls_rgw
PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw"
PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include")
set_target_properties(cls_rgw PROPERTIES
VERSION "1.0.0"
SOVERSION "1"
Expand All @@ -210,6 +216,9 @@ if (WITH_RADOSGW)
rgw/cls_rgw_types.cc
rgw/cls_rgw_ops.cc)
add_library(cls_rgw_client STATIC ${cls_rgw_client_srcs})
target_include_directories(cls_rgw_client
PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw"
PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include")

endif (WITH_RADOSGW)

Expand Down Expand Up @@ -299,6 +308,9 @@ if (WITH_RADOSGW)
queue/cls_queue_src.cc
${CMAKE_SOURCE_DIR}/src/common/ceph_json.cc)
add_library(cls_rgw_gc SHARED ${cls_rgw_gc_srcs})
target_include_directories(cls_rgw_gc
PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw"
PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include")
set_target_properties(cls_rgw_gc PROPERTIES
VERSION "1.0.0"
SOVERSION "1"
Expand All @@ -309,6 +321,9 @@ if (WITH_RADOSGW)
set(cls_rgw_gc_client_srcs
rgw_gc/cls_rgw_gc_client.cc)
add_library(cls_rgw_gc_client STATIC ${cls_rgw_gc_client_srcs})
target_include_directories(cls_rgw_gc_client
PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw"
PUBLIC "${CMAKE_SOURCE_DIR}/src/spawn/include")
endif (WITH_RADOSGW)


Expand Down
49 changes: 47 additions & 2 deletions src/cls/rgw/cls_rgw.cc
Expand Up @@ -74,7 +74,7 @@ static bool bi_is_plain_entry(const std::string& s) {
return (s.empty() || (unsigned char)s[0] != BI_PREFIX_CHAR);
}

int bi_entry_type(const string& s)
static int bi_entry_type(const string& s)
{
if (bi_is_plain_entry(s)) {
return BI_BUCKET_OBJS_INDEX;
Expand Down Expand Up @@ -3352,7 +3352,7 @@ static int usage_record_decode(bufferlist& record_bl, rgw_usage_log_entry& e)
return 0;
}

int rgw_user_usage_log_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
static int rgw_user_usage_log_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
CLS_LOG(10, "entered %s", __func__);

Expand Down Expand Up @@ -4149,6 +4149,47 @@ static int rgw_cls_lc_get_head(cls_method_context_t hctx, bufferlist *in, buffe
return 0;
}

static int rgw_mp_upload_part_info_update(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
CLS_LOG(10, "entered %s", __func__);
cls_rgw_mp_upload_part_info_update_op op;
auto in_iter = in->cbegin();
try {
decode(op, in_iter);
} catch (ceph::buffer::error& err) {
CLS_LOG(1, "ERROR: rgw_cls_mp_upload_part_info_update(): failed to decode op\n");
return -EINVAL;
}

RGWUploadPartInfo stored_info;

int ret = read_omap_entry(hctx, op.part_key, &stored_info);
if (ret < 0 && ret != -ENOENT) {
return ret;
}

/* merge all the prior (stored) manifest prefixes to carry forward */
if (!stored_info.manifest.empty()) {
op.info.past_prefixes.insert(stored_info.manifest.get_prefix());
}
op.info.past_prefixes.merge(stored_info.past_prefixes);

if (op.info.past_prefixes.count(op.info.manifest.get_prefix())) {
// Somehow the current chosen prefix collides with one of previous ones.
// Better fail this part upload so it can pick a different one in the next.
CLS_LOG(1, "ERROR: Current prefix %s is also a past prefix for part %s",
op.info.manifest.get_prefix().c_str(),
op.part_key.c_str());
return -EEXIST;
}

bufferlist bl;
encode(op.info, bl);
ret = cls_cxx_map_set_val(hctx, op.part_key, &bl);
CLS_LOG(10, "part info update on key [%s]: %zu past prefixes, ret %d", op.part_key.c_str(), op.info.past_prefixes.size(), ret);
return ret;
}

static int rgw_reshard_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
CLS_LOG(10, "entered %s", __func__);
Expand Down Expand Up @@ -4422,6 +4463,7 @@ CLS_INIT(rgw)
cls_method_handle_t h_rgw_lc_put_head;
cls_method_handle_t h_rgw_lc_get_head;
cls_method_handle_t h_rgw_lc_list_entries;
cls_method_handle_t h_rgw_mp_upload_part_info_update;
cls_method_handle_t h_rgw_reshard_add;
cls_method_handle_t h_rgw_reshard_list;
cls_method_handle_t h_rgw_reshard_get;
Expand Down Expand Up @@ -4485,6 +4527,9 @@ CLS_INIT(rgw)
cls_register_cxx_method(h_class, RGW_LC_GET_HEAD, CLS_METHOD_RD, rgw_cls_lc_get_head, &h_rgw_lc_get_head);
cls_register_cxx_method(h_class, RGW_LC_LIST_ENTRIES, CLS_METHOD_RD, rgw_cls_lc_list_entries, &h_rgw_lc_list_entries);

/* multipart */
cls_register_cxx_method(h_class, RGW_MP_UPLOAD_PART_INFO_UPDATE, CLS_METHOD_RD | CLS_METHOD_WR, rgw_mp_upload_part_info_update, &h_rgw_mp_upload_part_info_update);

/* resharding */
cls_register_cxx_method(h_class, RGW_RESHARD_ADD, CLS_METHOD_RD | CLS_METHOD_WR, rgw_reshard_add, &h_rgw_reshard_add);
cls_register_cxx_method(h_class, RGW_RESHARD_LIST, CLS_METHOD_RD, rgw_reshard_list, &h_rgw_reshard_list);
Expand Down
14 changes: 14 additions & 0 deletions src/cls/rgw/cls_rgw_client.cc
Expand Up @@ -1068,6 +1068,20 @@ int cls_rgw_lc_list(IoCtx& io_ctx, const string& oid,
return r;
}

void cls_rgw_mp_upload_part_info_update(librados::ObjectWriteOperation& op,
const std::string& part_key,
const RGWUploadPartInfo& info)
{
cls_rgw_mp_upload_part_info_update_op call;
call.part_key = part_key;
call.info = info;

buffer::list in;
encode(call, in);

op.exec(RGW_CLASS, RGW_MP_UPLOAD_PART_INFO_UPDATE, in);
}

void cls_rgw_reshard_add(librados::ObjectWriteOperation& op, const cls_rgw_reshard_entry& entry)
{
bufferlist in;
Expand Down