Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

octopus: mds: add request to batch_op before taking auth pins and locks #37022

Merged
merged 3 commits into from Oct 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/mds/BatchOp.h
Expand Up @@ -25,7 +25,7 @@ class BatchOp {
virtual ~BatchOp() {}

virtual void add_request(const ceph::ref_t<class MDRequestImpl>& mdr) = 0;
virtual void set_request(const ceph::ref_t<class MDRequestImpl>& mdr) = 0;
virtual ceph::ref_t<class MDRequestImpl> find_new_head() = 0;

virtual void print(std::ostream&) = 0;

Expand Down
33 changes: 2 additions & 31 deletions src/mds/MDCache.cc
Expand Up @@ -9629,37 +9629,8 @@ void MDCache::request_forward(MDRequestRef& mdr, mds_rank_t who, int port)
if (mdr->client_request && mdr->client_request->get_source().is_client()) {
dout(7) << "request_forward " << *mdr << " to mds." << who << " req "
<< *mdr->client_request << dendl;
if (mdr->is_batch_head) {
int mask = mdr->client_request->head.args.getattr.mask;

switch (mdr->client_request->get_op()) {
case CEPH_MDS_OP_GETATTR:
{
CInode* in = mdr->in[0];
if (in) {
auto it = in->batch_ops.find(mask);
if (it != in->batch_ops.end()) {
it->second->forward(who);
in->batch_ops.erase(it);
}
}
break;
}
case CEPH_MDS_OP_LOOKUP:
{
if (mdr->dn[0].size()) {
CDentry* dn = mdr->dn[0].back();
auto it = dn->batch_ops.find(mask);
if (it != dn->batch_ops.end()) {
it->second->forward(who);
dn->batch_ops.erase(it);
}
}
break;
}
default:
ceph_abort();
}
if (mdr->is_batch_head()) {
mdr->release_batch_op()->forward(who);
} else {
mds->forward_message_mds(mdr->release_client_request(), who);
}
Expand Down
29 changes: 24 additions & 5 deletions src/mds/Mutation.cc
Expand Up @@ -418,12 +418,31 @@ bool MDRequestImpl::is_queued_for_replay() const
return client_request ? client_request->is_queued_for_replay() : false;
}

bool MDRequestImpl::is_batch_op()
bool MDRequestImpl::can_batch()
{
return (client_request->get_op() == CEPH_MDS_OP_LOOKUP &&
client_request->get_filepath().depth() == 1) ||
(client_request->get_op() == CEPH_MDS_OP_GETATTR &&
client_request->get_filepath().depth() == 0);
if (num_auth_pins || num_remote_auth_pins || lock_cache || !locks.empty())
return false;

auto op = client_request->get_op();
auto& path = client_request->get_filepath();
if (op == CEPH_MDS_OP_GETATTR) {
if (path.depth() == 0)
return true;
} else if (op == CEPH_MDS_OP_LOOKUP) {
if (path.depth() == 1 && !path.is_last_snap())
return true;
}

return false;
}

std::unique_ptr<BatchOp> MDRequestImpl::release_batch_op()
{
int mask = client_request->head.args.getattr.mask;
auto it = batch_op_map->find(mask);
std::unique_ptr<BatchOp> bop = std::move(it->second);
batch_op_map->erase(it);
return bop;
}

int MDRequestImpl::compare_paths()
Expand Down
11 changes: 8 additions & 3 deletions src/mds/Mutation.h
Expand Up @@ -24,6 +24,7 @@

#include "SimpleLock.h"
#include "Capability.h"
#include "BatchOp.h"

#include "common/TrackedOp.h"
#include "messages/MClientRequest.h"
Expand Down Expand Up @@ -381,9 +382,14 @@ struct MDRequestImpl : public MutationImpl {
void set_filepath(const filepath& fp);
void set_filepath2(const filepath& fp);
bool is_queued_for_replay() const;
bool is_batch_op();
int compare_paths();

bool can_batch();
bool is_batch_head() {
return batch_op_map != nullptr;
}
std::unique_ptr<BatchOp> release_batch_op();

void print(ostream &out) const override;
void dump(Formatter *f) const override;

Expand Down Expand Up @@ -435,12 +441,11 @@ struct MDRequestImpl : public MutationImpl {
// indicates how may retries of request have been made
int retry = 0;

bool is_batch_head = false;
std::map<int, std::unique_ptr<BatchOp> > *batch_op_map = nullptr;

// indicator for vxattr osdmap update
bool waited_for_osdmap = false;

std::vector<Ref> batch_reqs;
protected:
void _dump(Formatter *f) const override;
void _dump_op_descriptor_unlocked(ostream& stream) const override;
Expand Down
148 changes: 64 additions & 84 deletions src/mds/Server.cc
Expand Up @@ -82,35 +82,53 @@ class Batch_Getattr_Lookup : public BatchOp {
protected:
Server* server;
ceph::ref_t<MDRequestImpl> mdr;
MDCache* mdcache;
std::vector<ceph::ref_t<MDRequestImpl>> batch_reqs;
int res = 0;
public:
Batch_Getattr_Lookup(Server* s, ceph::ref_t<MDRequestImpl> r, MDCache* mdc) : server(s), mdr(std::move(r)), mdcache(mdc) {}
void add_request(const ceph::ref_t<MDRequestImpl>& m) override {
mdr->batch_reqs.push_back(m);
Batch_Getattr_Lookup(Server* s, const ceph::ref_t<MDRequestImpl>& r)
: server(s), mdr(r) {
if (mdr->client_request->get_op() == CEPH_MDS_OP_LOOKUP)
mdr->batch_op_map = &mdr->dn[0].back()->batch_ops;
else
mdr->batch_op_map = &mdr->in[0]->batch_ops;
}
void add_request(const ceph::ref_t<MDRequestImpl>& r) override {
batch_reqs.push_back(r);
}
void set_request(const ceph::ref_t<MDRequestImpl>& m) override {
mdr = m;
ceph::ref_t<MDRequestImpl> find_new_head() override {
while (!batch_reqs.empty()) {
auto r = std::move(batch_reqs.back());
batch_reqs.pop_back();
if (r->killed)
continue;

r->batch_op_map = mdr->batch_op_map;
mdr->batch_op_map = nullptr;
mdr = r;
return mdr;
}
return nullptr;
}
void _forward(mds_rank_t t) override {
MDCache* mdcache = server->mdcache;
mdcache->mds->forward_message_mds(mdr->release_client_request(), t);
mdr->set_mds_stamp(ceph_clock_now());
for (auto& m : mdr->batch_reqs) {
for (auto& m : batch_reqs) {
if (!m->killed)
mdcache->request_forward(m, t);
}
mdr->batch_reqs.clear();
batch_reqs.clear();
}
void _respond(int r) override {
mdr->set_mds_stamp(ceph_clock_now());
for (auto& m : mdr->batch_reqs) {
for (auto& m : batch_reqs) {
if (!m->killed) {
m->tracei = mdr->tracei;
m->tracedn = mdr->tracedn;
server->respond_to_request(m, r);
}
}
mdr->batch_reqs.clear();
batch_reqs.clear();
server->reply_client_request(mdr, make_message<MClientReply>(*mdr->client_request, r));
}
void print(std::ostream& o) {
Expand Down Expand Up @@ -1865,23 +1883,9 @@ void Server::submit_mdlog_entry(LogEvent *le, MDSLogContextBase *fin, MDRequestR
void Server::respond_to_request(MDRequestRef& mdr, int r)
{
if (mdr->client_request) {
if (mdr->is_batch_op() && mdr->is_batch_head) {
int mask = mdr->client_request->head.args.getattr.mask;

std::unique_ptr<BatchOp> bop;
if (mdr->client_request->get_op() == CEPH_MDS_OP_GETATTR) {
dout(20) << __func__ << ": respond other getattr ops. " << *mdr << dendl;
auto it = mdr->in[0]->batch_ops.find(mask);
bop = std::move(it->second);
mdr->in[0]->batch_ops.erase(it);
} else {
dout(20) << __func__ << ": respond other lookup ops. " << *mdr << dendl;
auto it = mdr->dn[0].back()->batch_ops.find(mask);
bop = std::move(it->second);
mdr->dn[0].back()->batch_ops.erase(it);
}

bop->respond(r);
if (mdr->is_batch_head()) {
dout(20) << __func__ << " batch head " << *mdr << dendl;
mdr->release_batch_op()->respond(r);
} else {
reply_client_request(mdr, make_message<MClientReply>(*mdr->client_request, r));
}
Expand Down Expand Up @@ -2402,16 +2406,6 @@ void Server::handle_osd_map()
});
}

void Server::clear_batch_ops(const MDRequestRef& mdr)
{
int mask = mdr->client_request->head.args.getattr.mask;
if (mdr->client_request->get_op() == CEPH_MDS_OP_GETATTR && mdr->in[0]) {
mdr->in[0]->batch_ops.erase(mask);
} else if (mdr->client_request->get_op() == CEPH_MDS_OP_LOOKUP && mdr->dn[0].size()) {
mdr->dn[0].back()->batch_ops.erase(mask);
}
}

void Server::dispatch_client_request(MDRequestRef& mdr)
{
// we shouldn't be waiting on anyone.
Expand All @@ -2421,39 +2415,15 @@ void Server::dispatch_client_request(MDRequestRef& mdr)
dout(10) << "request " << *mdr << " was killed" << dendl;
//if the mdr is a "batch_op" and it has followers, pick a follower as
//the new "head of the batch ops" and go on processing the new one.
if (mdr->is_batch_op() && mdr->is_batch_head ) {
if (!mdr->batch_reqs.empty()) {
MDRequestRef new_batch_head;
for (auto itr = mdr->batch_reqs.cbegin(); itr != mdr->batch_reqs.cend();) {
auto req = *itr;
itr = mdr->batch_reqs.erase(itr);
if (!req->killed) {
new_batch_head = req;
break;
}
}

if (!new_batch_head) {
clear_batch_ops(mdr);
return;
}

new_batch_head->batch_reqs = std::move(mdr->batch_reqs);

mdr = new_batch_head;
mdr->is_batch_head = true;
int mask = mdr->client_request->head.args.getattr.mask;
if (mdr->client_request->get_op() == CEPH_MDS_OP_GETATTR) {
auto& fin = mdr->in[0]->batch_ops[mask];
fin->set_request(new_batch_head);
} else if (mdr->client_request->get_op() == CEPH_MDS_OP_LOOKUP) {
auto& fin = mdr->dn[0].back()->batch_ops[mask];
fin->set_request(new_batch_head);
}
} else {
clear_batch_ops(mdr);
if (mdr->is_batch_head()) {
int mask = mdr->client_request->head.args.getattr.mask;
auto it = mdr->batch_op_map->find(mask);
auto new_batch_head = it->second->find_new_head();
if (!new_batch_head) {
mdr->batch_op_map->erase(it);
return;
}
mdr = std::move(new_batch_head);
} else {
return;
}
Expand Down Expand Up @@ -3748,37 +3718,47 @@ void Server::handle_client_getattr(MDRequestRef& mdr, bool is_lookup)
if (mask & CEPH_STAT_RSTAT)
want_auth = true; // set want_auth for CEPH_STAT_RSTAT mask

CInode *ref = rdlock_path_pin_ref(mdr, want_auth, false);
if (!ref)
return;
if (!mdr->is_batch_head() && mdr->can_batch()) {
CF_MDS_MDRContextFactory cf(mdcache, mdr, false);
int r = mdcache->path_traverse(mdr, cf, mdr->get_filepath(),
(want_auth ? MDS_TRAVERSE_WANT_AUTH : 0),
&mdr->dn[0], &mdr->in[0]);
if (r > 0)
return; // delayed

mdr->getattr_caps = mask;

if (mdr->snapid == CEPH_NOSNAP && !mdr->is_batch_head && mdr->is_batch_op()) {
if (!is_lookup) {
auto em = ref->batch_ops.emplace(std::piecewise_construct, std::forward_as_tuple(mask), std::forward_as_tuple());
if (r < 0) {
// fall-thru. let rdlock_path_pin_ref() check again.
} else if (is_lookup) {
CDentry* dn = mdr->dn[0].back();
mdr->pin(dn);
auto em = dn->batch_ops.emplace(std::piecewise_construct, std::forward_as_tuple(mask), std::forward_as_tuple());
if (em.second) {
em.first->second = std::make_unique<Batch_Getattr_Lookup>(this, mdr, mdcache);
em.first->second = std::make_unique<Batch_Getattr_Lookup>(this, mdr);
} else {
dout(20) << __func__ << ": GETATTR op, wait for previous same getattr ops to respond. " << *mdr << dendl;
dout(20) << __func__ << ": LOOKUP op, wait for previous same getattr ops to respond. " << *mdr << dendl;
em.first->second->add_request(mdr);
return;
}
} else {
CDentry* dn = mdr->dn[0].back();
auto em = dn->batch_ops.emplace(std::piecewise_construct, std::forward_as_tuple(mask), std::forward_as_tuple());
CInode *in = mdr->in[0];
mdr->pin(in);
auto em = in->batch_ops.emplace(std::piecewise_construct, std::forward_as_tuple(mask), std::forward_as_tuple());
if (em.second) {
em.first->second = std::make_unique<Batch_Getattr_Lookup>(this, mdr, mdcache);
mdr->pin(dn);
em.first->second = std::make_unique<Batch_Getattr_Lookup>(this, mdr);
} else {
dout(20) << __func__ << ": LOOKUP op, wait for previous same getattr ops to respond. " << *mdr << dendl;
dout(20) << __func__ << ": GETATTR op, wait for previous same getattr ops to respond. " << *mdr << dendl;
em.first->second->add_request(mdr);
return;
}
}
mdr->is_batch_head = true;
}

CInode *ref = rdlock_path_pin_ref(mdr, want_auth, false);
if (!ref)
return;

mdr->getattr_caps = mask;

/*
* if client currently holds the EXCL cap on a field, do not rdlock
* it; client's stat() will result in valid info if _either_ EXCL
Expand Down
2 changes: 0 additions & 2 deletions src/mds/Server.h
Expand Up @@ -164,7 +164,6 @@ class Server {
void set_trace_dist(const ref_t<MClientReply> &reply, CInode *in, CDentry *dn,
MDRequestRef& mdr);


void handle_slave_request(const cref_t<MMDSSlaveRequest> &m);
void handle_slave_request_reply(const cref_t<MMDSSlaveRequest> &m);
void dispatch_slave_request(MDRequestRef& mdr);
Expand Down Expand Up @@ -320,7 +319,6 @@ class Server {

void reply_client_request(MDRequestRef& mdr, const ref_t<MClientReply> &reply);
void flush_session(Session *session, MDSGatherBuilder *gather);
void clear_batch_ops(const MDRequestRef& mdr);

MDSRank *mds;
MDCache *mdcache;
Expand Down