Skip to content

Commit

Permalink
Merge pull request #12974 from ukernel/wip-multimds-misc
Browse files Browse the repository at this point in the history
mds: miscellaneous fixes

Reviewed-by: John Spray <john.spray@redhat.com>
  • Loading branch information
John Spray committed Feb 1, 2017
2 parents 6995d2c + c9ecea1 commit 13a52e9
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 67 deletions.
55 changes: 43 additions & 12 deletions src/mds/MDCache.cc
Expand Up @@ -2517,6 +2517,8 @@ ESubtreeMap *MDCache::create_subtree_map()
mydir = myin->get_dirfrag(frag_t());
}

list<CDir*> maybe;

// include all auth subtrees, and their bounds.
// and a spanning tree to tie it to the root.
for (map<CDir*, set<CDir*> >::iterator p = subtrees.begin();
Expand All @@ -2543,6 +2545,15 @@ ESubtreeMap *MDCache::create_subtree_map()
}

le->subtrees[dir->dirfrag()].clear();

if (dir->get_dir_auth().second != CDIR_AUTH_UNKNOWN &&
le->ambiguous_subtrees.count(dir->dirfrag()) == 0 &&
p->second.empty()) {
dout(10) << " maybe journal " << *dir << dendl;
maybe.push_back(dir);
continue;
}

le->metablob.add_dir_context(dir, EMetaBlob::TO_ROOT);
le->metablob.add_dir(dir, false);

Expand Down Expand Up @@ -2654,6 +2665,18 @@ ESubtreeMap *MDCache::create_subtree_map()
}
}
}

for (list<CDir*>::iterator p = maybe.begin(); p != maybe.end(); ++p) {
CDir *dir = *p;
if (le->subtrees.count(dir->dirfrag())) {
// not swallowed by above code
le->metablob.add_dir_context(dir, EMetaBlob::TO_ROOT);
le->metablob.add_dir(dir, false);
} else {
dout(10) << "simplify: not journal " << *dir << dendl;
}
}

dout(15) << " subtrees " << le->subtrees << dendl;
dout(15) << " ambiguous_subtrees " << le->ambiguous_subtrees << dendl;

Expand Down Expand Up @@ -2861,9 +2884,6 @@ void MDCache::handle_mds_failure(mds_rank_t who)
{
dout(7) << "handle_mds_failure mds." << who << dendl;

// make note of recovery set
mds->mdsmap->get_recovery_mds_set(recovery_set);
recovery_set.erase(mds->get_nodeid());
dout(1) << "handle_mds_failure mds." << who << " : recovery peers are " << recovery_set << dendl;

resolve_gather.insert(who);
Expand Down Expand Up @@ -3221,14 +3241,14 @@ void MDCache::handle_resolve(MMDSResolve *m)
claimed_by_sender = true;
}

my_ambiguous_imports.erase(p); // no longer ambiguous.
if (claimed_by_sender) {
dout(7) << "ambiguous import failed on " << *dir << dendl;
migrator->import_reverse(dir);
} else {
dout(7) << "ambiguous import succeeded on " << *dir << dendl;
migrator->import_finish(dir, true);
}
my_ambiguous_imports.erase(p); // no longer ambiguous.
}
p = next;
}
Expand Down Expand Up @@ -3521,21 +3541,21 @@ void MDCache::disambiguate_imports()
map<dirfrag_t, vector<dirfrag_t> >::iterator q = my_ambiguous_imports.begin();

CDir *dir = get_dirfrag(q->first);
if (!dir) continue;
assert(dir);

if (dir->authority() != me_ambig) {
dout(10) << "ambiguous import auth known, must not be me " << *dir << dendl;
cancel_ambiguous_import(dir);

mds->mdlog->start_submit_entry(new EImportFinish(dir, false));

// subtree may have been swallowed by another node claiming dir
// as their own.
CDir *root = get_subtree_root(dir);
if (root != dir)
dout(10) << " subtree root is " << *root << dendl;
assert(root->dir_auth.first != mds->get_nodeid()); // no us!
try_trim_non_auth_subtree(root);

mds->mdlog->start_submit_entry(new EImportFinish(dir, false));
} else {
dout(10) << "ambiguous import auth unclaimed, must be me " << *dir << dendl;
finish_ambiguous_import(q->first);
Expand Down Expand Up @@ -4278,9 +4298,13 @@ void MDCache::handle_cache_rejoin_weak(MMDSCacheRejoin *weak)
dout(10) << " claiming cap import " << p->first << " client." << q->first << " on " << *in << dendl;
Capability *cap = rejoin_import_cap(in, q->first, q->second, from);
Capability::Import& im = imported_caps[p->first][q->first];
im.cap_id = cap->get_cap_id();
im.issue_seq = cap->get_last_seq();
im.mseq = cap->get_mseq();
if (cap) {
im.cap_id = cap->get_cap_id();
im.issue_seq = cap->get_last_seq();
im.mseq = cap->get_mseq();
} else {
// all are zero
}
}
mds->locker->eval(in, CEPH_CAP_LOCKS, true);
}
Expand Down Expand Up @@ -5057,7 +5081,8 @@ void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoin *ack)
MClientCaps *m = new MClientCaps(CEPH_CAP_OP_EXPORT, p->first, 0,
cap_exports[p->first][q->first].capinfo.cap_id, 0,
mds->get_osd_epoch_barrier());
m->set_cap_peer(q->second.cap_id, q->second.issue_seq, q->second.mseq, from, 0);
m->set_cap_peer(q->second.cap_id, q->second.issue_seq, q->second.mseq,
(q->second.cap_id > 0 ? from : -1), 0);
mds->send_message_client_counted(m, session);

cap_exports[p->first].erase(q->first);
Expand Down Expand Up @@ -5563,7 +5588,10 @@ Capability* MDCache::rejoin_import_cap(CInode *in, client_t client, const cap_re
dout(10) << "rejoin_import_cap for client." << client << " from mds." << frommds
<< " on " << *in << dendl;
Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(client.v));
assert(session);
if (!session) {
dout(10) << " no session for client." << client << dendl;
return NULL;
}

Capability *cap = in->reconnect_cap(client, icr, session);

Expand Down Expand Up @@ -9536,6 +9564,7 @@ void MDCache::discover_dir_frag(CInode *base,

if (!base->is_waiting_for_dir(approx_fg) || !onfinish) {
discover_info_t& d = _create_discover(from);
d.pin_base(base);
d.ino = base->ino();
d.frag = approx_fg;
d.want_base_dir = true;
Expand Down Expand Up @@ -9590,6 +9619,7 @@ void MDCache::discover_path(CInode *base,
!base->is_waiting_for_dir(fg) || !onfinish) {
discover_info_t& d = _create_discover(from);
d.ino = base->ino();
d.pin_base(base);
d.frag = fg;
d.snap = snap;
d.want_path = want_path;
Expand Down Expand Up @@ -9643,6 +9673,7 @@ void MDCache::discover_path(CDir *base,
!base->is_waiting_for_dentry(want_path[0].c_str(), snap) || !onfinish) {
discover_info_t& d = _create_discover(from);
d.ino = base->ino();
d.pin_base(base);
d.frag = base->get_frag();
d.snap = snap;
d.want_path = want_path;
Expand Down
16 changes: 15 additions & 1 deletion src/mds/MDCache.h
Expand Up @@ -217,10 +217,21 @@ class MDCache {
frag_t frag;
snapid_t snap;
filepath want_path;
MDSCacheObject *base;
bool want_base_dir;
bool want_xlocked;

discover_info_t() : tid(0), mds(-1), snap(CEPH_NOSNAP), want_base_dir(false), want_xlocked(false) {}
discover_info_t() :
tid(0), mds(-1), snap(CEPH_NOSNAP), base(NULL),
want_base_dir(false), want_xlocked(false) {}
~discover_info_t() {
if (base)
base->put(MDSCacheObject::PIN_DISCOVERBASE);
}
void pin_base(MDSCacheObject *b) {
base = b;
base->get(MDSCacheObject::PIN_DISCOVERBASE);
}
};

map<ceph_tid_t, discover_info_t> discovers;
Expand Down Expand Up @@ -370,6 +381,9 @@ class MDCache {
void wait_for_uncommitted_master(metareqid_t reqid, MDSInternalContextBase *c) {
uncommitted_masters[reqid].waiters.push_back(c);
}
bool have_uncommitted_master(metareqid_t reqid) {
return uncommitted_masters.count(reqid);
}
void log_master_commit(metareqid_t reqid);
void logged_master_update(metareqid_t reqid);
void _logged_master_commit(metareqid_t reqid);
Expand Down
2 changes: 2 additions & 0 deletions src/mds/MDSCacheObject.h
Expand Up @@ -66,6 +66,7 @@ class MDSCacheObject {
static const int PIN_PTRWAITER = -1007;
const static int PIN_TEMPEXPORTING = 1008; // temp pin between encode_ and finish_export
static const int PIN_CLIENTLEASE = 1009;
static const int PIN_DISCOVERBASE = 1010;

const char *generic_pin_name(int p) const {
switch (p) {
Expand All @@ -79,6 +80,7 @@ class MDSCacheObject {
case PIN_PTRWAITER: return "ptrwaiter";
case PIN_TEMPEXPORTING: return "tempexporting";
case PIN_CLIENTLEASE: return "clientlease";
case PIN_DISCOVERBASE: return "discoverbase";
default: ceph_abort(); return 0;
}
}
Expand Down
12 changes: 6 additions & 6 deletions src/mds/MDSMap.h
Expand Up @@ -414,11 +414,11 @@ class MDSMap {
}
void get_recovery_mds_set(std::set<mds_rank_t>& s) const {
s = failed;
for (std::map<mds_gid_t, mds_info_t>::const_iterator p = mds_info.begin();
p != mds_info.end();
++p)
if (p->second.state >= STATE_REPLAY && p->second.state <= STATE_STOPPING)
s.insert(p->second.rank);
for (const auto& p : damaged)
s.insert(p);
for (const auto& p : mds_info)
if (p.second.state >= STATE_REPLAY && p.second.state <= STATE_STOPPING)
s.insert(p.second.rank);
}

void
Expand Down Expand Up @@ -556,7 +556,7 @@ class MDSMap {
return
get_num_mds(STATE_RESOLVE) > 0 &&
get_num_mds(STATE_REPLAY) == 0 &&
failed.empty();
failed.empty() && damaged.empty();
}
bool is_rejoining() const {
// nodes are rejoining cache state
Expand Down
58 changes: 29 additions & 29 deletions src/mds/Migrator.cc
Expand Up @@ -495,15 +495,10 @@ void Migrator::handle_mds_failure_or_stop(mds_rank_t who)
cache->adjust_subtree_auth(dir, q->second.peer);
cache->try_subtree_merge(dir);

// bystanders?
if (q->second.bystanders.empty()) {
import_reverse_unfreeze(dir);
} else {
// notify them; wait in aborting state
import_notify_abort(dir, bounds);
import_state[df].state = IMPORT_ABORTING;
assert(g_conf->mds_kill_import_at != 10);
}
// notify bystanders ; wait in aborting state
import_state[df].state = IMPORT_ABORTING;
import_notify_abort(dir, bounds);
assert(g_conf->mds_kill_import_at != 10);
}
break;

Expand Down Expand Up @@ -2379,6 +2374,7 @@ void Migrator::import_reverse(CDir *dir)
dout(7) << "import_reverse " << *dir << dendl;

import_state_t& stat = import_state[dir->dirfrag()];
stat.state = IMPORT_ABORTING;

set<CDir*> bounds;
cache->get_subtree_bounds(dir, bounds);
Expand Down Expand Up @@ -2496,17 +2492,8 @@ void Migrator::import_reverse(CDir *dir)

cache->trim(-1, num_dentries); // try trimming dentries

// bystanders?
if (stat.bystanders.empty()) {
dout(7) << "no bystanders, finishing reverse now" << dendl;
import_reverse_unfreeze(dir);
} else {
// notify them; wait in aborting state
dout(7) << "notifying bystanders of abort" << dendl;
import_notify_abort(dir, bounds);
stat.state = IMPORT_ABORTING;
assert (g_conf->mds_kill_import_at != 10);
}
// notify bystanders; wait in aborting state
import_notify_abort(dir, bounds);
}

void Migrator::import_notify_finish(CDir *dir, set<CDir*>& bounds)
Expand All @@ -2533,15 +2520,26 @@ void Migrator::import_notify_abort(CDir *dir, set<CDir*>& bounds)

import_state_t& stat = import_state[dir->dirfrag()];
for (set<mds_rank_t>::iterator p = stat.bystanders.begin();
p != stat.bystanders.end();
++p) {
p != stat.bystanders.end(); ) {
if (!mds->mdsmap->is_clientreplay_or_active_or_stopping(*p)) {
// this can happen if both exporter and bystander fail in the same mdsmap epoch
stat.bystanders.erase(p++);
continue;
}
MExportDirNotify *notify =
new MExportDirNotify(dir->dirfrag(), stat.tid, true,
mds_authority_t(stat.peer, mds->get_nodeid()),
mds_authority_t(stat.peer, CDIR_AUTH_UNKNOWN));
for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
notify->get_bounds().push_back((*i)->dirfrag());
mds->send_message_mds(notify, *p);
++p;
}
if (stat.bystanders.empty()) {
dout(7) << "no bystanders, finishing reverse now" << dendl;
import_reverse_unfreeze(dir);
} else {
assert (g_conf->mds_kill_import_at != 10);
}
}

Expand Down Expand Up @@ -2749,8 +2747,8 @@ void Migrator::import_finish(CDir *dir, bool notify, bool last)
}


void Migrator::decode_import_inode(CDentry *dn, bufferlist::iterator& blp, mds_rank_t oldauth,
LogSegment *ls, uint64_t log_offset,
void Migrator::decode_import_inode(CDentry *dn, bufferlist::iterator& blp,
mds_rank_t oldauth, LogSegment *ls,
map<CInode*, map<client_t,Capability::Export> >& peer_exports,
list<ScatterLock*>& updated_scatterlocks)
{
Expand All @@ -2773,9 +2771,6 @@ void Migrator::decode_import_inode(CDentry *dn, bufferlist::iterator& blp, mds_r
// state after link -- or not! -sage
in->decode_import(blp, ls); // cap imports are noted for later action

// note that we are journaled at this log offset
in->last_journaled = log_offset;

// caps
decode_import_inode_caps(in, true, blp, peer_exports);

Expand Down Expand Up @@ -2973,8 +2968,8 @@ int Migrator::decode_import_dir(bufferlist::iterator& blp,
else if (icode == 'I') {
// inode
assert(le);
decode_import_inode(dn, blp, oldauth, ls, le->get_metablob()->event_seq,
peer_exports, updated_scatterlocks);
decode_import_inode(dn, blp, oldauth, ls,
peer_exports, updated_scatterlocks);
}

// add dentry to journal entry
Expand All @@ -3000,6 +2995,11 @@ int Migrator::decode_import_dir(bufferlist::iterator& blp,
/* This function DOES put the passed message before returning*/
void Migrator::handle_export_notify(MExportDirNotify *m)
{
if (!(mds->is_clientreplay() || mds->is_active() || mds->is_stopping())) {
m->put();
return;
}

CDir *dir = cache->get_dirfrag(m->get_dirfrag());

mds_rank_t from = mds_rank_t(m->get_source().num());
Expand Down
4 changes: 2 additions & 2 deletions src/mds/Migrator.h
Expand Up @@ -303,8 +303,8 @@ class Migrator {
void handle_export_dir(MExportDir *m);

public:
void decode_import_inode(CDentry *dn, bufferlist::iterator& blp, mds_rank_t oldauth,
LogSegment *ls, uint64_t log_offset,
void decode_import_inode(CDentry *dn, bufferlist::iterator& blp,
mds_rank_t oldauth, LogSegment *ls,
map<CInode*, map<client_t,Capability::Export> >& cap_imports,
list<ScatterLock*>& updated_scatterlocks);
void decode_import_inode_caps(CInode *in, bool auth_cap, bufferlist::iterator &blp,
Expand Down

0 comments on commit 13a52e9

Please sign in to comment.