Skip to content

Commit

Permalink
Merge pull request #14839 from chardan/jfw-wip-halflife_atomic_t-ebirah
Browse files Browse the repository at this point in the history
rgw: migrate atomic_t to std::atomic<> (ebirah)

Reviewed-by: Casey Bodley <cbodley@redhat.com>
  • Loading branch information
cbodley committed May 1, 2017
2 parents 0e30e3e + 0aefb5a commit 1e0b919
Show file tree
Hide file tree
Showing 31 changed files with 128 additions and 88 deletions.
5 changes: 5 additions & 0 deletions src/common/RefCountedObj.h
Expand Up @@ -21,6 +21,11 @@
#include "common/ceph_context.h"
#include "common/valgrind.h"

#include <atomic>

// re-include our assert to clobber the system one; fix dout:
#include "include/assert.h"

struct RefCountedObject {
private:
mutable atomic_t nref;
Expand Down
4 changes: 2 additions & 2 deletions src/rgw/rgw_bucket.cc
Expand Up @@ -1950,11 +1950,11 @@ int RGWDataChangesLog::trim_entries(const real_time& start_time, const real_time

bool RGWDataChangesLog::going_down()
{
return (down_flag.read() != 0);
return down_flag;
}

RGWDataChangesLog::~RGWDataChangesLog() {
down_flag.set(1);
down_flag = true;
renew_thread->stop();
renew_thread->join();
delete renew_thread;
Expand Down
2 changes: 1 addition & 1 deletion src/rgw/rgw_bucket.h
Expand Up @@ -404,7 +404,7 @@ class RGWDataChangesLog {
RWLock modified_lock;
map<int, set<string> > modified_shards;

atomic_t down_flag;
std::atomic<bool> down_flag = { false };

struct ChangeStatus {
real_time cur_expiration;
Expand Down
17 changes: 9 additions & 8 deletions src/rgw/rgw_coroutine.cc
@@ -1,10 +1,11 @@


#include "common/ceph_json.h"

#include "rgw_coroutine.h"
#include "rgw_boost_asio_yield.h"

// re-include our assert to clobber the system one; fix dout:
#include "include/assert.h"

#define dout_subsys ceph_subsys_rgw

Expand Down Expand Up @@ -68,7 +69,7 @@ int RGWCompletionManager::get_next(void **user_info)
Mutex::Locker l(lock);
while (complete_reqs.empty()) {
cond.Wait(lock);
if (going_down.read() != 0) {
if (going_down) {
return -ECANCELED;
}
}
Expand All @@ -94,7 +95,7 @@ void RGWCompletionManager::go_down()
for (auto cn : cns) {
cn->unregister();
}
going_down.set(1);
going_down = true;
cond.Signal();
}

Expand Down Expand Up @@ -460,7 +461,7 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
bool canceled = false; // set on going_down
RGWCoroutinesEnv env;

uint64_t run_context = run_context_count.inc();
uint64_t run_context = ++run_context_count;

lock.get_write();
set<RGWCoroutinesStack *>& context_stacks = run_contexts[run_context];
Expand All @@ -475,7 +476,7 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
env.manager = this;
env.scheduled_stacks = &scheduled_stacks;

for (list<RGWCoroutinesStack *>::iterator iter = scheduled_stacks.begin(); iter != scheduled_stacks.end() && !going_down.read();) {
for (list<RGWCoroutinesStack *>::iterator iter = scheduled_stacks.begin(); iter != scheduled_stacks.end() && !going_down;) {
lock.get_write();

RGWCoroutinesStack *stack = *iter;
Expand Down Expand Up @@ -566,7 +567,7 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
if (ret < 0) {
ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
}
if (going_down.read() > 0) {
if (going_down) {
ldout(cct, 5) << __func__ << "(): was stopped, exiting" << dendl;
ret = -ECANCELED;
canceled = true;
Expand All @@ -585,7 +586,7 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
}

lock.get_write();
if (!context_stacks.empty() && !going_down.read()) {
if (!context_stacks.empty() && !going_down) {
JSONFormatter formatter(true);
formatter.open_array_section("context_stacks");
for (auto& s : context_stacks) {
Expand All @@ -595,7 +596,7 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
lderr(cct) << __func__ << "(): ERROR: deadlock detected, dumping remaining coroutines:\n";
formatter.flush(*_dout);
*_dout << dendl;
assert(context_stacks.empty() || going_down.read()); // assert on deadlock
assert(context_stacks.empty() || going_down); // assert on deadlock
}

for (auto stack : context_stacks) {
Expand Down
11 changes: 7 additions & 4 deletions src/rgw/rgw_coroutine.h
Expand Up @@ -22,6 +22,8 @@
#include "rgw_common.h"
#include "rgw_boost_asio_coroutine.h"

#include <atomic>

#define RGW_ASYNC_OPS_MGR_WINDOW 100

class RGWCoroutinesStack;
Expand All @@ -39,7 +41,7 @@ class RGWCompletionManager : public RefCountedObject {

SafeTimer timer;

atomic_t going_down;
std::atomic<bool> going_down = { false };

map<void *, void *> waiters;

Expand Down Expand Up @@ -506,9 +508,9 @@ class RGWCoroutinesManagerRegistry : public RefCountedObject, public AdminSocket

class RGWCoroutinesManager {
CephContext *cct;
atomic_t going_down;
std::atomic<bool> going_down = { false };

atomic64_t run_context_count;
std::atomic<int64_t> run_context_count = { 0 };
map<uint64_t, set<RGWCoroutinesStack *> > run_contexts;

RWLock lock;
Expand Down Expand Up @@ -542,7 +544,8 @@ class RGWCoroutinesManager {
int run(list<RGWCoroutinesStack *>& ops);
int run(RGWCoroutine *op);
void stop() {
if (going_down.inc() == 1) {
bool expected = false;
if (going_down.compare_exchange_strong(expected, true)) {
completion_mgr->go_down();
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/rgw/rgw_cr_rados.cc
Expand Up @@ -65,7 +65,7 @@ void RGWAsyncRadosProcessor::start() {
}

void RGWAsyncRadosProcessor::stop() {
going_down.set(1);
going_down = true;
m_tp.drain(&req_wq);
m_tp.stop();
for (auto iter = m_req_queue.begin(); iter != m_req_queue.end(); ++iter) {
Expand Down Expand Up @@ -627,7 +627,7 @@ int RGWContinuousLeaseCR::operate()
return set_cr_done();
}
reenter(this) {
while (!going_down.read()) {
while (!going_down) {
yield call(new RGWSimpleRadosLockCR(async_rados, store, obj, lock_name, cookie, interval));

caller->set_sleeping(false); /* will only be relevant when we return, that's why we can do it early */
Expand Down
10 changes: 6 additions & 4 deletions src/rgw/rgw_cr_rados.h
Expand Up @@ -6,6 +6,8 @@
#include "common/WorkQueue.h"
#include "common/Throttle.h"

#include <atomic>

class RGWAsyncRadosRequest : public RefCountedObject {
RGWCoroutine *caller;
RGWAioCompletionNotifier *notifier;
Expand Down Expand Up @@ -57,7 +59,7 @@ class RGWAsyncRadosRequest : public RefCountedObject {

class RGWAsyncRadosProcessor {
deque<RGWAsyncRadosRequest *> m_req_queue;
atomic_t going_down;
std::atomic<bool> going_down = { false };
protected:
RGWRados *store;
ThreadPool m_tp;
Expand Down Expand Up @@ -91,7 +93,7 @@ class RGWAsyncRadosProcessor {
void queue(RGWAsyncRadosRequest *req);

bool is_going_down() {
return (going_down.read() != 0);
return going_down;
}
};

Expand Down Expand Up @@ -1014,7 +1016,7 @@ class RGWContinuousLeaseCR : public RGWCoroutine {
int interval;

Mutex lock;
atomic_t going_down;
std::atomic<bool> going_down = { false };
bool locked{false};

RGWCoroutine *caller;
Expand Down Expand Up @@ -1044,7 +1046,7 @@ class RGWContinuousLeaseCR : public RGWCoroutine {
}

void go_down() {
going_down.set(1);
going_down = true;
wakeup();
}

Expand Down
4 changes: 3 additions & 1 deletion src/rgw/rgw_file.cc
Expand Up @@ -27,6 +27,8 @@
#include "rgw_file.h"
#include "rgw_lib_frontend.h"

#include <atomic>

#define dout_subsys ceph_subsys_rgw

using namespace rgw;
Expand All @@ -37,7 +39,7 @@ namespace rgw {

const string RGWFileHandle::root_name = "/";

atomic<uint32_t> RGWLibFS::fs_inst_counter;
std::atomic<uint32_t> RGWLibFS::fs_inst_counter;

uint32_t RGWLibFS::write_completion_interval_s = 10;

Expand Down
2 changes: 1 addition & 1 deletion src/rgw/rgw_file.h
Expand Up @@ -745,7 +745,7 @@ namespace rgw {
RGWUserInfo user;
RGWAccessKey key; // XXXX acc_key

static atomic<uint32_t> fs_inst_counter;
static std::atomic<uint32_t> fs_inst_counter;

static uint32_t write_completion_interval_s;
std::string fsid;
Expand Down
4 changes: 2 additions & 2 deletions src/rgw/rgw_gc.cc
Expand Up @@ -253,7 +253,7 @@ int RGWGC::process()

bool RGWGC::going_down()
{
return (down_flag.read() != 0);
return down_flag;
}

void RGWGC::start_processor()
Expand All @@ -264,7 +264,7 @@ void RGWGC::start_processor()

void RGWGC::stop_processor()
{
down_flag.set(1);
down_flag = true;
if (worker) {
worker->stop();
worker->join();
Expand Down
5 changes: 3 additions & 2 deletions src/rgw/rgw_gc.h
Expand Up @@ -6,7 +6,6 @@


#include "include/types.h"
#include "include/atomic.h"
#include "include/rados/librados.hpp"
#include "common/Mutex.h"
#include "common/Cond.h"
Expand All @@ -15,12 +14,14 @@
#include "rgw_rados.h"
#include "cls/rgw/cls_rgw_types.h"

#include <atomic>

class RGWGC {
CephContext *cct;
RGWRados *store;
int max_objs;
string *obj_names;
atomic_t down_flag;
std::atomic<bool> down_flag = { false };

int tag_index(const string& tag);

Expand Down
16 changes: 9 additions & 7 deletions src/rgw/rgw_http_client.cc
Expand Up @@ -16,6 +16,8 @@

#include "rgw_coroutine.h"

#include <atomic>

#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw

Expand All @@ -24,7 +26,7 @@ struct rgw_http_req_data : public RefCountedObject {
curl_slist *h;
uint64_t id;
int ret;
atomic_t done;
std::atomic<bool> done = { false };
RGWHTTPClient *client;
void *user_info;
bool registered;
Expand Down Expand Up @@ -58,12 +60,12 @@ struct rgw_http_req_data : public RefCountedObject {

easy_handle = NULL;
h = NULL;
done.set(1);
done = true;
cond.Signal();
}

bool is_done() {
return done.read() != 0;
return done;
}

int get_retcode() {
Expand Down Expand Up @@ -900,14 +902,14 @@ int RGWHTTPManager::set_threaded()

void RGWHTTPManager::stop()
{
if (is_stopped.read()) {
if (is_stopped) {
return;
}

is_stopped.set(1);
is_stopped = true;

if (is_threaded) {
going_down.set(1);
going_down = true;
signal_thread();
reqs_thread->join();
delete reqs_thread;
Expand Down Expand Up @@ -935,7 +937,7 @@ void *RGWHTTPManager::reqs_thread_entry()

ldout(cct, 20) << __func__ << ": start" << dendl;

while (!going_down.read()) {
while (!going_down) {
int ret = do_curl_wait(cct, (CURLM *)multi_handle, thread_pipe[0]);
if (ret < 0) {
dout(0) << "ERROR: do_curl_wait() returned: " << ret << dendl;
Expand Down
9 changes: 5 additions & 4 deletions src/rgw/rgw_http_client.h
Expand Up @@ -6,10 +6,11 @@

#include "common/RWLock.h"
#include "common/Cond.h"
#include "include/atomic.h"
#include "rgw_common.h"
#include "rgw_string.h"

#include <atomic>

using param_pair_t = pair<string, string>;
using param_vec_t = vector<param_pair_t>;

Expand All @@ -33,7 +34,7 @@ class RGWHTTPClient
string last_url;
bool verify_ssl; // Do not validate self signed certificates, default to false

atomic_t stopped;
std::atomic<unsigned> stopped { 0 };

protected:
CephContext *cct;
Expand Down Expand Up @@ -219,8 +220,8 @@ class RGWHTTPManager {
RGWCompletionManager *completion_mgr;
void *multi_handle;
bool is_threaded;
atomic_t going_down;
atomic_t is_stopped;
std::atomic<unsigned> going_down { 0 };
std::atomic<unsigned> is_stopped { 0 };

RWLock reqs_lock;
map<uint64_t, rgw_http_req_data *> reqs;
Expand Down
2 changes: 1 addition & 1 deletion src/rgw/rgw_keystone.cc
Expand Up @@ -613,7 +613,7 @@ int TokenCache::RevokeThread::check_revoked()

bool TokenCache::going_down() const
{
return (down_flag.read() != 0);
return down_flag;
}

void* TokenCache::RevokeThread::entry()
Expand Down

0 comments on commit 1e0b919

Please sign in to comment.