Skip to content

Commit

Permalink
rgw/multisite: maintain endpoints connectable status and retry the re…
Browse files Browse the repository at this point in the history
…quests to them when appropriate

Signed-off-by: Juan Zhu <jzhu4@dev-10-34-20-139.pw1.bcc.bloomberg.com>
  • Loading branch information
jzhu116-bloomberg authored and Juan Zhu committed Oct 17, 2023
1 parent 28cb4d1 commit cb7af19
Show file tree
Hide file tree
Showing 10 changed files with 548 additions and 256 deletions.
150 changes: 87 additions & 63 deletions src/rgw/driver/rados/rgw_data_sync.cc
Expand Up @@ -233,6 +233,9 @@ class RGWReadRemoteDataLogShardInfoCR : public RGWCoroutine {
int shard_id;
RGWDataChangesLogInfo *shard_info;

int tries{0};
int op_ret{0};

public:
RGWReadRemoteDataLogShardInfoCR(RGWDataSyncCtx *_sc,
int _shard_id, RGWDataChangesLogInfo *_shard_info) : RGWCoroutine(_sc->cct),
Expand All @@ -243,41 +246,48 @@ class RGWReadRemoteDataLogShardInfoCR : public RGWCoroutine {
shard_info(_shard_info) {
}

~RGWReadRemoteDataLogShardInfoCR() override {
if (http_op) {
http_op->put();
}
}

int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
yield {
char buf[16];
snprintf(buf, sizeof(buf), "%d", shard_id);
rgw_http_param_pair pairs[] = { { "type" , "data" },
{ "id", buf },
{ "info" , NULL },
{ NULL, NULL } };
static constexpr int NUM_ENPOINT_IOERROR_RETRIES = 20;
for (tries = 0; tries < NUM_ENPOINT_IOERROR_RETRIES; tries++) {
ldpp_dout(dpp, 20) << "read remote datalog shard info. shard_id=" << shard_id << " retries=" << tries << dendl;

string p = "/admin/log/";
yield {
char buf[16];
snprintf(buf, sizeof(buf), "%d", shard_id);
rgw_http_param_pair pairs[] = { { "type" , "data" },
{ "id", buf },
{ "info" , NULL },
{ NULL, NULL } };

http_op = new RGWRESTReadResource(sc->conn, p, pairs, NULL, sync_env->http_manager);
string p = "/admin/log/";

init_new_io(http_op);
http_op = new RGWRESTReadResource(sc->conn, p, pairs, NULL, sync_env->http_manager);

int ret = http_op->aio_read(dpp);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: failed to read from " << p << dendl;
log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
return set_cr_error(ret);
init_new_io(http_op);

int ret = http_op->aio_read(dpp);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: failed to read from " << p << dendl;
log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
http_op->put();
return set_cr_error(ret);
}

return io_block(0);
}
yield {
op_ret = http_op->wait(shard_info, null_yield);
http_op->put();
}

return io_block(0);
}
yield {
int ret = http_op->wait(shard_info, null_yield);
if (ret < 0) {
return set_cr_error(ret);
if (op_ret < 0) {
if (op_ret == -EIO && tries < NUM_ENPOINT_IOERROR_RETRIES - 1) {
ldpp_dout(dpp, 20) << "failed to fetch remote datalog shard info. retry. shard_id=" << shard_id << dendl;
continue;
} else {
return set_cr_error(op_ret);
}
}
return set_cr_done();
}
Expand Down Expand Up @@ -315,6 +325,9 @@ class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
read_remote_data_log_response response;
std::optional<TOPNSPC::common::PerfGuard> timer;

int tries{0};
int op_ret{0};

public:
RGWReadRemoteDataLogShardCR(RGWDataSyncCtx *_sc, int _shard_id,
const std::string& marker, string *pnext_marker,
Expand All @@ -324,53 +337,62 @@ class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
shard_id(_shard_id), marker(marker), pnext_marker(pnext_marker),
entries(_entries), truncated(_truncated) {
}
~RGWReadRemoteDataLogShardCR() override {
if (http_op) {
http_op->put();
}
}

int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
yield {
char buf[16];
snprintf(buf, sizeof(buf), "%d", shard_id);
rgw_http_param_pair pairs[] = { { "type" , "data" },
{ "id", buf },
{ "marker", marker.c_str() },
{ "extra-info", "true" },
{ NULL, NULL } };
static constexpr int NUM_ENPOINT_IOERROR_RETRIES = 20;
for (tries = 0; tries < NUM_ENPOINT_IOERROR_RETRIES; tries++) {
ldpp_dout(dpp, 20) << "read remote datalog shard. shard_id=" << shard_id << " retries=" << tries << dendl;

string p = "/admin/log/";
yield {
char buf[16];
snprintf(buf, sizeof(buf), "%d", shard_id);
rgw_http_param_pair pairs[] = { { "type" , "data" },
{ "id", buf },
{ "marker", marker.c_str() },
{ "extra-info", "true" },
{ NULL, NULL } };

http_op = new RGWRESTReadResource(sc->conn, p, pairs, NULL, sync_env->http_manager);
string p = "/admin/log/";

init_new_io(http_op);
http_op = new RGWRESTReadResource(sc->conn, p, pairs, NULL, sync_env->http_manager);

init_new_io(http_op);

if (sync_env->counters) {
timer.emplace(sync_env->counters, sync_counters::l_poll);
}
int ret = http_op->aio_read(dpp);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: failed to read from " << p << dendl;
log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
if (sync_env->counters) {
sync_env->counters->inc(sync_counters::l_poll_err);
timer.emplace(sync_env->counters, sync_counters::l_poll);
}
int ret = http_op->aio_read(dpp);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: failed to read from " << p << dendl;
log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
if (sync_env->counters) {
sync_env->counters->inc(sync_counters::l_poll_err);
}
http_op->put();
return set_cr_error(ret);
}
return set_cr_error(ret);

return io_block(0);
}
yield {
timer.reset();
op_ret = http_op->wait(&response, null_yield);
http_op->put();
}

return io_block(0);
}
yield {
timer.reset();
int ret = http_op->wait(&response, null_yield);
if (ret < 0) {
if (sync_env->counters && ret != -ENOENT) {
sync_env->counters->inc(sync_counters::l_poll_err);
if (op_ret < 0) {
if (op_ret == -EIO && tries < NUM_ENPOINT_IOERROR_RETRIES - 1) {
ldpp_dout(dpp, 20) << "failed to read remote datalog shard. retry. shard_id=" << shard_id << dendl;
continue;
} else {
if (sync_env->counters && op_ret != -ENOENT) {
sync_env->counters->inc(sync_counters::l_poll_err);
}
return set_cr_error(op_ret);
}
return set_cr_error(ret);
}

entries->clear();
entries->swap(response.entries);
*pnext_marker = response.marker;
Expand Down Expand Up @@ -421,6 +443,8 @@ bool RGWReadRemoteDataLogInfoCR::spawn_next() {
}

class RGWListRemoteDataLogShardCR : public RGWSimpleCoroutine {
static constexpr int NUM_ENPOINT_IOERROR_RETRIES = 20;

RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
RGWRESTReadResource *http_op;
Expand All @@ -434,7 +458,7 @@ class RGWListRemoteDataLogShardCR : public RGWSimpleCoroutine {
RGWListRemoteDataLogShardCR(RGWDataSyncCtx *sc, int _shard_id,
const string& _marker, uint32_t _max_entries,
rgw_datalog_shard_data *_result)
: RGWSimpleCoroutine(sc->cct), sc(sc), sync_env(sc->env), http_op(NULL),
: RGWSimpleCoroutine(sc->cct, NUM_ENPOINT_IOERROR_RETRIES), sc(sc), sync_env(sc->env), http_op(NULL),
shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}

int send_request(const DoutPrefixProvider *dpp) override {
Expand Down Expand Up @@ -474,7 +498,7 @@ class RGWListRemoteDataLogShardCR : public RGWSimpleCoroutine {
int ret = http_op->wait(result, null_yield);
http_op->put();
if (ret < 0 && ret != -ENOENT) {
ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl;
ldpp_dout(sync_env->dpp, 5) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl;
return ret;
}
return 0;
Expand Down
113 changes: 70 additions & 43 deletions src/rgw/driver/rados/rgw_rados.cc
Expand Up @@ -4038,19 +4038,28 @@ int RGWRados::stat_remote_obj(const DoutPrefixProvider *dpp,
constexpr bool sync_manifest = true;
constexpr bool skip_decrypt = true;
constexpr bool sync_cloudtiered = true;
int ret = conn->get_obj(dpp, user_id, info, src_obj, pmod, unmod_ptr,
dest_mtime_weight.zone_short_id, dest_mtime_weight.pg_ver,
prepend_meta, get_op, rgwx_stat,
sync_manifest, skip_decrypt, nullptr, sync_cloudtiered,
true, &cb, &in_stream_req);
if (ret < 0) {
return ret;
}

ret = conn->complete_request(in_stream_req, nullptr, &set_mtime, psize,
nullptr, pheaders, y);
if (ret < 0) {
return ret;
static constexpr int NUM_ENPOINT_IOERROR_RETRIES = 20;
for (int tries = 0; tries < NUM_ENPOINT_IOERROR_RETRIES; tries++) {
int ret = conn->get_obj(dpp, user_id, info, src_obj, pmod, unmod_ptr,
dest_mtime_weight.zone_short_id, dest_mtime_weight.pg_ver,
prepend_meta, get_op, rgwx_stat,
sync_manifest, skip_decrypt, nullptr, sync_cloudtiered,
true, &cb, &in_stream_req);
if (ret < 0) {
return ret;
}

ret = conn->complete_request(in_stream_req, nullptr, &set_mtime, psize,
nullptr, pheaders, y);
if (ret < 0) {
if (ret == -EIO && tries < NUM_ENPOINT_IOERROR_RETRIES - 1) {
ldpp_dout(dpp, 20) << __func__ << "(): failed to fetch object from remote. retries=" << tries << dendl;
continue;
}
return ret;
}
break;
}

bufferlist& extra_data_bl = cb.get_extra_data();
Expand Down Expand Up @@ -4258,20 +4267,29 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
static constexpr bool sync_manifest = true;
static constexpr bool skip_decrypt = true;
static constexpr bool sync_cloudtiered = true;
ret = conn->get_obj(rctx.dpp, user_id, info, src_obj, pmod, unmod_ptr,
dest_mtime_weight.zone_short_id, dest_mtime_weight.pg_ver,
prepend_meta, get_op, rgwx_stat,
sync_manifest, skip_decrypt, &dst_zone_trace,
sync_cloudtiered, true,
&cb, &in_stream_req);
if (ret < 0) {
goto set_err_state;
}

ret = conn->complete_request(in_stream_req, &etag, &set_mtime,
&expected_size, nullptr, nullptr, rctx.y);
if (ret < 0) {
goto set_err_state;
static constexpr int NUM_ENPOINT_IOERROR_RETRIES = 20;
for (int tries = 0; tries < NUM_ENPOINT_IOERROR_RETRIES; tries++) {
ret = conn->get_obj(rctx.dpp, user_id, info, src_obj, pmod, unmod_ptr,
dest_mtime_weight.zone_short_id, dest_mtime_weight.pg_ver,
prepend_meta, get_op, rgwx_stat,
sync_manifest, skip_decrypt, &dst_zone_trace,
sync_cloudtiered, true,
&cb, &in_stream_req);
if (ret < 0) {
goto set_err_state;
}

ret = conn->complete_request(in_stream_req, &etag, &set_mtime,
&expected_size, nullptr, nullptr, rctx.y);
if (ret < 0) {
if (ret == -EIO && tries < NUM_ENPOINT_IOERROR_RETRIES - 1) {
ldpp_dout(rctx.dpp, 20) << __func__ << "(): failed to fetch object from remote. retries=" << tries << dendl;
continue;
}
goto set_err_state;
}
break;
}
ret = cb.flush();
if (ret < 0) {
Expand Down Expand Up @@ -4490,28 +4508,37 @@ int RGWRados::copy_obj_to_remote_dest(const DoutPrefixProvider *dpp,

auto rest_master_conn = svc.zone->get_master_conn();

int ret = rest_master_conn->put_obj_async_init(dpp, user_id, dest_obj, src_attrs, &out_stream_req);
if (ret < 0) {
return ret;
}
static constexpr int NUM_ENPOINT_IOERROR_RETRIES = 20;
for (int tries = 0; tries < NUM_ENPOINT_IOERROR_RETRIES; tries++) {
int ret = rest_master_conn->put_obj_async_init(dpp, user_id, dest_obj, src_attrs, &out_stream_req);
if (ret < 0) {
return ret;
}

out_stream_req->set_send_length(astate->size);
out_stream_req->set_send_length(astate->size);

ret = RGWHTTP::send(out_stream_req);
if (ret < 0) {
delete out_stream_req;
return ret;
}
ret = RGWHTTP::send(out_stream_req);
if (ret < 0) {
delete out_stream_req;
return ret;
}

ret = read_op.iterate(dpp, 0, astate->size - 1, out_stream_req->get_out_cb(), y);
if (ret < 0) {
delete out_stream_req;
return ret;
}
ret = read_op.iterate(dpp, 0, astate->size - 1, out_stream_req->get_out_cb(), y);
if (ret < 0) {
delete out_stream_req;
return ret;
}

ret = rest_master_conn->complete_request(out_stream_req, etag, mtime, y);
if (ret < 0)
return ret;
ret = rest_master_conn->complete_request(out_stream_req, etag, mtime, y);
if (ret < 0) {
if (ret == -EIO && tries < NUM_ENPOINT_IOERROR_RETRIES - 1) {
ldpp_dout(dpp, 20) << __func__ << "(): failed to put_obj_async_init. retries=" << tries << dendl;
continue;
}
return ret;
}
break;
}

return 0;
}
Expand Down

0 comments on commit cb7af19

Please sign in to comment.