Skip to content

Commit

Permalink
rgw: configurable write obj window size
Browse files Browse the repository at this point in the history
Fixes: http://tracker.ceph.com/issues/18623

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
(cherry picked from commit 66a82b4)
  • Loading branch information
yehudasa authored and smithfarm committed Apr 13, 2017
1 parent a4772f7 commit 3fd6429
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 21 deletions.
2 changes: 2 additions & 0 deletions src/common/config_opts.h
Original file line number Diff line number Diff line change
Expand Up @@ -1353,6 +1353,8 @@ OPTION(nss_db_path, OPT_STR, "") // path to nss db


OPTION(rgw_max_chunk_size, OPT_INT, 4 * 1024 * 1024)
OPTION(rgw_put_obj_min_window_size, OPT_INT, 16 * 1024 * 1024)
OPTION(rgw_put_obj_max_window_size, OPT_INT, 64 * 1024 * 1024)
OPTION(rgw_max_put_size, OPT_U64, 5ULL*1024*1024*1024)

/**
Expand Down
2 changes: 0 additions & 2 deletions src/rgw/rgw_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ using ceph::crypto::MD5;

#define RGW_BUCKETS_OBJ_SUFFIX ".buckets"

#define RGW_MAX_PENDING_CHUNKS 16

#define RGW_FORMAT_PLAIN 0
#define RGW_FORMAT_XML 1
#define RGW_FORMAT_JSON 2
Expand Down
8 changes: 5 additions & 3 deletions src/rgw/rgw_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -779,8 +779,8 @@ class RGWPutObj_Filter : public RGWPutObjDataProcessor
virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool *again) override {
return next->handle_data(bl, ofs, phandle, pobj, again);
}
virtual int throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait) override {
return next->throttle_data(handle, obj, need_to_wait);
virtual int throttle_data(void *handle, const rgw_obj& obj, uint64_t size, bool need_to_wait) override {
return next->throttle_data(handle, obj, size, need_to_wait);
}
}; /* RGWPutObj_Filter */

Expand Down Expand Up @@ -1606,11 +1606,13 @@ static inline int put_data_and_throttle(RGWPutObjDataProcessor *processor,
void *handle;
rgw_obj obj;

uint64_t size = data.length();

int ret = processor->handle_data(data, ofs, &handle, &obj, &again);
if (ret < 0)
return ret;

ret = processor->throttle_data(handle, obj, need_to_wait);
ret = processor->throttle_data(handle, obj, size, need_to_wait);
if (ret < 0)
return ret;

Expand Down
40 changes: 29 additions & 11 deletions src/rgw/rgw_rados.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2290,6 +2290,7 @@ struct put_obj_aio_info RGWPutObjProcessor_Aio::pop_pending()
struct put_obj_aio_info info;
info = pending.front();
pending.pop_front();
pending_size -= info.size;
return info;
}

Expand Down Expand Up @@ -2328,17 +2329,19 @@ int RGWPutObjProcessor_Aio::drain_pending()
return ret;
}

int RGWPutObjProcessor_Aio::throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait)
int RGWPutObjProcessor_Aio::throttle_data(void *handle, const rgw_obj& obj, uint64_t size, bool need_to_wait)
{
bool _wait = need_to_wait;

if (handle) {
struct put_obj_aio_info info;
info.handle = handle;
info.obj = obj;
info.size = size;
pending_size += size;
pending.push_back(info);
}
size_t orig_size = pending.size();
size_t orig_size = pending_size;

/* first drain complete IOs */
while (pending_has_completed()) {
Expand All @@ -2350,12 +2353,16 @@ int RGWPutObjProcessor_Aio::throttle_data(void *handle, const rgw_obj& obj, bool
}

/* resize window in case messages are draining too fast */
if (orig_size - pending.size() >= max_chunks) {
max_chunks++;
if (orig_size - pending_size >= window_size) {
window_size += store->ctx()->_conf->rgw_max_chunk_size;
uint64_t max_window_size = store->ctx()->_conf->rgw_put_obj_max_window_size;
if (window_size > max_window_size) {
window_size = max_window_size;
}
}

/* now throttle. Note that need_to_wait should only affect the first IO operation */
if (pending.size() > max_chunks || _wait) {
if (pending_size > window_size || _wait) {
int r = wait_pending_front();
if (r < 0)
return r;
Expand All @@ -2377,6 +2384,15 @@ int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phan
return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle, exclusive);
}

int RGWPutObjProcessor_Aio::prepare(RGWRados *store, string *oid_rand)
{
RGWPutObjProcessor::prepare(store, oid_rand);

window_size = store->ctx()->_conf->rgw_put_obj_min_window_size;

return 0;
}

int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool *again)
{
*phandle = NULL;
Expand Down Expand Up @@ -2418,7 +2434,7 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **pha

int RGWPutObjProcessor_Atomic::prepare_init(RGWRados *store, string *oid_rand)
{
RGWPutObjProcessor::prepare(store, oid_rand);
RGWPutObjProcessor_Aio::prepare(store, oid_rand);

int r = store->get_max_chunk_size(bucket, &max_chunk_size);
if (r < 0) {
Expand Down Expand Up @@ -2495,13 +2511,14 @@ int RGWPutObjProcessor_Atomic::complete_writing_data()
}
bufferlist bl;
pending_data_bl.splice(0, max_write_size, &bl);
uint64_t write_len = bl.length();
int r = write_data(bl, data_ofs, &handle, &obj, false);
if (r < 0) {
ldout(store->ctx(), 0) << "ERROR: write_data() returned " << r << dendl;
return r;
}
data_ofs += bl.length();
r = throttle_data(handle, obj, false);
data_ofs += write_len;
r = throttle_data(handle, obj, write_len, false);
if (r < 0) {
ldout(store->ctx(), 0) << "ERROR: throttle_data() returned " << r << dendl;
return r;
Expand Down Expand Up @@ -6719,6 +6736,7 @@ class RGWRadosPutObj : public RGWGetDataCB
do {
void *handle = NULL;
rgw_obj obj;
uint64_t size = bl.length();
int ret = filter->handle_data(bl, ofs, &handle, &obj, &again);
if (ret < 0)
return ret;
Expand All @@ -6730,7 +6748,7 @@ class RGWRadosPutObj : public RGWGetDataCB
ret = opstate->renew_state();
if (ret < 0) {
ldout(cct, 0) << "ERROR: RGWRadosPutObj::handle_data(): failed to renew op state ret=" << ret << dendl;
int r = filter->throttle_data(handle, obj, false);
int r = filter->throttle_data(handle, obj, size, false);
if (r < 0) {
ldout(cct, 0) << "ERROR: RGWRadosPutObj::handle_data(): processor->throttle_data() returned " << r << dendl;
}
Expand All @@ -6741,7 +6759,7 @@ class RGWRadosPutObj : public RGWGetDataCB
need_opstate = false;
}

ret = filter->throttle_data(handle, obj, false);
ret = filter->throttle_data(handle, obj, size, false);
if (ret < 0)
return ret;
} while (again);
Expand Down Expand Up @@ -7654,7 +7672,7 @@ int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx,
if (ret < 0) {
return ret;
}
ret = processor.throttle_data(handle, obj, false);
ret = processor.throttle_data(handle, obj, end - ofs + 1, false);
if (ret < 0)
return ret;
} while (again);
Expand Down
15 changes: 10 additions & 5 deletions src/rgw/rgw_rados.h
Original file line number Diff line number Diff line change
Expand Up @@ -3328,7 +3328,7 @@ class RGWPutObjDataProcessor
RGWPutObjDataProcessor(){}
virtual ~RGWPutObjDataProcessor(){}
virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool *again) = 0;
virtual int throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait) = 0;
virtual int throttle_data(void *handle, const rgw_obj& obj, uint64_t size, bool need_to_wait) = 0;
}; /* RGWPutObjDataProcessor */


Expand Down Expand Up @@ -3371,12 +3371,16 @@ class RGWPutObjProcessor : public RGWPutObjDataProcessor
struct put_obj_aio_info {
void *handle;
rgw_obj obj;
uint64_t size;
};

#define RGW_PUT_OBJ_MIN_WINDOW_SIZE_DEFAULT (16 * 1024 * 1024)

class RGWPutObjProcessor_Aio : public RGWPutObjProcessor
{
list<struct put_obj_aio_info> pending;
size_t max_chunks;
uint64_t window_size{RGW_PUT_OBJ_MIN_WINDOW_SIZE_DEFAULT};
uint64_t pending_size{0};

struct put_obj_aio_info pop_pending();
int wait_pending_front();
Expand All @@ -3385,7 +3389,7 @@ class RGWPutObjProcessor_Aio : public RGWPutObjProcessor
rgw_obj last_written_obj;

protected:
uint64_t obj_len;
uint64_t obj_len{0};

set<rgw_obj> written_objs;

Expand All @@ -3397,9 +3401,10 @@ class RGWPutObjProcessor_Aio : public RGWPutObjProcessor
int handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle, bool exclusive);

public:
int throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait);
int prepare(RGWRados *store, string *oid_rand);
int throttle_data(void *handle, const rgw_obj& obj, uint64_t size, bool need_to_wait);

RGWPutObjProcessor_Aio(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info) : RGWPutObjProcessor(obj_ctx, bucket_info), max_chunks(RGW_MAX_PENDING_CHUNKS), obj_len(0) {}
RGWPutObjProcessor_Aio(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info) : RGWPutObjProcessor(obj_ctx, bucket_info) {}
virtual ~RGWPutObjProcessor_Aio();
}; /* RGWPutObjProcessor_Aio */

Expand Down

0 comments on commit 3fd6429

Please sign in to comment.