Skip to content

Commit

Permalink
mds: use monotonic clock in beacon
Browse files Browse the repository at this point in the history
Also update other parts of MDS which interact.

Fixes: http://tracker.ceph.com/issues/26959

Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
(cherry picked from commit e77a2f5)

Conflicts:
	src/mds/Beacon.cc
	src/mds/Beacon.h
	src/mds/MDCache.h
	src/mds/Server.cc
  • Loading branch information
batrick committed Sep 27, 2018
1 parent e28cac8 commit 760ce8e
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 121 deletions.
110 changes: 48 additions & 62 deletions src/mds/Beacon.cc
Expand Up @@ -106,41 +106,31 @@ void Beacon::handle_mds_beacon(MMDSBeacon *m)
version_t seq = m->get_seq();

// update lab
if (seq_stamp.count(seq)) {
utime_t now = ceph_clock_now();
if (seq_stamp[seq] > last_acked_stamp) {
last_acked_stamp = seq_stamp[seq];
utime_t rtt = now - last_acked_stamp;

dout(10) << "handle_mds_beacon " << ceph_mds_state_name(m->get_state())
<< " seq " << m->get_seq() << " rtt " << rtt << dendl;

if (was_laggy && rtt < g_conf->mds_beacon_grace) {
dout(0) << "handle_mds_beacon no longer laggy" << dendl;
was_laggy = false;
laggy_until = now;
}
} else {
// Mark myself laggy if system clock goes backwards. Hopping
// later beacons will clear it.
dout(1) << "handle_mds_beacon system clock goes backwards, "
<< "mark myself laggy" << dendl;
last_acked_stamp = now - utime_t(g_conf->mds_beacon_grace + 1, 0);
was_laggy = true;
auto it = seq_stamp.find(seq);
if (it != seq_stamp.end()) {
auto now = clock::now();

last_acked_stamp = it->second;
auto rtt = std::chrono::duration<double>(now - last_acked_stamp).count();

dout(5) << "received beacon reply " << ceph_mds_state_name(m->get_state()) << " seq " << m->get_seq() << " rtt " << rtt << dendl;

if (laggy && rtt < g_conf->mds_beacon_grace) {
dout(0) << " MDS is no longer laggy" << dendl;
laggy = false;
last_laggy = now;
}

// clean up seq_stamp map
while (!seq_stamp.empty() &&
seq_stamp.begin()->first <= seq)
seq_stamp.erase(seq_stamp.begin());
seq_stamp.erase(seq_stamp.begin(), ++it);

// Wake a waiter up if present
if (awaiting_seq == seq) {
waiting_cond.Signal();
}
} else {
dout(10) << "handle_mds_beacon " << ceph_mds_state_name(m->get_state())
<< " seq " << m->get_seq() << " dne" << dendl;
dout(1) << "discarding unexpected beacon reply " << ceph_mds_state_name(m->get_state())
<< " seq " << m->get_seq() << " dne" << dendl;
}
m->put();
}
Expand All @@ -162,8 +152,9 @@ void Beacon::send_and_wait(const double duration)
<< " for up to " << duration << "s" << dendl;

utime_t timeout;
timeout.set_from_double(ceph_clock_now() + duration);
while ((!seq_stamp.empty() && seq_stamp.begin()->first <= awaiting_seq)
timeout.set_from_double(ceph_clock_now()+duration);
while (!seq_stamp.empty()
&& seq_stamp.begin()->first <= awaiting_seq
&& ceph_clock_now() < timeout) {
waiting_cond.WaitUntil(lock, timeout);
}
Expand All @@ -188,19 +179,20 @@ void Beacon::_send()
_send();
}));

auto now = clock::now();
auto since = std::chrono::duration<double>(now-last_acked_stamp).count();

if (!cct->get_heartbeat_map()->is_healthy()) {
/* If anything isn't progressing, let avoid sending a beacon so that
* the MDS will consider us laggy */
dout(1) << __func__ << " skipping beacon, heartbeat map not healthy" << dendl;
dout(0) << "Skipping beacon heartbeat to monitors (last acked " << since << "s ago); MDS internal heartbeat is not healthy!" << dendl;
return;
}

++last_seq;
dout(10) << __func__ << " " << ceph_mds_state_name(want_state)
<< " seq " << last_seq
<< dendl;
dout(5) << "Sending beacon " << ceph_mds_state_name(want_state) << " seq " << last_seq << dendl;

seq_stamp[last_seq] = ceph_clock_now();
seq_stamp[last_seq] = now;

assert(want_state != MDSMap::STATE_NULL);

Expand Down Expand Up @@ -256,17 +248,14 @@ bool Beacon::is_laggy()
{
Mutex::Locker l(lock);

if (last_acked_stamp == utime_t())
return false;

utime_t now = ceph_clock_now();
utime_t since = now - last_acked_stamp;
auto now = clock::now();
auto since = std::chrono::duration<double>(now-last_acked_stamp).count();
if (since > g_conf->mds_beacon_grace) {
dout(5) << "is_laggy " << since << " > " << g_conf->mds_beacon_grace
dout(1) << "is_laggy " << since << " > " << g_conf->mds_beacon_grace
<< " since last acked beacon" << dendl;
was_laggy = true;
if (since > (g_conf->mds_beacon_grace*2) &&
now > last_mon_reconnect + g_conf->mds_beacon_interval) {
laggy = true;
auto last_reconnect = std::chrono::duration<double>(now-last_mon_reconnect).count();
if (since > (g_conf->mds_beacon_grace*2) && last_reconnect > g_conf->mds_beacon_interval) {
// maybe it's not us?
dout(5) << "initiating monitor reconnect; maybe we're not the slow one"
<< dendl;
Expand All @@ -278,14 +267,7 @@ bool Beacon::is_laggy()
return false;
}

utime_t Beacon::get_laggy_until() const
{
Mutex::Locker l(lock);

return laggy_until;
}

void Beacon::set_want_state(MDSMap const *mdsmap, MDSMap::DaemonState const newstate)
void Beacon::set_want_state(const MDSMap* mdsmap, MDSMap::DaemonState const newstate)
{
Mutex::Locker l(lock);

Expand Down Expand Up @@ -393,30 +375,34 @@ void Beacon::notify_health(MDSRank const *mds)
set<Session*> sessions;
mds->sessionmap.get_client_session_set(sessions);

utime_t cutoff = ceph_clock_now();
cutoff -= g_conf->mds_recall_state_timeout;
utime_t last_recall = mds->mdcache->last_recall_state;
auto mds_recall_state_timeout = g_conf->mds_recall_state_timeout;
auto last_recall = mds->mdcache->last_recall_state;
auto last_recall_span = std::chrono::duration<double>(clock::now()-last_recall).count();
bool recall_state_timedout = last_recall_span > mds_recall_state_timeout;

std::list<MDSHealthMetric> late_recall_metrics;
std::list<MDSHealthMetric> large_completed_requests_metrics;
for (set<Session*>::iterator i = sessions.begin(); i != sessions.end(); ++i) {
Session *session = *i;
if (!session->recalled_at.is_zero()) {
for (auto& session : sessions) {
if (session->recalled_at != Session::clock::zero()) {
auto last_recall_sent = session->last_recall_sent;
auto recalled_at = session->recalled_at;
auto recalled_at_span = std::chrono::duration<double>(clock::now()-recalled_at).count();

dout(20) << "Session servicing RECALL " << session->info.inst
<< ": " << session->recalled_at << " " << session->recall_release_count
<< ": " << recalled_at_span << "s ago " << session->recall_release_count
<< "/" << session->recall_count << dendl;
if (last_recall < cutoff || session->last_recall_sent < last_recall) {
if (recall_state_timedout || last_recall_sent < last_recall) {
dout(20) << " no longer recall" << dendl;
session->clear_recalled_at();
} else if (session->recalled_at < cutoff) {
dout(20) << " exceeded timeout " << session->recalled_at << " vs. " << cutoff << dendl;
} else if (recalled_at_span > mds_recall_state_timeout) {
dout(20) << " exceeded timeout " << recalled_at_span << " vs. " << mds_recall_state_timeout << dendl;
std::ostringstream oss;
oss << "Client " << session->get_human_name() << " failing to respond to cache pressure";
MDSHealthMetric m(MDS_HEALTH_CLIENT_RECALL, HEALTH_WARN, oss.str());
m.metadata["client_id"] = stringify(session->info.inst.name.num());
late_recall_metrics.push_back(m);
} else {
dout(20) << " within timeout " << session->recalled_at << " vs. " << cutoff << dendl;
dout(20) << " within timeout " << recalled_at_span << " vs. " << mds_recall_state_timeout << dendl;
}
}
if ((session->get_num_trim_requests_warnings() > 0 &&
Expand Down Expand Up @@ -471,7 +457,7 @@ void Beacon::notify_health(MDSRank const *mds)

{
auto complaint_time = g_conf->osd_op_complaint_time;
auto now = ceph::coarse_mono_clock::now();
auto now = clock::now();
auto cutoff = now - ceph::make_timespan(complaint_time);

std::string count;
Expand Down
18 changes: 12 additions & 6 deletions src/mds/Beacon.h
Expand Up @@ -42,6 +42,9 @@ class MDSRank;
class Beacon : public Dispatcher
{
public:
using clock = ceph::coarse_mono_clock;
using time = ceph::coarse_mono_time;

Beacon(CephContext *cct_, MonClient *monc_, boost::string_view name);
~Beacon() override {};

Expand Down Expand Up @@ -74,7 +77,10 @@ class Beacon : public Dispatcher
void send_and_wait(const double duration);

bool is_laggy();
utime_t get_laggy_until() const;
double last_cleared_laggy() const {
Mutex::Locker l(lock);
return std::chrono::duration<double>(clock::now()-last_laggy).count();
}

private:
void _notify_mdsmap(MDSMap const *mdsmap);
Expand All @@ -97,11 +103,11 @@ class Beacon : public Dispatcher

// Internal beacon state
version_t last_seq = 0; // last seq sent to monitor
std::map<version_t,utime_t> seq_stamp; // seq # -> time sent
utime_t last_acked_stamp; // last time we sent a beacon that got acked
utime_t last_mon_reconnect;
bool was_laggy = false;
utime_t laggy_until;
std::map<version_t,time> seq_stamp; // seq # -> time sent
time last_acked_stamp = clock::zero(); // last time we sent a beacon that got acked
time last_mon_reconnect = clock::zero();
bool laggy = false;
time last_laggy = clock::zero();

// Health status to be copied into each beacon message
MDSHealth health;
Expand Down
2 changes: 1 addition & 1 deletion src/mds/MDCache.cc
Expand Up @@ -7497,7 +7497,7 @@ void MDCache::check_memory_usage()
mds->mlogger->set(l_mdm_heap, last.get_heap());

if (cache_toofull()) {
last_recall_state = ceph_clock_now();
last_recall_state = clock::now();
mds->server->recall_client_state();
}

Expand Down
5 changes: 4 additions & 1 deletion src/mds/MDCache.h
Expand Up @@ -118,6 +118,9 @@ static const int PREDIRTY_SHALLOW = 4; // only go to immediate parent (for easie

class MDCache {
public:
using clock = ceph::coarse_mono_clock;
using time = ceph::coarse_mono_time;

// my master
MDSRank *mds;

Expand Down Expand Up @@ -752,7 +755,7 @@ class MDCache {
void trim_client_leases();
void check_memory_usage();

utime_t last_recall_state;
time last_recall_state;

// shutdown
private:
Expand Down
5 changes: 0 additions & 5 deletions src/mds/MDSRank.cc
Expand Up @@ -1016,11 +1016,6 @@ void MDSRank::retry_dispatch(Message *m)
dec_dispatch_depth();
}

utime_t MDSRank::get_laggy_until() const
{
return beacon.get_laggy_until();
}

double MDSRank::get_dispatch_queue_max_age(utime_t now) const
{
return messenger->get_dispatch_queue_max_age(now);
Expand Down
5 changes: 4 additions & 1 deletion src/mds/MDSRank.h
Expand Up @@ -361,7 +361,10 @@ class MDSRank {
*/
void damaged_unlocked();

utime_t get_laggy_until() const;
double last_cleared_laggy() const {
return beacon.last_cleared_laggy();
}

double get_dispatch_queue_max_age(utime_t now) const;

void send_message_mds(Message *m, mds_rank_t mds);
Expand Down
55 changes: 23 additions & 32 deletions src/mds/Server.cc
Expand Up @@ -702,27 +702,25 @@ void Server::terminate_sessions()

void Server::find_idle_sessions()
{
dout(10) << "find_idle_sessions. laggy until " << mds->get_laggy_until() << dendl;
auto now = clock::now();
auto last_cleared_laggy = mds->last_cleared_laggy();

dout(10) << "find_idle_sessions. last cleared laggy state " << last_cleared_laggy << "s ago" << dendl;

// timeout/stale
// (caps go stale, lease die)
utime_t now = ceph_clock_now();
double queue_max_age = mds->get_dispatch_queue_max_age(now);

utime_t cutoff = now;
cutoff -= queue_max_age;
cutoff -= g_conf->mds_session_timeout;
double queue_max_age = mds->get_dispatch_queue_max_age(ceph_clock_now());
double cutoff = queue_max_age + mds->mdsmap->get_session_timeout();
while (1) {
Session *session = mds->sessionmap.get_oldest_session(Session::STATE_OPEN);
if (!session) break;
dout(20) << "laggiest active session is " << session->info.inst << dendl;
if (session->last_cap_renew >= cutoff) {
dout(20) << "laggiest active session is " << session->info.inst << " and sufficiently new ("
<< session->last_cap_renew << ")" << dendl;
auto last_cap_renew_span = std::chrono::duration<double>(now-session->last_cap_renew).count();
if (last_cap_renew_span < cutoff) {
dout(20) << "laggiest active session is " << session->info.inst << " and renewed caps recently (" << last_cap_renew_span << "s ago)" << dendl;
break;
}

dout(10) << "new stale session " << session->info.inst << " last " << session->last_cap_renew << dendl;
dout(10) << "new stale session " << session->info.inst << " last renewed caps " << last_cap_renew_span << "s ago" << dendl;
mds->sessionmap.set_state(session, Session::STATE_STALE);
mds->locker->revoke_stale_caps(session);
mds->locker->remove_stale_leases(session);
Expand All @@ -731,21 +729,17 @@ void Server::find_idle_sessions()
}

// autoclose
cutoff = now;
cutoff -= queue_max_age;
cutoff -= g_conf->mds_session_autoclose;
cutoff = queue_max_age + mds->mdsmap->get_session_autoclose();

// don't kick clients if we've been laggy
if (mds->get_laggy_until() > cutoff) {
dout(10) << " laggy_until " << mds->get_laggy_until() << " > cutoff " << cutoff
<< ", not kicking any clients to be safe" << dendl;
if (last_cleared_laggy < cutoff) {
dout(10) << " last cleared laggy " << last_cleared_laggy << "s ago (< cutoff " << cutoff
<< "), not kicking any clients to be safe" << dendl;
return;
}

if (mds->sessionmap.get_sessions().size() == 1 &&
mds->mdsmap->get_num_in_mds() == 1) {
dout(20) << "not evicting a slow client, because there is only one"
<< dendl;
if (mds->sessionmap.get_sessions().size() == 1 && mds->mdsmap->get_num_in_mds() == 1) {
dout(20) << "skipping client eviction because there is only one" << dendl;
return;
}

Expand All @@ -759,27 +753,24 @@ void Server::find_idle_sessions()
assert(stale_sessions != nullptr);

for (const auto &session: *stale_sessions) {
auto last_cap_renew_span = std::chrono::duration<double>(now-session->last_cap_renew).count();
if (session->is_importing()) {
dout(10) << "stopping at importing session " << session->info.inst << dendl;
break;
}
assert(session->is_stale());
if (session->last_cap_renew >= cutoff) {
dout(20) << "oldest stale session is " << session->info.inst << " and sufficiently new ("
<< session->last_cap_renew << ")" << dendl;
if (last_cap_renew_span < cutoff) {
dout(20) << "oldest stale session is " << session->info.inst << " and recently renewed caps " << last_cap_renew_span << "s ago" << dendl;
break;
}

to_evict.push_back(session);
}

for (const auto &session: to_evict) {
utime_t age = now;
age -= session->last_cap_renew;
mds->clog->warn() << "evicting unresponsive client " << *session
<< ", after " << age << " seconds";
dout(10) << "autoclosing stale session " << session->info.inst << " last "
<< session->last_cap_renew << dendl;
auto last_cap_renew_span = std::chrono::duration<double>(now-session->last_cap_renew).count();
mds->clog->warn() << "evicting unresponsive client " << *session << ", after " << last_cap_renew_span << " seconds";
dout(10) << "autoclosing stale session " << session->info.inst << " last renewed caps " << last_cap_renew_span << "s ago" << dendl;

if (g_conf->mds_session_blacklist_on_timeout) {
std::stringstream ss;
Expand Down Expand Up @@ -948,7 +939,7 @@ void Server::handle_client_reconnect(MClientReconnect *m)

// notify client of success with an OPEN
m->get_connection()->send_message(new MClientSession(CEPH_SESSION_OPEN));
session->last_cap_renew = ceph_clock_now();
session->last_cap_renew = clock::now();
mds->clog->debug() << "reconnect by " << session->info.inst << " after " << delay;

// snaprealms
Expand Down

0 comments on commit 760ce8e

Please sign in to comment.