Skip to content

Commit

Permalink
mds: fix session reference leak
Browse files Browse the repository at this point in the history
"m->get_connection()->get_priv()" increases the session's reference
count by one. but we forget to release the reference at several places

Fixes: http://tracker.ceph.com/issues/22821
Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
(cherry picked from commit 358f8a5)
  • Loading branch information
ukernel authored and Prashant D committed Feb 14, 2018
1 parent 2fc8d6c commit 7066eb1
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 28 deletions.
9 changes: 3 additions & 6 deletions src/mds/Locker.cc
Expand Up @@ -2603,16 +2603,15 @@ bool Locker::should_defer_client_cap_frozen(CInode *in)
*/
void Locker::handle_client_caps(MClientCaps *m)
{
Session *session = static_cast<Session *>(m->get_connection()->get_priv());
client_t client = m->get_source().num();

snapid_t follows = m->get_snap_follows();
dout(7) << "handle_client_caps "
<< ((m->flags & CLIENT_CAPS_SYNC) ? "sync" : "async")
<< " on " << m->get_ino()
<< " tid " << m->get_client_tid() << " follows " << follows
<< " op " << ceph_cap_op_name(m->get_op()) << dendl;

Session *session = mds->get_session(m);
if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
if (!session) {
dout(5) << " no session, dropping " << *m << dendl;
Expand Down Expand Up @@ -3316,14 +3315,12 @@ bool Locker::_do_cap_update(CInode *in, Capability *cap,
if (!dirty && !change_max)
return false;

Session *session = static_cast<Session *>(m->get_connection()->get_priv());
Session *session = mds->get_session(m);
if (session->check_access(in, MAY_WRITE,
m->caller_uid, m->caller_gid, NULL, 0, 0) < 0) {
session->put();
dout(10) << "check_access failed, dropping cap update on " << *in << dendl;
return false;
}
session->put();

// do the update.
EUpdate *le = new EUpdate(mds->mdlog, "cap update");
Expand Down Expand Up @@ -3413,7 +3410,7 @@ void Locker::handle_client_cap_release(MClientCapRelease *m)
mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
}

Session *session = static_cast<Session *>(m->get_connection()->get_priv());
Session *session = mds->get_session(m);

for (vector<ceph_mds_cap_item>::iterator p = m->caps.begin(); p != m->caps.end(); ++p) {
_do_cap_release(client, inodeno_t((uint64_t)p->ino) , p->cap_id, p->migrate_seq, p->seq);
Expand Down
10 changes: 6 additions & 4 deletions src/mds/MDSDaemon.cc
Expand Up @@ -560,17 +560,18 @@ void MDSDaemon::send_command_reply(MCommand *m, MDSRank *mds_rank,
// If someone is using a closed session for sending commands (e.g.
// the ceph CLI) then we should feel free to clean up this connection
// as soon as we've sent them a response.
const bool live_session = mds_rank &&
mds_rank->sessionmap.get_session(session->info.inst.name) != nullptr
&& session->get_state_seq() > 0;
const bool live_session =
session->get_state_seq() > 0 &&
mds_rank &&
mds_rank->sessionmap.get_session(session->info.inst.name);

if (!live_session) {
// This session only existed to issue commands, so terminate it
// as soon as we can.
assert(session->is_closed());
session->connection->mark_disposable();
session->put();
}
session->put();

MCommandReply *reply = new MCommandReply(r, outs);
reply->set_tid(m->get_tid());
Expand Down Expand Up @@ -609,6 +610,7 @@ void MDSDaemon::handle_command(MCommand *m)
} else {
r = _handle_command(cmdmap, m, &outbl, &outs, &run_after, &need_reply);
}
session->put();

if (need_reply) {
send_command_reply(m, mds_rank, r, outbl, outs);
Expand Down
12 changes: 12 additions & 0 deletions src/mds/MDSRank.cc
Expand Up @@ -845,6 +845,18 @@ bool MDSRank::is_stale_message(Message *m) const
return false;
}

Session *MDSRank::get_session(Message *m)
{
Session *session = static_cast<Session *>(m->get_connection()->get_priv());
if (session) {
dout(20) << "get_session have " << session << " " << session->info.inst
<< " state " << session->get_state_name() << dendl;
session->put(); // not carry ref
} else {
dout(20) << "get_session dne for " << m->get_source_inst() << dendl;
}
return session;
}

void MDSRank::send_message(Message *m, Connection *c)
{
Expand Down
1 change: 1 addition & 0 deletions src/mds/MDSRank.h
Expand Up @@ -175,6 +175,7 @@ class MDSRank {
Session *get_session(client_t client) {
return sessionmap.get_session(entity_name_t::CLIENT(client.v));
}
Session *get_session(Message *m);

PerfCounters *logger, *mlogger;
OpTracker op_tracker;
Expand Down
21 changes: 4 additions & 17 deletions src/mds/Server.cc
Expand Up @@ -204,7 +204,7 @@ void Server::dispatch(Message *m)
if (m->get_type() == CEPH_MSG_CLIENT_REQUEST &&
(mds->is_reconnect() || mds->get_want_state() == CEPH_MDS_STATE_RECONNECT)) {
MClientRequest *req = static_cast<MClientRequest*>(m);
Session *session = get_session(req);
Session *session = mds->get_session(req);
if (!session || session->is_closed()) {
dout(5) << "session is closed, dropping " << req->get_reqid() << dendl;
req->put();
Expand Down Expand Up @@ -304,25 +304,12 @@ class C_MDS_session_finish : public ServerLogContext {
}
};

Session *Server::get_session(Message *m)
{
Session *session = static_cast<Session *>(m->get_connection()->get_priv());
if (session) {
dout(20) << "get_session have " << session << " " << session->info.inst
<< " state " << session->get_state_name() << dendl;
session->put(); // not carry ref
} else {
dout(20) << "get_session dne for " << m->get_source_inst() << dendl;
}
return session;
}

/* This function DOES put the passed message before returning*/
void Server::handle_client_session(MClientSession *m)
{
version_t pv;
bool blacklisted = false;
Session *session = get_session(m);
Session *session = mds->get_session(m);

dout(3) << "handle_client_session " << *m << " from " << m->get_source() << dendl;
assert(m->get_source().is_client()); // should _not_ come from an mds!
Expand Down Expand Up @@ -891,7 +878,7 @@ void Server::handle_client_reconnect(MClientReconnect *m)
{
dout(7) << "handle_client_reconnect " << m->get_source() << dendl;
client_t from = m->get_source().num();
Session *session = get_session(m);
Session *session = mds->get_session(m);
assert(session);

if (!mds->is_reconnect() && mds->get_want_state() == CEPH_MDS_STATE_RECONNECT) {
Expand Down Expand Up @@ -1609,7 +1596,7 @@ void Server::handle_client_request(MClientRequest *req)
// active session?
Session *session = 0;
if (req->get_source().is_client()) {
session = get_session(req);
session = mds->get_session(req);
if (!session) {
dout(5) << "no session for " << req->get_source() << ", dropping" << dendl;
} else if (session->is_closed() ||
Expand Down
1 change: 0 additions & 1 deletion src/mds/Server.h
Expand Up @@ -110,7 +110,6 @@ class Server {
bool waiting_for_reconnect(client_t c) const;
void dump_reconnect_status(Formatter *f) const;

Session *get_session(Message *m);
void handle_client_session(class MClientSession *m);
void _session_logged(Session *session, uint64_t state_seq,
bool open, version_t pv, interval_set<inodeno_t>& inos,version_t piv);
Expand Down

0 comments on commit 7066eb1

Please sign in to comment.