Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rgw/multisite: maintain endpoints connectable status and retry the requests to them when appropriate #53320

Merged
merged 2 commits into from Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
150 changes: 87 additions & 63 deletions src/rgw/driver/rados/rgw_data_sync.cc
Expand Up @@ -233,6 +233,9 @@ class RGWReadRemoteDataLogShardInfoCR : public RGWCoroutine {
int shard_id;
RGWDataChangesLogInfo *shard_info;

int tries{0};
int op_ret{0};

public:
RGWReadRemoteDataLogShardInfoCR(RGWDataSyncCtx *_sc,
int _shard_id, RGWDataChangesLogInfo *_shard_info) : RGWCoroutine(_sc->cct),
Expand All @@ -243,41 +246,48 @@ class RGWReadRemoteDataLogShardInfoCR : public RGWCoroutine {
shard_info(_shard_info) {
}

~RGWReadRemoteDataLogShardInfoCR() override {
if (http_op) {
http_op->put();
}
}

int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
yield {
char buf[16];
snprintf(buf, sizeof(buf), "%d", shard_id);
rgw_http_param_pair pairs[] = { { "type" , "data" },
{ "id", buf },
{ "info" , NULL },
{ NULL, NULL } };
static constexpr int NUM_ENPOINT_IOERROR_RETRIES = 20;
for (tries = 0; tries < NUM_ENPOINT_IOERROR_RETRIES; tries++) {
ldpp_dout(dpp, 20) << "read remote datalog shard info. shard_id=" << shard_id << " retries=" << tries << dendl;

string p = "/admin/log/";
yield {
char buf[16];
snprintf(buf, sizeof(buf), "%d", shard_id);
rgw_http_param_pair pairs[] = { { "type" , "data" },
{ "id", buf },
{ "info" , NULL },
{ NULL, NULL } };

http_op = new RGWRESTReadResource(sc->conn, p, pairs, NULL, sync_env->http_manager);
string p = "/admin/log/";

init_new_io(http_op);
http_op = new RGWRESTReadResource(sc->conn, p, pairs, NULL, sync_env->http_manager);

int ret = http_op->aio_read(dpp);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: failed to read from " << p << dendl;
log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
return set_cr_error(ret);
init_new_io(http_op);

int ret = http_op->aio_read(dpp);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: failed to read from " << p << dendl;
log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
http_op->put();
return set_cr_error(ret);
}

return io_block(0);
}
yield {
op_ret = http_op->wait(shard_info, null_yield);
http_op->put();
}

return io_block(0);
}
yield {
int ret = http_op->wait(shard_info, null_yield);
if (ret < 0) {
return set_cr_error(ret);
if (op_ret < 0) {
if (op_ret == -EIO && tries < NUM_ENPOINT_IOERROR_RETRIES - 1) {
ldpp_dout(dpp, 20) << "failed to fetch remote datalog shard info. retry. shard_id=" << shard_id << dendl;
continue;
} else {
return set_cr_error(op_ret);
}
}
return set_cr_done();
}
Expand Down Expand Up @@ -315,6 +325,9 @@ class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
read_remote_data_log_response response;
std::optional<TOPNSPC::common::PerfGuard> timer;

int tries{0};
int op_ret{0};

public:
RGWReadRemoteDataLogShardCR(RGWDataSyncCtx *_sc, int _shard_id,
const std::string& marker, string *pnext_marker,
Expand All @@ -324,53 +337,62 @@ class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
shard_id(_shard_id), marker(marker), pnext_marker(pnext_marker),
entries(_entries), truncated(_truncated) {
}
~RGWReadRemoteDataLogShardCR() override {
if (http_op) {
http_op->put();
}
}

int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
yield {
char buf[16];
snprintf(buf, sizeof(buf), "%d", shard_id);
rgw_http_param_pair pairs[] = { { "type" , "data" },
{ "id", buf },
{ "marker", marker.c_str() },
{ "extra-info", "true" },
{ NULL, NULL } };
static constexpr int NUM_ENPOINT_IOERROR_RETRIES = 20;
for (tries = 0; tries < NUM_ENPOINT_IOERROR_RETRIES; tries++) {
ldpp_dout(dpp, 20) << "read remote datalog shard. shard_id=" << shard_id << " retries=" << tries << dendl;

string p = "/admin/log/";
yield {
char buf[16];
snprintf(buf, sizeof(buf), "%d", shard_id);
rgw_http_param_pair pairs[] = { { "type" , "data" },
{ "id", buf },
{ "marker", marker.c_str() },
{ "extra-info", "true" },
{ NULL, NULL } };

http_op = new RGWRESTReadResource(sc->conn, p, pairs, NULL, sync_env->http_manager);
string p = "/admin/log/";

init_new_io(http_op);
http_op = new RGWRESTReadResource(sc->conn, p, pairs, NULL, sync_env->http_manager);

init_new_io(http_op);

if (sync_env->counters) {
timer.emplace(sync_env->counters, sync_counters::l_poll);
}
int ret = http_op->aio_read(dpp);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: failed to read from " << p << dendl;
log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
if (sync_env->counters) {
sync_env->counters->inc(sync_counters::l_poll_err);
timer.emplace(sync_env->counters, sync_counters::l_poll);
}
int ret = http_op->aio_read(dpp);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: failed to read from " << p << dendl;
log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
if (sync_env->counters) {
sync_env->counters->inc(sync_counters::l_poll_err);
}
http_op->put();
return set_cr_error(ret);
}
return set_cr_error(ret);

return io_block(0);
}
yield {
timer.reset();
op_ret = http_op->wait(&response, null_yield);
http_op->put();
}

return io_block(0);
}
yield {
timer.reset();
int ret = http_op->wait(&response, null_yield);
if (ret < 0) {
if (sync_env->counters && ret != -ENOENT) {
sync_env->counters->inc(sync_counters::l_poll_err);
if (op_ret < 0) {
if (op_ret == -EIO && tries < NUM_ENPOINT_IOERROR_RETRIES - 1) {
ldpp_dout(dpp, 20) << "failed to read remote datalog shard. retry. shard_id=" << shard_id << dendl;
continue;
} else {
if (sync_env->counters && op_ret != -ENOENT) {
sync_env->counters->inc(sync_counters::l_poll_err);
}
return set_cr_error(op_ret);
}
return set_cr_error(ret);
}

entries->clear();
entries->swap(response.entries);
*pnext_marker = response.marker;
Expand Down Expand Up @@ -421,6 +443,8 @@ bool RGWReadRemoteDataLogInfoCR::spawn_next() {
}

class RGWListRemoteDataLogShardCR : public RGWSimpleCoroutine {
static constexpr int NUM_ENPOINT_IOERROR_RETRIES = 20;

RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
RGWRESTReadResource *http_op;
Expand All @@ -434,7 +458,7 @@ class RGWListRemoteDataLogShardCR : public RGWSimpleCoroutine {
RGWListRemoteDataLogShardCR(RGWDataSyncCtx *sc, int _shard_id,
const string& _marker, uint32_t _max_entries,
rgw_datalog_shard_data *_result)
: RGWSimpleCoroutine(sc->cct), sc(sc), sync_env(sc->env), http_op(NULL),
: RGWSimpleCoroutine(sc->cct, NUM_ENPOINT_IOERROR_RETRIES), sc(sc), sync_env(sc->env), http_op(NULL),
shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}

int send_request(const DoutPrefixProvider *dpp) override {
Expand Down Expand Up @@ -474,7 +498,7 @@ class RGWListRemoteDataLogShardCR : public RGWSimpleCoroutine {
int ret = http_op->wait(result, null_yield);
http_op->put();
if (ret < 0 && ret != -ENOENT) {
ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl;
ldpp_dout(sync_env->dpp, 5) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl;
return ret;
}
return 0;
Expand Down
113 changes: 70 additions & 43 deletions src/rgw/driver/rados/rgw_rados.cc
Expand Up @@ -4028,19 +4028,28 @@ int RGWRados::stat_remote_obj(const DoutPrefixProvider *dpp,
constexpr bool sync_manifest = true;
constexpr bool skip_decrypt = true;
constexpr bool sync_cloudtiered = true;
int ret = conn->get_obj(dpp, user_id, info, src_obj, pmod, unmod_ptr,
dest_mtime_weight.zone_short_id, dest_mtime_weight.pg_ver,
prepend_meta, get_op, rgwx_stat,
sync_manifest, skip_decrypt, nullptr, sync_cloudtiered,
true, &cb, &in_stream_req);
if (ret < 0) {
return ret;
}

ret = conn->complete_request(in_stream_req, nullptr, &set_mtime, psize,
nullptr, pheaders, y);
if (ret < 0) {
return ret;
static constexpr int NUM_ENPOINT_IOERROR_RETRIES = 20;
for (int tries = 0; tries < NUM_ENPOINT_IOERROR_RETRIES; tries++) {
int ret = conn->get_obj(dpp, user_id, info, src_obj, pmod, unmod_ptr,
dest_mtime_weight.zone_short_id, dest_mtime_weight.pg_ver,
prepend_meta, get_op, rgwx_stat,
sync_manifest, skip_decrypt, nullptr, sync_cloudtiered,
true, &cb, &in_stream_req);
if (ret < 0) {
return ret;
}

ret = conn->complete_request(in_stream_req, nullptr, &set_mtime, psize,
nullptr, pheaders, y);
if (ret < 0) {
if (ret == -EIO && tries < NUM_ENPOINT_IOERROR_RETRIES - 1) {
ldpp_dout(dpp, 20) << __func__ << "(): failed to fetch object from remote. retries=" << tries << dendl;
continue;
}
return ret;
}
break;
}

bufferlist& extra_data_bl = cb.get_extra_data();
Expand Down Expand Up @@ -4248,20 +4257,29 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
static constexpr bool sync_manifest = true;
static constexpr bool skip_decrypt = true;
static constexpr bool sync_cloudtiered = true;
ret = conn->get_obj(rctx.dpp, user_id, info, src_obj, pmod, unmod_ptr,
dest_mtime_weight.zone_short_id, dest_mtime_weight.pg_ver,
prepend_meta, get_op, rgwx_stat,
sync_manifest, skip_decrypt, &dst_zone_trace,
sync_cloudtiered, true,
&cb, &in_stream_req);
if (ret < 0) {
goto set_err_state;
}

ret = conn->complete_request(in_stream_req, &etag, &set_mtime,
&accounted_size, nullptr, nullptr, rctx.y);
if (ret < 0) {
goto set_err_state;
static constexpr int NUM_ENPOINT_IOERROR_RETRIES = 20;
for (int tries = 0; tries < NUM_ENPOINT_IOERROR_RETRIES; tries++) {
ret = conn->get_obj(rctx.dpp, user_id, info, src_obj, pmod, unmod_ptr,
dest_mtime_weight.zone_short_id, dest_mtime_weight.pg_ver,
prepend_meta, get_op, rgwx_stat,
sync_manifest, skip_decrypt, &dst_zone_trace,
sync_cloudtiered, true,
&cb, &in_stream_req);
if (ret < 0) {
goto set_err_state;
}

ret = conn->complete_request(in_stream_req, &etag, &set_mtime,
&accounted_size, nullptr, nullptr, rctx.y);
if (ret < 0) {
if (ret == -EIO && tries < NUM_ENPOINT_IOERROR_RETRIES - 1) {
ldpp_dout(rctx.dpp, 20) << __func__ << "(): failed to fetch object from remote. retries=" << tries << dendl;
continue;
}
goto set_err_state;
}
break;
}
ret = cb.flush();
if (ret < 0) {
Expand Down Expand Up @@ -4495,28 +4513,37 @@ int RGWRados::copy_obj_to_remote_dest(const DoutPrefixProvider *dpp,

auto rest_master_conn = svc.zone->get_master_conn();

int ret = rest_master_conn->put_obj_async_init(dpp, user_id, dest_obj, src_attrs, &out_stream_req);
if (ret < 0) {
return ret;
}
static constexpr int NUM_ENPOINT_IOERROR_RETRIES = 20;
for (int tries = 0; tries < NUM_ENPOINT_IOERROR_RETRIES; tries++) {
int ret = rest_master_conn->put_obj_async_init(dpp, user_id, dest_obj, src_attrs, &out_stream_req);
if (ret < 0) {
return ret;
}

out_stream_req->set_send_length(astate->size);
out_stream_req->set_send_length(astate->size);

ret = RGWHTTP::send(out_stream_req);
if (ret < 0) {
delete out_stream_req;
return ret;
}
ret = RGWHTTP::send(out_stream_req);
if (ret < 0) {
delete out_stream_req;
return ret;
}

ret = read_op.iterate(dpp, 0, astate->size - 1, out_stream_req->get_out_cb(), y);
if (ret < 0) {
delete out_stream_req;
return ret;
}
ret = read_op.iterate(dpp, 0, astate->size - 1, out_stream_req->get_out_cb(), y);
if (ret < 0) {
delete out_stream_req;
return ret;
}

ret = rest_master_conn->complete_request(out_stream_req, etag, mtime, y);
if (ret < 0)
return ret;
ret = rest_master_conn->complete_request(out_stream_req, etag, mtime, y);
if (ret < 0) {
if (ret == -EIO && tries < NUM_ENPOINT_IOERROR_RETRIES - 1) {
ldpp_dout(dpp, 20) << __func__ << "(): failed to put_obj_async_init. retries=" << tries << dendl;
continue;
}
return ret;
}
break;
}

return 0;
}
Expand Down