From bf70c4b9b0627367d416427a8adaf6d9c80be7d2 Mon Sep 17 00:00:00 2001 From: "J. Eric Ivancich" Date: Wed, 7 Jun 2017 14:44:13 -0400 Subject: [PATCH] osd/PG: Add two new mClock implementations of the PG sharded operator queue Create an mClock priority queue, which can in turn be used for two new implementations of the PG shards operator queue. The first (mClockOpClassQueue) prioritizes operations based on which class they belong to (recovery, scrub, snaptrim, client op, osd subop). The second (mClockClientQueue) also incorporates the client identifier, in order to promote fairness between clients. In addition, also remove OpQueue's remove_by_filter and all possible associated subclass implementations and tests. Signed-off-by: J. Eric Ivancich --- src/common/OpQueue.h | 9 +- src/common/PrioritizedQueue.h | 63 --- src/common/WeightedPriorityQueue.h | 50 --- src/common/config_opts.h | 28 +- src/common/mClockPriorityQueue.h | 361 ++++++++++++++++++ src/osd/CMakeLists.txt | 5 +- src/osd/OSD.cc | 35 +- src/osd/OSD.h | 167 ++------ src/osd/PGQueueable.cc | 35 ++ src/osd/PGQueueable.h | 148 +++++++ src/osd/mClockClientQueue.cc | 165 ++++++++ src/osd/mClockClientQueue.h | 146 +++++++ src/osd/mClockOpClassQueue.cc | 123 ++++++ src/osd/mClockOpClassQueue.h | 153 ++++++++ src/test/common/CMakeLists.txt | 7 + src/test/common/test_mclock_priority_queue.cc | 318 +++++++++++++++ src/test/common/test_prioritized_queue.cc | 54 --- .../common/test_weighted_priority_queue.cc | 80 ---- src/test/osd/CMakeLists.txt | 22 ++ src/test/osd/TestMClockClientQueue.cc | 187 +++++++++ src/test/osd/TestMClockOpClassQueue.cc | 187 +++++++++ 21 files changed, 1946 insertions(+), 397 deletions(-) create mode 100644 src/common/mClockPriorityQueue.h create mode 100644 src/osd/PGQueueable.cc create mode 100644 src/osd/PGQueueable.h create mode 100644 src/osd/mClockClientQueue.cc create mode 100644 src/osd/mClockClientQueue.h create mode 100644 src/osd/mClockOpClassQueue.cc create mode 100644 src/osd/mClockOpClassQueue.h create mode 100644 src/test/common/test_mclock_priority_queue.cc create mode 100644 src/test/osd/TestMClockClientQueue.cc create mode 100644 src/test/osd/TestMClockOpClassQueue.cc diff --git a/src/common/OpQueue.h b/src/common/OpQueue.h index 208d762b37cf9..3902b329c69d8 100644 --- a/src/common/OpQueue.h +++ b/src/common/OpQueue.h @@ -37,10 +37,11 @@ class OpQueue { public: // How many Ops are in the queue virtual unsigned length() const = 0; - // Ops will be removed f evaluates to true, f may have sideeffects - virtual void remove_by_filter( - std::function f) = 0; - // Ops of this priority should be deleted immediately + // Ops of this class should be deleted immediately. If out isn't + // nullptr then items should be added to the front in + // front-to-back order. The typical strategy is to visit items in + // the queue in *reverse* order and to use *push_front* to insert + // them into out. virtual void remove_by_class(K k, std::list *out) = 0; // Enqueue op in the back of the strict queue virtual void enqueue_strict(K cl, unsigned priority, T item) = 0; diff --git a/src/common/PrioritizedQueue.h b/src/common/PrioritizedQueue.h index 8d9cd95b28e41..816d80ac1aea4 100644 --- a/src/common/PrioritizedQueue.h +++ b/src/common/PrioritizedQueue.h @@ -45,24 +45,6 @@ class PrioritizedQueue : public OpQueue { int64_t min_cost; typedef std::list > ListPairs; - static unsigned filter_list_pairs( - ListPairs *l, - std::function f) { - unsigned ret = 0; - for (typename ListPairs::iterator i = l->end(); - i != l->begin(); - ) { - auto next = i; - --next; - if (f(next->second)) { - ++ret; - l->erase(next); - } else { - i = next; - } - } - return ret; - } struct SubQueue { private: @@ -142,24 +124,6 @@ class PrioritizedQueue : public OpQueue { bool empty() const { return q.empty(); } - void remove_by_filter( - std::function f) { - for (typename Classes::iterator i = q.begin(); - i != q.end(); - ) { - size -= filter_list_pairs(&(i->second), f); - if (i->second.empty()) { - if (cur == i) { - ++cur; - } - q.erase(i++); - } else { - ++i; - } - } - if (cur == q.end()) - cur = q.begin(); - } void remove_by_class(K k, std::list *out) { typename Classes::iterator i = q.find(k); if (i == q.end()) { @@ -251,33 +215,6 @@ class PrioritizedQueue : public OpQueue { return total; } - void remove_by_filter( - std::function f) final { - for (typename SubQueues::iterator i = queue.begin(); - i != queue.end(); - ) { - unsigned priority = i->first; - - i->second.remove_by_filter(f); - if (i->second.empty()) { - ++i; - remove_queue(priority); - } else { - ++i; - } - } - for (typename SubQueues::iterator i = high_queue.begin(); - i != high_queue.end(); - ) { - i->second.remove_by_filter(f); - if (i->second.empty()) { - high_queue.erase(i++); - } else { - ++i; - } - } - } - void remove_by_class(K k, std::list *out = 0) final { for (typename SubQueues::iterator i = queue.begin(); i != queue.end(); diff --git a/src/common/WeightedPriorityQueue.h b/src/common/WeightedPriorityQueue.h index 10d6f0d4514e4..64ac120bfa88a 100644 --- a/src/common/WeightedPriorityQueue.h +++ b/src/common/WeightedPriorityQueue.h @@ -99,22 +99,6 @@ class WeightedPriorityQueue : public OpQueue unsigned get_size() const { return lp.size(); } - unsigned filter_list_pairs(std::function& f) { - unsigned count = 0; - // intrusive containers can't erase with a reverse_iterator - // so we have to walk backwards on our own. Since there is - // no iterator before begin, we have to test at the end. - for (Lit i = --lp.end();; --i) { - if (f(i->item)) { - i = lp.erase_and_dispose(i, DelItem()); - ++count; - } - if (i == lp.begin()) { - break; - } - } - return count; - } unsigned filter_class(std::list* out) { unsigned count = 0; for (Lit i = --lp.end();; --i) { @@ -180,25 +164,6 @@ class WeightedPriorityQueue : public OpQueue check_end(); return ret; } - unsigned filter_list_pairs(std::function& f) { - unsigned count = 0; - // intrusive containers can't erase with a reverse_iterator - // so we have to walk backwards on our own. Since there is - // no iterator before begin, we have to test at the end. - for (Kit i = klasses.begin(); i != klasses.end();) { - count += i->filter_list_pairs(f); - if (i->empty()) { - if (next == i) { - ++next; - } - i = klasses.erase_and_dispose(i, DelItem()); - } else { - ++i; - } - } - check_end(); - return count; - } unsigned filter_class(K& cl, std::list* out) { unsigned count = 0; Kit i = klasses.find(cl, MapKey()); @@ -291,17 +256,6 @@ class WeightedPriorityQueue : public OpQueue } return ret; } - void filter_list_pairs(std::function& f) { - for (Sit i = queues.begin(); i != queues.end();) { - size -= i->filter_list_pairs(f); - if (i->empty()) { - total_prio -= i->key; - i = queues.erase_and_dispose(i, DelItem()); - } else { - ++i; - } - } - } void filter_class(K& cl, std::list* out) { for (Sit i = queues.begin(); i != queues.end();) { size -= i->filter_class(cl, out); @@ -338,10 +292,6 @@ class WeightedPriorityQueue : public OpQueue unsigned length() const final { return strict.size + normal.size; } - void remove_by_filter(std::function f) final { - strict.filter_list_pairs(f); - normal.filter_list_pairs(f); - } void remove_by_class(K cl, std::list* removed = 0) final { strict.filter_class(cl, removed); normal.filter_class(cl, removed); diff --git a/src/common/config_opts.h b/src/common/config_opts.h index c9615c4a6509b..63bdd09721c9e 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -768,9 +768,35 @@ OPTION(osd_op_num_threads_per_shard_ssd, OPT_INT, 2) OPTION(osd_op_num_shards, OPT_INT, 0) OPTION(osd_op_num_shards_hdd, OPT_INT, 5) OPTION(osd_op_num_shards_ssd, OPT_INT, 8) -OPTION(osd_op_queue, OPT_STR, "wpq") // PrioritzedQueue (prio), Weighted Priority Queue (wpq), or debug_random + +// PrioritzedQueue (prio), Weighted Priority Queue (wpq ; default), +// mclock_opclass, mclock_client, or debug_random. "mclock_opclass" +// and "mclock_client" are based on the mClock/dmClock algorithm +// (Gulati, et al. 2010). "mclock_opclass" prioritizes based on the +// class the operation belongs to. "mclock_client" does the same but +// also works to ienforce fairness between clients. "debug_random" +// chooses among all four with equal probability. +OPTION(osd_op_queue, OPT_STR, "wpq") + OPTION(osd_op_queue_cut_off, OPT_STR, "low") // Min priority to go to strict queue. (low, high, debug_random) +// mClock priority queue parameters for five types of ops +OPTION(osd_op_queue_mclock_client_op_res, OPT_DOUBLE, 1000.0) +OPTION(osd_op_queue_mclock_client_op_wgt, OPT_DOUBLE, 500.0) +OPTION(osd_op_queue_mclock_client_op_lim, OPT_DOUBLE, 0.0) +OPTION(osd_op_queue_mclock_osd_subop_res, OPT_DOUBLE, 1000.0) +OPTION(osd_op_queue_mclock_osd_subop_wgt, OPT_DOUBLE, 500.0) +OPTION(osd_op_queue_mclock_osd_subop_lim, OPT_DOUBLE, 0.0) +OPTION(osd_op_queue_mclock_snap_res, OPT_DOUBLE, 0.0) +OPTION(osd_op_queue_mclock_snap_wgt, OPT_DOUBLE, 1.0) +OPTION(osd_op_queue_mclock_snap_lim, OPT_DOUBLE, 0.001) +OPTION(osd_op_queue_mclock_recov_res, OPT_DOUBLE, 0.0) +OPTION(osd_op_queue_mclock_recov_wgt, OPT_DOUBLE, 1.0) +OPTION(osd_op_queue_mclock_recov_lim, OPT_DOUBLE, 0.001) +OPTION(osd_op_queue_mclock_scrub_res, OPT_DOUBLE, 0.0) +OPTION(osd_op_queue_mclock_scrub_wgt, OPT_DOUBLE, 1.0) +OPTION(osd_op_queue_mclock_scrub_lim, OPT_DOUBLE, 0.001) + OPTION(osd_ignore_stale_divergent_priors, OPT_BOOL, false) // do not assert on divergent_prior entries which aren't in the log and whose on-disk objects are newer // Set to true for testing. Users should NOT set this. diff --git a/src/common/mClockPriorityQueue.h b/src/common/mClockPriorityQueue.h new file mode 100644 index 0000000000000..b651cf08f4e85 --- /dev/null +++ b/src/common/mClockPriorityQueue.h @@ -0,0 +1,361 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 Red Hat Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + + +#include +#include +#include +#include + +#include "common/Formatter.h" +#include "common/OpQueue.h" + +#include "dmclock/src/dmclock_server.h" + +// the following is done to unclobber _ASSERT_H so it returns to the +// way ceph likes it +#include "include/assert.h" + + +namespace ceph { + + namespace dmc = crimson::dmclock; + + template + class mClockQueue : public OpQueue { + + using priority_t = unsigned; + using cost_t = unsigned; + + typedef std::list > ListPairs; + + static unsigned filter_list_pairs(ListPairs *l, + std::function f, + std::list* out = nullptr) { + unsigned ret = 0; + for (typename ListPairs::iterator i = l->end(); + i != l->begin(); + /* no inc */ + ) { + auto next = i; + --next; + if (f(next->second)) { + ++ret; + if (out) out->push_back(next->second); + l->erase(next); + } else { + i = next; + } + } + return ret; + } + + struct SubQueue { + private: + typedef std::map Classes; + // client-class to ordered queue + Classes q; + + unsigned tokens, max_tokens; + int64_t size; + + typename Classes::iterator cur; + + public: + + SubQueue(const SubQueue &other) + : q(other.q), + tokens(other.tokens), + max_tokens(other.max_tokens), + size(other.size), + cur(q.begin()) {} + + SubQueue() + : tokens(0), + max_tokens(0), + size(0), cur(q.begin()) {} + + void set_max_tokens(unsigned mt) { + max_tokens = mt; + } + + unsigned get_max_tokens() const { + return max_tokens; + } + + unsigned num_tokens() const { + return tokens; + } + + void put_tokens(unsigned t) { + tokens += t; + if (tokens > max_tokens) { + tokens = max_tokens; + } + } + + void take_tokens(unsigned t) { + if (tokens > t) { + tokens -= t; + } else { + tokens = 0; + } + } + + void enqueue(K cl, cost_t cost, T item) { + q[cl].push_back(std::make_pair(cost, item)); + if (cur == q.end()) + cur = q.begin(); + size++; + } + + void enqueue_front(K cl, cost_t cost, T item) { + q[cl].push_front(std::make_pair(cost, item)); + if (cur == q.end()) + cur = q.begin(); + size++; + } + + std::pair front() const { + assert(!(q.empty())); + assert(cur != q.end()); + return cur->second.front(); + } + + void pop_front() { + assert(!(q.empty())); + assert(cur != q.end()); + cur->second.pop_front(); + if (cur->second.empty()) { + auto i = cur; + ++cur; + q.erase(i); + } else { + ++cur; + } + if (cur == q.end()) { + cur = q.begin(); + } + size--; + } + + unsigned length() const { + assert(size >= 0); + return (unsigned)size; + } + + bool empty() const { + return q.empty(); + } + + void remove_by_filter(std::function f) { + for (typename Classes::iterator i = q.begin(); + i != q.end(); + /* no-inc */) { + size -= filter_list_pairs(&(i->second), f); + if (i->second.empty()) { + if (cur == i) { + ++cur; + } + i = q.erase(i); + } else { + ++i; + } + } + if (cur == q.end()) cur = q.begin(); + } + + void remove_by_class(K k, std::list *out) { + typename Classes::iterator i = q.find(k); + if (i == q.end()) { + return; + } + size -= i->second.size(); + if (i == cur) { + ++cur; + } + if (out) { + for (auto j = i->second.rbegin(); j != i->second.rend(); ++j) { + out->push_front(j->second); + } + } + q.erase(i); + if (cur == q.end()) cur = q.begin(); + } + + void dump(ceph::Formatter *f) const { + f->dump_int("size", size); + f->dump_int("num_keys", q.size()); + } + }; + + using SubQueues = std::map; + + SubQueues high_queue; + + dmc::PullPriorityQueue queue; + + // when enqueue_front is called, rather than try to re-calc tags + // to put in mClock priority queue, we'll just keep a separate + // list from which we dequeue items first, and only when it's + // empty do we use queue. + std::list> queue_front; + + public: + + mClockQueue( + const typename dmc::PullPriorityQueue::ClientInfoFunc& info_func) : + queue(info_func, true) + { + // empty + } + + unsigned length() const override final { + unsigned total = 0; + total += queue_front.size(); + total += queue.request_count(); + for (auto i = high_queue.cbegin(); i != high_queue.cend(); ++i) { + assert(i->second.length()); + total += i->second.length(); + } + return total; + } + + // be sure to do things in reverse priority order and push_front + // to the list so items end up on list in front-to-back priority + // order + void remove_by_filter(std::function filter_accum) { + queue.remove_by_req_filter(filter_accum, true); + + for (auto i = queue_front.rbegin(); i != queue_front.rend(); /* no-inc */) { + if (filter_accum(i->second)) { + i = decltype(i){ queue_front.erase(std::next(i).base()) }; + } else { + ++i; + } + } + + for (typename SubQueues::iterator i = high_queue.begin(); + i != high_queue.end(); + /* no-inc */ ) { + i->second.remove_by_filter(filter_accum); + if (i->second.empty()) { + i = high_queue.erase(i); + } else { + ++i; + } + } + } + + void remove_by_class(K k, std::list *out = nullptr) override final { + if (out) { + queue.remove_by_client(k, + true, + [&out] (const T& t) { out->push_front(t); }); + } else { + queue.remove_by_client(k, true); + } + + for (auto i = queue_front.rbegin(); i != queue_front.rend(); /* no-inc */) { + if (k == i->first) { + if (nullptr != out) out->push_front(i->second); + i = decltype(i){ queue_front.erase(std::next(i).base()) }; + } else { + ++i; + } + } + + for (auto i = high_queue.begin(); i != high_queue.end(); /* no-inc */) { + i->second.remove_by_class(k, out); + if (i->second.empty()) { + i = high_queue.erase(i); + } else { + ++i; + } + } + } + + void enqueue_strict(K cl, unsigned priority, T item) override final { + high_queue[priority].enqueue(cl, 0, item); + } + + void enqueue_strict_front(K cl, unsigned priority, T item) override final { + high_queue[priority].enqueue_front(cl, 0, item); + } + + void enqueue(K cl, unsigned priority, unsigned cost, T item) override final { + // priority is ignored + queue.add_request(item, cl, cost); + } + + void enqueue_front(K cl, + unsigned priority, + unsigned cost, + T item) override final { + queue_front.emplace_front(std::pair(cl, item)); + } + + bool empty() const override final { + return queue.empty() && high_queue.empty() && queue_front.empty(); + } + + T dequeue() override final { + assert(!empty()); + + if (!(high_queue.empty())) { + T ret = high_queue.rbegin()->second.front().second; + high_queue.rbegin()->second.pop_front(); + if (high_queue.rbegin()->second.empty()) { + high_queue.erase(high_queue.rbegin()->first); + } + return ret; + } + + if (!queue_front.empty()) { + T ret = queue_front.front().second; + queue_front.pop_front(); + return ret; + } + + auto pr = queue.pull_request(); + assert(pr.is_retn()); + auto& retn = pr.get_retn(); + return *(retn.request); + } + + void dump(ceph::Formatter *f) const override final { + f->open_array_section("high_queues"); + for (typename SubQueues::const_iterator p = high_queue.begin(); + p != high_queue.end(); + ++p) { + f->open_object_section("subqueue"); + f->dump_int("priority", p->first); + p->second.dump(f); + f->close_section(); + } + f->close_section(); + + f->open_object_section("queue_front"); + f->dump_int("size", queue_front.size()); + f->close_section(); + + f->open_object_section("queue"); + f->dump_int("size", queue.request_count()); + f->close_section(); + } // dump + }; + +} // namespace ceph diff --git a/src/osd/CMakeLists.txt b/src/osd/CMakeLists.txt index 670223b6cf4e1..3ec6f31a60481 100644 --- a/src/osd/CMakeLists.txt +++ b/src/osd/CMakeLists.txt @@ -29,6 +29,9 @@ set(osd_srcs osd_types.cc ECUtil.cc ExtentCache.cc + mClockOpClassQueue.cc + mClockClientQueue.cc + PGQueueable.cc ${CMAKE_SOURCE_DIR}/src/common/TrackedOp.cc ${osd_cyg_functions_src} ${osdc_osd_srcs}) @@ -40,7 +43,7 @@ add_library(osd STATIC ${osd_srcs} $ $ $) -target_link_libraries(osd ${LEVELDB_LIBRARIES} ${CMAKE_DL_LIBS} ${ALLOC_LIBS}) +target_link_libraries(osd ${LEVELDB_LIBRARIES} dmclock ${CMAKE_DL_LIBS} ${ALLOC_LIBS}) if(WITH_LTTNG) add_dependencies(osd osd-tp pg-tp) endif() diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 048da1d234563..ed988ad0fb8e0 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -166,22 +166,6 @@ static ostream& _prefix(std::ostream* _dout, int whoami, epoch_t epoch) { return *_dout << "osd." << whoami << " " << epoch << " "; } -void PGQueueable::RunVis::operator()(const OpRequestRef &op) { - return osd->dequeue_op(pg, op, handle); -} - -void PGQueueable::RunVis::operator()(const PGSnapTrim &op) { - return pg->snap_trimmer(op.epoch_queued); -} - -void PGQueueable::RunVis::operator()(const PGScrub &op) { - return pg->scrub(op.epoch_queued, handle); -} - -void PGQueueable::RunVis::operator()(const PGRecovery &op) { - return osd->do_recovery(pg.get(), op.epoch_queued, op.reserved_pushes, handle); -} - //Initial features in new superblock. //Features here are also automatically upgraded CompatSet OSD::get_osd_initial_compat_set() { @@ -9726,6 +9710,7 @@ void OSD::PeeringWQ::_dequeue(list *out) { in_use.insert(out->begin(), out->end()); } + // ============================================================= #undef dout_context @@ -10118,3 +10103,21 @@ int heap(CephContext& cct, cmdmap_t& cmdmap, Formatter& f, std::ostream& os) }} // namespace ceph::osd_cmds + +std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q) { + switch(q) { + case OSD::io_queue::prioritized: + out << "prioritized"; + break; + case OSD::io_queue::weightedpriority: + out << "weightedpriority"; + break; + case OSD::io_queue::mclock_opclass: + out << "mclock_opclass"; + break; + case OSD::io_queue::mclock_client: + out << "mclock_client"; + break; + } + return out; +} diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 12a9d774c75e0..e5edaccc28154 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -40,6 +40,8 @@ #include "OpRequest.h" #include "Session.h" +#include "osd/PGQueueable.h" + #include #include #include @@ -53,6 +55,8 @@ using namespace std; #include "common/sharedptr_registry.hpp" #include "common/WeightedPriorityQueue.h" #include "common/PrioritizedQueue.h" +#include "osd/mClockOpClassQueue.h" +#include "osd/mClockClientQueue.h" #include "messages/MOSDOp.h" #include "include/Spinlock.h" #include "common/EventTrace.h" @@ -337,123 +341,6 @@ typedef ceph::shared_ptr DeletingStateRef; class OSD; -struct PGScrub { - epoch_t epoch_queued; - explicit PGScrub(epoch_t e) : epoch_queued(e) {} - ostream &operator<<(ostream &rhs) { - return rhs << "PGScrub"; - } -}; - -struct PGSnapTrim { - epoch_t epoch_queued; - explicit PGSnapTrim(epoch_t e) : epoch_queued(e) {} - ostream &operator<<(ostream &rhs) { - return rhs << "PGSnapTrim"; - } -}; - -struct PGRecovery { - epoch_t epoch_queued; - uint64_t reserved_pushes; - PGRecovery(epoch_t e, uint64_t reserved_pushes) - : epoch_queued(e), reserved_pushes(reserved_pushes) {} - ostream &operator<<(ostream &rhs) { - return rhs << "PGRecovery(epoch=" << epoch_queued - << ", reserved_pushes: " << reserved_pushes << ")"; - } -}; - -class PGQueueable { - typedef boost::variant< - OpRequestRef, - PGSnapTrim, - PGScrub, - PGRecovery - > QVariant; - QVariant qvariant; - int cost; - unsigned priority; - utime_t start_time; - entity_inst_t owner; - epoch_t map_epoch; ///< an epoch we expect the PG to exist in - - struct RunVis : public boost::static_visitor<> { - OSD *osd; - PGRef &pg; - ThreadPool::TPHandle &handle; - RunVis(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle) - : osd(osd), pg(pg), handle(handle) {} - void operator()(const OpRequestRef &op); - void operator()(const PGSnapTrim &op); - void operator()(const PGScrub &op); - void operator()(const PGRecovery &op); - }; - - struct StringifyVis : public boost::static_visitor { - std::string operator()(const OpRequestRef &op) { - return stringify(op); - } - std::string operator()(const PGSnapTrim &op) { - return "PGSnapTrim"; - } - std::string operator()(const PGScrub &op) { - return "PGScrub"; - } - std::string operator()(const PGRecovery &op) { - return "PGRecovery"; - } - }; - friend ostream& operator<<(ostream& out, const PGQueueable& q) { - StringifyVis v; - return out << "PGQueueable(" << boost::apply_visitor(v, q.qvariant) - << " prio " << q.priority << " cost " << q.cost - << " e" << q.map_epoch << ")"; - } - -public: - // cppcheck-suppress noExplicitConstructor - PGQueueable(OpRequestRef op, epoch_t e) - : qvariant(op), cost(op->get_req()->get_cost()), - priority(op->get_req()->get_priority()), - start_time(op->get_req()->get_recv_stamp()), - owner(op->get_req()->get_source_inst()), - map_epoch(e) - {} - PGQueueable( - const PGSnapTrim &op, int cost, unsigned priority, utime_t start_time, - const entity_inst_t &owner, epoch_t e) - : qvariant(op), cost(cost), priority(priority), start_time(start_time), - owner(owner), map_epoch(e) {} - PGQueueable( - const PGScrub &op, int cost, unsigned priority, utime_t start_time, - const entity_inst_t &owner, epoch_t e) - : qvariant(op), cost(cost), priority(priority), start_time(start_time), - owner(owner), map_epoch(e) {} - PGQueueable( - const PGRecovery &op, int cost, unsigned priority, utime_t start_time, - const entity_inst_t &owner, epoch_t e) - : qvariant(op), cost(cost), priority(priority), start_time(start_time), - owner(owner), map_epoch(e) {} - const boost::optional maybe_get_op() const { - const OpRequestRef *op = boost::get(&qvariant); - return op ? OpRequestRef(*op) : boost::optional(); - } - uint64_t get_reserved_pushes() const { - const PGRecovery *op = boost::get(&qvariant); - return op ? op->reserved_pushes : 0; - } - void run(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle) { - RunVis v(osd, pg, handle); - boost::apply_visitor(v, qvariant); - } - unsigned get_priority() const { return priority; } - int get_cost() const { return cost; } - utime_t get_start_time() const { return start_time; } - entity_inst_t get_owner() const { return owner; } - epoch_t get_map_epoch() const { return map_epoch; } -}; - class OSDService { public: OSD *osd; @@ -1694,10 +1581,14 @@ class OSD : public Dispatcher, friend struct C_OpenPGs; // -- op queue -- - enum io_queue { + enum class io_queue { prioritized, - weightedpriority + weightedpriority, + mclock_opclass, + mclock_client, }; + friend std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q); + const io_queue op_queue; const unsigned int op_prio_cutoff; @@ -1722,6 +1613,7 @@ class OSD : public Dispatcher, * and already requeued the items. */ friend class PGQueueable; + class ShardedOpWQ : public ShardedThreadPool::ShardedWQ> { @@ -1774,19 +1666,25 @@ class OSD : public Dispatcher, : sdata_lock(lock_name.c_str(), false, true, false, cct), sdata_op_ordering_lock(ordering_lock.c_str(), false, true, false, cct) { - if (opqueue == weightedpriority) { + if (opqueue == io_queue::weightedpriority) { pqueue = std::unique_ptr ,entity_inst_t>>( new WeightedPriorityQueue,entity_inst_t>( max_tok_per_prio, min_cost)); - } else if (opqueue == prioritized) { + } else if (opqueue == io_queue::prioritized) { pqueue = std::unique_ptr ,entity_inst_t>>( new PrioritizedQueue,entity_inst_t>( max_tok_per_prio, min_cost)); + } else if (opqueue == io_queue::mclock_opclass) { + pqueue = std::unique_ptr + (new ceph::mClockOpClassQueue(cct)); + } else if (opqueue == io_queue::mclock_client) { + pqueue = std::unique_ptr + (new ceph::mClockClientQueue(cct)); } } - }; + }; // struct ShardData vector shard_list; OSD *osd; @@ -1977,7 +1875,7 @@ class OSD : public Dispatcher, OSDMapRef get_osdmap() { return osdmap; } - epoch_t get_osdmap_epoch() { + epoch_t get_osdmap_epoch() const { return osdmap ? osdmap->get_epoch() : 0; } @@ -2368,7 +2266,7 @@ class OSD : public Dispatcher, } } remove_wq; - private: +private: bool ms_can_fast_dispatch_any() const override { return true; } bool ms_can_fast_dispatch(const Message *m) const override { switch (m->get_type()) { @@ -2414,12 +2312,21 @@ class OSD : public Dispatcher, io_queue get_io_queue() const { if (cct->_conf->osd_op_queue == "debug_random") { + static io_queue index_lookup[] = { io_queue::prioritized, + io_queue::weightedpriority, + io_queue::mclock_opclass, + io_queue::mclock_client }; srand(time(NULL)); - return (rand() % 2 < 1) ? prioritized : weightedpriority; + unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0])); + return index_lookup[which]; } else if (cct->_conf->osd_op_queue == "wpq") { - return weightedpriority; + return io_queue::weightedpriority; + } else if (cct->_conf->osd_op_queue == "mclock_opclass") { + return io_queue::mclock_opclass; + } else if (cct->_conf->osd_op_queue == "mclock_client") { + return io_queue::mclock_client; } else { - return prioritized; + return io_queue::prioritized; } } @@ -2507,9 +2414,13 @@ class OSD : public Dispatcher, friend class OSDService; }; + +std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q); + + //compatibility of the executable extern const CompatSet::Feature ceph_osd_feature_compat[]; extern const CompatSet::Feature ceph_osd_feature_ro_compat[]; extern const CompatSet::Feature ceph_osd_feature_incompat[]; -#endif +#endif // CEPH_OSD_H diff --git a/src/osd/PGQueueable.cc b/src/osd/PGQueueable.cc new file mode 100644 index 0000000000000..844cdfc350bfb --- /dev/null +++ b/src/osd/PGQueueable.cc @@ -0,0 +1,35 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 Red Hat Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#include "PG.h" +#include "PGQueueable.h" +#include "OSD.h" + + +void PGQueueable::RunVis::operator()(const OpRequestRef &op) { + osd->dequeue_op(pg, op, handle); +} + +void PGQueueable::RunVis::operator()(const PGSnapTrim &op) { + pg->snap_trimmer(op.epoch_queued); +} + +void PGQueueable::RunVis::operator()(const PGScrub &op) { + pg->scrub(op.epoch_queued, handle); +} + +void PGQueueable::RunVis::operator()(const PGRecovery &op) { + osd->do_recovery(pg.get(), op.epoch_queued, op.reserved_pushes, handle); +} diff --git a/src/osd/PGQueueable.h b/src/osd/PGQueueable.h new file mode 100644 index 0000000000000..cb32e52b782ac --- /dev/null +++ b/src/osd/PGQueueable.h @@ -0,0 +1,148 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 Red Hat Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#pragma once + +#include + +#include "include/types.h" +#include "include/utime.h" +#include "osd/OpRequest.h" +#include "osd/PG.h" + + +class OSD; + + +struct PGScrub { + epoch_t epoch_queued; + explicit PGScrub(epoch_t e) : epoch_queued(e) {} + ostream &operator<<(ostream &rhs) { + return rhs << "PGScrub"; + } +}; + +struct PGSnapTrim { + epoch_t epoch_queued; + explicit PGSnapTrim(epoch_t e) : epoch_queued(e) {} + ostream &operator<<(ostream &rhs) { + return rhs << "PGSnapTrim"; + } +}; + +struct PGRecovery { + epoch_t epoch_queued; + uint64_t reserved_pushes; + PGRecovery(epoch_t e, uint64_t reserved_pushes) + : epoch_queued(e), reserved_pushes(reserved_pushes) {} + ostream &operator<<(ostream &rhs) { + return rhs << "PGRecovery(epoch=" << epoch_queued + << ", reserved_pushes: " << reserved_pushes << ")"; + } +}; + + +class PGQueueable { + typedef boost::variant< + OpRequestRef, + PGSnapTrim, + PGScrub, + PGRecovery + > QVariant; + QVariant qvariant; + int cost; + unsigned priority; + utime_t start_time; + entity_inst_t owner; + epoch_t map_epoch; ///< an epoch we expect the PG to exist in + + struct RunVis : public boost::static_visitor<> { + OSD *osd; + PGRef &pg; + ThreadPool::TPHandle &handle; + RunVis(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle) + : osd(osd), pg(pg), handle(handle) {} + void operator()(const OpRequestRef &op); + void operator()(const PGSnapTrim &op); + void operator()(const PGScrub &op); + void operator()(const PGRecovery &op); + }; // struct RunVis + + struct StringifyVis : public boost::static_visitor { + std::string operator()(const OpRequestRef &op) { + return stringify(op); + } + std::string operator()(const PGSnapTrim &op) { + return "PGSnapTrim"; + } + std::string operator()(const PGScrub &op) { + return "PGScrub"; + } + std::string operator()(const PGRecovery &op) { + return "PGRecovery"; + } + }; + + friend ostream& operator<<(ostream& out, const PGQueueable& q) { + StringifyVis v; + return out << "PGQueueable(" << boost::apply_visitor(v, q.qvariant) + << " prio " << q.priority << " cost " << q.cost + << " e" << q.map_epoch << ")"; + } + +public: + + PGQueueable(OpRequestRef op, epoch_t e) + : qvariant(op), cost(op->get_req()->get_cost()), + priority(op->get_req()->get_priority()), + start_time(op->get_req()->get_recv_stamp()), + owner(op->get_req()->get_source_inst()), + map_epoch(e) + {} + PGQueueable( + const PGSnapTrim &op, int cost, unsigned priority, utime_t start_time, + const entity_inst_t &owner, epoch_t e) + : qvariant(op), cost(cost), priority(priority), start_time(start_time), + owner(owner), map_epoch(e) {} + PGQueueable( + const PGScrub &op, int cost, unsigned priority, utime_t start_time, + const entity_inst_t &owner, epoch_t e) + : qvariant(op), cost(cost), priority(priority), start_time(start_time), + owner(owner), map_epoch(e) {} + PGQueueable( + const PGRecovery &op, int cost, unsigned priority, utime_t start_time, + const entity_inst_t &owner, epoch_t e) + : qvariant(op), cost(cost), priority(priority), start_time(start_time), + owner(owner), map_epoch(e) {} + + const boost::optional maybe_get_op() const { + const OpRequestRef *op = boost::get(&qvariant); + return op ? OpRequestRef(*op) : boost::optional(); + } + uint64_t get_reserved_pushes() const { + const PGRecovery *op = boost::get(&qvariant); + return op ? op->reserved_pushes : 0; + } + void run(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle) { + RunVis v(osd, pg, handle); + boost::apply_visitor(v, qvariant); + } + unsigned get_priority() const { return priority; } + int get_cost() const { return cost; } + utime_t get_start_time() const { return start_time; } + entity_inst_t get_owner() const { return owner; } + epoch_t get_map_epoch() const { return map_epoch; } + const QVariant& get_variant() const { return qvariant; } +}; // struct PGQueueable diff --git a/src/osd/mClockClientQueue.cc b/src/osd/mClockClientQueue.cc new file mode 100644 index 0000000000000..71a6631f26a79 --- /dev/null +++ b/src/osd/mClockClientQueue.cc @@ -0,0 +1,165 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 Red Hat Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#include + +#include "osd/mClockClientQueue.h" +#include "common/dout.h" + + +namespace dmc = crimson::dmclock; + + +#define dout_context cct +#define dout_subsys ceph_subsys_osd +#undef dout_prefix +#define dout_prefix *_dout + + +namespace ceph { + + mClockClientQueue::mclock_op_tags_t::mclock_op_tags_t(CephContext *cct) : + client_op(cct->_conf->osd_op_queue_mclock_client_op_res, + cct->_conf->osd_op_queue_mclock_client_op_wgt, + cct->_conf->osd_op_queue_mclock_client_op_lim), + osd_subop(cct->_conf->osd_op_queue_mclock_osd_subop_res, + cct->_conf->osd_op_queue_mclock_osd_subop_wgt, + cct->_conf->osd_op_queue_mclock_osd_subop_lim), + snaptrim(cct->_conf->osd_op_queue_mclock_snap_res, + cct->_conf->osd_op_queue_mclock_snap_wgt, + cct->_conf->osd_op_queue_mclock_snap_lim), + recov(cct->_conf->osd_op_queue_mclock_recov_res, + cct->_conf->osd_op_queue_mclock_recov_wgt, + cct->_conf->osd_op_queue_mclock_recov_lim), + scrub(cct->_conf->osd_op_queue_mclock_scrub_res, + cct->_conf->osd_op_queue_mclock_scrub_wgt, + cct->_conf->osd_op_queue_mclock_scrub_lim) + { + dout(20) << + "mClockClientQueue settings:: " << + "client_op:" << client_op << + "; osd_subop:" << osd_subop << + "; snaptrim:" << snaptrim << + "; recov:" << recov << + "; scrub:" << scrub << + dendl; + } + + + dmc::ClientInfo + mClockClientQueue::op_class_client_info_f( + const mClockClientQueue::InnerClient& client) + { + switch(client.second) { + case osd_op_type_t::client_op: + return mclock_op_tags->client_op; + case osd_op_type_t::osd_subop: + return mclock_op_tags->osd_subop; + case osd_op_type_t::bg_snaptrim: + return mclock_op_tags->snaptrim; + case osd_op_type_t::bg_recovery: + return mclock_op_tags->recov; + case osd_op_type_t::bg_scrub: + return mclock_op_tags->scrub; + default: + assert(0); + return dmc::ClientInfo(-1, -1, -1); + } + } + + + /* + * class mClockClientQueue + */ + + std::unique_ptr + mClockClientQueue::mclock_op_tags(nullptr); + + mClockClientQueue::pg_queueable_visitor_t + mClockClientQueue::pg_queueable_visitor; + + mClockClientQueue::mClockClientQueue(CephContext *cct) : + queue(&mClockClientQueue::op_class_client_info_f) + { + // manage the singleton + if (!mclock_op_tags) { + mclock_op_tags.reset(new mclock_op_tags_t(cct)); + } + } + + mClockClientQueue::osd_op_type_t + mClockClientQueue::get_osd_op_type(const Request& request) { + osd_op_type_t type = + boost::apply_visitor(pg_queueable_visitor, request.second.get_variant()); + + // if we got client_op back then we need to distinguish between + // a client op and an osd subop. + + if (osd_op_type_t::client_op != type) { + return type; + } else if (MSG_OSD_SUBOP == + boost::get( + request.second.get_variant())->get_req()->get_header().type) { + return osd_op_type_t::osd_subop; + } else { + return osd_op_type_t::client_op; + } + } + + mClockClientQueue::InnerClient + inline mClockClientQueue::get_inner_client(const Client& cl, + const Request& request) { + return InnerClient(cl, get_osd_op_type(request)); + } + + // Formatted output of the queue + inline void mClockClientQueue::dump(ceph::Formatter *f) const { + queue.dump(f); + } + + inline void mClockClientQueue::enqueue_strict(Client cl, + unsigned priority, + Request item) { + queue.enqueue_strict(get_inner_client(cl, item), priority, item); + } + + // Enqueue op in the front of the strict queue + inline void mClockClientQueue::enqueue_strict_front(Client cl, + unsigned priority, + Request item) { + queue.enqueue_strict_front(get_inner_client(cl, item), priority, item); + } + + // Enqueue op in the back of the regular queue + inline void mClockClientQueue::enqueue(Client cl, + unsigned priority, + unsigned cost, + Request item) { + queue.enqueue(get_inner_client(cl, item), priority, cost, item); + } + + // Enqueue the op in the front of the regular queue + inline void mClockClientQueue::enqueue_front(Client cl, + unsigned priority, + unsigned cost, + Request item) { + queue.enqueue_front(get_inner_client(cl, item), priority, cost, item); + } + + // Return an op to be dispatched + inline Request mClockClientQueue::dequeue() { + return queue.dequeue(); + } +} // namespace ceph diff --git a/src/osd/mClockClientQueue.h b/src/osd/mClockClientQueue.h new file mode 100644 index 0000000000000..49b58c8327d39 --- /dev/null +++ b/src/osd/mClockClientQueue.h @@ -0,0 +1,146 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 Red Hat Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#pragma once + +#include + +#include "boost/variant.hpp" + +#include "common/config.h" +#include "common/ceph_context.h" +#include "osd/PGQueueable.h" + +#include "common/mClockPriorityQueue.h" + + +namespace ceph { + + using Request = std::pair; + using Client = entity_inst_t; + + + // This class exists to bridge the ceph code, which treats the class + // as the client, and the queue, where the class is + // osd_op_type_t. So this adpater class will transform calls + // appropriately. + class mClockClientQueue : public OpQueue { + + enum class osd_op_type_t { + client_op, osd_subop, bg_snaptrim, bg_recovery, bg_scrub }; + + using InnerClient = std::pair; + + using queue_t = mClockQueue; + + queue_t queue; + + struct mclock_op_tags_t { + crimson::dmclock::ClientInfo client_op; + crimson::dmclock::ClientInfo osd_subop; + crimson::dmclock::ClientInfo snaptrim; + crimson::dmclock::ClientInfo recov; + crimson::dmclock::ClientInfo scrub; + + mclock_op_tags_t(CephContext *cct); + }; + + static std::unique_ptr mclock_op_tags; + + public: + + mClockClientQueue(CephContext *cct); + + static crimson::dmclock::ClientInfo + op_class_client_info_f(const InnerClient& client); + + inline unsigned length() const override final { + return queue.length(); + } + + // Ops of this priority should be deleted immediately + inline void remove_by_class(Client cl, + std::list *out) override final { + queue.remove_by_filter( + [&cl, out] (const Request& r) -> bool { + if (cl == r.second.get_owner()) { + out->push_front(r); + return true; + } else { + return false; + } + }); + } + + void enqueue_strict(Client cl, + unsigned priority, + Request item) override final; + + // Enqueue op in the front of the strict queue + void enqueue_strict_front(Client cl, + unsigned priority, + Request item) override final; + + // Enqueue op in the back of the regular queue + void enqueue(Client cl, + unsigned priority, + unsigned cost, + Request item) override final; + + // Enqueue the op in the front of the regular queue + void enqueue_front(Client cl, + unsigned priority, + unsigned cost, + Request item) override final; + + // Return an op to be dispatch + Request dequeue() override final; + + // Returns if the queue is empty + inline bool empty() const override final { + return queue.empty(); + } + + // Formatted output of the queue + void dump(ceph::Formatter *f) const override final; + + protected: + + struct pg_queueable_visitor_t : public boost::static_visitor { + osd_op_type_t operator()(const OpRequestRef& o) const { + // don't know if it's a client_op or a + return osd_op_type_t::client_op; + } + + osd_op_type_t operator()(const PGSnapTrim& o) const { + return osd_op_type_t::bg_snaptrim; + } + + osd_op_type_t operator()(const PGScrub& o) const { + return osd_op_type_t::bg_scrub; + } + + osd_op_type_t operator()(const PGRecovery& o) const { + return osd_op_type_t::bg_recovery; + } + }; // class pg_queueable_visitor_t + + static pg_queueable_visitor_t pg_queueable_visitor; + + osd_op_type_t get_osd_op_type(const Request& request); + InnerClient get_inner_client(const Client& cl, const Request& request); + }; // class mClockClientAdapter + +} // namespace ceph diff --git a/src/osd/mClockOpClassQueue.cc b/src/osd/mClockOpClassQueue.cc new file mode 100644 index 0000000000000..848d8d6992bc5 --- /dev/null +++ b/src/osd/mClockOpClassQueue.cc @@ -0,0 +1,123 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 Red Hat Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#include + +#include "osd/mClockOpClassQueue.h" +#include "common/dout.h" + + +namespace dmc = crimson::dmclock; + + +#define dout_context cct +#define dout_subsys ceph_subsys_osd +#undef dout_prefix +#define dout_prefix *_dout + + +namespace ceph { + + mClockOpClassQueue::mclock_op_tags_t::mclock_op_tags_t(CephContext *cct) : + client_op(cct->_conf->osd_op_queue_mclock_client_op_res, + cct->_conf->osd_op_queue_mclock_client_op_wgt, + cct->_conf->osd_op_queue_mclock_client_op_lim), + osd_subop(cct->_conf->osd_op_queue_mclock_osd_subop_res, + cct->_conf->osd_op_queue_mclock_osd_subop_wgt, + cct->_conf->osd_op_queue_mclock_osd_subop_lim), + snaptrim(cct->_conf->osd_op_queue_mclock_snap_res, + cct->_conf->osd_op_queue_mclock_snap_wgt, + cct->_conf->osd_op_queue_mclock_snap_lim), + recov(cct->_conf->osd_op_queue_mclock_recov_res, + cct->_conf->osd_op_queue_mclock_recov_wgt, + cct->_conf->osd_op_queue_mclock_recov_lim), + scrub(cct->_conf->osd_op_queue_mclock_scrub_res, + cct->_conf->osd_op_queue_mclock_scrub_wgt, + cct->_conf->osd_op_queue_mclock_scrub_lim) + { + dout(20) << + "mClockOpClassQueue settings:: " << + "client_op:" << client_op << + "; osd_subop:" << osd_subop << + "; snaptrim:" << snaptrim << + "; recov:" << recov << + "; scrub:" << scrub << + dendl; + } + + + dmc::ClientInfo + mClockOpClassQueue::op_class_client_info_f(const osd_op_type_t& op_type) { + switch(op_type) { + case osd_op_type_t::client_op: + return mclock_op_tags->client_op; + case osd_op_type_t::osd_subop: + return mclock_op_tags->osd_subop; + case osd_op_type_t::bg_snaptrim: + return mclock_op_tags->snaptrim; + case osd_op_type_t::bg_recovery: + return mclock_op_tags->recov; + case osd_op_type_t::bg_scrub: + return mclock_op_tags->scrub; + default: + assert(0); + return dmc::ClientInfo(-1, -1, -1); + } + } + + /* + * class mClockOpClassQueue + */ + + std::unique_ptr + mClockOpClassQueue::mclock_op_tags(nullptr); + + mClockOpClassQueue::pg_queueable_visitor_t + mClockOpClassQueue::pg_queueable_visitor; + + mClockOpClassQueue::mClockOpClassQueue(CephContext *cct) : + queue(&mClockOpClassQueue::op_class_client_info_f) + { + // manage the singleton + if (!mclock_op_tags) { + mclock_op_tags.reset(new mclock_op_tags_t(cct)); + } + } + + mClockOpClassQueue::osd_op_type_t + mClockOpClassQueue::get_osd_op_type(const Request& request) { + osd_op_type_t type = + boost::apply_visitor(pg_queueable_visitor, request.second.get_variant()); + + // if we got client_op back then we need to distinguish between + // a client op and an osd subop. + + if (osd_op_type_t::client_op != type) { + return type; + } else if (MSG_OSD_SUBOP == + boost::get( + request.second.get_variant())->get_req()->get_header().type) { + return osd_op_type_t::osd_subop; + } else { + return osd_op_type_t::client_op; + } + } + + // Formatted output of the queue + void mClockOpClassQueue::dump(ceph::Formatter *f) const { + queue.dump(f); + } + +} // namespace ceph diff --git a/src/osd/mClockOpClassQueue.h b/src/osd/mClockOpClassQueue.h new file mode 100644 index 0000000000000..1b386fe2da202 --- /dev/null +++ b/src/osd/mClockOpClassQueue.h @@ -0,0 +1,153 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 Red Hat Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#pragma once + +#include + +#include "boost/variant.hpp" + +#include "common/config.h" +#include "common/ceph_context.h" +#include "osd/PGQueueable.h" + +#include "common/mClockPriorityQueue.h" + + +namespace ceph { + + using Request = std::pair; + using Client = entity_inst_t; + + + // This class exists to bridge the ceph code, which treats the class + // as the client, and the queue, where the class is + // osd_op_type_t. So this adpater class will transform calls + // appropriately. + class mClockOpClassQueue : public OpQueue { + + enum class osd_op_type_t { + client_op, osd_subop, bg_snaptrim, bg_recovery, bg_scrub }; + + using queue_t = mClockQueue; + + queue_t queue; + + struct mclock_op_tags_t { + crimson::dmclock::ClientInfo client_op; + crimson::dmclock::ClientInfo osd_subop; + crimson::dmclock::ClientInfo snaptrim; + crimson::dmclock::ClientInfo recov; + crimson::dmclock::ClientInfo scrub; + + mclock_op_tags_t(CephContext *cct); + }; + + static std::unique_ptr mclock_op_tags; + + public: + + mClockOpClassQueue(CephContext *cct); + + static crimson::dmclock::ClientInfo + op_class_client_info_f(const osd_op_type_t& op_type); + + inline unsigned length() const override final { + return queue.length(); + } + + // Ops of this priority should be deleted immediately + inline void remove_by_class(Client cl, + std::list *out) override final { + queue.remove_by_filter( + [&cl, out] (const Request& r) -> bool { + if (cl == r.second.get_owner()) { + out->push_front(r); + return true; + } else { + return false; + } + }); + } + + inline void enqueue_strict(Client cl, + unsigned priority, + Request item) override final { + queue.enqueue_strict(get_osd_op_type(item), priority, item); + } + + // Enqueue op in the front of the strict queue + inline void enqueue_strict_front(Client cl, + unsigned priority, + Request item) override final { + queue.enqueue_strict_front(get_osd_op_type(item), priority, item); + } + + // Enqueue op in the back of the regular queue + inline void enqueue(Client cl, + unsigned priority, + unsigned cost, + Request item) override final { + queue.enqueue(get_osd_op_type(item), priority, cost, item); + } + + // Enqueue the op in the front of the regular queue + inline void enqueue_front(Client cl, + unsigned priority, + unsigned cost, + Request item) override final { + queue.enqueue_front(get_osd_op_type(item), priority, cost, item); + } + + // Returns if the queue is empty + inline bool empty() const override final { + return queue.empty(); + } + + // Return an op to be dispatch + inline Request dequeue() override final { + return queue.dequeue(); + } + + // Formatted output of the queue + void dump(ceph::Formatter *f) const override final; + + protected: + + struct pg_queueable_visitor_t : public boost::static_visitor { + osd_op_type_t operator()(const OpRequestRef& o) const { + // don't know if it's a client_op or a + return osd_op_type_t::client_op; + } + + osd_op_type_t operator()(const PGSnapTrim& o) const { + return osd_op_type_t::bg_snaptrim; + } + + osd_op_type_t operator()(const PGScrub& o) const { + return osd_op_type_t::bg_scrub; + } + + osd_op_type_t operator()(const PGRecovery& o) const { + return osd_op_type_t::bg_recovery; + } + }; // class pg_queueable_visitor_t + + static pg_queueable_visitor_t pg_queueable_visitor; + + osd_op_type_t get_osd_op_type(const Request& request); + }; // class mClockOpClassAdapter + +} // namespace ceph diff --git a/src/test/common/CMakeLists.txt b/src/test/common/CMakeLists.txt index 45f8a43a9a343..17034b67aee39 100644 --- a/src/test/common/CMakeLists.txt +++ b/src/test/common/CMakeLists.txt @@ -42,6 +42,13 @@ add_executable(unittest_prioritized_queue add_ceph_unittest(unittest_prioritized_queue ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/unittest_prioritized_queue) target_link_libraries(unittest_prioritized_queue global ${BLKID_LIBRARIES}) +# unittest_mclock_priority_queue +add_executable(unittest_mclock_priority_queue EXCLUDE_FROM_ALL + test_mclock_priority_queue.cc + ) +add_ceph_unittest(unittest_mclock_priority_queue ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/unittest_mclock_priority_queue) +target_link_libraries(unittest_mclock_priority_queue global ${BLKID_LIBRARIES} dmclock) + # unittest_str_map add_executable(unittest_str_map test_str_map.cc diff --git a/src/test/common/test_mclock_priority_queue.cc b/src/test/common/test_mclock_priority_queue.cc new file mode 100644 index 0000000000000..e7ba761787935 --- /dev/null +++ b/src/test/common/test_mclock_priority_queue.cc @@ -0,0 +1,318 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include +#include +#include +#include "gtest/gtest.h" +#include "common/mClockPriorityQueue.h" + + +struct Request { + int value; + Request() = default; + Request(const Request& o) = default; + Request(int value) : + value(value) + {} +}; + + +struct Client { + int client_num; + Client() : + Client(-1) + {} + Client(int client_num) : + client_num(client_num) + {} + friend bool operator<(const Client& r1, const Client& r2) { + return r1.client_num < r2.client_num; + } + friend bool operator==(const Client& r1, const Client& r2) { + return r1.client_num == r2.client_num; + } +}; + + +crimson::dmclock::ClientInfo client_info_func(const Client& c) { + static const crimson::dmclock::ClientInfo + the_info(10.0, 10.0, 10.0); + return the_info; +} + + +TEST(mClockPriorityQueue, Create) +{ + ceph::mClockQueue q(&client_info_func); +} + + +TEST(mClockPriorityQueue, Sizes) +{ + ceph::mClockQueue q(&client_info_func); + + ASSERT_TRUE(q.empty()); + ASSERT_EQ(0u, q.length()); + + Client c1(1); + Client c2(2); + + q.enqueue_strict(c1, 1, Request(1)); + q.enqueue_strict(c2, 2, Request(2)); + q.enqueue_strict(c1, 2, Request(3)); + q.enqueue(c2, 1, 0, Request(4)); + q.enqueue(c1, 2, 0, Request(5)); + q.enqueue_strict(c2, 1, Request(6)); + + ASSERT_FALSE(q.empty()); + ASSERT_EQ(6u, q.length()); + + + for (int i = 0; i < 6; ++i) { + (void) q.dequeue(); + } + + ASSERT_TRUE(q.empty()); + ASSERT_EQ(0u, q.length()); +} + + +TEST(mClockPriorityQueue, JustStrict) +{ + ceph::mClockQueue q(&client_info_func); + + Client c1(1); + Client c2(2); + + q.enqueue_strict(c1, 1, Request(1)); + q.enqueue_strict(c2, 2, Request(2)); + q.enqueue_strict(c1, 2, Request(3)); + q.enqueue_strict(c2, 1, Request(4)); + + Request r; + + r = q.dequeue(); + ASSERT_EQ(2, r.value); + r = q.dequeue(); + ASSERT_EQ(3, r.value); + r = q.dequeue(); + ASSERT_EQ(1, r.value); + r = q.dequeue(); + ASSERT_EQ(4, r.value); +} + + +TEST(mClockPriorityQueue, StrictPriorities) +{ + ceph::mClockQueue q(&client_info_func); + + Client c1(1); + Client c2(2); + + q.enqueue_strict(c1, 1, Request(1)); + q.enqueue_strict(c2, 2, Request(2)); + q.enqueue_strict(c1, 3, Request(3)); + q.enqueue_strict(c2, 4, Request(4)); + + Request r; + + r = q.dequeue(); + ASSERT_EQ(4, r.value); + r = q.dequeue(); + ASSERT_EQ(3, r.value); + r = q.dequeue(); + ASSERT_EQ(2, r.value); + r = q.dequeue(); + ASSERT_EQ(1, r.value); +} + + +TEST(mClockPriorityQueue, JustNotStrict) +{ + ceph::mClockQueue q(&client_info_func); + + Client c1(1); + Client c2(2); + + // non-strict queue ignores priorites, but will divide between + // clients evenly and maintain orders between clients + q.enqueue(c1, 1, 0, Request(1)); + q.enqueue(c1, 2, 0, Request(2)); + q.enqueue(c2, 3, 0, Request(3)); + q.enqueue(c2, 4, 0, Request(4)); + + Request r1, r2; + + r1 = q.dequeue(); + ASSERT_TRUE(1 == r1.value || 3 == r1.value); + + r2 = q.dequeue(); + ASSERT_TRUE(1 == r2.value || 3 == r2.value); + + ASSERT_NE(r1.value, r2.value); + + r1 = q.dequeue(); + ASSERT_TRUE(2 == r1.value || 4 == r1.value); + + r2 = q.dequeue(); + ASSERT_TRUE(2 == r2.value || 4 == r2.value); + + ASSERT_NE(r1.value, r2.value); +} + + +TEST(mClockPriorityQueue, EnqueuFront) +{ + ceph::mClockQueue q(&client_info_func); + + Client c1(1); + Client c2(2); + + // non-strict queue ignores priorites, but will divide between + // clients evenly and maintain orders between clients + q.enqueue(c1, 1, 0, Request(1)); + q.enqueue(c1, 2, 0, Request(2)); + q.enqueue(c2, 3, 0, Request(3)); + q.enqueue(c2, 4, 0, Request(4)); + q.enqueue_strict(c2, 6, Request(6)); + q.enqueue_strict(c1, 7, Request(7)); + + std::list reqs; + + for (uint i = 0; i < 4; ++i) { + reqs.emplace_back(q.dequeue()); + } + + for (uint i = 0; i < 4; ++i) { + Request& r = reqs.front(); + if (r.value > 5) { + q.enqueue_strict_front(r.value == 6 ? c2 : 1, r.value, r); + } else { + q.enqueue_front(r.value <= 2 ? c1 : c2, r.value, 0, r); + } + reqs.pop_front(); + } + + Request r; + + r = q.dequeue(); + ASSERT_EQ(7, r.value); + + r = q.dequeue(); + ASSERT_EQ(6, r.value); + + r = q.dequeue(); + ASSERT_TRUE(1 == r.value || 3 == r.value); + + r = q.dequeue(); + ASSERT_TRUE(1 == r.value || 3 == r.value); + + r = q.dequeue(); + ASSERT_TRUE(2 == r.value || 4 == r.value); + + r = q.dequeue(); + ASSERT_TRUE(2 == r.value || 4 == r.value); +} + + +TEST(mClockPriorityQueue, RemoveByClass) +{ + ceph::mClockQueue q(&client_info_func); + + Client c1(1); + Client c2(2); + Client c3(3); + + q.enqueue(c1, 1, 0, Request(1)); + q.enqueue(c2, 1, 0, Request(2)); + q.enqueue(c3, 1, 0, Request(4)); + q.enqueue_strict(c1, 2, Request(8)); + q.enqueue_strict(c2, 1, Request(16)); + q.enqueue_strict(c3, 3, Request(32)); + q.enqueue(c3, 1, 0, Request(64)); + q.enqueue(c2, 1, 0, Request(128)); + q.enqueue(c1, 1, 0, Request(256)); + + int out_mask = 2 | 16 | 128; + int in_mask = 1 | 8 | 256; + + std::list out; + q.remove_by_class(c2, &out); + + ASSERT_EQ(3u, out.size()); + while (!out.empty()) { + ASSERT_TRUE((out.front().value & out_mask) > 0) << + "had value that was not expected after first removal"; + out.pop_front(); + } + + ASSERT_EQ(6u, q.length()) << "after removal of three from client c2"; + + q.remove_by_class(c3); + + ASSERT_EQ(3u, q.length()) << "after removal of three from client c3"; + while (!q.empty()) { + Request r = q.dequeue(); + ASSERT_TRUE((r.value & in_mask) > 0) << + "had value that was not expected after two removals"; + } +} + + +TEST(mClockPriorityQueue, RemoveByFilter) +{ + ceph::mClockQueue q(&client_info_func); + + Client c1(1); + Client c2(2); + Client c3(3); + + q.enqueue(c1, 1, 0, Request(1)); + q.enqueue(c2, 1, 0, Request(2)); + q.enqueue(c3, 1, 0, Request(3)); + q.enqueue_strict(c1, 2, Request(4)); + q.enqueue_strict(c2, 1, Request(5)); + q.enqueue_strict(c3, 3, Request(6)); + q.enqueue(c3, 1, 0, Request(7)); + q.enqueue(c2, 1, 0, Request(8)); + q.enqueue(c1, 1, 0, Request(9)); + + std::list filtered; + + q.remove_by_filter([&](const Request& r) -> bool { + if (r.value & 2) { + filtered.push_back(r); + return true; + } else { + return false; + } + }); + + ASSERT_EQ(4u, filtered.size()) << + "filter should have removed four elements"; + while (!filtered.empty()) { + ASSERT_TRUE((filtered.front().value & 2) > 0) << + "expect this value to have been filtered out"; + filtered.pop_front(); + } + + ASSERT_EQ(5u, q.length()) << + "filter should have left five remaining elements"; + while (!q.empty()) { + Request r = q.dequeue(); + ASSERT_TRUE((r.value & 2) == 0) << + "expect this value to have been left in"; + } +} diff --git a/src/test/common/test_prioritized_queue.cc b/src/test/common/test_prioritized_queue.cc index 72de64149f8fc..bfe5cb8bdfebc 100644 --- a/src/test/common/test_prioritized_queue.cc +++ b/src/test/common/test_prioritized_queue.cc @@ -161,60 +161,6 @@ TEST_F(PrioritizedQueueTest, fairness_by_class) { } } -template -struct Greater { - const T rhs; - std::list *removed; - explicit Greater(const T& v, std::list *removed) : rhs(v), removed(removed) - {} - bool operator()(const T& lhs) { - if (lhs > rhs) { - if (removed) - removed->push_back(lhs); - return true; - } else { - return false; - } - } -}; - -TEST_F(PrioritizedQueueTest, remove_by_filter) { - const unsigned min_cost = 1; - const unsigned max_tokens_per_subqueue = 50; - PQ pq(max_tokens_per_subqueue, min_cost); - - Greater pred(item_size/2, nullptr); - unsigned num_to_remove = 0; - for (unsigned i = 0; i < item_size; i++) { - const Item& item = items[i]; - pq.enqueue(Klass(1), 0, 10, item); - if (pred(item)) { - num_to_remove++; - } - } - std::list removed; - Greater pred2(item_size/2, &removed); - pq.remove_by_filter(pred2); - - // see if the removed items are expected ones. - for (std::list::iterator it = removed.begin(); - it != removed.end(); - ++it) { - const Item& item = *it; - EXPECT_TRUE(pred(item)); - items.erase(remove(items.begin(), items.end(), item), items.end()); - } - EXPECT_EQ(num_to_remove, removed.size()); - EXPECT_EQ(item_size - num_to_remove, pq.length()); - EXPECT_EQ(item_size - num_to_remove, items.size()); - // see if the remainder are expeceted also. - while (!pq.empty()) { - const Item item = pq.dequeue(); - EXPECT_FALSE(pred(item)); - items.erase(remove(items.begin(), items.end(), item), items.end()); - } - EXPECT_TRUE(items.empty()); -} TEST_F(PrioritizedQueueTest, remove_by_class) { const unsigned min_cost = 1; diff --git a/src/test/common/test_weighted_priority_queue.cc b/src/test/common/test_weighted_priority_queue.cc index b32c2ce2f0db9..9bb87177147c6 100644 --- a/src/test/common/test_weighted_priority_queue.cc +++ b/src/test/common/test_weighted_priority_queue.cc @@ -197,86 +197,6 @@ TEST_F(WeightedPriorityQueueTest, wpq_test_random) { test_queue(rand() % 500 + 500, true); } -template -struct Greater { - const T rhs; - std::list *removed; - Greater(const T &v, std::list *removed) : rhs(v), removed(removed) {} - bool operator()(const T &lhs) { - if (std::get<2>(lhs) > std::get<2>(rhs)) { - if (removed) - removed->push_back(lhs); - return true; - } else { - return false; - } - } -}; - -TEST_F(WeightedPriorityQueueTest, wpq_test_remove_by_filter_null) { - WQ wq(0, 0); - LQ strictq, normq; - unsigned num_items = 100; - fill_queue(wq, strictq, normq, num_items); - // Pick a value that we didn't enqueue - Removed wq_removed; - Greater pred(std::make_tuple(0, 0, 1 << 17), &wq_removed); - wq.remove_by_filter(pred); - EXPECT_EQ(0u, wq_removed.size()); -} - -TEST_F(WeightedPriorityQueueTest, wpq_test_remove_by_filter) { - WQ wq(0, 0); - LQ strictq, normq; - unsigned num_items = 1000; - fill_queue(wq, strictq, normq, num_items); - Greater pred2(std::make_tuple(0, 0, (1 << 16) - (1 << 16)/10), nullptr); - Removed r_strictq, r_normq; - unsigned num_to_remove = 0; - // Figure out from what has been queued what we - // expect to be removed - for (LQ::iterator pi = strictq.begin(); - pi != strictq.end(); ++pi) { - for (KlassItem::iterator ki = pi->second.begin(); - ki != pi->second.end(); ++ki) { - for (ItemList::iterator li = ki->second.begin(); - li != ki->second.end(); ++li) { - if (pred2(li->second)) { - ++num_to_remove; - } - } - } - } - for (LQ::iterator pi = normq.begin(); - pi != normq.end(); ++pi) { - for (KlassItem::iterator ki = pi->second.begin(); - ki != pi->second.end(); ++ki) { - for (ItemList::iterator li = ki->second.begin(); - li != ki->second.end(); ++li) { - if (pred2(li->second)) { - ++num_to_remove; - } - } - } - } - Removed wq_removed; - Greater pred( - std::make_tuple(0, 0, (1 << 16) - (1 << 16)/10), - &wq_removed); - wq.remove_by_filter(pred); - // Check that what was removed was correct - for (Removed::iterator it = wq_removed.begin(); - it != wq_removed.end(); ++it) { - EXPECT_TRUE(pred2(*it)); - } - EXPECT_EQ(num_to_remove, wq_removed.size()); - EXPECT_EQ(num_items - num_to_remove, wq.length()); - // Make sure that none were missed - while (!(wq.empty())) { - EXPECT_FALSE(pred(wq.dequeue())); - } -} - TEST_F(WeightedPriorityQueueTest, wpq_test_remove_by_class_null) { WQ wq(0, 0); LQ strictq, normq; diff --git a/src/test/osd/CMakeLists.txt b/src/test/osd/CMakeLists.txt index 10b59cb513a46..037fa41759ee5 100644 --- a/src/test/osd/CMakeLists.txt +++ b/src/test/osd/CMakeLists.txt @@ -106,3 +106,25 @@ add_executable(unittest_ec_transaction ) add_ceph_unittest(unittest_ec_transaction ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/unittest_ec_transaction) target_link_libraries(unittest_ec_transaction osd global ${BLKID_LIBRARIES}) + +# unittest_mclock_op_class_queue +add_executable(unittest_mclock_op_class_queue + TestMClockOpClassQueue.cc +) +add_ceph_unittest(unittest_mclock_op_class_queue + ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/unittest_mclock_op_class_queue +) +target_link_libraries(unittest_mclock_op_class_queue + global osd dmclock +) + +# unittest_mclock_client_queue +add_executable(unittest_mclock_client_queue + TestMClockClientQueue.cc +) +add_ceph_unittest(unittest_mclock_client_queue + ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/unittest_mclock_client_queue +) +target_link_libraries(unittest_mclock_client_queue + global osd dmclock +) diff --git a/src/test/osd/TestMClockClientQueue.cc b/src/test/osd/TestMClockClientQueue.cc new file mode 100644 index 0000000000000..69d2f38ddb23f --- /dev/null +++ b/src/test/osd/TestMClockClientQueue.cc @@ -0,0 +1,187 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- + +#include + +#include "gtest/gtest.h" +#include "global/global_init.h" +#include "common/common_init.h" + +#include "osd/mClockClientQueue.h" + + +int main(int argc, char **argv) { + std::vector args(argv, argv+argc); + auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_OSD, + CODE_ENVIRONMENT_UTILITY, + CINIT_FLAG_NO_DEFAULT_CONFIG_FILE); + common_init_finish(g_ceph_context); + + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + + +class MClockClientQueueTest : public testing::Test { +public: + mClockClientQueue q; + + entity_inst_t client1; + entity_inst_t client2; + entity_inst_t client3; + + MClockClientQueueTest() : + q(g_ceph_context), + client1(entity_name_t(CEPH_ENTITY_TYPE_OSD, 1), entity_addr_t()), + client2(entity_name_t(CEPH_ENTITY_TYPE_OSD, 2), entity_addr_t()), + client3(entity_name_t(CEPH_ENTITY_TYPE_CLIENT, 1), entity_addr_t()) + {} + +#if 0 // more work needed here + Request create_client_op(epoch_t e, const entity_inst_t& owner) { + return Request(spg_t(), PGQueueable(OpRequestRef(), e)); + } +#endif + + Request create_snaptrim(epoch_t e, const entity_inst_t& owner) { + return Request(spg_t(), + PGQueueable(PGSnapTrim(e), + 12, 12, + utime_t(), owner, e)); + } + + Request create_scrub(epoch_t e, const entity_inst_t& owner) { + return Request(spg_t(), + PGQueueable(PGScrub(e), + 12, 12, + utime_t(), owner, e)); + } + + Request create_recovery(epoch_t e, const entity_inst_t& owner) { + return Request(spg_t(), + PGQueueable(PGRecovery(e, 64), + 12, 12, + utime_t(), owner, e)); + } +}; + + +TEST_F(MClockClientQueueTest, TestSize) { + ASSERT_TRUE(q.empty()); + ASSERT_EQ(0u, q.length()); + + q.enqueue(client1, 12, 0, create_snaptrim(100, client1)); + q.enqueue_strict(client2, 12, create_snaptrim(101, client2)); + q.enqueue(client2, 12, 0, create_snaptrim(102, client2)); + q.enqueue_strict(client3, 12, create_snaptrim(103, client3)); + q.enqueue(client1, 12, 0, create_snaptrim(104, client1)); + + ASSERT_FALSE(q.empty()); + ASSERT_EQ(5u, q.length()); + + std::list reqs; + + reqs.push_back(q.dequeue()); + reqs.push_back(q.dequeue()); + reqs.push_back(q.dequeue()); + + ASSERT_FALSE(q.empty()); + ASSERT_EQ(2u, q.length()); + + q.enqueue_front(client2, 12, 0, reqs.back()); + reqs.pop_back(); + + q.enqueue_strict_front(client3, 12, reqs.back()); + reqs.pop_back(); + + q.enqueue_strict_front(client2, 12, reqs.back()); + reqs.pop_back(); + + ASSERT_FALSE(q.empty()); + ASSERT_EQ(5u, q.length()); + + for (int i = 0; i < 5; ++i) { + (void) q.dequeue(); + } + + ASSERT_TRUE(q.empty()); + ASSERT_EQ(0u, q.length()); +} + + +TEST_F(MClockClientQueueTest, TestEnqueue) { + q.enqueue(client1, 12, 0, create_snaptrim(100, client1)); + q.enqueue(client2, 12, 0, create_snaptrim(101, client2)); + q.enqueue(client2, 12, 0, create_snaptrim(102, client2)); + q.enqueue(client3, 12, 0, create_snaptrim(103, client3)); + q.enqueue(client1, 12, 0, create_snaptrim(104, client1)); + + Request r = q.dequeue(); + ASSERT_EQ(100u, r.second.get_map_epoch()); + + r = q.dequeue(); + ASSERT_EQ(101u, r.second.get_map_epoch()); + + r = q.dequeue(); + ASSERT_EQ(103u, r.second.get_map_epoch()); + + r = q.dequeue(); + ASSERT_TRUE(r.second.get_map_epoch() == 102u || + r.second.get_map_epoch() == 104u); + + r = q.dequeue(); + ASSERT_TRUE(r.second.get_map_epoch() == 102u || + r.second.get_map_epoch() == 104u); +} + + +TEST_F(MClockClientQueueTest, TestEnqueueStrict) { + q.enqueue_strict(client1, 12, create_snaptrim(100, client1)); + q.enqueue_strict(client2, 13, create_snaptrim(101, client2)); + q.enqueue_strict(client2, 16, create_snaptrim(102, client2)); + q.enqueue_strict(client3, 14, create_snaptrim(103, client3)); + q.enqueue_strict(client1, 15, create_snaptrim(104, client1)); + + Request r = q.dequeue(); + ASSERT_EQ(102u, r.second.get_map_epoch()); + + r = q.dequeue(); + ASSERT_EQ(104u, r.second.get_map_epoch()); + + r = q.dequeue(); + ASSERT_EQ(103u, r.second.get_map_epoch()); + + r = q.dequeue(); + ASSERT_EQ(101u, r.second.get_map_epoch()); + + r = q.dequeue(); + ASSERT_EQ(100u, r.second.get_map_epoch()); +} + + +TEST_F(MClockClientQueueTest, TestRemoveByClass) { + q.enqueue(client1, 12, 0, create_snaptrim(100, client1)); + q.enqueue_strict(client2, 12, create_snaptrim(101, client2)); + q.enqueue(client2, 12, 0, create_snaptrim(102, client2)); + q.enqueue_strict(client3, 12, create_snaptrim(103, client3)); + q.enqueue(client1, 12, 0, create_snaptrim(104, client1)); + + std::list filtered_out; + q.remove_by_class(client2, &filtered_out); + + ASSERT_EQ(2u, filtered_out.size()); + while (!filtered_out.empty()) { + auto e = filtered_out.front().second.get_map_epoch() ; + ASSERT_TRUE(e == 101 || e == 102); + filtered_out.pop_front(); + } + + ASSERT_EQ(3u, q.length()); + Request r = q.dequeue(); + ASSERT_EQ(103u, r.second.get_map_epoch()); + + r = q.dequeue(); + ASSERT_EQ(100u, r.second.get_map_epoch()); + + r = q.dequeue(); + ASSERT_EQ(104u, r.second.get_map_epoch()); +} diff --git a/src/test/osd/TestMClockOpClassQueue.cc b/src/test/osd/TestMClockOpClassQueue.cc new file mode 100644 index 0000000000000..b524451479093 --- /dev/null +++ b/src/test/osd/TestMClockOpClassQueue.cc @@ -0,0 +1,187 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- + +#include + +#include "gtest/gtest.h" + +#include "global/global_context.h" +#include "global/global_init.h" +#include "common/common_init.h" + +#include "osd/mClockOpClassQueue.h" + + +int main(int argc, char **argv) { + std::vector args(argv, argv+argc); + auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_OSD, + CODE_ENVIRONMENT_UTILITY, + CINIT_FLAG_NO_DEFAULT_CONFIG_FILE); + common_init_finish(g_ceph_context); + + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + + +class MClockOpClassQueueTest : public testing::Test { +public: + mClockOpClassQueue q; + + entity_inst_t client1; + entity_inst_t client2; + entity_inst_t client3; + + MClockOpClassQueueTest() : + q(g_ceph_context), + client1(entity_name_t(CEPH_ENTITY_TYPE_OSD, 1), entity_addr_t()), + client2(entity_name_t(CEPH_ENTITY_TYPE_OSD, 2), entity_addr_t()), + client3(entity_name_t(CEPH_ENTITY_TYPE_CLIENT, 1), entity_addr_t()) + {} + +#if 0 // more work needed here + Request create_client_op(epoch_t e, const entity_inst_t& owner) { + return Request(spg_t(), PGQueueable(OpRequestRef(), e)); + } +#endif + + Request create_snaptrim(epoch_t e, const entity_inst_t& owner) { + return Request(spg_t(), + PGQueueable(PGSnapTrim(e), + 12, 12, + utime_t(), owner, e)); + } + + Request create_scrub(epoch_t e, const entity_inst_t& owner) { + return Request(spg_t(), + PGQueueable(PGScrub(e), + 12, 12, + utime_t(), owner, e)); + } + + Request create_recovery(epoch_t e, const entity_inst_t& owner) { + return Request(spg_t(), + PGQueueable(PGRecovery(e, 64), + 12, 12, + utime_t(), owner, e)); + } +}; + + +TEST_F(MClockOpClassQueueTest, TestSize) { + ASSERT_TRUE(q.empty()); + ASSERT_EQ(0u, q.length()); + + q.enqueue(client1, 12, 0, create_snaptrim(100, client1)); + q.enqueue_strict(client2, 12, create_snaptrim(101, client2)); + q.enqueue(client2, 12, 0, create_snaptrim(102, client2)); + q.enqueue_strict(client3, 12, create_snaptrim(103, client3)); + q.enqueue(client1, 12, 0, create_snaptrim(104, client1)); + + ASSERT_FALSE(q.empty()); + ASSERT_EQ(5u, q.length()); + + std::list reqs; + + reqs.push_back(q.dequeue()); + reqs.push_back(q.dequeue()); + reqs.push_back(q.dequeue()); + + ASSERT_FALSE(q.empty()); + ASSERT_EQ(2u, q.length()); + + q.enqueue_front(client2, 12, 0, reqs.back()); + reqs.pop_back(); + + q.enqueue_strict_front(client3, 12, reqs.back()); + reqs.pop_back(); + + q.enqueue_strict_front(client2, 12, reqs.back()); + reqs.pop_back(); + + ASSERT_FALSE(q.empty()); + ASSERT_EQ(5u, q.length()); + + for (int i = 0; i < 5; ++i) { + (void) q.dequeue(); + } + + ASSERT_TRUE(q.empty()); + ASSERT_EQ(0u, q.length()); +} + + +TEST_F(MClockOpClassQueueTest, TestEnqueue) { + q.enqueue(client1, 12, 0, create_snaptrim(100, client1)); + q.enqueue(client2, 12, 0, create_snaptrim(101, client2)); + q.enqueue(client2, 12, 0, create_snaptrim(102, client2)); + q.enqueue(client3, 12, 0, create_snaptrim(103, client3)); + q.enqueue(client1, 12, 0, create_snaptrim(104, client1)); + + Request r = q.dequeue(); + ASSERT_EQ(100u, r.second.get_map_epoch()); + + r = q.dequeue(); + ASSERT_EQ(101u, r.second.get_map_epoch()); + + r = q.dequeue(); + ASSERT_EQ(102u, r.second.get_map_epoch()); + + r = q.dequeue(); + ASSERT_EQ(103u, r.second.get_map_epoch()); + + r = q.dequeue(); + ASSERT_EQ(104u, r.second.get_map_epoch()); +} + + +TEST_F(MClockOpClassQueueTest, TestEnqueueStrict) { + q.enqueue_strict(client1, 12, create_snaptrim(100, client1)); + q.enqueue_strict(client2, 13, create_snaptrim(101, client2)); + q.enqueue_strict(client2, 16, create_snaptrim(102, client2)); + q.enqueue_strict(client3, 14, create_snaptrim(103, client3)); + q.enqueue_strict(client1, 15, create_snaptrim(104, client1)); + + Request r = q.dequeue(); + ASSERT_EQ(102u, r.second.get_map_epoch()); + + r = q.dequeue(); + ASSERT_EQ(104u, r.second.get_map_epoch()); + + r = q.dequeue(); + ASSERT_EQ(103u, r.second.get_map_epoch()); + + r = q.dequeue(); + ASSERT_EQ(101u, r.second.get_map_epoch()); + + r = q.dequeue(); + ASSERT_EQ(100u, r.second.get_map_epoch()); +} + + +TEST_F(MClockOpClassQueueTest, TestRemoveByClass) { + q.enqueue(client1, 12, 0, create_snaptrim(100, client1)); + q.enqueue_strict(client2, 12, create_snaptrim(101, client2)); + q.enqueue(client2, 12, 0, create_snaptrim(102, client2)); + q.enqueue_strict(client3, 12, create_snaptrim(103, client3)); + q.enqueue(client1, 12, 0, create_snaptrim(104, client1)); + + std::list filtered_out; + q.remove_by_class(client2, &filtered_out); + + ASSERT_EQ(2u, filtered_out.size()); + while (!filtered_out.empty()) { + auto e = filtered_out.front().second.get_map_epoch() ; + ASSERT_TRUE(e == 101 || e == 102); + filtered_out.pop_front(); + } + + ASSERT_EQ(3u, q.length()); + Request r = q.dequeue(); + ASSERT_EQ(103u, r.second.get_map_epoch()); + + r = q.dequeue(); + ASSERT_EQ(100u, r.second.get_map_epoch()); + + r = q.dequeue(); + ASSERT_EQ(104u, r.second.get_map_epoch()); +}