Skip to content

Commit

Permalink
mds: Kill C_SaferCond in evict_sessions()
Browse files Browse the repository at this point in the history
MDSRankDispatcher::evict_sessions waits on a C_SaferCond for
kill_session to complete on each of its victims. Change the
command handling flow to pass command messages all the way down
to MDSRankDispatcher. Extract the MDSDaemon's reply path into a
static function callable from a new context in the MDSRankDispatcher.

See: http://tracker.ceph.com/issues/16288
Signed-off-by: Douglas Fuller <dfuller@redhat.com>
  • Loading branch information
Douglas Fuller committed Jun 30, 2016
1 parent f2c2c29 commit ea8a455
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 47 deletions.
59 changes: 36 additions & 23 deletions src/mds/MDSDaemon.cc
Expand Up @@ -587,6 +587,33 @@ void MDSDaemon::tick()
}
}

void MDSDaemon::send_command_reply(MCommand *m, MDSRank *mds_rank,
int r, bufferlist outbl,
const std::string& outs)
{
Session *session = static_cast<Session *>(m->get_connection()->get_priv());
assert(session != NULL);
// If someone is using a closed session for sending commands (e.g.
// the ceph CLI) then we should feel free to clean up this connection
// as soon as we've sent them a response.
const bool live_session = mds_rank &&
mds_rank->sessionmap.get_session(session->info.inst.name) != nullptr
&& session->get_state_seq() > 0;

if (!live_session) {
// This session only existed to issue commands, so terminate it
// as soon as we can.
assert(session->is_closed());
session->connection->mark_disposable();
session->put();
}

MCommandReply *reply = new MCommandReply(r, outs);
reply->set_tid(m->get_tid());
reply->set_data(outbl);
m->get_connection()->send_message(reply);
}

/* This function DOES put the passed message before returning*/
void MDSDaemon::handle_command(MCommand *m)
{
Expand All @@ -599,7 +626,7 @@ void MDSDaemon::handle_command(MCommand *m)
std::string outs;
bufferlist outbl;
Context *run_after = NULL;

bool need_reply = true;

if (!session->auth_caps.allow_all()) {
dout(1) << __func__
Expand All @@ -615,29 +642,13 @@ void MDSDaemon::handle_command(MCommand *m)
r = -EINVAL;
outs = ss.str();
} else {
r = _handle_command(cmdmap, m->get_data(), &outbl, &outs, &run_after);
r = _handle_command(cmdmap, m, &outbl, &outs, &run_after, &need_reply);
}

// If someone is using a closed session for sending commands (e.g.
// the ceph CLI) then we should feel free to clean up this connection
// as soon as we've sent them a response.
const bool live_session = mds_rank &&
mds_rank->sessionmap.get_session(session->info.inst.name) != nullptr
&& session->get_state_seq() > 0;

if (!live_session) {
// This session only existed to issue commands, so terminate it
// as soon as we can.
assert(session->is_closed());
session->connection->mark_disposable();
session->put();
if (need_reply) {
send_command_reply(m, mds_rank, r, outbl, outs);
}

MCommandReply *reply = new MCommandReply(r, outs);
reply->set_tid(m->get_tid());
reply->set_data(outbl);
m->get_connection()->send_message(reply);

if (run_after) {
run_after->complete(0);
}
Expand Down Expand Up @@ -703,10 +714,11 @@ void MDSDaemon::handle_command(MMonCommand *m)

int MDSDaemon::_handle_command(
const cmdmap_t &cmdmap,
bufferlist const &inbl,
MCommand *m,
bufferlist *outbl,
std::string *outs,
Context **run_later)
Context **run_later,
bool *need_reply)
{
assert(outbl != NULL);
assert(outs != NULL);
Expand Down Expand Up @@ -824,7 +836,8 @@ int MDSDaemon::_handle_command(
} else {
// Give MDSRank a shot at the command
if (mds_rank) {
bool handled = mds_rank->handle_command(cmdmap, inbl, &r, &ds, &ss);
bool handled = mds_rank->handle_command(cmdmap, m, &r, &ds, &ss,
need_reply);
if (handled) {
goto out;
}
Expand Down
8 changes: 6 additions & 2 deletions src/mds/MDSDaemon.h
Expand Up @@ -187,13 +187,17 @@ class MDSDaemon : public Dispatcher, public md_config_obs_t {
bool handle_core_message(Message *m);

// special message types
friend class C_MDS_Send_Command_Reply;
static void send_command_reply(MCommand *m, MDSRank* mds_rank, int r,
bufferlist outbl, const std::string& outs);
int _handle_command_legacy(std::vector<std::string> args);
int _handle_command(
const cmdmap_t &cmdmap,
bufferlist const &inbl,
MCommand *m,
bufferlist *outbl,
std::string *outs,
Context **run_later);
Context **run_later,
bool *need_reply);
void handle_command(class MMonCommand *m);
void handle_command(class MCommand *m);
void handle_mds_map(class MMDSMap *m);
Expand Down
49 changes: 33 additions & 16 deletions src/mds/MDSRank.cc
Expand Up @@ -17,7 +17,10 @@

#include "messages/MClientRequestForward.h"
#include "messages/MMDSMap.h"
#include "messages/MCommand.h"
#include "messages/MCommandReply.h"

#include "MDSDaemon.h"
#include "MDSMap.h"
#include "SnapClient.h"
#include "SnapServer.h"
Expand Down Expand Up @@ -1777,15 +1780,32 @@ bool MDSRankDispatcher::handle_asok_command(
return true;
}

class C_MDS_Send_Command_Reply : public MDSInternalContext
{
protected:
MCommand *m;
public:
C_MDS_Send_Command_Reply(MDSRank *_mds, MCommand *_m) :
MDSInternalContext(_mds), m(_m) { m->get(); }
void send (int r) {
bufferlist bl;
MDSDaemon::send_command_reply(m, mds, r, bl, "");
m->put();
}
void finish (int r) {
send(r);
}
};

/**
* This function drops the mds_lock, so don't do anything with
* MDSRank after calling it (we could have gone into shutdown): just
* send your result back to the calling client and finish.
*/
std::vector<entity_name_t> MDSRankDispatcher::evict_sessions(
const SessionFilter &filter)
void MDSRankDispatcher::evict_sessions(const SessionFilter &filter, MCommand *m)
{
std::list<Session*> victims;
C_MDS_Send_Command_Reply *reply = new C_MDS_Send_Command_Reply(this, m);

const auto sessions = sessionmap.get_sessions();
for (const auto p : sessions) {
Expand All @@ -1802,24 +1822,17 @@ std::vector<entity_name_t> MDSRankDispatcher::evict_sessions(

dout(20) << __func__ << " matched " << victims.size() << " sessions" << dendl;

std::vector<entity_name_t> result;

if (victims.empty()) {
return result;
reply->send(0);
delete reply;
return;
}

C_SaferCond on_safe;
C_GatherBuilder gather(g_ceph_context, &on_safe);
C_GatherBuilder gather(g_ceph_context, reply);
for (const auto s : victims) {
server->kill_session(s, gather.new_sub());
result.push_back(s->info.inst.name);
}
gather.activate();
mds_lock.Unlock();
on_safe.wait();
mds_lock.Lock();

return result;
}

void MDSRankDispatcher::dump_sessions(const SessionFilter &filter, Formatter *f) const
Expand Down Expand Up @@ -2545,15 +2558,18 @@ MDSRankDispatcher::MDSRankDispatcher(

bool MDSRankDispatcher::handle_command(
const cmdmap_t &cmdmap,
bufferlist const &inbl,
MCommand *m,
int *r,
std::stringstream *ds,
std::stringstream *ss)
std::stringstream *ss,
bool *need_reply)
{
assert(r != nullptr);
assert(ds != nullptr);
assert(ss != nullptr);

*need_reply = true;

std::string prefix;
cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);

Expand Down Expand Up @@ -2582,8 +2598,9 @@ bool MDSRankDispatcher::handle_command(
return true;
}

evict_sessions(filter);
evict_sessions(filter, m);

*need_reply = false;
return true;
} else if (prefix == "damage ls") {
Formatter *f = new JSONFormatter();
Expand Down
13 changes: 7 additions & 6 deletions src/mds/MDSRank.h
Expand Up @@ -19,6 +19,8 @@
#include "common/LogClient.h"
#include "common/Timer.h"

#include "messages/MCommand.h"

#include "Beacon.h"
#include "DamageTable.h"
#include "MDSMap.h"
Expand Down Expand Up @@ -496,15 +498,14 @@ class MDSRankDispatcher : public MDSRank

bool handle_command(
const cmdmap_t &cmdmap,
bufferlist const &inbl,
MCommand *m,
int *r,
std::stringstream *ds,
std::stringstream *ss);
std::stringstream *ss,
bool *need_reply);

void dump_sessions(
const SessionFilter &filter, Formatter *f) const;
std::vector<entity_name_t> evict_sessions(
const SessionFilter &filter);
void dump_sessions(const SessionFilter &filter, Formatter *f) const;
void evict_sessions(const SessionFilter &filter, MCommand *m);

// Call into me from MDS::ms_dispatch
bool ms_dispatch(Message *m);
Expand Down

0 comments on commit ea8a455

Please sign in to comment.