Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rbd-replay: remove boost dependency #21202

Merged
merged 1 commit into from
Apr 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
66 changes: 34 additions & 32 deletions src/rbd_replay/Replayer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

#include "Replayer.hpp"
#include "common/errno.h"
#include "include/scope_guard.h"
#include "rbd_replay/ActionTypes.h"
#include "rbd_replay/BufferReader.h"
#include <boost/foreach.hpp>
#include <boost/thread/thread.hpp>
#include <boost/scope_exit.hpp>
#include <chrono>
#include <condition_variable>
#include <thread>
#include <fstream>
#include "global/global_context.h"
#include "rbd_replay_debug.hpp"
Expand Down Expand Up @@ -58,7 +60,7 @@ Worker::Worker(Replayer &replayer)
}

void Worker::start() {
m_thread = boost::shared_ptr<boost::thread>(new boost::thread(boost::bind(&Worker::run, this)));
m_thread = std::make_shared<std::thread>(&Worker::run, this);
}

// Should only be called by StopThreadAction
Expand All @@ -77,7 +79,7 @@ void Worker::send(Action::ptr action) {

void Worker::add_pending(PendingIO::ptr io) {
assert(io);
boost::mutex::scoped_lock lock(m_pending_ios_mutex);
std::scoped_lock lock{m_pending_ios_mutex};
assertf(m_pending_ios.count(io->id()) == 0, "id = %d", io->id());
m_pending_ios[io->id()] = io;
}
Expand All @@ -92,7 +94,7 @@ void Worker::run() {
m_replayer.set_action_complete(action->id());
}
{
boost::mutex::scoped_lock lock(m_pending_ios_mutex);
std::unique_lock lock{m_pending_ios_mutex};
bool first_time = true;
while (!m_pending_ios.empty()) {
if (!first_time) {
Expand All @@ -102,7 +104,7 @@ void Worker::run() {
dout(THREAD_LEVEL) << "> " << p.first << dendl;
}
}
m_pending_ios_empty.timed_wait(lock, boost::posix_time::seconds(1));
m_pending_ios_empty.wait_for(lock, std::chrono::seconds(1));
first_time = false;
}
}
Expand All @@ -113,7 +115,7 @@ void Worker::run() {
void Worker::remove_pending(PendingIO::ptr io) {
assert(io);
m_replayer.set_action_complete(io->id());
boost::mutex::scoped_lock lock(m_pending_ios_mutex);
std::scoped_lock lock{m_pending_ios_mutex};
size_t num_erased = m_pending_ios.erase(io->id());
assertf(num_erased == 1, "id = %d", io->id());
if (m_pending_ios.empty()) {
Expand Down Expand Up @@ -218,9 +220,7 @@ void Replayer::run(const std::string& replay_file) {
<< cpp_strerror(errno) << std::endl;
exit(1);
}
BOOST_SCOPE_EXIT( (fd) ) {
close(fd);
} BOOST_SCOPE_EXIT_END;
auto close_fd = make_scope_guard([fd] { close(fd); });

BufferReader buffer_reader(fd);
bool versioned = is_versioned_replay(buffer_reader);
Expand Down Expand Up @@ -283,19 +283,19 @@ void Replayer::run(const std::string& replay_file) {


librbd::Image* Replayer::get_image(imagectx_id_t imagectx_id) {
boost::shared_lock<boost::shared_mutex> lock(m_images_mutex);
std::scoped_lock lock{m_images_mutex};
return m_images[imagectx_id];
}

void Replayer::put_image(imagectx_id_t imagectx_id, librbd::Image *image) {
assert(image);
boost::unique_lock<boost::shared_mutex> lock(m_images_mutex);
std::unique_lock lock{m_images_mutex};
assert(m_images.count(imagectx_id) == 0);
m_images[imagectx_id] = image;
}

void Replayer::erase_image(imagectx_id_t imagectx_id) {
boost::unique_lock<boost::shared_mutex> lock(m_images_mutex);
std::unique_lock lock{m_images_mutex};
librbd::Image* image = m_images[imagectx_id];
if (m_dump_perf_counters) {
string command = "perf dump";
Expand All @@ -313,55 +313,59 @@ void Replayer::erase_image(imagectx_id_t imagectx_id) {

void Replayer::set_action_complete(action_id_t id) {
dout(DEPGRAPH_LEVEL) << "ActionTracker::set_complete(" << id << ")" << dendl;
boost::system_time now(boost::get_system_time());
auto now = std::chrono::system_clock::now();
action_tracker_d &tracker = tracker_for(id);
boost::unique_lock<boost::shared_mutex> lock(tracker.mutex);
std::unique_lock lock{tracker.mutex};
assert(tracker.actions.count(id) == 0);
tracker.actions[id] = now;
tracker.condition.notify_all();
}

bool Replayer::is_action_complete(action_id_t id) {
action_tracker_d &tracker = tracker_for(id);
boost::shared_lock<boost::shared_mutex> lock(tracker.mutex);
std::shared_lock lock{tracker.mutex};
return tracker.actions.count(id) > 0;
}

void Replayer::wait_for_actions(const action::Dependencies &deps) {
boost::posix_time::ptime release_time(boost::posix_time::neg_infin);
BOOST_FOREACH(const action::Dependency &dep, deps) {
auto release_time = std::chrono::time_point<std::chrono::system_clock>::min();
for(auto& dep : deps) {
dout(DEPGRAPH_LEVEL) << "Waiting for " << dep.id << dendl;
boost::system_time start_time(boost::get_system_time());
auto start_time = std::chrono::system_clock::now();
action_tracker_d &tracker = tracker_for(dep.id);
boost::shared_lock<boost::shared_mutex> lock(tracker.mutex);
std::unique_lock lock{tracker.mutex};
bool first_time = true;
while (tracker.actions.count(dep.id) == 0) {
if (!first_time) {
dout(DEPGRAPH_LEVEL) << "Still waiting for " << dep.id << dendl;
}
tracker.condition.timed_wait(lock, boost::posix_time::seconds(1));
tracker.condition.wait_for(lock, std::chrono::seconds(1));
first_time = false;
}
boost::system_time action_completed_time(tracker.actions[dep.id]);
auto action_completed_time(tracker.actions[dep.id]);
lock.unlock();
boost::system_time end_time(boost::get_system_time());
long long micros = (end_time - start_time).total_microseconds();
auto end_time = std::chrono::system_clock::now();
auto micros = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count();
dout(DEPGRAPH_LEVEL) << "Finished waiting for " << dep.id << " after " << micros << " microseconds" << dendl;
// Apparently the nanoseconds constructor is optional:
// http://www.boost.org/doc/libs/1_46_0/doc/html/date_time/details.html#compile_options
boost::system_time sub_release_time(action_completed_time + boost::posix_time::microseconds(dep.time_delta * m_latency_multiplier / 1000));
auto sub_release_time{action_completed_time +
std::chrono::microseconds{static_cast<long long>(dep.time_delta * m_latency_multiplier / 1000)}};
if (sub_release_time > release_time) {
release_time = sub_release_time;
}
}
if (release_time > boost::get_system_time()) {
dout(SLEEP_LEVEL) << "Sleeping for " << (release_time - boost::get_system_time()).total_microseconds() << " microseconds" << dendl;
boost::this_thread::sleep(release_time);
if (release_time > std::chrono::system_clock::now()) {
auto sleep_for = release_time - std::chrono::system_clock::now();
dout(SLEEP_LEVEL) << "Sleeping for "
<< std::chrono::duration_cast<std::chrono::microseconds>(sleep_for).count()
<< " microseconds" << dendl;
std::this_thread::sleep_until(release_time);
}
}

void Replayer::clear_images() {
boost::unique_lock<boost::shared_mutex> lock(m_images_mutex);
std::shared_lock lock{m_images_mutex};
if (m_dump_perf_counters && !m_images.empty()) {
string command = "perf dump";
cmdmap_t cmdmap;
Expand All @@ -370,10 +374,8 @@ void Replayer::clear_images() {
g_ceph_context->do_command(command, cmdmap, format, &out);
out.write_stream(cout);
cout << std::endl;
cout.flush();
}
pair<imagectx_id_t, librbd::Image*> p;
BOOST_FOREACH(p, m_images) {
for (auto& p : m_images) {
delete p.second;
}
m_images.clear();
Expand Down
20 changes: 11 additions & 9 deletions src/rbd_replay/Replayer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
#ifndef _INCLUDED_RBD_REPLAY_REPLAYER_HPP
#define _INCLUDED_RBD_REPLAY_REPLAYER_HPP

#include <boost/thread/mutex.hpp>
#include <boost/thread/shared_mutex.hpp>
#include <chrono>
#include <mutex>
#include <thread>
#include <condition_variable>
#include "rbd_replay/ActionTypes.h"
#include "BoundedBuffer.hpp"
#include "ImageNameMap.hpp"
Expand Down Expand Up @@ -67,10 +69,10 @@ class Worker : public ActionCtx {

Replayer &m_replayer;
BoundedBuffer<Action::ptr> m_buffer;
boost::shared_ptr<boost::thread> m_thread;
std::shared_ptr<std::thread> m_thread;
std::map<action_id_t, PendingIO::ptr> m_pending_ios;
boost::mutex m_pending_ios_mutex;
boost::condition m_pending_ios_empty;
std::mutex m_pending_ios_mutex;
std::condition_variable_any m_pending_ios_empty;
bool m_done;
};

Expand Down Expand Up @@ -128,9 +130,9 @@ class Replayer {
private:
struct action_tracker_d {
/// Maps an action ID to the time the action completed
std::map<action_id_t, boost::system_time> actions;
boost::shared_mutex mutex;
boost::condition condition;
std::map<action_id_t, std::chrono::system_clock::time_point> actions;
std::shared_mutex mutex;
std::condition_variable_any condition;
};

void clear_images();
Expand All @@ -151,7 +153,7 @@ class Replayer {
bool m_dump_perf_counters;

std::map<imagectx_id_t, librbd::Image*> m_images;
boost::shared_mutex m_images_mutex;
std::shared_mutex m_images_mutex;

/// Actions are hashed across the trackers by ID.
/// Number of trackers should probably be larger than the number of cores and prime.
Expand Down