Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

crimson/osd: use single-pg peering ops #30372

Merged
merged 3 commits into from Sep 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 19 additions & 19 deletions src/crimson/osd/osd.cc
Expand Up @@ -297,8 +297,8 @@ seastar::future<> OSD::_preboot(version_t oldest, version_t newest)
logger().warn("osdmap NOUP flag is set, waiting for it to clear");
} else if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
logger().error("osdmap SORTBITWISE OSDMap flag is NOT set; please set it");
} else if (osdmap->require_osd_release < ceph_release_t::luminous) {
logger().error("osdmap require_osd_release < luminous; please upgrade to luminous");
} else if (osdmap->require_osd_release < ceph_release_t::octopus) {
logger().error("osdmap require_osd_release < octopus; please upgrade to octopus");
} else if (false) {
// TODO: update mon if current fullness state is different from osdmap
} else if (version_t n = local_conf()->osd_map_message_max;
Expand Down Expand Up @@ -488,16 +488,19 @@ seastar::future<> OSD::ms_dispatch(ceph::net::Connection* conn, MessageRef m)
case CEPH_MSG_OSD_OP:
return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m));
case MSG_OSD_PG_CREATE2:
case MSG_OSD_PG_NOTIFY:
case MSG_OSD_PG_INFO:
case MSG_OSD_PG_QUERY:
shard_services.start_operation<CompoundPeeringRequest>(
*this,
conn->get_shared(),
m);
return seastar::now();
case MSG_OSD_PG_NOTIFY2:
[[fallthrough]];
case MSG_OSD_PG_INFO2:
[[fallthrough]];
case MSG_OSD_PG_QUERY2:
[[fallthrough]];
case MSG_OSD_PG_LOG:
return handle_pg_log(conn, boost::static_pointer_cast<MOSDPGLog>(m));
return handle_peering_op(conn, boost::static_pointer_cast<MOSDPeeringOp>(m));
case MSG_OSD_REPOP:
return handle_rep_op(conn, boost::static_pointer_cast<MOSDRepOp>(m));
case MSG_OSD_REPOPREPLY:
Expand Down Expand Up @@ -666,7 +669,7 @@ seastar::future<Ref<PG>> OSD::handle_pg_create_info(
Ref<PG>(),
startmap);
}
ceph_assert(osdmap->require_osd_release >= ceph_release_t::nautilus);
ceph_assert(osdmap->require_osd_release >= ceph_release_t::octopus);
if (!pool->has_flag(pg_pool_t::FLAG_CREATING)) {
// this ensures we do not process old creating messages after the
// pool's initial pgs have been created (and pg are subsequently
Expand All @@ -691,7 +694,7 @@ seastar::future<Ref<PG>> OSD::handle_pg_create_info(
if (!pg)
return seastar::make_ready_future<Ref<PG>>(Ref<PG>());
return store->create_new_collection(coll_t(info->pgid)).then([this, &info, startmap, pg] (auto coll) {
PeeringCtx rctx;
PeeringCtx rctx{ceph_release_t::octopus};
const pg_pool_t* pp = startmap->get_pg_pool(info->pgid.pool());

int up_primary, acting_primary;
Expand Down Expand Up @@ -974,29 +977,25 @@ void OSD::update_heartbeat_peers()
heartbeat->update_peers(whoami);
}

seastar::future<> OSD::handle_pg_log(
seastar::future<> OSD::handle_peering_op(
ceph::net::Connection* conn,
Ref<MOSDPGLog> m)
Ref<MOSDPeeringOp> m)
{
const int from = m->get_source().num();
logger().debug("handle_pg_log on {} from {}", m->get_spg(), from);
logger().debug("handle_peering_op on {} from {}", m->get_spg(), from);
shard_services.start_operation<RemotePeeringEvent>(
*this,
conn->get_shared(),
shard_services,
pg_shard_t(from, m->from),
spg_t(m->info.pgid.pgid, m->to),
pg_shard_t{from, m->get_spg().shard},
m->get_spg(),
std::move(*m->get_event()));
return seastar::now();
}

void OSD::check_osdmap_features()
{
if (osdmap->require_osd_release < ceph_release_t::nautilus) {
heartbeat->set_require_authorizer(false);
} else {
heartbeat->set_require_authorizer(true);
}
heartbeat->set_require_authorizer(true);
}

seastar::future<> OSD::consume_map(epoch_t epoch)
Expand All @@ -1005,7 +1004,8 @@ seastar::future<> OSD::consume_map(epoch_t epoch)
auto &pgs = pg_map.get_pgs();
return seastar::parallel_for_each(pgs.begin(), pgs.end(), [=](auto& pg) {
return shard_services.start_operation<PGAdvanceMap>(
*this, pg.second, pg.second->get_osdmap_epoch(), epoch).second;
*this, pg.second, pg.second->get_osdmap_epoch(), epoch,
PeeringCtx{ceph_release_t::octopus}, false).second;
}).then([epoch, this] {
osdmap_gate.got_map(epoch);
return seastar::make_ready_future();
Expand Down
4 changes: 2 additions & 2 deletions src/crimson/osd/osd.h
Expand Up @@ -169,8 +169,8 @@ class OSD final : public ceph::net::Dispatcher,
Ref<MOSDRepOp> m);
seastar::future<> handle_rep_op_reply(ceph::net::Connection* conn,
Ref<MOSDRepOpReply> m);
seastar::future<> handle_pg_log(ceph::net::Connection* conn,
Ref<MOSDPGLog> m);
seastar::future<> handle_peering_op(ceph::net::Connection* conn,
Ref<MOSDPeeringOp> m);

seastar::future<> committed_osd_maps(version_t first,
version_t last,
Expand Down
90 changes: 6 additions & 84 deletions src/crimson/osd/osd_operations/compound_peering_request.cc
Expand Up @@ -5,8 +5,6 @@

#include "osd/PeeringState.h"

#include "messages/MOSDPGInfo.h"
#include "messages/MOSDPGNotify.h"
#include "messages/MOSDPGQuery.h"
#include "messages/MOSDPGCreate2.h"

Expand All @@ -28,6 +26,11 @@ using namespace ceph::osd;
struct compound_state {
seastar::promise<BufferedRecoveryMessages> promise;
BufferedRecoveryMessages ctx;
compound_state()
// assuming crimson-osd won't need to be compatible with pre-octopus
// releases
: ctx{ceph_release_t::octopus}
{}
~compound_state() {
promise.set_value(std::move(ctx));
}
Expand Down Expand Up @@ -107,75 +110,6 @@ std::vector<OperationRef> handle_pg_create(
return ret;
}

std::vector<OperationRef> handle_pg_notify(
OSD &osd,
ceph::net::ConnectionRef conn,
compound_state_ref state,
Ref<MOSDPGNotify> m)
{
std::vector<OperationRef> ret;
ret.reserve(m->get_pg_list().size());
const int from = m->get_source().num();
for (auto& pg_notify : m->get_pg_list()) {
spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to};
MNotifyRec notify{pgid,
pg_shard_t{from, pg_notify.from},
pg_notify,
0}; // the features is not used
logger().debug("handle_pg_notify on {} from {}", pgid.pgid, from);
auto create_info = new PGCreateInfo{
pgid,
pg_notify.query_epoch,
pg_notify.info.history,
pg_notify.past_intervals,
false};
auto op = osd.get_shard_services().start_operation<PeeringSubEvent>(
state,
osd,
conn,
osd.get_shard_services(),
pg_shard_t(from, pg_notify.from),
pgid,
pg_notify.epoch_sent,
pg_notify.query_epoch,
notify,
true, // requires_pg
create_info).first;
ret.push_back(op);
}
return ret;
}

std::vector<OperationRef> handle_pg_info(
OSD &osd,
ceph::net::ConnectionRef conn,
compound_state_ref state,
Ref<MOSDPGInfo> m)
{
std::vector<OperationRef> ret;
ret.reserve(m->pg_list.size());
const int from = m->get_source().num();
for (auto& pg_notify : m->pg_list) {
spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to};
logger().debug("handle_pg_info on {} from {}", pgid.pgid, from);
MInfoRec info{pg_shard_t{from, pg_notify.from},
pg_notify.info,
pg_notify.epoch_sent};
auto op = osd.get_shard_services().start_operation<PeeringSubEvent>(
state,
osd,
conn,
osd.get_shard_services(),
pg_shard_t(from, pg_notify.from),
pgid,
pg_notify.epoch_sent,
pg_notify.query_epoch,
std::move(info)).first;
ret.push_back(op);
}
return ret;
}

class QuerySubEvent : public PeeringSubEvent {
public:
template <typename... Args>
Expand All @@ -185,7 +119,7 @@ class QuerySubEvent : public PeeringSubEvent {
void on_pg_absent() final {
logger().debug("handle_pg_query on absent pg {} from {}", pgid, from);
pg_info_t empty(pgid);
ctx.notify_list[from.osd].emplace_back(
ctx.send_notify(from.osd,
pg_notify_t(
from.shard, pgid.shard,
evt.get_epoch_sent(),
Expand Down Expand Up @@ -275,18 +209,6 @@ seastar::future<> CompoundPeeringRequest::start()
conn,
state,
boost::static_pointer_cast<MOSDPGCreate2>(m));
case MSG_OSD_PG_NOTIFY:
return handle_pg_notify(
osd,
conn,
state,
boost::static_pointer_cast<MOSDPGNotify>(m));
case MSG_OSD_PG_INFO:
return handle_pg_info(
osd,
conn,
state,
boost::static_pointer_cast<MOSDPGInfo>(m));
case MSG_OSD_PG_QUERY:
return handle_pg_query(
osd,
Expand Down
1 change: 1 addition & 0 deletions src/crimson/osd/osd_operations/peering_event.h
Expand Up @@ -64,6 +64,7 @@ class PeeringEvent : public OperationT<PeeringEvent> {
ShardServices &shard_services, const pg_shard_t &from, const spg_t &pgid,
Args&&... args) :
shard_services(shard_services),
ctx{ceph_release_t::octopus},
from(from),
pgid(pgid),
evt(std::forward<Args>(args)...)
Expand Down
4 changes: 0 additions & 4 deletions src/crimson/osd/osd_operations/pg_advance_map.cc
Expand Up @@ -18,10 +18,6 @@ namespace {

namespace ceph::osd {

PGAdvanceMap::PGAdvanceMap(
OSD &osd, Ref<PG> pg, epoch_t from, epoch_t to)
: osd(osd), pg(pg), from(from), to(to), do_init(false) {}

PGAdvanceMap::PGAdvanceMap(
OSD &osd, Ref<PG> pg, epoch_t from, epoch_t to,
PeeringCtx &&rctx, bool do_init)
Expand Down
2 changes: 0 additions & 2 deletions src/crimson/osd/osd_operations/pg_advance_map.h
Expand Up @@ -33,8 +33,6 @@ class PGAdvanceMap : public OperationT<PGAdvanceMap> {
const bool do_init;

public:
PGAdvanceMap(
OSD &osd, Ref<PG> pg, epoch_t from, epoch_t to);
PGAdvanceMap(
OSD &osd, Ref<PG> pg, epoch_t from, epoch_t to,
PeeringCtx &&rctx, bool do_init);
Expand Down
37 changes: 9 additions & 28 deletions src/crimson/osd/shard_services.cc
Expand Up @@ -77,34 +77,15 @@ seastar::future<> ShardServices::dispatch_context_transaction(
seastar::future<> ShardServices::dispatch_context_messages(
BufferedRecoveryMessages &&ctx)
{
auto ret = seastar::when_all_succeed(
seastar::parallel_for_each(std::move(ctx.notify_list),
[this](auto& osd_notifies) {
auto& [peer, notifies] = osd_notifies;
auto m = make_message<MOSDPGNotify>(osdmap->get_epoch(),
std::move(notifies));
logger().debug("dispatch_context_messages sending notify to {}", peer);
return send_to_osd(peer, m, osdmap->get_epoch());
}),
seastar::parallel_for_each(std::move(ctx.query_map),
[this](auto& osd_queries) {
auto& [peer, queries] = osd_queries;
auto m = make_message<MOSDPGQuery>(osdmap->get_epoch(),
std::move(queries));
logger().debug("dispatch_context_messages sending query to {}", peer);
return send_to_osd(peer, m, osdmap->get_epoch());
}),
seastar::parallel_for_each(std::move(ctx.info_map),
[this](auto& osd_infos) {
auto& [peer, infos] = osd_infos;
auto m = make_message<MOSDPGInfo>(osdmap->get_epoch(),
std::move(infos));
logger().debug("dispatch_context_messages sending info to {}", peer);
return send_to_osd(peer, m, osdmap->get_epoch());
}));
ctx.notify_list.clear();
ctx.query_map.clear();
ctx.info_map.clear();
auto ret = seastar::parallel_for_each(std::move(ctx.message_map),
[this](auto& osd_messages) {
auto& [peer, messages] = osd_messages;
logger().debug("dispatch_context_messages sending messages to {}", peer);
return seastar::parallel_for_each(std::move(messages), [=](auto& m) {
return send_to_osd(peer, m, osdmap->get_epoch());
});
});
ctx.message_map.clear();
return ret;
}

Expand Down