Skip to content

Commit

Permalink
Merge pull request #30372 from tchaikov/wip-crimson-single-pg-peering
Browse files Browse the repository at this point in the history
crimson/osd: use single-pg peering ops

Reviewed-by: Samuel Just <sjust@redhat.com>
Reviewed-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
  • Loading branch information
tchaikov committed Sep 13, 2019
2 parents 65996cb + 406522d commit d42c186
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 139 deletions.
38 changes: 19 additions & 19 deletions src/crimson/osd/osd.cc
Expand Up @@ -297,8 +297,8 @@ seastar::future<> OSD::_preboot(version_t oldest, version_t newest)
logger().warn("osdmap NOUP flag is set, waiting for it to clear");
} else if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
logger().error("osdmap SORTBITWISE OSDMap flag is NOT set; please set it");
} else if (osdmap->require_osd_release < ceph_release_t::luminous) {
logger().error("osdmap require_osd_release < luminous; please upgrade to luminous");
} else if (osdmap->require_osd_release < ceph_release_t::octopus) {
logger().error("osdmap require_osd_release < octopus; please upgrade to octopus");
} else if (false) {
// TODO: update mon if current fullness state is different from osdmap
} else if (version_t n = local_conf()->osd_map_message_max;
Expand Down Expand Up @@ -488,16 +488,19 @@ seastar::future<> OSD::ms_dispatch(ceph::net::Connection* conn, MessageRef m)
case CEPH_MSG_OSD_OP:
return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m));
case MSG_OSD_PG_CREATE2:
case MSG_OSD_PG_NOTIFY:
case MSG_OSD_PG_INFO:
case MSG_OSD_PG_QUERY:
shard_services.start_operation<CompoundPeeringRequest>(
*this,
conn->get_shared(),
m);
return seastar::now();
case MSG_OSD_PG_NOTIFY2:
[[fallthrough]];
case MSG_OSD_PG_INFO2:
[[fallthrough]];
case MSG_OSD_PG_QUERY2:
[[fallthrough]];
case MSG_OSD_PG_LOG:
return handle_pg_log(conn, boost::static_pointer_cast<MOSDPGLog>(m));
return handle_peering_op(conn, boost::static_pointer_cast<MOSDPeeringOp>(m));
case MSG_OSD_REPOP:
return handle_rep_op(conn, boost::static_pointer_cast<MOSDRepOp>(m));
case MSG_OSD_REPOPREPLY:
Expand Down Expand Up @@ -666,7 +669,7 @@ seastar::future<Ref<PG>> OSD::handle_pg_create_info(
Ref<PG>(),
startmap);
}
ceph_assert(osdmap->require_osd_release >= ceph_release_t::nautilus);
ceph_assert(osdmap->require_osd_release >= ceph_release_t::octopus);
if (!pool->has_flag(pg_pool_t::FLAG_CREATING)) {
// this ensures we do not process old creating messages after the
// pool's initial pgs have been created (and pg are subsequently
Expand All @@ -691,7 +694,7 @@ seastar::future<Ref<PG>> OSD::handle_pg_create_info(
if (!pg)
return seastar::make_ready_future<Ref<PG>>(Ref<PG>());
return store->create_new_collection(coll_t(info->pgid)).then([this, &info, startmap, pg] (auto coll) {
PeeringCtx rctx;
PeeringCtx rctx{ceph_release_t::octopus};
const pg_pool_t* pp = startmap->get_pg_pool(info->pgid.pool());

int up_primary, acting_primary;
Expand Down Expand Up @@ -974,29 +977,25 @@ void OSD::update_heartbeat_peers()
heartbeat->update_peers(whoami);
}

seastar::future<> OSD::handle_pg_log(
seastar::future<> OSD::handle_peering_op(
ceph::net::Connection* conn,
Ref<MOSDPGLog> m)
Ref<MOSDPeeringOp> m)
{
const int from = m->get_source().num();
logger().debug("handle_pg_log on {} from {}", m->get_spg(), from);
logger().debug("handle_peering_op on {} from {}", m->get_spg(), from);
shard_services.start_operation<RemotePeeringEvent>(
*this,
conn->get_shared(),
shard_services,
pg_shard_t(from, m->from),
spg_t(m->info.pgid.pgid, m->to),
pg_shard_t{from, m->get_spg().shard},
m->get_spg(),
std::move(*m->get_event()));
return seastar::now();
}

void OSD::check_osdmap_features()
{
if (osdmap->require_osd_release < ceph_release_t::nautilus) {
heartbeat->set_require_authorizer(false);
} else {
heartbeat->set_require_authorizer(true);
}
heartbeat->set_require_authorizer(true);
}

seastar::future<> OSD::consume_map(epoch_t epoch)
Expand All @@ -1005,7 +1004,8 @@ seastar::future<> OSD::consume_map(epoch_t epoch)
auto &pgs = pg_map.get_pgs();
return seastar::parallel_for_each(pgs.begin(), pgs.end(), [=](auto& pg) {
return shard_services.start_operation<PGAdvanceMap>(
*this, pg.second, pg.second->get_osdmap_epoch(), epoch).second;
*this, pg.second, pg.second->get_osdmap_epoch(), epoch,
PeeringCtx{ceph_release_t::octopus}, false).second;
}).then([epoch, this] {
osdmap_gate.got_map(epoch);
return seastar::make_ready_future();
Expand Down
4 changes: 2 additions & 2 deletions src/crimson/osd/osd.h
Expand Up @@ -169,8 +169,8 @@ class OSD final : public ceph::net::Dispatcher,
Ref<MOSDRepOp> m);
seastar::future<> handle_rep_op_reply(ceph::net::Connection* conn,
Ref<MOSDRepOpReply> m);
seastar::future<> handle_pg_log(ceph::net::Connection* conn,
Ref<MOSDPGLog> m);
seastar::future<> handle_peering_op(ceph::net::Connection* conn,
Ref<MOSDPeeringOp> m);

seastar::future<> committed_osd_maps(version_t first,
version_t last,
Expand Down
90 changes: 6 additions & 84 deletions src/crimson/osd/osd_operations/compound_peering_request.cc
Expand Up @@ -5,8 +5,6 @@

#include "osd/PeeringState.h"

#include "messages/MOSDPGInfo.h"
#include "messages/MOSDPGNotify.h"
#include "messages/MOSDPGQuery.h"
#include "messages/MOSDPGCreate2.h"

Expand All @@ -28,6 +26,11 @@ using namespace ceph::osd;
struct compound_state {
seastar::promise<BufferedRecoveryMessages> promise;
BufferedRecoveryMessages ctx;
compound_state()
// assuming crimson-osd won't need to be compatible with pre-octopus
// releases
: ctx{ceph_release_t::octopus}
{}
~compound_state() {
promise.set_value(std::move(ctx));
}
Expand Down Expand Up @@ -107,75 +110,6 @@ std::vector<OperationRef> handle_pg_create(
return ret;
}

std::vector<OperationRef> handle_pg_notify(
OSD &osd,
ceph::net::ConnectionRef conn,
compound_state_ref state,
Ref<MOSDPGNotify> m)
{
std::vector<OperationRef> ret;
ret.reserve(m->get_pg_list().size());
const int from = m->get_source().num();
for (auto& pg_notify : m->get_pg_list()) {
spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to};
MNotifyRec notify{pgid,
pg_shard_t{from, pg_notify.from},
pg_notify,
0}; // the features is not used
logger().debug("handle_pg_notify on {} from {}", pgid.pgid, from);
auto create_info = new PGCreateInfo{
pgid,
pg_notify.query_epoch,
pg_notify.info.history,
pg_notify.past_intervals,
false};
auto op = osd.get_shard_services().start_operation<PeeringSubEvent>(
state,
osd,
conn,
osd.get_shard_services(),
pg_shard_t(from, pg_notify.from),
pgid,
pg_notify.epoch_sent,
pg_notify.query_epoch,
notify,
true, // requires_pg
create_info).first;
ret.push_back(op);
}
return ret;
}

std::vector<OperationRef> handle_pg_info(
OSD &osd,
ceph::net::ConnectionRef conn,
compound_state_ref state,
Ref<MOSDPGInfo> m)
{
std::vector<OperationRef> ret;
ret.reserve(m->pg_list.size());
const int from = m->get_source().num();
for (auto& pg_notify : m->pg_list) {
spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to};
logger().debug("handle_pg_info on {} from {}", pgid.pgid, from);
MInfoRec info{pg_shard_t{from, pg_notify.from},
pg_notify.info,
pg_notify.epoch_sent};
auto op = osd.get_shard_services().start_operation<PeeringSubEvent>(
state,
osd,
conn,
osd.get_shard_services(),
pg_shard_t(from, pg_notify.from),
pgid,
pg_notify.epoch_sent,
pg_notify.query_epoch,
std::move(info)).first;
ret.push_back(op);
}
return ret;
}

class QuerySubEvent : public PeeringSubEvent {
public:
template <typename... Args>
Expand All @@ -185,7 +119,7 @@ class QuerySubEvent : public PeeringSubEvent {
void on_pg_absent() final {
logger().debug("handle_pg_query on absent pg {} from {}", pgid, from);
pg_info_t empty(pgid);
ctx.notify_list[from.osd].emplace_back(
ctx.send_notify(from.osd,
pg_notify_t(
from.shard, pgid.shard,
evt.get_epoch_sent(),
Expand Down Expand Up @@ -275,18 +209,6 @@ seastar::future<> CompoundPeeringRequest::start()
conn,
state,
boost::static_pointer_cast<MOSDPGCreate2>(m));
case MSG_OSD_PG_NOTIFY:
return handle_pg_notify(
osd,
conn,
state,
boost::static_pointer_cast<MOSDPGNotify>(m));
case MSG_OSD_PG_INFO:
return handle_pg_info(
osd,
conn,
state,
boost::static_pointer_cast<MOSDPGInfo>(m));
case MSG_OSD_PG_QUERY:
return handle_pg_query(
osd,
Expand Down
1 change: 1 addition & 0 deletions src/crimson/osd/osd_operations/peering_event.h
Expand Up @@ -64,6 +64,7 @@ class PeeringEvent : public OperationT<PeeringEvent> {
ShardServices &shard_services, const pg_shard_t &from, const spg_t &pgid,
Args&&... args) :
shard_services(shard_services),
ctx{ceph_release_t::octopus},
from(from),
pgid(pgid),
evt(std::forward<Args>(args)...)
Expand Down
4 changes: 0 additions & 4 deletions src/crimson/osd/osd_operations/pg_advance_map.cc
Expand Up @@ -18,10 +18,6 @@ namespace {

namespace ceph::osd {

PGAdvanceMap::PGAdvanceMap(
OSD &osd, Ref<PG> pg, epoch_t from, epoch_t to)
: osd(osd), pg(pg), from(from), to(to), do_init(false) {}

PGAdvanceMap::PGAdvanceMap(
OSD &osd, Ref<PG> pg, epoch_t from, epoch_t to,
PeeringCtx &&rctx, bool do_init)
Expand Down
2 changes: 0 additions & 2 deletions src/crimson/osd/osd_operations/pg_advance_map.h
Expand Up @@ -33,8 +33,6 @@ class PGAdvanceMap : public OperationT<PGAdvanceMap> {
const bool do_init;

public:
PGAdvanceMap(
OSD &osd, Ref<PG> pg, epoch_t from, epoch_t to);
PGAdvanceMap(
OSD &osd, Ref<PG> pg, epoch_t from, epoch_t to,
PeeringCtx &&rctx, bool do_init);
Expand Down
37 changes: 9 additions & 28 deletions src/crimson/osd/shard_services.cc
Expand Up @@ -77,34 +77,15 @@ seastar::future<> ShardServices::dispatch_context_transaction(
seastar::future<> ShardServices::dispatch_context_messages(
BufferedRecoveryMessages &&ctx)
{
auto ret = seastar::when_all_succeed(
seastar::parallel_for_each(std::move(ctx.notify_list),
[this](auto& osd_notifies) {
auto& [peer, notifies] = osd_notifies;
auto m = make_message<MOSDPGNotify>(osdmap->get_epoch(),
std::move(notifies));
logger().debug("dispatch_context_messages sending notify to {}", peer);
return send_to_osd(peer, m, osdmap->get_epoch());
}),
seastar::parallel_for_each(std::move(ctx.query_map),
[this](auto& osd_queries) {
auto& [peer, queries] = osd_queries;
auto m = make_message<MOSDPGQuery>(osdmap->get_epoch(),
std::move(queries));
logger().debug("dispatch_context_messages sending query to {}", peer);
return send_to_osd(peer, m, osdmap->get_epoch());
}),
seastar::parallel_for_each(std::move(ctx.info_map),
[this](auto& osd_infos) {
auto& [peer, infos] = osd_infos;
auto m = make_message<MOSDPGInfo>(osdmap->get_epoch(),
std::move(infos));
logger().debug("dispatch_context_messages sending info to {}", peer);
return send_to_osd(peer, m, osdmap->get_epoch());
}));
ctx.notify_list.clear();
ctx.query_map.clear();
ctx.info_map.clear();
auto ret = seastar::parallel_for_each(std::move(ctx.message_map),
[this](auto& osd_messages) {
auto& [peer, messages] = osd_messages;
logger().debug("dispatch_context_messages sending messages to {}", peer);
return seastar::parallel_for_each(std::move(messages), [=](auto& m) {
return send_to_osd(peer, m, osdmap->get_epoch());
});
});
ctx.message_map.clear();
return ret;
}

Expand Down

0 comments on commit d42c186

Please sign in to comment.