Skip to content

Commit

Permalink
mds: add drop_cache command for MDS
Browse files Browse the repository at this point in the history
This command trims the cache, optionally ask the clients to release all
caps, waits for client's acknowledgement that caps are released, and,
optionally, flushes the journal.

To wait for client's acknowledgement for releasing caps, MDS waits for
receiving CEPH_SESSION_RECALL_STATEMSG_ACK message from all the clients
till the timeout. To adjust this timeout, a new option is added to the
mds section, namely `mds_drop_cache_ack_timeout`.

Signed-off-by: Rishabh Dave <ridave@redhat.com>
  • Loading branch information
rishabh-d-dave committed Apr 20, 2018
1 parent 8b3b7db commit 2e5a4dc
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 4 deletions.
3 changes: 3 additions & 0 deletions src/client/Client.cc
Expand Up @@ -2078,6 +2078,9 @@ void Client::handle_client_session(MClientSession *m)

case CEPH_SESSION_RECALL_STATE:
trim_caps(session, m->get_max_caps());
session->con->send_message(new MClientSession(
CEPH_SESSION_RECALL_STATEMSG_ACK,
m->get_seq()));
break;

case CEPH_SESSION_FLUSHMSG:
Expand Down
1 change: 1 addition & 0 deletions src/common/ceph_strings.cc
Expand Up @@ -286,6 +286,7 @@ const char *ceph_session_op_name(int op)
case CEPH_SESSION_RENEWCAPS: return "renewcaps";
case CEPH_SESSION_STALE: return "stale";
case CEPH_SESSION_RECALL_STATE: return "recall_state";
case CEPH_SESSION_RECALL_STATEMSG_ACK: return "recall_state_ack";
case CEPH_SESSION_FLUSHMSG: return "flushmsg";
case CEPH_SESSION_FLUSHMSG_ACK: return "flushmsg_ack";
case CEPH_SESSION_FORCE_RO: return "force_ro";
Expand Down
6 changes: 6 additions & 0 deletions src/common/options.cc
Expand Up @@ -6938,6 +6938,12 @@ std::vector<Option> get_mds_options() {
Option("mds_hack_allow_loading_invalid_metadata", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
.set_default(0)
.set_description("INTENTIONALLY CAUSE DATA LOSS by bypasing checks for invalid metadata on disk. Allows testing repair tools."),
Option("mds_drop_cache_ack_timeout", Option::TYPE_UINT,
Option::LEVEL_ADVANCED)
.set_default(30)
.set_description("how long should the MDS wait for receiving "
"CEPH_SESSION_RECALL_STATEMSG_ACK from all sessions "
"while dropping cache."),
});
}

Expand Down
1 change: 1 addition & 0 deletions src/include/ceph_fs.h
Expand Up @@ -298,6 +298,7 @@ enum {
CEPH_SESSION_RENEWCAPS,
CEPH_SESSION_STALE,
CEPH_SESSION_RECALL_STATE,
CEPH_SESSION_RECALL_STATEMSG_ACK,
CEPH_SESSION_FLUSHMSG,
CEPH_SESSION_FLUSHMSG_ACK,
CEPH_SESSION_FORCE_RO,
Expand Down
2 changes: 2 additions & 0 deletions src/mds/MDSDaemon.cc
Expand Up @@ -703,6 +703,8 @@ COMMAND("heap " \
"name=heapcmd,type=CephChoices,strings=dump|start_profiler|stop_profiler|release|stats", \
"show heap usage info (available only if compiled with tcmalloc)", \
"mds", "*", "cli,rest")
COMMAND("drop cache", "trim cache and optionally request client to "
"release all caps and flush the journal", "mds", "r", "cli,rest")
};


Expand Down
64 changes: 64 additions & 0 deletions src/mds/MDSRank.cc
Expand Up @@ -2983,11 +2983,75 @@ bool MDSRankDispatcher::handle_command(

damage_table.erase(id);
return true;
} else if (prefix == "drop cache") {
command_drop_cache(ds);
return true;
} else {
return false;
}
}

void MDSRank::command_drop_cache(stringstream *ds)
{
if (not is_active()) {
dout(5) << __func__ << ": MDS not active, no-op" << dendl;
return false;
}

if (not mdcache->trim(UINT64_MAX))
return false;

server->recall_client_state(1, true);
mds_lock.Unlock();
if (not _wait_for_client_recall_state_acks())
dout(4) << "Timed out; didn't not receive "
"CEPH_SESSION_RECALL_STATEMSG_ACK from all sessions." << dendl;
mds_lock.Lock();

// Optionally, flush the journal -
Formatter *fmtr = new JSONFormatter(true);
command_flush_journal(fmtr);
// TODO: do we want output from `flush journal`?
fmtr->flush(*ds);
delete fmtr;

fmtr = new JSONFormatter(true);
int r = mdcache->cache_status(fmtr);
if (r != 0)
*ds << "Failed to get cache status: " << cpp_strerror(r);
fmtr->flush(*ds);

return true;
}

bool MDSRank::_wait_for_client_recall_state_acks()
{
const unsigned int TIMEOUT = g_conf->get_val<uint64_t>("mds_drop_cache_ack_timeout");
set<Session*> unacked_sessions;
mono_time time_at_beg = mono_clock::now();
std::chrono::duration<double> time_passed = mono_clock::now() - time_at_beg;

sessionmap.get_client_session_set(unacked_sessions);

while (time_passed.count() < TIMEOUT) {
for (auto &session : unacked_sessions)
if (session->recall_acked)
unacked_sessions.erase(session);

if (unacked_sessions.empty()) {
return true;
}

time_passed = mono_clock::now() - time_at_beg;
if (TIMEOUT - time_passed.count() > 2)
sleep(2);
}

// return false since we timed out.
return false;
}


epoch_t MDSRank::get_osd_epoch() const
{
return objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch));
Expand Down
4 changes: 3 additions & 1 deletion src/mds/MDSRank.h
Expand Up @@ -425,14 +425,14 @@ class MDSRank {

bool evict_client(int64_t session_id, bool wait, bool blacklist,
std::stringstream& ss, Context *on_killed=nullptr);
void command_flush_journal(Formatter *f);

protected:
void dump_clientreplay_status(Formatter *f) const;
void command_scrub_path(Formatter *f, std::string_view path, vector<string>& scrubop_vec);
void command_tag_path(Formatter *f, std::string_view path,
std::string_view tag);
void command_flush_path(Formatter *f, std::string_view path);
void command_flush_journal(Formatter *f);
void command_get_subtrees(Formatter *f);
void command_export_dir(Formatter *f,
std::string_view path, mds_rank_t dest);
Expand All @@ -453,6 +453,8 @@ class MDSRank {
std::ostream &ss);
void command_openfiles_ls(Formatter *f);
void command_dump_tree(const cmdmap_t &cmdmap, std::ostream &ss, Formatter *f);
bool command_drop_cache(stringstream *ds);
bool _wait_for_client_recall_state_acks();

protected:
Messenger *messenger;
Expand Down
17 changes: 15 additions & 2 deletions src/mds/Server.cc
Expand Up @@ -465,6 +465,11 @@ void Server::handle_client_session(MClientSession *m)
mdlog->flush();
break;

case CEPH_SESSION_RECALL_STATEMSG_ACK:
if (this->report_acks)
session->recall_acked = true;
break;

default:
ceph_abort();
}
Expand Down Expand Up @@ -1102,8 +1107,11 @@ void Server::recover_filelocks(CInode *in, bufferlist locks, int64_t client)
* to trim some caps, and consequently unpin some inodes in the MDCache so
* that it can trim too.
*/
void Server::recall_client_state(void)
void Server::recall_client_state(double ratio, bool report_acks)
{
if (report_acks)
this->report_acks = true;

/* try to recall at least 80% of all caps */
uint64_t max_caps_per_client = Capability::count() * g_conf->get_val<double>("mds_max_ratio_caps_per_client");
uint64_t min_caps_per_client = g_conf->get_val<uint64_t>("mds_min_caps_per_client");
Expand All @@ -1117,14 +1125,19 @@ void Server::recall_client_state(void)
/* ratio: determine the amount of caps to recall from each client. Use
* percentage full over the cache reservation. Cap the ratio at 80% of client
* caps. */
double ratio = 1.0-fmin(0.80, mdcache->cache_toofull_ratio());
if (ratio == -1)
ratio = 1.0-fmin(0.80, mdcache->cache_toofull_ratio());

dout(10) << "recall_client_state " << ratio
<< ", caps per client " << min_caps_per_client << "-" << max_caps_per_client
<< dendl;

set<Session*> sessions;
mds->sessionmap.get_client_session_set(sessions);

if (sessions.size() == 0)
this->report_acks = false;

for (auto &session : sessions) {
if (!session->is_open() ||
!session->info.inst.name.is_client())
Expand Down
4 changes: 3 additions & 1 deletion src/mds/Server.h
Expand Up @@ -75,6 +75,8 @@ class Server {
MDCache *mdcache;
MDLog *mdlog;
PerfCounters *logger;
bool report_acks;
// report when CEPH_SESSION_RECALL_STATEMSG_ACK is received

// OSDMap full status, used to generate ENOSPC on some operations
bool is_full;
Expand Down Expand Up @@ -134,7 +136,7 @@ class Server {
void reconnect_tick();
void recover_filelocks(CInode *in, bufferlist locks, int64_t client);

void recall_client_state(void);
void recall_client_state(double ratio = -1, bool report_acks = false);
void force_clients_readonly();

// -- requests --
Expand Down
2 changes: 2 additions & 0 deletions src/mds/SessionMap.h
Expand Up @@ -207,6 +207,8 @@ class Session : public RefCountedObject {
xlist<Capability*> caps; // inodes with caps; front=most recently used
xlist<ClientLease*> leases; // metadata leases to clients
utime_t last_cap_renew;
bool recall_acked = false;
// whether CEPH_SESSION_RECALL_STATEMSG_ACK was received

public:
version_t inc_push_seq() { return ++cap_push_seq; }
Expand Down

0 comments on commit 2e5a4dc

Please sign in to comment.