Skip to content

Commit

Permalink
Merge pull request #24319 from smithfarm/wip-35979-mimic
Browse files Browse the repository at this point in the history
mimic: multisite: data sync error repo processing does not back off on empty

Reviewed-by: Casey Bodley <cbodley@redhat.com>
  • Loading branch information
yuriw committed Oct 17, 2018
2 parents 47939ee + b783c6b commit 742f11c
Show file tree
Hide file tree
Showing 15 changed files with 164 additions and 61 deletions.
22 changes: 22 additions & 0 deletions src/common/ceph_time.h
Expand Up @@ -180,6 +180,14 @@ namespace ceph {
return from_timespec(ts);
}

static bool is_zero(const time_point& t) {
return (t == time_point::min());
}

static time_point zero() {
return time_point::min();
}

static time_t to_time_t(const time_point& t) noexcept {
return duration_cast<seconds>(t.time_since_epoch()).count();
}
Expand Down Expand Up @@ -468,6 +476,20 @@ inline timespan to_timespan(signedspan z) {
ceph_assert(z >= signedspan::zero());
return std::chrono::duration_cast<timespan>(z);
}

// detects presence of Clock::to_timespec() and from_timespec()
template <typename Clock, typename = std::void_t<>>
struct converts_to_timespec : std::false_type {};

template <typename Clock>
struct converts_to_timespec<Clock, std::void_t<decltype(
Clock::from_timespec(Clock::to_timespec(
std::declval<typename Clock::time_point>()))
)>> : std::true_type {};

template <typename Clock>
constexpr bool converts_to_timespec_v = converts_to_timespec<Clock>::value;

} // namespace ceph

#endif // COMMON_CEPH_TIME_H
28 changes: 26 additions & 2 deletions src/include/encoding.h
Expand Up @@ -280,7 +280,8 @@ inline void decode_nohead(int len, bufferlist& s, bufferlist::iterator& p)

// Time, since the templates are defined in std::chrono

template<typename Clock, typename Duration>
template<typename Clock, typename Duration,
typename std::enable_if_t<converts_to_timespec_v<Clock>>* = nullptr>
void encode(const std::chrono::time_point<Clock, Duration>& t,
ceph::bufferlist &bl) {
auto ts = Clock::to_timespec(t);
Expand All @@ -291,7 +292,8 @@ void encode(const std::chrono::time_point<Clock, Duration>& t,
encode(ns, bl);
}

template<typename Clock, typename Duration>
template<typename Clock, typename Duration,
typename std::enable_if_t<converts_to_timespec_v<Clock>>* = nullptr>
void decode(std::chrono::time_point<Clock, Duration>& t,
bufferlist::iterator& p) {
uint32_t s;
Expand All @@ -305,6 +307,28 @@ void decode(std::chrono::time_point<Clock, Duration>& t,
t = Clock::from_timespec(ts);
}

template<typename Rep, typename Period,
typename std::enable_if_t<std::is_integral_v<Rep>>* = nullptr>
void encode(const std::chrono::duration<Rep, Period>& d,
ceph::bufferlist &bl) {
using namespace std::chrono;
uint32_t s = duration_cast<seconds>(d).count();
uint32_t ns = (duration_cast<nanoseconds>(d) % seconds(1)).count();
encode(s, bl);
encode(ns, bl);
}

template<typename Rep, typename Period,
typename std::enable_if_t<std::is_integral_v<Rep>>* = nullptr>
void decode(std::chrono::duration<Rep, Period>& d,
bufferlist::iterator& p) {
uint32_t s;
uint32_t ns;
decode(s, p);
decode(ns, p);
d = std::chrono::seconds(s) + std::chrono::nanoseconds(ns);
}

// -----------------------------
// STL container types

Expand Down
10 changes: 6 additions & 4 deletions src/include/utime.h
Expand Up @@ -71,10 +71,12 @@ class utime_t {
tv.tv_sec = v.tv_sec;
tv.tv_nsec = v.tv_nsec;
}
explicit utime_t(const ceph::real_time& rt) {
ceph_timespec ts = real_clock::to_ceph_timespec(rt);
decode_timeval(&ts);
}
// conversion from ceph::real_time/coarse_real_time
template <typename Clock, typename std::enable_if_t<
ceph::converts_to_timespec_v<Clock>>* = nullptr>
explicit utime_t(const std::chrono::time_point<Clock>& t)
: utime_t(Clock::to_timespec(t)) {} // forward to timespec ctor

utime_t(const struct timeval &v) {
set_from_timeval(&v);
}
Expand Down
3 changes: 2 additions & 1 deletion src/rgw/rgw_admin.cc
Expand Up @@ -5518,7 +5518,8 @@ int main(int argc, const char **argv)
formatter->open_array_section("log_entries");

do {
uint64_t total_time = entry.total_time.to_msec();
using namespace std::chrono;
uint64_t total_time = duration_cast<milliseconds>(entry.total_time).count();

agg_time += total_time;
agg_bytes_sent += entry.bytes_sent;
Expand Down
2 changes: 1 addition & 1 deletion src/rgw/rgw_common.cc
Expand Up @@ -284,7 +284,7 @@ req_state::req_state(CephContext* _cct, RGWEnv* e, RGWUserInfo* u)
enable_usage_log = e->get_enable_usage_log();
defer_to_bucket_acls = e->get_defer_to_bucket_acls();

time = ceph_clock_now();
time = Clock::now();
}

req_state::~req_state() {
Expand Down
6 changes: 5 additions & 1 deletion src/rgw/rgw_common.h
Expand Up @@ -1886,7 +1886,11 @@ struct req_state {
req_info info;
req_init_state init_state;

utime_t time;
using Clock = ceph::coarse_real_clock;
Clock::time_point time;

Clock::duration time_elapsed() const { return Clock::now() - time; }

void *obj_ctx{nullptr};
string dialect;
string req_id;
Expand Down
67 changes: 42 additions & 25 deletions src/rgw/rgw_data_sync.cc
Expand Up @@ -1206,7 +1206,7 @@ class RGWDataSyncShardCR : public RGWCoroutine {
string error_marker;
int max_error_entries;

ceph::real_time error_retry_time;
ceph::coarse_real_time error_retry_time;

#define RETRY_BACKOFF_SECS_MIN 60
#define RETRY_BACKOFF_SECS_DEFAULT 60
Expand Down Expand Up @@ -1435,32 +1435,33 @@ class RGWDataSyncShardCR : public RGWCoroutine {
}
}

/* process bucket shards that previously failed */
yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
error_marker, &error_entries,
max_error_entries));
tn->log(20, SSTR("read error repo, got " << error_entries.size() << " entries"));
iter = error_entries.begin();
for (; iter != error_entries.end(); ++iter) {
error_marker = *iter;
tn->log(20, SSTR("handle error entry: " << error_marker));
spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true, tn), false);
}
if ((int)error_entries.size() != max_error_entries) {
if (error_marker.empty() && error_entries.empty()) {
/* the retry repo is empty, we back off a bit before calling it again */
retry_backoff_secs *= 2;
if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) {
retry_backoff_secs = RETRY_BACKOFF_SECS_MAX;
if (error_retry_time <= ceph::coarse_real_clock::now()) {
/* process bucket shards that previously failed */
yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
error_marker, &error_entries,
max_error_entries));
tn->log(20, SSTR("read error repo, got " << error_entries.size() << " entries"));
iter = error_entries.begin();
for (; iter != error_entries.end(); ++iter) {
error_marker = *iter;
tn->log(20, SSTR("handle error entry: " << error_marker));
spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true, tn), false);
}
if ((int)error_entries.size() != max_error_entries) {
if (error_marker.empty() && error_entries.empty()) {
/* the retry repo is empty, we back off a bit before calling it again */
retry_backoff_secs *= 2;
if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) {
retry_backoff_secs = RETRY_BACKOFF_SECS_MAX;
}
} else {
retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT;
}
} else {
retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT;
error_retry_time = ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs);
error_marker.clear();
}
error_retry_time = ceph::real_clock::now() + make_timespan(retry_backoff_secs);
error_marker.clear();
}


yield call(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &shard_info));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to fetch remote data log info: ret=" << retcode));
Expand Down Expand Up @@ -1533,13 +1534,29 @@ class RGWDataSyncShardCR : public RGWCoroutine {
tn->log(20, SSTR("shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker));
if (datalog_marker == sync_marker.marker || remote_trimmed == RemoteTrimmed) {
tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
#define INCREMENTAL_INTERVAL 20
yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
yield wait(get_idle_interval());
}
} while (true);
}
return 0;
}

utime_t get_idle_interval() const {
#define INCREMENTAL_INTERVAL 20
ceph::timespan interval = std::chrono::seconds(INCREMENTAL_INTERVAL);
if (!ceph::coarse_real_clock::is_zero(error_retry_time)) {
auto now = ceph::coarse_real_clock::now();
if (error_retry_time > now) {
auto d = error_retry_time - now;
if (interval > d) {
interval = d;
}
}
}
// convert timespan -> time_point -> utime_t
return utime_t(ceph::coarse_real_clock::zero() + interval);
}

void stop_spawned_services() {
lease_cr->go_down();
if (error_repo) {
Expand Down
3 changes: 1 addition & 2 deletions src/rgw/rgw_file.cc
Expand Up @@ -1511,8 +1511,7 @@ namespace rgw {

done:
dispose_processor(processor);
perfcounter->tinc(l_rgw_put_lat,
(ceph_clock_now() - s->time));
perfcounter->tinc(l_rgw_put_lat, s->time_elapsed());
return op_ret;
} /* exec_finish */

Expand Down
19 changes: 12 additions & 7 deletions src/rgw/rgw_log.cc
Expand Up @@ -237,8 +237,11 @@ void rgw_format_ops_log_entry(struct rgw_log_entry& entry, Formatter *formatter)
{
formatter->open_object_section("log_entry");
formatter->dump_string("bucket", entry.bucket);
entry.time.gmtime(formatter->dump_stream("time")); // UTC
entry.time.localtime(formatter->dump_stream("time_local"));
{
auto t = utime_t{entry.time};
t.gmtime(formatter->dump_stream("time")); // UTC
t.localtime(formatter->dump_stream("time_local"));
}
formatter->dump_string("remote_addr", entry.remote_addr);
string obj_owner = entry.object_owner.to_str();
if (obj_owner.length())
Expand All @@ -251,9 +254,11 @@ void rgw_format_ops_log_entry(struct rgw_log_entry& entry, Formatter *formatter)
formatter->dump_int("bytes_sent", entry.bytes_sent);
formatter->dump_int("bytes_received", entry.bytes_received);
formatter->dump_int("object_size", entry.obj_size);
uint64_t total_time = entry.total_time.to_msec();

formatter->dump_int("total_time", total_time);
{
using namespace std::chrono;
uint64_t total_time = duration_cast<milliseconds>(entry.total_time).count();
formatter->dump_int("total_time", total_time);
}
formatter->dump_string("user_agent", entry.user_agent);
formatter->dump_string("referrer", entry.referrer);
if (entry.x_headers.size() > 0) {
Expand Down Expand Up @@ -406,7 +411,7 @@ int rgw_log_op(RGWRados *store, RGWREST* const rest, struct req_state *s,
uint64_t bytes_received = ACCOUNTING_IO(s)->get_bytes_received();

entry.time = s->time;
entry.total_time = ceph_clock_now() - s->time;
entry.total_time = s->time_elapsed();
entry.bytes_sent = bytes_sent;
entry.bytes_received = bytes_received;
if (s->err.http_ret) {
Expand All @@ -423,7 +428,7 @@ int rgw_log_op(RGWRados *store, RGWREST* const rest, struct req_state *s,
encode(entry, bl);

struct tm bdt;
time_t t = entry.time.sec();
time_t t = req_state::Clock::to_time_t(entry.time);
if (s->cct->_conf->rgw_log_object_name_utc)
gmtime_r(&t, &bdt);
else
Expand Down
6 changes: 3 additions & 3 deletions src/rgw/rgw_log.h
Expand Up @@ -5,7 +5,6 @@
#define CEPH_RGW_LOG_H
#include <boost/container/flat_map.hpp>
#include "rgw_common.h"
#include "include/utime.h"
#include "common/Formatter.h"
#include "common/OutputDataSocket.h"

Expand All @@ -14,11 +13,12 @@ class RGWRados;
struct rgw_log_entry {

using headers_map = boost::container::flat_map<std::string, std::string>;
using Clock = req_state::Clock;

rgw_user object_owner;
rgw_user bucket_owner;
string bucket;
utime_t time;
Clock::time_point time;
string remote_addr;
string user;
rgw_obj_key obj;
Expand All @@ -29,7 +29,7 @@ struct rgw_log_entry {
uint64_t bytes_sent;
uint64_t bytes_received;
uint64_t obj_size;
utime_t total_time;
Clock::duration total_time;
string user_agent;
string referrer;
string bucket_id;
Expand Down
7 changes: 2 additions & 5 deletions src/rgw/rgw_op.cc
Expand Up @@ -1719,7 +1719,6 @@ static bool object_is_expired(map<string, bufferlist>& attrs) {

void RGWGetObj::execute()
{
utime_t start_time = s->time;
bufferlist bl;
gc_invalidate_time = ceph_clock_now();
gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2);
Expand Down Expand Up @@ -1876,8 +1875,7 @@ void RGWGetObj::execute()
if (op_ret >= 0)
op_ret = filter->flush();

perfcounter->tinc(l_rgw_get_lat,
(ceph_clock_now() - start_time));
perfcounter->tinc(l_rgw_get_lat, s->time_elapsed());
if (op_ret < 0) {
goto done_err;
}
Expand Down Expand Up @@ -3812,8 +3810,7 @@ void RGWPutObj::execute()

done:
dispose_processor(processor);
perfcounter->tinc(l_rgw_put_lat,
(ceph_clock_now() - s->time));
perfcounter->tinc(l_rgw_put_lat, s->time_elapsed());
}

int RGWPostObj::verify_permission()
Expand Down
3 changes: 1 addition & 2 deletions src/rgw/rgw_request.cc
Expand Up @@ -22,7 +22,6 @@ void RGWRequest::log_format(struct req_state *s, const char *fmt, ...)
} /* RGWRequest::log_format */

void RGWRequest::log_init() {
ts = ceph_clock_now();
}

void RGWRequest::log(struct req_state *s, const char *msg) {
Expand All @@ -31,7 +30,7 @@ void RGWRequest::log(struct req_state *s, const char *msg) {
req_str.append(" ");
req_str.append(s->info.request_uri);
}
utime_t t = ceph_clock_now() - ts;
auto t = s->time_elapsed();
dout(2) << "req " << id << ":" << t << ":" << s->dialect << ":"
<< req_str << ":" << (op ? op->name() : "") << ":" << msg
<< dendl;
Expand Down
1 change: 0 additions & 1 deletion src/rgw/rgw_request.h
Expand Up @@ -23,7 +23,6 @@ struct RGWRequest
struct req_state *s;
string req_str;
RGWOp *op;
utime_t ts;

explicit RGWRequest(uint64_t id) : id(id), s(NULL), op(NULL) {}

Expand Down

0 comments on commit 742f11c

Please sign in to comment.