Skip to content

Commit

Permalink
mds: command to trim mds cache and client caps
Browse files Browse the repository at this point in the history
With this command, the MDS would request clients to release
caps followed by trimming its own cache and a journal flush.
The command accepts a timeout to wait for clients to respond
to session recall and flush messages.

Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
Signed-off-by: Rishabh Dave <ridave@redhat.com>
Signed-off-by: Venky Shankar <vshankar@redhat.com>
  • Loading branch information
vshankar committed Sep 28, 2018
1 parent 75f4234 commit da70dde
Show file tree
Hide file tree
Showing 6 changed files with 302 additions and 21 deletions.
2 changes: 1 addition & 1 deletion src/mds/MDCache.cc
Expand Up @@ -7524,7 +7524,7 @@ void MDCache::check_memory_usage()

if (cache_toofull()) {
last_recall_state = clock::now();
mds->server->recall_client_state();
mds->server->recall_client_state(-1.0, false, nullptr);
}

// If the cache size had exceeded its limit, but we're back in bounds
Expand Down
12 changes: 10 additions & 2 deletions src/mds/MDSDaemon.cc
Expand Up @@ -244,6 +244,11 @@ void MDSDaemon::set_up_admin_socket()
asok_hook,
"show cache status");
ceph_assert(r == 0);
r = admin_socket->register_command("cache drop",
"cache drop name=timeout,type=CephInt,range=1",
asok_hook,
"drop cache");
ceph_assert(r == 0);
r = admin_socket->register_command("dump tree",
"dump tree "
"name=root,type=CephString,req=true "
Expand Down Expand Up @@ -696,6 +701,9 @@ COMMAND("heap " \
"name=heapcmd,type=CephChoices,strings=dump|start_profiler|stop_profiler|release|stats", \
"show heap usage info (available only if compiled with tcmalloc)", \
"mds", "*", "cli,rest")
COMMAND("cache drop name=timeout,type=CephInt,range=1", "trim cache and optionally "
"request client to release all caps and flush the journal", "mds",
"r", "cli,rest")
};


Expand Down Expand Up @@ -861,8 +869,8 @@ int MDSDaemon::_handle_command(
else {
bool handled;
try {
handled = mds_rank->handle_command(cmdmap, m, &r, &ds, &ss,
need_reply);
handled = mds_rank->handle_command(cmdmap, m, &r, &ds, &ss,
run_later, need_reply);
if (!handled) {
// MDSDaemon doesn't know this command
ss << "unrecognized command! " << prefix;
Expand Down
258 changes: 253 additions & 5 deletions src/mds/MDSRank.cc
Expand Up @@ -238,6 +238,190 @@ class C_Flush_Journal : public MDSInternalContext {
int incarnation;
};

class C_Drop_Cache : public MDSInternalContext {
public:
C_Drop_Cache(Server *server, MDCache *mdcache, MDLog *mdlog,
MDSRank *mds, uint64_t recall_timeout,
Formatter *f, Context *on_finish)
: MDSInternalContext(mds),
server(server), mdcache(mdcache), mdlog(mdlog),
recall_timeout(recall_timeout), f(f), on_finish(on_finish),
whoami(mds->whoami), incarnation(mds->incarnation) {
}

void send() {
// not really a hard requirement here, but lets ensure this in
// case we change the logic here.
assert(mds->mds_lock.is_locked());

dout(20) << __func__ << dendl;
recall_client_state();
}

private:
// context which completes itself (with -ETIMEDOUT) after a specified
// timeout or when explicitly completed, whichever comes first. Note
// that the context does not detroy itself after completion -- it
// needs to be explicitly freed.
class C_ContextTimeout : public MDSInternalContext {
public:
C_ContextTimeout(MDSRank *mds, uint64_t timeout, Context *on_finish)
: MDSInternalContext(mds),
timeout(timeout),
lock("mds::context::timeout", false, true),
on_finish(on_finish) {
}
~C_ContextTimeout() {
ceph_assert(timer_task == nullptr);
}

void start_timer() {
timer_task = new FunctionContext([this](int _) {
timer_task = nullptr;
complete(-ETIMEDOUT);
});
mds->timer.add_event_after(timeout, timer_task);
}

void finish(int r) override {
Context *ctx = nullptr;
{
Mutex::Locker locker(lock);
std::swap(on_finish, ctx);
}
if (ctx != nullptr) {
ctx->complete(r);
}
}
void complete(int r) override {
if (timer_task != nullptr) {
mds->timer.cancel_event(timer_task);
}

finish(r);
}

uint64_t timeout;
Mutex lock;
Context *on_finish = nullptr;
Context *timer_task = nullptr;
};

void recall_client_state() {
dout(20) << __func__ << dendl;

f->open_object_section("result");

MDSGatherBuilder *gather = new MDSGatherBuilder(g_ceph_context);
server->recall_client_state(1.0, true, gather);
if (!gather->has_subs()) {
handle_recall_client_state(0);
delete gather;
return;
}

C_ContextTimeout *ctx = new C_ContextTimeout(
mds, recall_timeout, new FunctionContext([this](int r) {
handle_recall_client_state(r);
}));

ctx->start_timer();
gather->set_finisher(new MDSInternalContextWrapper(mds, ctx));
gather->activate();
}

void handle_recall_client_state(int r) {
dout(20) << __func__ << ": r=" << r << dendl;

// client recall section
f->open_object_section("client_recall");
f->dump_int("return_code", r);
f->dump_string("message", cpp_strerror(r));
f->close_section();

// we can still continue after recall timeout
trim_cache();
}

void trim_cache() {
dout(20) << __func__ << dendl;

if (!mdcache->trim(UINT64_MAX)) {
cmd_err(f, "failed to trim cache");
complete(-EINVAL);
return;
}

flush_journal();
}

void flush_journal() {
dout(20) << __func__ << dendl;

Context *ctx = new FunctionContext([this](int r) {
handle_flush_journal(r);
});

C_Flush_Journal *flush_journal = new C_Flush_Journal(mdcache, mdlog, mds, &ss, ctx);
flush_journal->send();
}

void handle_flush_journal(int r) {
dout(20) << __func__ << ": r=" << r << dendl;

if (r != 0) {
cmd_err(f, ss.str());
complete(r);
return;
}

// journal flush section
f->open_object_section("flush_journal");
f->dump_int("return_code", r);
f->dump_string("message", ss.str());
f->close_section();

cache_status();
}

void cache_status() {
dout(20) << __func__ << dendl;

// cache status section
mdcache->cache_status(f);
f->close_section();

complete(0);
}

void finish(int r) override {
dout(20) << __func__ << ": r=" << r << dendl;

on_finish->complete(r);
}

Server *server;
MDCache *mdcache;
MDLog *mdlog;
uint64_t recall_timeout;
Formatter *f;
Context *on_finish;

int retval = 0;
std::stringstream ss;

// so as to use dout
mds_rank_t whoami;
int incarnation;

void cmd_err(Formatter *f, std::string_view err) {
f->reset();
f->open_object_section("result");
f->dump_string("error", err);
f->close_section();
}
};

MDSRank::MDSRank(
mds_rank_t whoami_,
Mutex &mds_lock_,
Expand Down Expand Up @@ -2289,6 +2473,18 @@ bool MDSRankDispatcher::handle_asok_command(std::string_view command,
} else if (command == "cache status") {
Mutex::Locker l(mds_lock);
mdcache->cache_status(f);
} else if (command == "cache drop") {
int64_t timeout;
if (!cmd_getval(g_ceph_context, cmdmap, "timeout", timeout)) {
return false;
}

C_SaferCond cond;
command_cache_drop((uint64_t)timeout, f, &cond);
int r = cond.wait();
if (r != 0) {
f->flush(ss);
}
} else if (command == "dump tree") {
command_dump_tree(cmdmap, ss, f);
} else if (command == "dump loads") {
Expand Down Expand Up @@ -2335,18 +2531,25 @@ bool MDSRankDispatcher::handle_asok_command(std::string_view command,
return true;
}

class C_MDS_Send_Command_Reply : public MDSInternalContext
{
class C_MDS_Send_Command_Reply : public MDSInternalContext {
protected:
MCommand::const_ref m;
public:
C_MDS_Send_Command_Reply(MDSRank *_mds, const MCommand::const_ref &_m) :
MDSInternalContext(_mds), m(_m) {}
void send (int r, std::string_view out_str) {

void send(int r, std::string_view ss) {
std::stringstream ds;
send(r, ss, ds);
}

void send(int r, std::string_view ss, std::stringstream &ds) {
bufferlist bl;
MDSDaemon::send_command_reply(m, mds, r, bl, out_str);
bl.append(ds);
MDSDaemon::send_command_reply(m, mds, r, bl, ss);
}
void finish (int r) override {

void finish(int r) override {
send(r, "");
}
};
Expand Down Expand Up @@ -3097,6 +3300,7 @@ bool MDSRankDispatcher::handle_command(
int *r,
std::stringstream *ds,
std::stringstream *ss,
Context **run_later,
bool *need_reply)
{
ceph_assert(r != nullptr);
Expand Down Expand Up @@ -3150,12 +3354,56 @@ bool MDSRankDispatcher::handle_command(
}

damage_table.erase(id);
return true;
} else if (prefix == "cache drop") {
int64_t timeout;
if (!cmd_getval(g_ceph_context, cmdmap, "timeout", timeout)) {
return false;
}

JSONFormatter *f = new JSONFormatter(true);
C_MDS_Send_Command_Reply *reply = new C_MDS_Send_Command_Reply(this, m);
Context *on_finish = new FunctionContext([this, f, reply](int r) {
cache_drop_send_reply(f, reply, r);
delete f;
delete reply;
});

*need_reply = false;
*run_later = new C_OnFinisher(
new FunctionContext([this, timeout, f, on_finish](int _) {
command_cache_drop((uint64_t)timeout, f, on_finish);
}), finisher);

return true;
} else {
return false;
}
}

void MDSRank::cache_drop_send_reply(Formatter *f, C_MDS_Send_Command_Reply *reply, int r) {
dout(20) << __func__ << ": r=" << r << dendl;

std::stringstream ds;
std::stringstream ss;
if (r != 0) {
f->flush(ss);
} else {
f->flush(ds);
}

reply->send(r, ss.str(), ds);
}

void MDSRank::command_cache_drop(uint64_t timeout, Formatter *f, Context *on_finish) {
dout(20) << __func__ << dendl;

Mutex::Locker locker(mds_lock);
C_Drop_Cache *request = new C_Drop_Cache(server, mdcache, mdlog, this,
timeout, f, on_finish);
request->send();
}

epoch_t MDSRank::get_osd_epoch() const
{
return objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch));
Expand Down
6 changes: 6 additions & 0 deletions src/mds/MDSRank.h
Expand Up @@ -119,6 +119,7 @@ class Objecter;
class MonClient;
class Finisher;
class ScrubStack;
class C_MDS_Send_Command_Reply;

/**
* The public part of this class's interface is what's exposed to all
Expand All @@ -136,6 +137,7 @@ class MDSRank {
public:

friend class C_Flush_Journal;
friend class C_Drop_Cache;

mds_rank_t get_nodeid() const { return whoami; }
int64_t get_metadata_pool();
Expand Down Expand Up @@ -480,6 +482,9 @@ class MDSRank {
void command_dump_tree(const cmdmap_t &cmdmap, std::ostream &ss, Formatter *f);
void command_dump_inode(Formatter *f, const cmdmap_t &cmdmap, std::ostream &ss);

void cache_drop_send_reply(Formatter *f, C_MDS_Send_Command_Reply *reply, int r);
void command_cache_drop(uint64_t timeout, Formatter *f, Context *on_finish);

protected:
Messenger *messenger;
MonClient *monc;
Expand Down Expand Up @@ -607,6 +612,7 @@ class MDSRankDispatcher : public MDSRank
int *r,
std::stringstream *ds,
std::stringstream *ss,
Context **run_later,
bool *need_reply);

void dump_sessions(const SessionFilter &filter, Formatter *f) const;
Expand Down

0 comments on commit da70dde

Please sign in to comment.