Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ceph: remove atomic_t #14502

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/client/MetaRequest.h
Expand Up @@ -8,13 +8,14 @@
#include "include/types.h"
#include "include/xlist.h"
#include "include/filepath.h"
#include "include/atomic.h"
#include "mds/mdstypes.h"
#include "InodeRef.h"
#include "UserPerm.h"

#include "messages/MClientRequest.h"

#include <atomic>

class MClientReply;
class Dentry;
class dir_result_t;
Expand Down Expand Up @@ -48,7 +49,7 @@ struct MetaRequest {
__u32 sent_on_mseq; // mseq at last submission of this request
int num_fwd; // # of times i've been forwarded
int retry_attempt;
atomic_t ref;
std::atomic<unsigned> ref { 0 };

MClientReply *reply; // the reply
bool kick;
Expand Down Expand Up @@ -153,13 +154,13 @@ struct MetaRequest {
Dentry *old_dentry();

MetaRequest* get() {
ref.inc();
ref++;
return this;
}

/// psuedo-private put method; use Client::put_request()
bool _put() {
int v = ref.dec();
int v = --ref;
return v == 0;
}

Expand Down
7 changes: 4 additions & 3 deletions src/client/hypertable/CephBroker.cc
Expand Up @@ -38,10 +38,11 @@
#include <sys/types.h>
#include <sys/uio.h>
#include <unistd.h>
#include <atomic>

using namespace Hypertable;

atomic_t CephBroker::ms_next_fd = ATOMIC_INIT(0);
std::atomic<unsigned> CephBroker::ms_next_fd = { 0 };

/* A thread-safe version of strerror */
static std::string cpp_strerror(int err)
Expand Down Expand Up @@ -111,7 +112,7 @@ void CephBroker::open(ResponseCallbackOpen *cb, const char *fname,

make_abs_path(fname, abspath);

fd = atomic_inc_return(&ms_next_fd);
fd = ++ms_next_fd;

if ((ceph_fd = ceph_open(cmount, abspath.c_str(), O_RDONLY, 0)) < 0) {
report_error(cb, -ceph_fd);
Expand Down Expand Up @@ -141,7 +142,7 @@ void CephBroker::create(ResponseCallbackOpen *cb, const char *fname, uint32_t fl
HT_DEBUGF("create file='%s' flags=%u bufsz=%d replication=%d blksz=%lld",
fname, flags, bufsz, (int)replication, (Lld)blksz);

fd = atomic_inc_return(&ms_next_fd);
fd = ++ms_next_fd;

if (flags & Filesystem::OPEN_FLAG_OVERWRITE)
oflags = O_WRONLY | O_CREAT | O_TRUNC;
Expand Down
5 changes: 3 additions & 2 deletions src/client/hypertable/CephBroker.h
Expand Up @@ -29,13 +29,14 @@ extern "C" {
}

#include "Common/String.h"
#include "Common/atomic.h"
#include "Common/Properties.h"

#include "DfsBroker/Lib/Broker.h"

#include <cephfs/libcephfs.h>

#include <atomic>

namespace Hypertable {
using namespace DfsBroker;
/**
Expand Down Expand Up @@ -97,7 +98,7 @@ namespace Hypertable {

private:
struct ceph_mount_info *cmount;
static atomic_t ms_next_fd;
static std::atomic<unsigned> ms_next_fd { 0 };

virtual void report_error(ResponseCallback *cb, int error);

Expand Down
4 changes: 4 additions & 0 deletions src/common/Finisher.cc
Expand Up @@ -5,6 +5,10 @@
#include "Finisher.h"

#include "common/debug.h"

// re-include our assert to clobber the system one; fix dout:
#include "include/assert.h"

#define dout_subsys ceph_subsys_finisher
#undef dout_prefix
#define dout_prefix *_dout << "finisher(" << this << ") "
Expand Down
1 change: 0 additions & 1 deletion src/common/Finisher.h
Expand Up @@ -15,7 +15,6 @@
#ifndef CEPH_FINISHER_H
#define CEPH_FINISHER_H

#include "include/atomic.h"
#include "common/Mutex.h"
#include "common/Cond.h"
#include "common/Thread.h"
Expand Down
22 changes: 11 additions & 11 deletions src/common/HeartbeatMap.cc
Expand Up @@ -76,13 +76,13 @@ bool HeartbeatMap::_check(const heartbeat_handle_d *h, const char *who, time_t n
bool healthy = true;
time_t was;

was = h->timeout.read();
was = h->timeout;
if (was && was < now) {
ldout(m_cct, 1) << who << " '" << h->name << "'"
<< " had timed out after " << h->grace << dendl;
healthy = false;
}
was = h->suicide_timeout.read();
was = h->suicide_timeout;
if (was && was < now) {
ldout(m_cct, 1) << who << " '" << h->name << "'"
<< " had suicide timed out after " << h->suicide_grace << dendl;
Expand All @@ -100,13 +100,13 @@ void HeartbeatMap::reset_timeout(heartbeat_handle_d *h, time_t grace, time_t sui
time_t now = time(NULL);
_check(h, "reset_timeout", now);

h->timeout.set(now + grace);
h->timeout = now + grace;
h->grace = grace;

if (suicide_grace)
h->suicide_timeout.set(now + suicide_grace);
h->suicide_timeout = now + suicide_grace;
else
h->suicide_timeout.set(0);
h->suicide_timeout = 0;
h->suicide_grace = suicide_grace;
}

Expand All @@ -115,8 +115,8 @@ void HeartbeatMap::clear_timeout(heartbeat_handle_d *h)
ldout(m_cct, 20) << "clear_timeout '" << h->name << "'" << dendl;
time_t now = time(NULL);
_check(h, "clear_timeout", now);
h->timeout.set(0);
h->suicide_timeout.set(0);
h->timeout = 0;
h->suicide_timeout = 0;
}

bool HeartbeatMap::is_healthy()
Expand Down Expand Up @@ -149,8 +149,8 @@ bool HeartbeatMap::is_healthy()
}
m_rwlock.put_read();

m_unhealthy_workers.set(unhealthy);
m_total_workers.set(total);
m_unhealthy_workers = unhealthy;
m_total_workers = total;

ldout(m_cct, 20) << "is_healthy = " << (healthy ? "healthy" : "NOT HEALTHY")
<< ", total workers: " << total << ", number of unhealthy: " << unhealthy << dendl;
Expand All @@ -159,12 +159,12 @@ bool HeartbeatMap::is_healthy()

int HeartbeatMap::get_unhealthy_workers() const
{
return m_unhealthy_workers.read();
return m_unhealthy_workers;
}

int HeartbeatMap::get_total_workers() const
{
return m_total_workers.read();
return m_total_workers;
}

void HeartbeatMap::check_touch_file()
Expand Down
9 changes: 4 additions & 5 deletions src/common/HeartbeatMap.h
Expand Up @@ -17,12 +17,11 @@

#include <pthread.h>

#include <atomic>
#include <string>
#include <list>
#include <time.h>

#include "include/atomic.h"

#include "RWLock.h"

class CephContext;
Expand All @@ -43,7 +42,7 @@ namespace ceph {
struct heartbeat_handle_d {
const std::string name;
pthread_t thread_id;
atomic_t timeout, suicide_timeout;
std::atomic<unsigned> timeout = { 0 }, suicide_timeout = { 0 };
time_t grace, suicide_grace;
std::list<heartbeat_handle_d*>::iterator list_item;

Expand Down Expand Up @@ -83,8 +82,8 @@ class HeartbeatMap {
RWLock m_rwlock;
time_t m_inject_unhealthy_until;
std::list<heartbeat_handle_d*> m_workers;
atomic_t m_unhealthy_workers;
atomic_t m_total_workers;
std::atomic<unsigned> m_unhealthy_workers = { 0 };
std::atomic<unsigned> m_total_workers = { 0 };

bool _check(const heartbeat_handle_d *h, const char *who, time_t now);
};
Expand Down
3 changes: 3 additions & 0 deletions src/common/OutputDataSocket.cc
Expand Up @@ -41,6 +41,9 @@

#include "include/compat.h"

// re-include our assert to clobber the system one; fix dout:
#include "include/assert.h"

#define dout_subsys ceph_subsys_asok
#undef dout_prefix
#define dout_prefix *_dout << "asok(" << (void*)m_cct << ") "
Expand Down
14 changes: 7 additions & 7 deletions src/common/QueueRing.h
@@ -1,12 +1,12 @@
#ifndef QUEUE_RING_H
#define QUEUE_RING_H

#include <list>
#include <vector>
#include "common/Mutex.h"
#include "common/Cond.h"


#include <list>
#include <atomic>
#include <vector>

template <class T>
class QueueRing {
Expand Down Expand Up @@ -43,18 +43,18 @@ class QueueRing {

std::vector<QueueBucket> buckets;
int num_buckets;
atomic_t cur_read_bucket;
atomic_t cur_write_bucket;
std::atomic<unsigned> cur_read_bucket = { 0 };
std::atomic<unsigned> cur_write_bucket = { 0 };
public:
QueueRing(int n) : buckets(n), num_buckets(n) {
}

void enqueue(const T& entry) {
buckets[cur_write_bucket.inc() % num_buckets].enqueue(entry);
buckets[++cur_write_bucket % num_buckets].enqueue(entry);
};

void dequeue(T *entry) {
buckets[cur_read_bucket.inc() % num_buckets].dequeue(entry);
buckets[++cur_read_bucket % num_buckets].dequeue(entry);
}
};

Expand Down
25 changes: 13 additions & 12 deletions src/common/RWLock.h
Expand Up @@ -21,15 +21,16 @@
#include <string>
#include <include/assert.h>
#include "lockdep.h"
#include "include/atomic.h"
#include "common/valgrind.h"

#include <atomic>

class RWLock final
{
mutable pthread_rwlock_t L;
std::string name;
mutable int id;
mutable atomic_t nrlock, nwlock;
mutable std::atomic<unsigned> nrlock = { 0 }, nwlock = { 0 };
bool track, lockdep;

std::string unique_name(const char* name) const;
Expand Down Expand Up @@ -65,12 +66,12 @@ class RWLock final

bool is_locked() const {
assert(track);
return (nrlock.read() > 0) || (nwlock.read() > 0);
return (nrlock > 0) || (nwlock > 0);
}

bool is_wlocked() const {
assert(track);
return (nwlock.read() > 0);
return (nwlock > 0);
}
~RWLock() {
// The following check is racy but we are about to destroy
Expand All @@ -85,11 +86,11 @@ class RWLock final

void unlock(bool lockdep=true) const {
if (track) {
if (nwlock.read() > 0) {
nwlock.dec();
if (nwlock > 0) {
nwlock--;
} else {
assert(nrlock.read() > 0);
nrlock.dec();
assert(nrlock > 0);
nrlock--;
}
}
if (lockdep && this->lockdep && g_lockdep)
Expand All @@ -105,12 +106,12 @@ class RWLock final
assert(r == 0);
if (lockdep && g_lockdep) id = lockdep_locked(name.c_str(), id);
if (track)
nrlock.inc();
nrlock++;
}
bool try_get_read() const {
if (pthread_rwlock_tryrdlock(&L) == 0) {
if (track)
nrlock.inc();
nrlock++;
if (lockdep && g_lockdep) id = lockdep_locked(name.c_str(), id);
return true;
}
Expand All @@ -129,15 +130,15 @@ class RWLock final
if (lockdep && this->lockdep && g_lockdep)
id = lockdep_locked(name.c_str(), id);
if (track)
nwlock.inc();
nwlock++;

}
bool try_get_write(bool lockdep=true) {
if (pthread_rwlock_trywrlock(&L) == 0) {
if (lockdep && this->lockdep && g_lockdep)
id = lockdep_locked(name.c_str(), id);
if (track)
nwlock.inc();
nwlock++;
return true;
}
return false;
Expand Down