Skip to content

Commit

Permalink
add spinlocks, add tryjoin, add libev submodule. sleeping is working …
Browse files Browse the repository at this point in the history
…using the libev backend
  • Loading branch information
Brian Watling committed Dec 3, 2011
1 parent 29ace63 commit 8fd903a
Show file tree
Hide file tree
Showing 43 changed files with 55,031 additions and 26 deletions.
13 changes: 8 additions & 5 deletions Makefile
Expand Up @@ -7,17 +7,18 @@ CFILES = \
fiber_context.c \
fiber_manager.c \
fiber_mutex.c \
fiber_spinlock.c \
fiber_cond.c \
fiber.c \
fiber_barrier.c \
fiber_io.c \
fiber_event.c \
fiber_event_ev.c \
work_stealing_deque.c \

PTHREAD_CFILES = \
fiber_pthread.c \

LDFLAGS += -lev
LDFLAGS += -lm

OS ?= $(shell uname -s)

Expand All @@ -40,7 +41,7 @@ ifeq ($(ARCH),x86)
CFLAGS += -m32 -march=i686 -DARCH_x86
endif

CFLAGS += -Werror -Wall -Iinclude -ggdb -O0
CFLAGS += -Wall -Iinclude -Isubmodules/libev -ggdb -O0

USE_VALGRIND ?= 0
ifeq ($(USE_VALGRIND),1)
Expand All @@ -61,7 +62,7 @@ endif

ifeq ($(OS),Linux)
CFLAGS += -DLINUX
LDFLAGSAFTER += -ldl -lev
LDFLAGSAFTER += -ldl -lm
endif

USE_COMPILER_THREAD_LOCAL ?= 1
Expand All @@ -77,6 +78,8 @@ CFLAGS += -DFIBER_FAST_SWITCHING
endif

TESTS= \
test_tryjoin \
test_sleep \
test_io \
test_context \
test_context_speed \
Expand Down Expand Up @@ -119,7 +122,7 @@ bin/test_%.o: test_%.c $(INCLUDES) $(TESTINCLUDES)
$(CC) $(CFLAGS) -Isrc -c $< -o $@

bin/test_%: bin/test_%.o libfiber.so
$(CC) $(LDFLAGS) $(CFLAGS) -L. -Lbin $^ -o $@ -lpthread
$(CC) $(LDFLAGS) $(CFLAGS) -L. -Lbin $^ -o $@ -lpthread $(LDFLAGSAFTER)

bin/%.o: %.c $(INCLUDES)
$(CC) $(CFLAGS) -c $< -o $@
Expand Down
2 changes: 2 additions & 0 deletions include/fiber.h
Expand Up @@ -47,6 +47,8 @@ extern fiber_t* fiber_create_from_thread();

extern int fiber_join(fiber_t* f, void** result);

extern int fiber_tryjoin(fiber_t* f, void** result);

extern int fiber_yield();

extern int fiber_detach(fiber_t* f);
Expand Down
2 changes: 2 additions & 0 deletions include/fiber_manager.h
Expand Up @@ -3,6 +3,7 @@

#include "fiber.h"
#include "fiber_mutex.h"
#include "fiber_spinlock.h"
#include "work_stealing_deque.h"
#include "mpsc_fifo.h"

Expand All @@ -20,6 +21,7 @@ typedef struct fiber_manager
fiber_t* volatile to_schedule;
fiber_mpsc_to_push_t mpsc_to_push;
fiber_mutex_t* volatile mutex_to_unlock;
fiber_spinlock_t* volatile spinlock_to_unlock;
void** volatile set_wait_location;
void* volatile set_wait_value;
wsd_work_stealing_deque_t* queue_one;
Expand Down
41 changes: 41 additions & 0 deletions include/fiber_spinlock.h
@@ -0,0 +1,41 @@
#ifndef _FIBER_SPINLOCK_H_
#define _FIBER_SPINLOCK_H_

/*
Author: Brian Watling
Email: brianwatling@hotmail.com
Website: https://github.com/brianwatling
Description: A spin lock for fibers. This is meant to be used in places
where a fiber does not want to or cannot perform a context
switch. A fiber will spin on this lock without yielding to
other fibers, but will yield to other system threads after
spinning for a while.
*/

#include <stddef.h>

typedef struct fiber_spinlock
{
volatile int state;
} fiber_spinlock_t;

#ifdef __cplusplus
extern "C" {
#endif

extern int fiber_spinlock_init(fiber_spinlock_t* spinlock);

extern int fiber_spinlock_destroy(fiber_spinlock_t* spinlock);

extern int fiber_spinlock_lock(fiber_spinlock_t* spinlock);

extern int fiber_spinlock_trylock(fiber_spinlock_t* spinlock, size_t tries);

extern int fiber_spinlock_unlock(fiber_spinlock_t* spinlock);

#ifdef __cplusplus
}
#endif

#endif
32 changes: 32 additions & 0 deletions src/fiber.c
Expand Up @@ -162,6 +162,38 @@ int fiber_join(fiber_t* f, void** result)
return FIBER_SUCCESS;
}

int fiber_tryjoin(fiber_t* f, void** result)
{
assert(f);
if(result) {
*result = NULL;
}
if(f->detach_state == FIBER_DETACH_DETACHED) {
return FIBER_ERROR;
}

if(f->detach_state == FIBER_DETACH_WAIT_FOR_JOINER) {
//here we've read that the fiber is waiting to be joined.
//if the fiber is still waiting to be joined after we atmically change its state,
//then we can go ahead and wake it up. if the fiber's state has changed, we can
//assume the fiber has been detached or has be joined by some other fiber
const int old_state = atomic_exchange_int((int*)&f->detach_state, FIBER_DETACH_WAIT_TO_JOIN);
if(old_state == FIBER_DETACH_WAIT_FOR_JOINER) {
//the other fiber is waiting for us to join
if(result) {
*result = f->result;
}
load_load_barrier();
fiber_t* const to_schedule = fiber_manager_clear_or_wait(fiber_manager_get(), (void**)&f->join_info);
to_schedule->state = FIBER_STATE_READY;
fiber_manager_schedule(fiber_manager_get(), to_schedule);
return FIBER_SUCCESS;
}
}

return FIBER_ERROR;
}

int fiber_yield()
{
fiber_manager_yield(fiber_manager_get());
Expand Down
38 changes: 18 additions & 20 deletions src/fiber_event.c → src/fiber_event_ev.c
@@ -1,19 +1,20 @@
#include "fiber_event.h"
#include "fiber.h"
#include "fiber_manager.h"
#include "fiber_mutex.h"
#include "fiber_spinlock.h"
#include <stdio.h>
#include <ev.h>
#include <assert.h>
#include <unistd.h>
#ifndef __USE_GNU
#define __USE_GNU
#endif
#include <dlfcn.h>
#include <stdio.h>
#define EV_STANDALONE 1
#include <ev.c>

static struct ev_loop* volatile fiber_loop = NULL;
static fiber_mutex_t fiber_loop_mutex;
static fiber_spinlock_t fiber_loop_spinlock;
static volatile size_t num_events_triggered = 0;

typedef int (*usleepFnType) (useconds_t);
Expand All @@ -25,18 +26,18 @@ int fiber_event_init()

fiber_loop = ev_loop_new(EVFLAG_AUTO);
assert(fiber_loop);
assert(fiber_mutex_init(&fiber_loop_mutex));
fiber_spinlock_init(&fiber_loop_spinlock);

return fiber_loop ? FIBER_SUCCESS : FIBER_ERROR;
}

void fiber_event_destroy()
{
if(fiber_loop) {
fiber_mutex_lock(&fiber_loop_mutex);
fiber_spinlock_lock(&fiber_loop_spinlock);
ev_loop_destroy(fiber_loop);
fiber_loop = NULL;
fiber_mutex_unlock(&fiber_loop_mutex);
fiber_spinlock_unlock(&fiber_loop_spinlock);
}
}

Expand All @@ -60,17 +61,20 @@ void fiber_poll_events(uint32_t seconds, uint32_t useconds)
return;
}

fiber_mutex_lock(&fiber_loop_mutex);
if(!fiber_spinlock_trylock(&fiber_loop_spinlock, 128)) {
do_real_sleep(seconds, useconds);
return;
}

if(!fiber_loop) {
fiber_mutex_unlock(&fiber_loop_mutex);
fiber_spinlock_unlock(&fiber_loop_spinlock);
return;
}

num_events_triggered = 0;
ev_run(fiber_loop, EVRUN_NOWAIT);
const size_t local_copy = num_events_triggered;
fiber_mutex_unlock(&fiber_loop_mutex);
fiber_spinlock_unlock(&fiber_loop_spinlock);

if(!local_copy) {
do_real_sleep(seconds, useconds);
Expand Down Expand Up @@ -98,7 +102,7 @@ int fiber_wait_for_event(int fd, uint32_t events)
}
ev_io_init(&fd_event, fd_ready, fd, poll_events);

fiber_mutex_lock(&fiber_loop_mutex);
fiber_spinlock_lock(&fiber_loop_spinlock);

fiber_manager_t* const manager = fiber_manager_get();
fiber_t* const this_fiber = manager->current_fiber;
Expand All @@ -107,13 +111,10 @@ int fiber_wait_for_event(int fd, uint32_t events)
ev_io_start(fiber_loop, &fd_event);

this_fiber->state = FIBER_STATE_WAITING;
manager->mutex_to_unlock = &fiber_loop_mutex;
manager->spinlock_to_unlock = &fiber_loop_spinlock;

fiber_manager_yield(manager);

fiber_mutex_lock(&fiber_loop_mutex);
fiber_mutex_unlock(&fiber_loop_mutex);

return FIBER_SUCCESS;
}

Expand All @@ -137,9 +138,9 @@ int fiber_sleep(uint32_t seconds, uint32_t useconds)
const double sleep_time = seconds + useconds * 0.000001;
ev_timer_init(&timer_event, &timer_trigger, sleep_time, 0);

fiber_mutex_lock(&fiber_loop_mutex);
fiber_spinlock_lock(&fiber_loop_spinlock);
if(!fiber_loop) {
fiber_mutex_unlock(&fiber_loop_mutex);
fiber_spinlock_unlock(&fiber_loop_spinlock);
do_real_sleep(seconds, useconds);
return FIBER_SUCCESS;
}
Expand All @@ -152,13 +153,10 @@ int fiber_sleep(uint32_t seconds, uint32_t useconds)
ev_timer_start(fiber_loop, &timer_event);

this_fiber->state = FIBER_STATE_WAITING;
manager->mutex_to_unlock = &fiber_loop_mutex;
manager->spinlock_to_unlock = &fiber_loop_spinlock;

fiber_manager_yield(manager);

fiber_mutex_lock(&fiber_loop_mutex);
fiber_mutex_unlock(&fiber_loop_mutex);

return FIBER_SUCCESS;
}

9 changes: 8 additions & 1 deletion src/fiber_manager.c
Expand Up @@ -130,7 +130,8 @@ void fiber_manager_yield(fiber_manager_t* manager)
fiber_manager_switch_to(manager, manager->current_fiber, new_fiber);
break;
}
} else if(FIBER_STATE_WAITING == manager->current_fiber->state) {
} else if(FIBER_STATE_WAITING == manager->current_fiber->state
|| FIBER_STATE_DONE == manager->current_fiber->state) {
if(!manager->maintenance_fiber) {
manager->maintenance_fiber = fiber_create_no_sched(102400, &fiber_manager_thread_func, manager);
fiber_detach(manager->maintenance_fiber);
Expand Down Expand Up @@ -348,6 +349,12 @@ void fiber_manager_do_maintenance()
fiber_mutex_unlock_internal(to_unlock);
}

if(manager->spinlock_to_unlock) {
fiber_spinlock_t* const to_unlock = manager->spinlock_to_unlock;
manager->spinlock_to_unlock = NULL;
fiber_spinlock_unlock(to_unlock);
}

if(manager->set_wait_location) {
*manager->set_wait_location = manager->set_wait_value;
manager->set_wait_location = NULL;
Expand Down
57 changes: 57 additions & 0 deletions src/fiber_spinlock.c
@@ -0,0 +1,57 @@
#include "fiber_spinlock.h"
#include "fiber.h"
#include "sched.h"

int fiber_spinlock_init(fiber_spinlock_t* spinlock)
{
assert(spinlock);
spinlock->state = 0;
return FIBER_SUCCESS;
}

int fiber_spinlock_destroy(fiber_spinlock_t* spinlock)
{
assert(spinlock);
return FIBER_SUCCESS;
}

int fiber_spinlock_lock(fiber_spinlock_t* spinlock)
{
assert(spinlock);
unsigned int counter = 0;
while(!__sync_bool_compare_and_swap(&spinlock->state, 0, 1)) {
while(spinlock->state) {
++counter;
if(counter > 128) {
sched_yield();
counter = 0;
}
}
}
load_load_barrier();
return FIBER_SUCCESS;
}

int fiber_spinlock_trylock(fiber_spinlock_t* spinlock, size_t tries)
{
assert(spinlock);
while(!__sync_bool_compare_and_swap(&spinlock->state, 0, 1)) {
if(tries == 0) {
return FIBER_ERROR;
}
--tries;
while(spinlock->state && tries > 0) {
--tries;
}
}
load_load_barrier();
return FIBER_SUCCESS;
}

int fiber_spinlock_unlock(fiber_spinlock_t* spinlock)
{
assert(spinlock);
store_load_barrier();//flush this fiber's writes
spinlock->state = 0;
}

0 comments on commit 8fd903a

Please sign in to comment.