Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mds: fix issuing redundant reintegrate/migrate_stray requests #53280

Merged
merged 2 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/mds/CDentry.h
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,8 @@ class CDentry : public MDSCacheObject, public LRUObject, public Counter<CDentry>
mempool::mds_co::map<client_t,ClientLease*> client_lease_map;
std::map<int, std::unique_ptr<BatchOp>> batch_ops;

ceph_tid_t reintegration_reqid = 0;


protected:
friend class Migrator;
Expand Down
33 changes: 33 additions & 0 deletions src/mds/MDSMetaRequest.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2023 Red Hat, Inc.
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/

#ifndef CEPH_MDS_META_REQUEST_H
#define CEPH_MDS_META_REQUEST_H

#include "include/types.h"

struct MDSMetaRequest {
private:
int op;
ceph_tid_t tid;
public:
explicit MDSMetaRequest(int o, ceph_tid_t t) :
op(o), tid(t) { }
virtual ~MDSMetaRequest() { }

int get_op() { return op; }
ceph_tid_t get_tid() { return tid; }
};

#endif // !CEPH_MDS_META_REQUEST_H
2 changes: 2 additions & 0 deletions src/mds/MDSRank.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1191,6 +1191,7 @@ bool MDSRank::is_valid_message(const cref_t<Message> &m) {
type == CEPH_MSG_CLIENT_RECONNECT ||
type == CEPH_MSG_CLIENT_RECLAIM ||
type == CEPH_MSG_CLIENT_REQUEST ||
type == CEPH_MSG_CLIENT_REPLY ||
type == MSG_MDS_PEER_REQUEST ||
type == MSG_MDS_HEARTBEAT ||
type == MSG_MDS_TABLE_REQUEST ||
Expand Down Expand Up @@ -1244,6 +1245,7 @@ void MDSRank::handle_message(const cref_t<Message> &m)
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT);
// fall-thru
case CEPH_MSG_CLIENT_REQUEST:
case CEPH_MSG_CLIENT_REPLY:
server->dispatch(m);
break;
case MSG_MDS_PEER_REQUEST:
Expand Down
3 changes: 3 additions & 0 deletions src/mds/MDSRank.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "Server.h"
#include "MetricsHandler.h"
#include "osdc/Journaler.h"
#include "MDSMetaRequest.h"

// Full .h import instead of forward declaration for PerfCounter, for the
// benefit of those including this header and using MDSRank::logger
Expand Down Expand Up @@ -423,6 +424,8 @@ class MDSRank {
PerfCounters *logger = nullptr, *mlogger = nullptr;
OpTracker op_tracker;

std::map<ceph_tid_t, std::unique_ptr<MDSMetaRequest>> internal_client_requests;

// The last different state I held before current
MDSMap::DaemonState last_state = MDSMap::STATE_BOOT;
// The state assigned to me by the MDSMap
Expand Down
30 changes: 30 additions & 0 deletions src/mds/Server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "Mutation.h"
#include "MetricsHandler.h"
#include "cephfs_features.h"
#include "MDSContext.h"

#include "msg/Messenger.h"

Expand Down Expand Up @@ -360,6 +361,9 @@ void Server::dispatch(const cref_t<Message> &m)
case CEPH_MSG_CLIENT_REQUEST:
handle_client_request(ref_cast<MClientRequest>(m));
return;
case CEPH_MSG_CLIENT_REPLY:
handle_client_reply(ref_cast<MClientReply>(m));
return;
case CEPH_MSG_CLIENT_RECLAIM:
handle_client_reclaim(ref_cast<MClientReclaim>(m));
return;
Expand Down Expand Up @@ -2319,6 +2323,10 @@ void Server::reply_client_request(MDRequestRef& mdr, const ref_t<MClientReply> &
mds->send_message_client(reply, session);
}

if (client_inst.name.is_mds() && reply->get_op() == CEPH_MDS_OP_RENAME) {
mds->send_message(reply, mdr->client_request->get_connection());
}

if (req->is_queued_for_replay() &&
(mdr->has_completed || reply->get_result() < 0)) {
if (reply->get_result() < 0) {
Expand Down Expand Up @@ -2551,6 +2559,28 @@ void Server::handle_client_request(const cref_t<MClientRequest> &req)
return;
}

void Server::handle_client_reply(const cref_t<MClientReply> &reply)
{
dout(4) << "handle_client_reply " << *reply << dendl;

ceph_assert(reply->is_safe());
ceph_tid_t tid = reply->get_tid();

if (mds->internal_client_requests.count(tid) == 0) {
dout(1) << " no pending request on tid " << tid << dendl;
return;
}

switch (reply->get_op()) {
case CEPH_MDS_OP_RENAME:
break;
default:
dout(5) << " unknown client op " << reply->get_op() << dendl;
}

mds->internal_client_requests.erase(tid);
}

void Server::handle_osd_map()
{
/* Note that we check the OSDMAP_FULL flag directly rather than
Expand Down
1 change: 1 addition & 0 deletions src/mds/Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ class Server {

// -- requests --
void handle_client_request(const cref_t<MClientRequest> &m);
void handle_client_reply(const cref_t<MClientReply> &m);

void journal_and_reply(MDRequestRef& mdr, CInode *tracei, CDentry *tracedn,
LogEvent *le, MDSLogContextBase *fin);
Expand Down
30 changes: 26 additions & 4 deletions src/mds/StrayManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -673,24 +673,41 @@ void StrayManager::reintegrate_stray(CDentry *straydn, CDentry *rdn)
{
dout(10) << __func__ << " " << *straydn << " to " << *rdn << dendl;

if (straydn->reintegration_reqid) {
dout(20) << __func__ << ": stray dentry " << *straydn
<< " is already under reintegrating" << dendl;
return;
}

logger->inc(l_mdc_strays_reintegrated);

// rename it to remote linkage .
filepath src(straydn->get_name(), straydn->get_dir()->ino());
filepath dst(rdn->get_name(), rdn->get_dir()->ino());

ceph_tid_t tid = mds->issue_tid();

auto req = make_message<MClientRequest>(CEPH_MDS_OP_RENAME);
req->set_filepath(dst);
req->set_filepath2(src);
req->set_tid(mds->issue_tid());
req->set_tid(tid);

auto ptr = std::make_unique<StrayEvalRequest>(CEPH_MDS_OP_RENAME, tid, straydn);
mds->internal_client_requests.emplace(tid, std::move(ptr));

mds->send_message_mds(req, rdn->authority().first);
}

void StrayManager::migrate_stray(CDentry *dn, mds_rank_t to)
{
dout(10) << __func__ << " " << *dn << " to mds." << to << dendl;

if (dn->reintegration_reqid) {
dout(20) << __func__ << ": stray dentry " << *dn
<< " is already under migrating" << dendl;
return;
}

logger->inc(l_mdc_strays_migrated);

// rename it to another mds.
Expand All @@ -700,10 +717,15 @@ void StrayManager::migrate_stray(CDentry *dn, mds_rank_t to)
filepath src(dn->get_name(), dirino);
filepath dst(dn->get_name(), MDS_INO_STRAY(to, MDS_INO_STRAY_INDEX(dirino)));

ceph_tid_t tid = mds->issue_tid();

auto req = make_message<MClientRequest>(CEPH_MDS_OP_RENAME);
req->set_filepath(dst);
req->set_filepath2(src);
req->set_tid(mds->issue_tid());
req->set_tid(tid);

auto ptr = std::make_unique<StrayEvalRequest>(CEPH_MDS_OP_RENAME, tid, dn);
mds->internal_client_requests.emplace(tid, std::move(ptr));

mds->send_message_mds(req, to);
}
Expand Down
17 changes: 16 additions & 1 deletion src/mds/StrayManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,30 @@
#include <list>
#include "Mutation.h"
#include "PurgeQueue.h"
#include "MDSMetaRequest.h"
#include "CDentry.h"

class MDSRank;
class CInode;
class CDentry;

class StrayManager
{
// My public interface is for consumption by MDCache
public:
struct StrayEvalRequest : public MDSMetaRequest {
CDentry *dentry;
public:
explicit StrayEvalRequest(int o, ceph_tid_t t, CDentry *d) :
MDSMetaRequest(o, t), dentry(d) {
dentry->get(CDentry::PIN_PURGING);
dentry->reintegration_reqid = t;
}
~StrayEvalRequest() {
dentry->reintegration_reqid = 0;
dentry->put(CDentry::PIN_PURGING);
}
};

explicit StrayManager(MDSRank *mds, PurgeQueue &purge_queue_);
void set_logger(PerfCounters *l) {logger = l;}
void activate();
Expand Down