Skip to content

Commit

Permalink
osd/scrub: tag replica scrub messages to identify stale events
Browse files Browse the repository at this point in the history
A lot can happen while a replica is waiting for the backend
to collect the map data. The Primary might, for example, abort
the scrub and start a new one (following no-scrub commands).

The 'token' introduced here tags 'replica scrub resched'
messages with an index value that is modified on each 'release
scrub resources' request from the Primary.

Fixes: https://tracker.ceph.com/issues/52012
Signed-off-by: Ronen Friedman <rfriedma@redhat.com>
  • Loading branch information
ronen-fr committed Aug 16, 2021
1 parent 8e064be commit e4e211b
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 41 deletions.
22 changes: 14 additions & 8 deletions src/osd/OSD.cc
Expand Up @@ -1747,19 +1747,22 @@ void OSDService::queue_for_snap_trim(PG *pg)
template <class MSG_TYPE>
void OSDService::queue_scrub_event_msg(PG* pg,
Scrub::scrub_prio_t with_priority,
unsigned int qu_priority)
unsigned int qu_priority,
Scrub::act_token_t act_token)
{
const auto epoch = pg->get_osdmap_epoch();
auto msg = new MSG_TYPE(pg->get_pgid(), epoch);
dout(15) << "queue a scrub event (" << *msg << ") for " << *pg << ". Epoch: " << epoch << dendl;
auto msg = new MSG_TYPE(pg->get_pgid(), epoch, act_token);
dout(15) << "queue a scrub event (" << *msg << ") for " << *pg
<< ". Epoch: " << epoch << " token: " << act_token << dendl;

enqueue_back(OpSchedulerItem(
unique_ptr<OpSchedulerItem::OpQueueable>(msg), cct->_conf->osd_scrub_cost,
pg->scrub_requeue_priority(with_priority, qu_priority), ceph_clock_now(), 0, epoch));
}

template <class MSG_TYPE>
void OSDService::queue_scrub_event_msg(PG* pg, Scrub::scrub_prio_t with_priority)
void OSDService::queue_scrub_event_msg(PG* pg,
Scrub::scrub_prio_t with_priority)
{
const auto epoch = pg->get_osdmap_epoch();
auto msg = new MSG_TYPE(pg->get_pgid(), epoch);
Expand All @@ -1782,17 +1785,20 @@ void OSDService::queue_scrub_after_repair(PG* pg, Scrub::scrub_prio_t with_prior

void OSDService::queue_for_rep_scrub(PG* pg,
Scrub::scrub_prio_t with_priority,
unsigned int qu_priority)
unsigned int qu_priority,
Scrub::act_token_t act_token)
{
queue_scrub_event_msg<PGRepScrub>(pg, with_priority, qu_priority);
queue_scrub_event_msg<PGRepScrub>(pg, with_priority, qu_priority, act_token);
}

void OSDService::queue_for_rep_scrub_resched(PG* pg,
Scrub::scrub_prio_t with_priority,
unsigned int qu_priority)
unsigned int qu_priority,
Scrub::act_token_t act_token)
{
// Resulting scrub event: 'SchedReplica'
queue_scrub_event_msg<PGRepScrubResched>(pg, with_priority, qu_priority);
queue_scrub_event_msg<PGRepScrubResched>(pg, with_priority, qu_priority,
act_token);
}

void OSDService::queue_for_scrub_granted(PG* pg, Scrub::scrub_prio_t with_priority)
Expand Down
14 changes: 9 additions & 5 deletions src/osd/OSD.h
Expand Up @@ -638,16 +638,19 @@ class OSDService {

void queue_for_rep_scrub(PG* pg,
Scrub::scrub_prio_t with_high_priority,
unsigned int qu_priority);
unsigned int qu_priority,
Scrub::act_token_t act_token);

/// Signals a change in the number of in-flight recovery writes
void queue_scrub_replica_pushes(PG *pg, Scrub::scrub_prio_t with_priority);

/// (not in Crimson) Queue a SchedReplica event to be sent to the replica, to trigger
/// a re-check of the availability of the scrub map prepared by the backend.
/// (not in Crimson) Queue a SchedReplica event to be sent to the replica, to
/// trigger a re-check of the availability of the scrub map prepared by the
/// backend.
void queue_for_rep_scrub_resched(PG* pg,
Scrub::scrub_prio_t with_high_priority,
unsigned int qu_priority);
unsigned int qu_priority,
Scrub::act_token_t act_token);

void queue_for_pg_delete(spg_t pgid, epoch_t e);
bool try_finish_pg_delete(PG *pg, unsigned old_pg_num);
Expand All @@ -661,7 +664,8 @@ class OSDService {
template <class MSG_TYPE>
void queue_scrub_event_msg(PG* pg,
Scrub::scrub_prio_t with_priority,
unsigned int qu_priority);
unsigned int qu_priority,
Scrub::act_token_t act_token);

/// An alternative version of queue_scrub_event_msg(), in which the queuing priority is
/// provided by the executing scrub (i.e. taken from PgScrubber::m_flags)
Expand Down
21 changes: 20 additions & 1 deletion src/osd/PG.cc
Expand Up @@ -2079,6 +2079,23 @@ void PG::forward_scrub_event(ScrubAPI fn, epoch_t epoch_queued, std::string_view
}
}

void PG::forward_scrub_event(ScrubSafeAPI fn,
epoch_t epoch_queued,
Scrub::act_token_t act_token,
std::string_view desc)
{
dout(20) << __func__ << ": " << desc << " queued: " << epoch_queued
<< " token: " << act_token << dendl;
if (is_active() && m_scrubber) {
((*m_scrubber).*fn)(epoch_queued, act_token);
} else {
// pg might be in the process of being deleted
dout(5) << __func__ << " refusing to forward. "
<< (is_clean() ? "(clean) " : "(not clean) ")
<< (is_active() ? "(active) " : "(not active) ") << dendl;
}
}

void PG::replica_scrub(OpRequestRef op, ThreadPool::TPHandle& handle)
{
dout(10) << __func__ << " (op)" << dendl;
Expand All @@ -2087,12 +2104,14 @@ void PG::replica_scrub(OpRequestRef op, ThreadPool::TPHandle& handle)
}

void PG::replica_scrub(epoch_t epoch_queued,
Scrub::act_token_t act_token,
[[maybe_unused]] ThreadPool::TPHandle& handle)
{
dout(10) << __func__ << " queued at: " << epoch_queued
<< (is_primary() ? " (primary)" : " (replica)") << dendl;
scrub_queued = false;
forward_scrub_event(&ScrubPgIF::send_start_replica, epoch_queued, "StartReplica/nw");
forward_scrub_event(&ScrubPgIF::send_start_replica, epoch_queued, act_token,
"StartReplica/nw");
}

bool PG::ops_blocked_by_scrub() const
Expand Down
19 changes: 15 additions & 4 deletions src/osd/PG.h
Expand Up @@ -394,12 +394,17 @@ class PG : public DoutPrefixProvider, public PeeringState::PeeringListener {
"AfterRepairScrub");
}

void replica_scrub(epoch_t queued, ThreadPool::TPHandle &handle);
void replica_scrub(epoch_t queued,
Scrub::act_token_t act_token,
ThreadPool::TPHandle& handle);

void replica_scrub_resched(epoch_t queued, ThreadPool::TPHandle& handle)
void replica_scrub_resched(epoch_t queued,
Scrub::act_token_t act_token,
ThreadPool::TPHandle& handle)
{
scrub_queued = false;
forward_scrub_event(&ScrubPgIF::send_sched_replica, queued, "SchedReplica");
forward_scrub_event(&ScrubPgIF::send_sched_replica, queued, act_token,
"SchedReplica");
}

void scrub_send_resources_granted(epoch_t queued, ThreadPool::TPHandle& handle)
Expand Down Expand Up @@ -651,8 +656,14 @@ class PG : public DoutPrefixProvider, public PeeringState::PeeringListener {
requested_scrub_t& planned) const;

using ScrubAPI = void (ScrubPgIF::*)(epoch_t epoch_queued);

void forward_scrub_event(ScrubAPI fn, epoch_t epoch_queued, std::string_view desc);
// and for events that carry a meaningful 'activation token'
using ScrubSafeAPI = void (ScrubPgIF::*)(epoch_t epoch_queued,
Scrub::act_token_t act_token);
void forward_scrub_event(ScrubSafeAPI fn,
epoch_t epoch_queued,
Scrub::act_token_t act_token,
std::string_view desc);

public:
virtual void do_request(
Expand Down
70 changes: 58 additions & 12 deletions src/osd/pg_scrubber.cc
Expand Up @@ -221,15 +221,17 @@ void PgScrubber::send_scrub_resched(epoch_t epoch_queued)
dout(10) << "scrubber event --<< " << __func__ << dendl;
}

void PgScrubber::send_start_replica(epoch_t epoch_queued)
void PgScrubber::send_start_replica(epoch_t epoch_queued, Scrub::act_token_t token)
{
dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued
<< " token: " << token << dendl;
if (is_primary()) {
// shouldn't happen. Ignore
dout(1) << "got a replica scrub request while Primary!" << dendl;
return;
}
if (check_interval(epoch_queued)) {

if (check_interval(epoch_queued) && is_token_current(token)) {
m_fsm->my_states();
// save us some time by not waiting for updates if there are none
// to wait for. Affects the transition from NotActive into either
Expand All @@ -242,10 +244,11 @@ void PgScrubber::send_start_replica(epoch_t epoch_queued)
dout(10) << "scrubber event --<< " << __func__ << dendl;
}

void PgScrubber::send_sched_replica(epoch_t epoch_queued)
void PgScrubber::send_sched_replica(epoch_t epoch_queued, Scrub::act_token_t token)
{
dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
if (check_interval(epoch_queued)) {
dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued
<< " token: " << token << dendl;
if (check_interval(epoch_queued) && is_token_current(token)) {
m_fsm->my_states();
m_fsm->process_event(SchedReplica{}); // retest for map availability
}
Expand Down Expand Up @@ -491,9 +494,8 @@ void PgScrubber::reg_next_scrub(const requested_scrub_t& request_flags)

void PgScrubber::unreg_next_scrub()
{
dout(10) << __func__ << " existing-" << m_scrub_reg_stamp << ". was registered? "
<< is_scrub_registered() << dendl;
if (is_scrub_registered()) {
dout(15) << __func__ << " existing-" << m_scrub_reg_stamp << dendl;
m_osds->unreg_pg_scrub(m_pg->info.pgid, m_scrub_reg_stamp);
m_scrub_reg_stamp = utime_t{};
}
Expand Down Expand Up @@ -1034,7 +1036,8 @@ int PgScrubber::build_primary_map_chunk()
int PgScrubber::build_replica_map_chunk()
{
dout(10) << __func__ << " interval start: " << m_interval_start
<< " epoch: " << m_epoch_start << " deep: " << m_is_deep << dendl;
<< " current token: " << m_current_token << " epoch: " << m_epoch_start
<< " deep: " << m_is_deep << dendl;

auto ret = build_scrub_map_chunk(replica_scrubmap, replica_scrubmap_pos, m_start, m_end,
m_is_deep);
Expand All @@ -1046,7 +1049,7 @@ int PgScrubber::build_replica_map_chunk()
// (note: previous version used low priority here. Now switched to using the
// priority of the original message)
m_osds->queue_for_rep_scrub_resched(m_pg, m_replica_request_priority,
m_flags.priority);
m_flags.priority, m_current_token);
break;

case 0: {
Expand Down Expand Up @@ -1239,7 +1242,8 @@ void PgScrubber::replica_scrub_op(OpRequestRef op)
// make sure the FSM is at NotActive
m_fsm->assert_not_active();

m_osds->queue_for_rep_scrub(m_pg, m_replica_request_priority, m_flags.priority);
m_osds->queue_for_rep_scrub(m_pg, m_replica_request_priority, m_flags.priority,
m_current_token);
}

void PgScrubber::set_op_parameters(requested_scrub_t& request)
Expand Down Expand Up @@ -1471,12 +1475,15 @@ void PgScrubber::handle_scrub_reserve_request(OpRequestRef op)
* otherwise the interval would have changed.
* Ostensibly we can discard & redo the reservation. But then we
* will be temporarily releasing the OSD resource - and might not be able to grab it
* again. Thus we simple treat this as a successful new request.
* again. Thus, we simply treat this as a successful new request
* (but mark the fact that if there is a previous request from the primary to
* scrub a specific chunk - that request is now defunct).
*/

if (m_remote_osd_resource.has_value() && m_remote_osd_resource->is_stale()) {
// we are holding a stale reservation from a past epoch
m_remote_osd_resource.reset();
dout(10) << __func__ << " stale reservation request" << dendl;
}

if (request_ep < m_pg->get_same_interval_since()) {
Expand All @@ -1488,6 +1495,15 @@ void PgScrubber::handle_scrub_reserve_request(OpRequestRef op)
if (m_remote_osd_resource.has_value()) {

dout(10) << __func__ << " already reserved." << dendl;

/*
* it might well be that we did not yet finish handling the latest scrub-op from
* our primary. This happens, for example, if 'noscrub' was set via a command, then
* reset. The primary in this scenario will remain in the same interval, but we do need
* to reset our internal state (otherwise - the first renewed 'give me your scrub map'
* from the primary will see us in active state, crashing the OSD).
*/
advance_token();
granted = true;

} else if (m_pg->cct->_conf->osd_scrub_during_recovery ||
Expand Down Expand Up @@ -1539,6 +1555,12 @@ void PgScrubber::handle_scrub_reserve_release(OpRequestRef op)
{
dout(10) << __func__ << " " << *op->get_req() << dendl;
op->mark_started();

/*
* this specific scrub session has terminated. All incoming events carrying the old
* tag will be discarded.
*/
advance_token();
m_remote_osd_resource.reset();
}

Expand Down Expand Up @@ -2010,6 +2032,30 @@ void PgScrubber::reset_internal_state()
m_active = false;
}

// note that only applicable to the Replica:
void PgScrubber::advance_token()
{
dout(10) << __func__ << " was: " << m_current_token << dendl;
m_current_token++;

// when advance_token() is called, it is assumed that no scrubbing takes place.
// We will, though, verify that. And if we are actually still handling a stale request -
// both our internal state and the FSM state will be cleared.
replica_handling_done();
m_fsm->process_event(FullReset{});
}

bool PgScrubber::is_token_current(Scrub::act_token_t received_token)
{
if (received_token == 0 || received_token == m_current_token) {
return true;
}
dout(5) << __func__ << " obsolete token (" << received_token
<< " vs current " << m_current_token << dendl;

return false;
}

const OSDMapRef& PgScrubber::get_osdmap() const
{
return m_pg->get_osdmap();
Expand Down
22 changes: 20 additions & 2 deletions src/osd/pg_scrubber.h
Expand Up @@ -209,9 +209,9 @@ class PgScrubber : public ScrubPgIF, public ScrubMachineListener {

void send_replica_maps_ready(epoch_t epoch_queued) final;

void send_start_replica(epoch_t epoch_queued) final;
void send_start_replica(epoch_t epoch_queued, Scrub::act_token_t token) final;

void send_sched_replica(epoch_t epoch_queued) final;
void send_sched_replica(epoch_t epoch_queued, Scrub::act_token_t token) final;

void send_replica_pushes_upd(epoch_t epoch_queued) final;

Expand Down Expand Up @@ -441,6 +441,14 @@ class PgScrubber : public ScrubPgIF, public ScrubMachineListener {
private:
void reset_internal_state();

/**
* the current scrubbing operation is done. We should mark that fact, so that
* all events related to the previous operation can be discarded.
*/
void advance_token();

bool is_token_current(Scrub::act_token_t received_token);

void requeue_waiting() const { m_pg->requeue_ops(m_pg->waiting_for_scrub); }

void _scan_snaps(ScrubMap& smap);
Expand Down Expand Up @@ -560,6 +568,16 @@ class PgScrubber : public ScrubPgIF, public ScrubMachineListener {
* discarded.
*/
epoch_t m_epoch_start{0}; ///< the actual epoch when scrubbing started

/**
* (replica) a tag identifying a specific scrub "session". Incremented whenever the
* Primary releases the replica scrub resources.
* When the scrub session is terminated (even if the interval remains unchanged, as
* might happen following an asok no-scrub command), stale scrub-resched messages
* triggered by the backend will be discarded.
*/
Scrub::act_token_t m_current_token{1};

scrub_flags_t m_flags;

bool m_active{false};
Expand Down
4 changes: 2 additions & 2 deletions src/osd/scheduler/OpSchedulerItem.cc
Expand Up @@ -159,7 +159,7 @@ void PGScrubMapsCompared::run(OSD* osd,

void PGRepScrub::run(OSD* osd, OSDShard* sdata, PGRef& pg, ThreadPool::TPHandle& handle)
{
pg->replica_scrub(epoch_queued, handle);
pg->replica_scrub(epoch_queued, activation_index, handle);
pg->unlock();
}

Expand All @@ -168,7 +168,7 @@ void PGRepScrubResched::run(OSD* osd,
PGRef& pg,
ThreadPool::TPHandle& handle)
{
pg->replica_scrub_resched(epoch_queued, handle);
pg->replica_scrub_resched(epoch_queued, activation_index, handle);
pg->unlock();
}

Expand Down

0 comments on commit e4e211b

Please sign in to comment.