diff --git a/doc/rados/operations/control.rst b/doc/rados/operations/control.rst index d78f70d585947..99d315295bc25 100644 --- a/doc/rados/operations/control.rst +++ b/doc/rados/operations/control.rst @@ -255,8 +255,6 @@ Changes a pool setting. :: Valid fields are: * ``size``: Sets the number of copies of data in the pool. - * ``crash_replay_interval``: The number of seconds to allow - clients to replay acknowledged but uncommited requests. * ``pg_num``: The placement group number. * ``pgp_num``: Effective number when calculating pg placement. * ``crush_ruleset``: rule number for mapping placement. diff --git a/doc/rados/operations/pools.rst b/doc/rados/operations/pools.rst index 4bdaaa34d6126..7b015ede4783a 100644 --- a/doc/rados/operations/pools.rst +++ b/doc/rados/operations/pools.rst @@ -275,15 +275,6 @@ You may set values for the following keys: :Type: Integer :Version: ``0.54`` and above -.. _crash_replay_interval: - -``crash_replay_interval`` - -:Description: The number of seconds to allow clients to replay acknowledged, - but uncommitted requests. - -:Type: Integer - .. _pg_num: ``pg_num`` @@ -573,12 +564,6 @@ You may get values for the following keys: :Type: Integer :Version: ``0.54`` and above -``crash_replay_interval`` - -:Description: see crash_replay_interval_ - -:Type: Integer - ``pg_num`` :Description: see pg_num_ diff --git a/qa/workunits/cephtool/test.sh b/qa/workunits/cephtool/test.sh index 27f5ecbc1c5c9..24fec8d7a57db 100755 --- a/qa/workunits/cephtool/test.sh +++ b/qa/workunits/cephtool/test.sh @@ -801,11 +801,6 @@ function test_mon_mds() # the "current_epoch + 1" checking below if they're generating updates fail_all_mds $FS_NAME - # Check for default crash_replay_interval set automatically in 'fs new' - #This may vary based on ceph.conf (e.g., it's 5 in teuthology runs) - #ceph osd dump | grep fs_data > $TMPFILE - #check_response "crash_replay_interval 45 " - ceph mds compat show expect_false ceph mds deactivate 2 ceph mds dump @@ -1441,7 +1436,7 @@ function test_mon_osd_pool_set() wait_for_clean ceph osd pool get $TEST_POOL_GETSET all - for s in pg_num pgp_num size min_size crash_replay_interval crush_rule crush_ruleset; do + for s in pg_num pgp_num size min_size crush_rule crush_ruleset; do ceph osd pool get $TEST_POOL_GETSET $s done diff --git a/qa/workunits/rest/test.py b/qa/workunits/rest/test.py index a6a52fc587467..7f10c9cf66853 100755 --- a/qa/workunits/rest/test.py +++ b/qa/workunits/rest/test.py @@ -398,7 +398,7 @@ def expect_nofail(url, method, respcode, contenttype, extra_hdrs=None, expect('osd/reweight?id=0&weight=-1', 'PUT', 400, '') expect('osd/reweight?id=0&weight=1', 'PUT', 200, '') - for v in ['pg_num', 'pgp_num', 'size', 'min_size', 'crash_replay_interval', + for v in ['pg_num', 'pgp_num', 'size', 'min_size', 'crush_ruleset']: r = expect('osd/pool/get.json?pool=rbd&var=' + v, 'GET', 200, 'json') assert(v in r.myjson['output']) diff --git a/src/mon/MDSMonitor.cc b/src/mon/MDSMonitor.cc index 43a0bb93b2409..c19d7a70d8b92 100644 --- a/src/mon/MDSMonitor.cc +++ b/src/mon/MDSMonitor.cc @@ -1643,22 +1643,6 @@ int MDSMonitor::management_command( return r; } - // Automatically set crash_replay_interval on data pool if it - // isn't already set. - if (data_pool->get_crash_replay_interval() == 0) { - // We will be changing osdmon's state and requesting the osdmon to - // propose. We thus need to make sure the osdmon is writeable before - // we do this, waiting if it's not. - if (!mon->osdmon()->is_writeable()) { - mon->osdmon()->wait_for_writeable(op, new C_RetryMessage(this, op)); - return -EAGAIN; - } - - r = mon->osdmon()->set_crash_replay_interval(data, g_conf->osd_default_data_pool_replay_window); - assert(r == 0); // We just did get_pg_pool so it must exist and be settable - request_proposal(mon->osdmon()); - } - // All checks passed, go ahead and create. create_new_fs(pending_fsmap, fs_name, metadata, data); ss << "new fs with metadata pool " << metadata << " and data pool " << data; diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc index 8c6c5fb0fa828..9c62b404648f6 100644 --- a/src/mon/OSDMonitor.cc +++ b/src/mon/OSDMonitor.cc @@ -5125,34 +5125,6 @@ int OSDMonitor::parse_osd_id(const char *s, stringstream *pss) } -/** - * Special setter for crash_replay_interval on a pool. Equivalent to - * using prepare_command_pool_set, but in a form convenient for use - * from MDSMonitor rather than from an administrative command. - */ -int OSDMonitor::set_crash_replay_interval(const int64_t pool_id, const uint32_t cri) -{ - pg_pool_t p; - if (pending_inc.new_pools.count(pool_id)) { - p = pending_inc.new_pools[pool_id]; - } else { - const pg_pool_t *p_ptr = osdmap.get_pg_pool(pool_id); - if (p_ptr == NULL) { - return -ENOENT; - } else { - p = *p_ptr; - } - } - - dout(10) << "Set pool " << pool_id << " crash_replay_interval=" << cri << dendl; - p.crash_replay_interval = cri; - p.last_change = pending_inc.epoch; - pending_inc.new_pools[pool_id] = p; - - return 0; -} - - int OSDMonitor::prepare_command_pool_set(map &cmdmap, stringstream& ss) { diff --git a/src/mon/OSDMonitor.h b/src/mon/OSDMonitor.h index 103806175242a..5b354bca8fde6 100644 --- a/src/mon/OSDMonitor.h +++ b/src/mon/OSDMonitor.h @@ -411,7 +411,6 @@ class OSDMonitor : public PaxosService { bool prepare_command(MonOpRequestRef op); bool prepare_command_impl(MonOpRequestRef op, map& cmdmap); - int set_crash_replay_interval(const int64_t pool_id, const uint32_t cri); int prepare_command_pool_set(map &cmdmap, stringstream& ss); diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 9c795aff7a4ab..70f39fde5b499 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -1716,7 +1716,6 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_, cct->_conf->osd_command_thread_timeout, cct->_conf->osd_command_thread_suicide_timeout, &command_tp), - replay_queue_lock("OSD::replay_queue_lock"), remove_wq( cct, store, @@ -2521,8 +2520,6 @@ void OSD::create_logger() "Client write operations"); // client writes osd_plb.add_u64_counter(l_osd_op_w_inb, "op_w_in_bytes", "Client data written"); // client write in bytes - osd_plb.add_time_avg(l_osd_op_w_rlat, "op_w_rlat", - "Client write operation readable/applied latency"); // client write readable/applied latency osd_plb.add_time_avg(l_osd_op_w_lat, "op_w_latency", "Latency of write operation (including queue time)"); // client write latency osd_plb.add_time_avg(l_osd_op_w_process_lat, "op_w_process_latency", @@ -2535,8 +2532,6 @@ void OSD::create_logger() "Client read-modify-write operations write in"); // client rmw in bytes osd_plb.add_u64_counter(l_osd_op_rw_outb,"op_rw_out_bytes", "Client read-modify-write operations read out "); // client rmw out bytes - osd_plb.add_time_avg(l_osd_op_rw_rlat,"op_rw_rlat", - "Client read-modify-write operation readable/applied latency"); // client rmw readable/applied latency osd_plb.add_time_avg(l_osd_op_rw_lat, "op_rw_latency", "Latency of read-modify-write operation (including queue time)"); // client rmw latency osd_plb.add_time_avg(l_osd_op_rw_process_lat, "op_rw_process_latency", @@ -4389,10 +4384,6 @@ void OSD::tick() start_boot(); } - if (is_active()) { - check_replay_queue(); - } - do_waiters(); tick_timer.add_event_after(OSD_TICK_INTERVAL, new C_Tick(this)); @@ -8452,44 +8443,6 @@ void OSD::_remove_pg(PG *pg) // ========================================================= // RECOVERY -/* - * caller holds osd_lock - */ -void OSD::check_replay_queue() -{ - assert(osd_lock.is_locked()); - - utime_t now = ceph_clock_now(); - list< pair > pgids; - replay_queue_lock.Lock(); - while (!replay_queue.empty() && - replay_queue.front().second <= now) { - pgids.push_back(replay_queue.front()); - replay_queue.pop_front(); - } - replay_queue_lock.Unlock(); - - for (list< pair >::iterator p = pgids.begin(); p != pgids.end(); ++p) { - spg_t pgid = p->first; - pg_map_lock.get_read(); - if (pg_map.count(pgid)) { - PG *pg = _lookup_lock_pg_with_map_lock_held(pgid); - pg_map_lock.unlock(); - dout(10) << "check_replay_queue " << *pg << dendl; - if ((pg->is_active() || pg->is_activating()) && - pg->is_replay() && - pg->is_primary() && - pg->replay_until == p->second) { - pg->replay_queued_ops(); - } - pg->unlock(); - } else { - pg_map_lock.unlock(); - dout(10) << "check_replay_queue pgid " << pgid << " (not found)" << dendl; - } - } -} - void OSDService::_maybe_queue_recovery() { assert(recovery_lock.is_locked_by_me()); uint64_t available_pushes; @@ -8864,10 +8817,8 @@ void OSD::handle_replica_op(OpRequestRef& op, OSDMapRef& osdmap) bool OSD::op_is_discardable(MOSDOp *op) { // drop client request if they are not connected and can't get the - // reply anyway. unless this is a replayed op, in which case we - // want to do what we can to apply it. - if (!op->get_connection()->is_connected() && - op->get_version().version == 0) { + // reply anyway. + if (!op->get_connection()->is_connected()) { return true; } return false; @@ -9337,7 +9288,7 @@ int OSD::init_op_flags(OpRequestRef& op) iter->op.watch.op == CEPH_OSD_WATCH_OP_PING)) { /* This a bit odd. PING isn't actually a write. It can't * result in an update to the object_info. PINGs also aren'ty - * replayed, so there's no reason to write out a log entry + * resent, so there's no reason to write out a log entry * * However, we pipeline them behind writes, so let's force * the write_ordered flag. diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 7e089c9868be8..d404d0254e5c1 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -73,14 +73,12 @@ enum { l_osd_op_r_prepare_lat, l_osd_op_w, l_osd_op_w_inb, - l_osd_op_w_rlat, l_osd_op_w_lat, l_osd_op_w_process_lat, l_osd_op_w_prepare_lat, l_osd_op_rw, l_osd_op_rw_inb, l_osd_op_rw_outb, - l_osd_op_rw_rlat, l_osd_op_rw_lat, l_osd_op_rw_process_lat, l_osd_op_rw_prepare_lat, @@ -975,7 +973,7 @@ class OSDService { } } } - // replay / delayed pg activation + // delayed pg activation void queue_for_recovery(PG *pg, bool front = false) { Mutex::Locker l(recovery_lock); if (front) { @@ -2313,10 +2311,6 @@ class OSD : public Dispatcher, void do_recovery(PG *pg, epoch_t epoch_queued, uint64_t pushes_reserved, ThreadPool::TPHandle &handle); - Mutex replay_queue_lock; - list< pair > replay_queue; - - void check_replay_queue(); // -- scrubbing -- void sched_scrub(); diff --git a/src/osd/PG.cc b/src/osd/PG.cc index fd9a45fc0c75d..b63d1d789f581 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -1551,24 +1551,6 @@ void PG::activate(ObjectStore::Transaction& t, assert(scrubber.callbacks.empty()); assert(callbacks_for_degraded_object.empty()); - // -- crash recovery? - if (acting.size() >= pool.info.min_size && - is_primary() && - pool.info.crash_replay_interval > 0 && - may_need_replay(get_osdmap())) { - replay_until = ceph_clock_now(); - replay_until += pool.info.crash_replay_interval; - dout(10) << "activate starting replay interval for " << pool.info.crash_replay_interval - << " until " << replay_until << dendl; - state_set(PG_STATE_REPLAY); - - // TODOSAM: osd->osd-> is no good - osd->osd->replay_queue_lock.Lock(); - osd->osd->replay_queue.push_back(pair( - info.pgid, replay_until)); - osd->osd->replay_queue_lock.Unlock(); - } - // twiddle pg state state_clear(PG_STATE_DOWN); @@ -1939,41 +1921,6 @@ void PG::queue_op(OpRequestRef& op) } } -void PG::replay_queued_ops() -{ - assert(is_replay()); - assert(is_active() || is_activating()); - eversion_t c = info.last_update; - list replay; - dout(10) << "replay_queued_ops" << dendl; - state_clear(PG_STATE_REPLAY); - - for (map::iterator p = replay_queue.begin(); - p != replay_queue.end(); - ++p) { - if (p->first.version != c.version+1) { - dout(10) << "activate replay " << p->first - << " skipping " << c.version+1 - p->first.version - << " ops" - << dendl; - c = p->first; - } - dout(10) << "activate replay " << p->first << " " - << *p->second->get_req() << dendl; - replay.push_back(p->second); - } - replay_queue.clear(); - if (is_active()) { - requeue_ops(replay); - requeue_ops(waiting_for_active); - assert(waiting_for_peered.empty()); - } else { - waiting_for_active.splice(waiting_for_active.begin(), replay); - } - - publish_stats_to_osd(); -} - void PG::_activate_committed(epoch_t epoch, epoch_t activation_epoch) { lock(); @@ -2259,34 +2206,14 @@ void PG::finish_recovery_op(const hobject_t& soid, bool dequeue) } } -static void split_replay_queue( - map *from, - map *to, - unsigned match, - unsigned bits) -{ - for (map::iterator i = from->begin(); - i != from->end(); - ) { - if (OSD::split_request(i->second, match, bits)) { - to->insert(*i); - from->erase(i++); - } else { - ++i; - } - } -} - void PG::split_ops(PG *child, unsigned split_bits) { unsigned match = child->info.pgid.ps(); assert(waiting_for_all_missing.empty()); assert(waiting_for_cache_not_full.empty()); assert(waiting_for_unreadable_object.empty()); assert(waiting_for_degraded_object.empty()); - assert(waiting_for_ack.empty()); assert(waiting_for_ondisk.empty()); assert(waiting_for_active.empty()); - split_replay_queue(&replay_queue, &(child->replay_queue), match, split_bits); osd->dequeue_pg(this, &waiting_for_peered); @@ -2507,13 +2434,13 @@ void PG::update_heartbeat_peers() bool PG::check_in_progress_op( const osd_reqid_t &r, - eversion_t *replay_version, + eversion_t *version, version_t *user_version, int *return_code) const { return ( - projected_log.get_request(r, replay_version, user_version, return_code) || - pg_log.get_log().get_request(r, replay_version, user_version, return_code)); + projected_log.get_request(r, version, user_version, return_code) || + pg_log.get_log().get_request(r, version, user_version, return_code)); } void PG::_update_calc_stats() @@ -4857,86 +4784,6 @@ void PG::fulfill_log( osd->send_message_osd_cluster(mlog, con.get()); } - -// true if all OSDs in prior intervals may have crashed, and we need to replay -// false positives are okay, false negatives are not. -bool PG::may_need_replay(const OSDMapRef osdmap) const -{ - bool crashed = false; - - for (map::const_reverse_iterator p = past_intervals.rbegin(); - p != past_intervals.rend(); - ++p) { - const pg_interval_t &interval = p->second; - dout(10) << "may_need_replay " << interval << dendl; - - if (interval.last < info.history.last_epoch_started) - break; // we don't care - - if (interval.acting.empty()) - continue; - - if (!interval.maybe_went_rw) - continue; - - // look at whether any of the osds during this interval survived - // past the end of the interval (i.e., didn't crash and - // potentially fail to COMMIT a write that it ACKed). - bool any_survived_interval = false; - - // consider ACTING osds - for (unsigned i=0; iexists(o)) - pinfo = &osdmap->get_info(o); - - // does this osd appear to have survived through the end of the - // interval? - if (pinfo) { - if (pinfo->up_from <= interval.first && pinfo->up_thru > interval.last) { - dout(10) << "may_need_replay osd." << o - << " up_from " << pinfo->up_from << " up_thru " << pinfo->up_thru - << " survived the interval" << dendl; - any_survived_interval = true; - } - else if (pinfo->up_from <= interval.first && - (std::find(acting.begin(), acting.end(), o) != acting.end() || - std::find(up.begin(), up.end(), o) != up.end())) { - dout(10) << "may_need_replay osd." << o - << " up_from " << pinfo->up_from << " and is in acting|up," - << " assumed to have survived the interval" << dendl; - // (if it hasn't, we will rebuild PriorSet) - any_survived_interval = true; - } - else if (pinfo->up_from > interval.last && - pinfo->last_clean_begin <= interval.first && - pinfo->last_clean_end > interval.last) { - dout(10) << "may_need_replay prior osd." << o - << " up_from " << pinfo->up_from - << " and last clean interval [" - << pinfo->last_clean_begin << "," << pinfo->last_clean_end - << ") survived the interval" << dendl; - any_survived_interval = true; - } - } - } - - if (!any_survived_interval) { - dout(3) << "may_need_replay no known survivors of interval " - << interval.first << "-" << interval.last - << ", may need replay" << dendl; - crashed = true; - break; - } - } - - return crashed; -} - void PG::check_full_transition(OSDMapRef lastmap, OSDMapRef osdmap) { bool changed = false; @@ -5197,15 +5044,6 @@ void PG::start_peering_interval( if (was_old_primary != is_primary()) { state_clear(PG_STATE_CLEAN); clear_publish_stats(); - - // take replay queue waiters - list ls; - for (map::iterator it = replay_queue.begin(); - it != replay_queue.end(); - ++it) - ls.push_back(it->second); - replay_queue.clear(); - requeue_ops(ls); } on_role_change(); @@ -5426,15 +5264,6 @@ bool PG::can_discard_op(OpRequestRef& op) return true; } - if (is_replay()) { - if (m->get_version().version > 0) { - dout(7) << " queueing replay at " << m->get_version() - << " for " << *m << dendl; - replay_queue[m->get_version()] = op; - op->mark_delayed("waiting for replay"); - return true; - } - } return false; } diff --git a/src/osd/PG.h b/src/osd/PG.h index 5860e267eeaeb..607fadad70390 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -521,8 +521,6 @@ class PG : protected DoutPrefixProvider { set recovering_oids; #endif - utime_t replay_until; - protected: int role; // 0 = primary, 1 = replica, -1=none. unsigned state; // PG_STATE_* @@ -593,8 +591,6 @@ class PG : protected DoutPrefixProvider { friend std::ostream& operator<<(std::ostream& oss, const struct PriorSet &prior); - bool may_need_replay(const OSDMapRef osdmap) const; - public: struct BufferedRecoveryMessages { @@ -858,9 +854,8 @@ class PG : protected DoutPrefixProvider { map, hobject_t::BitwiseComparator> callbacks_for_degraded_object; map > > waiting_for_ack, waiting_for_ondisk; + list > > waiting_for_ondisk; - map replay_queue; void split_ops(PG *child, unsigned split_bits); void requeue_object_waiters(map, hobject_t::BitwiseComparator>& m); @@ -1036,7 +1031,6 @@ class PG : protected DoutPrefixProvider { bool choose_acting(pg_shard_t &auth_log_shard, bool *history_les_bound); void build_might_have_unfound(); - void replay_queued_ops(); void activate( ObjectStore::Transaction& t, epoch_t activation_epoch, @@ -2172,7 +2166,6 @@ class PG : protected DoutPrefixProvider { bool is_activating() const { return state_test(PG_STATE_ACTIVATING); } bool is_peering() const { return state_test(PG_STATE_PEERING); } bool is_down() const { return state_test(PG_STATE_DOWN); } - bool is_replay() const { return state_test(PG_STATE_REPLAY); } bool is_clean() const { return state_test(PG_STATE_CLEAN); } bool is_degraded() const { return state_test(PG_STATE_DEGRADED); } bool is_undersized() const { return state_test(PG_STATE_UNDERSIZED); } @@ -2224,7 +2217,7 @@ class PG : protected DoutPrefixProvider { PGLog::IndexedLog projected_log; bool check_in_progress_op( const osd_reqid_t &r, - eversion_t *replay_version, + eversion_t *version, version_t *user_version, int *return_code) const; eversion_t projected_last_update; diff --git a/src/osd/PGLog.h b/src/osd/PGLog.h index 17fb39d405ae0..713d3d8ce3562 100644 --- a/src/osd/PGLog.h +++ b/src/osd/PGLog.h @@ -258,10 +258,10 @@ struct PGLog : DoutPrefixProvider { bool get_request( const osd_reqid_t &r, - eversion_t *replay_version, + eversion_t *version, version_t *user_version, int *return_code) const { - assert(replay_version); + assert(version); assert(user_version); assert(return_code); ceph::unordered_map::const_iterator p; @@ -270,7 +270,7 @@ struct PGLog : DoutPrefixProvider { } p = caller_ops.find(r); if (p != caller_ops.end()) { - *replay_version = p->second->version; + *version = p->second->version; *user_version = p->second->user_version; *return_code = p->second->return_code; return true; @@ -288,7 +288,7 @@ struct PGLog : DoutPrefixProvider { i != p->second->extra_reqids.end(); ++i) { if (i->first == r) { - *replay_version = p->second->version; + *version = p->second->version; *user_version = i->second; *return_code = p->second->return_code; return true; diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index 0f7c1b70035c0..7e2405b11e921 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -1637,12 +1637,6 @@ void PrimaryLogPG::do_request( op->mark_delayed("waiting for active"); return; } - if (is_replay()) { - dout(20) << " replay, waiting for active on " << op << dendl; - waiting_for_active.push_back(op); - op->mark_delayed("waiting for replay end"); - return; - } // verify client features if ((pool.info.has_tiers() || pool.info.is_tier()) && !op->has_feature(CEPH_FEATURE_OSD_CACHEPOOL)) { @@ -1955,38 +1949,27 @@ void PrimaryLogPG::do_op(OpRequestRef& op) return; } - // dup/replay? + // dup/resent? if (op->may_write() || op->may_cache()) { // warning: we will get back *a* request for this reqid, but not // necessarily the most recent. this happens with flush and // promote ops, but we can't possible have both in our log where // the original request is still not stable on disk, so for our // purposes here it doesn't matter which one we get. - eversion_t replay_version; + eversion_t version; version_t user_version; int return_code = 0; bool got = check_in_progress_op( - m->get_reqid(), &replay_version, &user_version, &return_code); + m->get_reqid(), &version, &user_version, &return_code); if (got) { dout(3) << __func__ << " dup " << m->get_reqid() - << " was " << replay_version << dendl; - if (already_complete(replay_version)) { - osd->reply_op_error(op, return_code, replay_version, user_version); + << " version " << version << dendl; + if (already_complete(version)) { + osd->reply_op_error(op, return_code, version, user_version); } else { - if (m->wants_ack()) { - if (already_ack(replay_version)) { - MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false); - reply->add_flags(CEPH_OSD_FLAG_ACK); - reply->set_reply_versions(replay_version, user_version); - osd->send_message_osd_client(reply, m->get_connection()); - } else { - dout(10) << " waiting for " << replay_version << " to ack" << dendl; - waiting_for_ack[replay_version].push_back(make_pair(op, user_version)); - } - } - dout(10) << " waiting for " << replay_version << " to commit" << dendl; + dout(10) << " waiting for " << version << " to commit" << dendl; // always queue ondisk waiters, so that we can requeue if needed - waiting_for_ondisk[replay_version].push_back(make_pair(op, user_version)); + waiting_for_ondisk[version].push_back(make_pair(op, user_version)); op->mark_delayed("waiting for ondisk"); } return; @@ -3262,39 +3245,13 @@ void PrimaryLogPG::execute_ctx(OpContext *ctx) // no need to capture PG ref, repop cancel will handle that // Can capture the ctx by pointer, it's owned by the repop - ctx->register_on_applied( - [m, ctx, this](){ - if (m && m->wants_ack() && !ctx->sent_ack && !ctx->sent_disk) { - // send ack - MOSDOpReply *reply = ctx->reply; - if (reply) - ctx->reply = NULL; - else { - reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true); - reply->set_reply_versions(ctx->at_version, - ctx->user_at_version); - } - reply->add_flags(CEPH_OSD_FLAG_ACK); - dout(10) << " sending ack: " << *m << " " << reply << dendl; - osd->send_message_osd_client(reply, m->get_connection()); - ctx->sent_ack = true; - } - - // note the write is now readable (for rlatency calc). note - // that this will only be defined if the write is readable - // _prior_ to being committed; it will not get set with - // writeahead journaling, for instance. - if (ctx->readable_stamp == utime_t()) - ctx->readable_stamp = ceph_clock_now(); - }); ctx->register_on_commit( [m, ctx, this](){ if (ctx->op) log_op_stats( ctx); - if (m && m->wants_ondisk() && !ctx->sent_disk) { - // send commit. + if (m && (m->wants_ondisk() || m->wants_ack()) && !ctx->sent_reply) { MOSDOpReply *reply = ctx->reply; if (reply) ctx->reply = NULL; @@ -3304,9 +3261,9 @@ void PrimaryLogPG::execute_ctx(OpContext *ctx) ctx->user_at_version); } reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); - dout(10) << " sending commit on " << *m << " " << reply << dendl; + dout(10) << " sending reply on " << *m << " " << reply << dendl; osd->send_message_osd_client(reply, m->get_connection()); - ctx->sent_disk = true; + ctx->sent_reply = true; ctx->op->mark_commit_sent(); } }); @@ -3357,12 +3314,6 @@ void PrimaryLogPG::log_op_stats(OpContext *ctx) utime_t process_latency = now; process_latency -= ctx->op->get_dequeued_time(); - utime_t rlatency; - if (ctx->readable_stamp != utime_t()) { - rlatency = ctx->readable_stamp; - rlatency -= ctx->op->get_req()->get_recv_stamp(); - } - uint64_t inb = ctx->bytes_written; uint64_t outb = ctx->bytes_read; @@ -3379,8 +3330,6 @@ void PrimaryLogPG::log_op_stats(OpContext *ctx) osd->logger->inc(l_osd_op_rw_outb, outb); osd->logger->tinc(l_osd_op_rw_lat, latency); osd->logger->tinc(l_osd_op_rw_process_lat, process_latency); - if (rlatency != utime_t()) - osd->logger->tinc(l_osd_op_rw_rlat, rlatency); } else if (op->may_read()) { osd->logger->inc(l_osd_op_r); osd->logger->inc(l_osd_op_r_outb, outb); @@ -3391,15 +3340,12 @@ void PrimaryLogPG::log_op_stats(OpContext *ctx) osd->logger->inc(l_osd_op_w_inb, inb); osd->logger->tinc(l_osd_op_w_lat, latency); osd->logger->tinc(l_osd_op_w_process_lat, process_latency); - if (rlatency != utime_t()) - osd->logger->tinc(l_osd_op_w_rlat, rlatency); } else ceph_abort(); dout(15) << "log_op_stats " << *m << " inb " << inb << " outb " << outb - << " rlat " << rlatency << " lat " << latency << dendl; } @@ -8546,12 +8492,6 @@ void PrimaryLogPG::eval_repop(RepGather *repop) } waiting_for_ondisk.erase(repop->v); } - - // clear out acks, we sent the commits above - if (waiting_for_ack.count(repop->v)) { - assert(waiting_for_ack.begin()->first == repop->v); - waiting_for_ack.erase(repop->v); - } } // applied? @@ -10087,7 +10027,6 @@ void PrimaryLogPG::apply_and_flush_repops(bool requeue) } waiting_for_ondisk.clear(); - waiting_for_ack.clear(); } void PrimaryLogPG::on_flushed() @@ -11624,7 +11563,6 @@ void PrimaryLogPG::hit_set_clear() dout(20) << __func__ << dendl; hit_set.reset(); hit_set_start_stamp = utime_t(); - hit_set_flushing.clear(); } void PrimaryLogPG::hit_set_setup() @@ -11773,7 +11711,6 @@ void PrimaryLogPG::hit_set_persist() utime_t now = ceph_clock_now(); hobject_t oid; - time_t flush_time = 0; // If any archives are degraded we skip this persist request // account for the additional entry being added below @@ -11835,19 +11772,8 @@ void PrimaryLogPG::hit_set_persist() hit_set_in_memory_trim(size); } - // hold a ref until it is flushed to disk - hit_set_flushing[new_hset.begin] = hit_set; - flush_time = new_hset.begin; - ObjectContextRef obc = get_object_context(oid, true); OpContextUPtr ctx = simple_opc_create(obc); - if (flush_time != 0) { - PrimaryLogPGRef pg(this); - ctx->register_on_applied( - [pg, flush_time]() { - pg->hit_set_flushing.erase(flush_time); - }); - } ctx->at_version = get_next_version(); ctx->updated_hset_history = info.hit_set; @@ -12180,12 +12106,6 @@ void PrimaryLogPG::agent_load_hit_sets() break; } - // check if it's still in flight - if (hit_set_flushing.count(p->begin)) { - agent_state->add_hit_set(p->begin.sec(), hit_set_flushing[p->begin]); - continue; - } - hobject_t oid = get_hit_set_archive_object(p->begin, p->end, p->using_gmt); if (is_unreadable_object(oid)) { dout(10) << __func__ << " unreadable " << oid << ", waiting" << dendl; diff --git a/src/osd/PrimaryLogPG.h b/src/osd/PrimaryLogPG.h index 019e871e12d7e..50b32365153b1 100644 --- a/src/osd/PrimaryLogPG.h +++ b/src/osd/PrimaryLogPG.h @@ -512,7 +512,6 @@ class PrimaryLogPG : public PG, public PGBackend::Listener { MOSDOpReply *reply; - utime_t readable_stamp; // when applied on all replicas PrimaryLogPG *pg; int num_read; ///< count read ops @@ -545,8 +544,7 @@ class PrimaryLogPG : public PG, public PGBackend::Listener { on_committed.emplace_back(std::forward(f)); } - bool sent_ack; - bool sent_disk; + bool sent_reply; // pending async reads -> list, @@ -582,7 +580,7 @@ class PrimaryLogPG : public PG, public PGBackend::Listener { num_read(0), num_write(0), copy_cb(NULL), - sent_ack(false), sent_disk(false), + sent_reply(false), async_read_result(0), inflightreads(0), lock_type(ObjectContext::RWState::RWNONE) { @@ -872,7 +870,6 @@ class PrimaryLogPG : public PG, public PGBackend::Listener { HitSetRef hit_set; ///< currently accumulating HitSet utime_t hit_set_start_stamp; ///< time the current HitSet started recording - map hit_set_flushing; ///< currently being written, not yet readable void hit_set_clear(); ///< discard any HitSet state void hit_set_setup(); ///< initialize HitSet state