Skip to content

Commit

Permalink
rcu: add basic read-copy-update implementation
Browse files Browse the repository at this point in the history
This adds fairly basic support for rcu.

Declaring:

   mutex mtx;
   rcu_ptr<my_object> my_ptr;

Read-side:

   WITH_LOCK(rcu_read_lock) {
      const my_object* p = my_ptr.read();
      // do things with *p
      // but don't block!
   }

Write-side:

  WITH_LOCK(mtx) {
    my_object* old = my_ptr.read_by_owner();
    my_object* p = new my_object;
    // ...
    my_ptr.assign(p);
    rcu_dispose(old);  // or rcu_defer(some_func, old);
  }
  • Loading branch information
avikivity committed Aug 11, 2013
1 parent 6f77fcf commit 94b6979
Show file tree
Hide file tree
Showing 4 changed files with 283 additions and 0 deletions.
1 change: 1 addition & 0 deletions build.mk
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ objects += core/semaphore.o
objects += core/condvar.o
objects += core/eventlist.o
objects += core/debug.o
objects += core/rcu.o
objects += drivers/pci.o
objects += core/mempool.o
objects += core/alloctracker.o
Expand Down
121 changes: 121 additions & 0 deletions core/rcu.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
#include <osv/rcu.hh>
#include <osv/mutex.h>
#include <vector>
#include <boost/algorithm/cxx11/all_of.hpp>

namespace osv {

namespace rcu {

mutex mtx;
std::vector<std::function<void ()>> callbacks;
void collect_garbage();
sched::thread* garbage_collector_thread;

class cpu_quiescent_state_thread {
public:
cpu_quiescent_state_thread(sched::cpu* cpu);
void request(uint64_t generation);
bool check(uint64_t generation);
private:
void work();
private:
sched::thread _t;
std::atomic<uint64_t> _generation = { 0 };
std::atomic<uint64_t> _request = { 0 };
};

std::vector<cpu_quiescent_state_thread*> cpu_quiescent_state_threads;

// FIXME: hot-remove cpus
// FIXME: locking for the vector
sched::cpu::notifier cpu_notifier([] {
cpu_quiescent_state_threads.push_back(new cpu_quiescent_state_thread(sched::cpu::current()));
});

cpu_quiescent_state_thread::cpu_quiescent_state_thread(sched::cpu* cpu)
: _t([=] { work(); }, sched::thread::attr(cpu))
{
_t.start();
}

void cpu_quiescent_state_thread::request(uint64_t generation)
{
auto r = _request.load(std::memory_order_relaxed);
while (generation > r && !_request.compare_exchange_weak(r, generation, std::memory_order_relaxed)) {
// nothing to do
}
_t.wake();
}

bool cpu_quiescent_state_thread::check(uint64_t generation)
{
return _generation.load(std::memory_order_relaxed) >= generation;
}

void cpu_quiescent_state_thread::work()
{
while (true) {
sched::thread::wait_until([&] {
return _generation.load(std::memory_order_relaxed) < _request.load(std::memory_order_relaxed);
});
auto r = _request.load(std::memory_order_relaxed);
_generation.store(r, std::memory_order_relaxed);
garbage_collector_thread->wake();
}
}

bool all_at_generation(uint64_t generation)
{
for (auto cqst : cpu_quiescent_state_threads) {
if (!cqst->check(generation)) {
return false;
}
}
return true;
}

void await_grace_period()
{
static uint64_t generation = 0;
++generation;
for (auto cqst : cpu_quiescent_state_threads) {
cqst->request(generation);
}
sched::thread::wait_until([] { return all_at_generation(generation); });
}

void collect_garbage()
{
while (true) {
std::vector<std::function<void ()>> now;
WITH_LOCK(mtx) {
sched::thread::wait_until(mtx, [] { return !callbacks.empty(); });
now = std::move(callbacks);
}
await_grace_period();
for (auto& c : now) {
c();
}
}
}

}

using namespace rcu;

void rcu_defer(std::function<void ()>&& func)
{
WITH_LOCK(mtx) {
callbacks.push_back(func);
}
garbage_collector_thread->wake();
}

void rcu_init()
{
garbage_collector_thread = new sched::thread(collect_garbage);
garbage_collector_thread->start();
}

}
157 changes: 157 additions & 0 deletions include/osv/rcu.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
#ifndef RCU_HH_
#define RCU_HH_

#include <sched.hh>
#include <atomic>
#include <memory>
#include <functional>

// Read-copy-update implementation
//
// This provides a reader/writer synchronization mechanism with
// a very lightweight read path, and a very expensive write path.
//
// The basic premise behind RCU is that the read path only indicates
// the critical section in which the protected data is used (in our
// implementation, by disabling preemption). The write path never
// modifies the data directly; instead it creates a new copy and
// replaces the old copy atomically. The old copy is then disposed
// of after all readers have finished using it.
//
//
// How to use:
//
// Declaring:
//
// mutex mtx;
// rcu_ptr<my_object> my_ptr;
//
// Read-side:
//
// WITH_LOCK(rcu_read_lock) {
// const my_object* p = my_ptr.read();
// // do things with *p
// // but don't block!
// }
//
// Write-side:
//
// WITH_LOCK(mtx) {
// my_object* old = my_ptr.read_by_owner();
// my_object* p = new my_object;
// // ...
// my_ptr.assign(p);
// rcu_dispose(old); // or rcu_defer(some_func, old);
// }
//

namespace osv {

class rcu_lock_type {
public:
static void lock();
static void unlock();
};

extern rcu_lock_type rcu_read_lock;

template <typename T>
class rcu_ptr {
public:
// Access contents for reading. Note: must be only called once
// for an object within a lock()/unlock() pair.
T* read() const;
// Update contents. Note: must not be called concurrently with
// other assign() calls to the same objects.
void assign(T* p);
// Access contents, must be called with exclusive access wrt.
// mutator (i.e. in same context as assign().
T* read_by_owner();
// Check if the pointer is non-null, can be done outside
// rcu_read_lock
operator bool() const;
private:
std::atomic<T*> _ptr;
};

// Calls 'delete p' when it is safe to do so
template <typename T>
void rcu_dispose(T* p);

// Calls 'delete[] p' when it is safe to do so
template <typename T>
void dispose_array(T* p);

// Calls 'func(p)' when it is safe to do so
template <typename T, typename functor>
static void rcu_defer(functor func, T* p);

// Calls 'func()' when it is safe to do so
void rcu_defer(std::function<void ()>&& func);

void rcu_init();

///////////////

inline void rcu_lock_type::lock()
{
sched::preempt_disable();
}

inline void rcu_lock_type::unlock()
{
sched::preempt_enable();
}

template <typename T>
inline
T* rcu_ptr<T>::read() const
{
return _ptr.load(std::memory_order_consume);
}

template <typename T>
inline
void rcu_ptr<T>::assign(T* p)
{
_ptr.store(p, std::memory_order_release);
}

template <typename T>
inline
rcu_ptr<T>::operator bool() const
{
return _ptr.load(std::memory_order_relaxed);
}

template <typename T>
inline
void rcu_dispose(T* p)
{
rcu_defer(std::default_delete<T>(), p);
}

template <typename T>
inline
T* rcu_ptr<T>::read_by_owner()
{
return _ptr.load(std::memory_order_relaxed);
}

template <typename T>
inline
void rcu_dispose_array(T* p)
{
rcu_defer(std::default_delete<T[]>(), p);
}

template <typename T, typename functor>
inline
void rcu_defer(functor func, T* p)
{
rcu_defer([=] { func(p); });
}

}

#endif /* RCU_HH_ */
4 changes: 4 additions & 0 deletions loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@
#include "arch.hh"
#include "osv/trace.hh"
#include <osv/power.hh>
#include <osv/rcu.hh>
#include "mempool.hh"

using namespace osv;

asm(".pushsection \".debug_gdb_scripts\", \"MS\",@progbits,1 \n"
".byte 1 \n"
".asciz \"scripts/loader.py\" \n"
Expand Down Expand Up @@ -228,6 +231,7 @@ void main_cont(int ac, char** av)
memory::enable_debug_allocator();
enable_trace();
sched::init_detached_threads_reaper();
rcu_init();

vfs_init();
ramdisk_init();
Expand Down

0 comments on commit 94b6979

Please sign in to comment.