diff --git a/Makefile b/Makefile index b92a1a333..98f7d1c91 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,8 @@ CSERVICE_PATH ?= cservice SKYNET_BUILD_PATH ?= . -CFLAGS = -g -O2 -Wall -I$(LUA_INC) $(MYCFLAGS) +CFLAGS = -g -O2 -Wall -I$(LUA_INC) $(MYCFLAGS) +# CFLAGS += -DUSE_PTHREAD_LOCK # lua @@ -79,7 +80,7 @@ $(LUA_CLIB_PATH)/socketdriver.so : lualib-src/lua-socket.c | $(LUA_CLIB_PATH) $(CC) $(CFLAGS) $(SHARED) $^ -o $@ -Iskynet-src -Iservice-src $(LUA_CLIB_PATH)/bson.so : lualib-src/lua-bson.c | $(LUA_CLIB_PATH) - $(CC) $(CFLAGS) $(SHARED) $^ -o $@ -Iskynet-src + $(CC) $(CFLAGS) $(SHARED) -Iskynet-src $^ -o $@ -Iskynet-src $(LUA_CLIB_PATH)/mongo.so : lualib-src/lua-mongo.c | $(LUA_CLIB_PATH) $(CC) $(CFLAGS) $(SHARED) $^ -o $@ -Iskynet-src @@ -109,7 +110,7 @@ $(LUA_CLIB_PATH)/crypt.so : lualib-src/lua-crypt.c lualib-src/lsha1.c | $(LUA_CL $(CC) $(CFLAGS) $(SHARED) $^ -o $@ $(LUA_CLIB_PATH)/sharedata.so : lualib-src/lua-sharedata.c | $(LUA_CLIB_PATH) - $(CC) $(CFLAGS) $(SHARED) $^ -o $@ + $(CC) $(CFLAGS) $(SHARED) -Iskynet-src $^ -o $@ $(LUA_CLIB_PATH)/stm.so : lualib-src/lua-stm.c | $(LUA_CLIB_PATH) $(CC) $(CFLAGS) $(SHARED) -Iskynet-src $^ -o $@ @@ -124,7 +125,7 @@ $(LUA_CLIB_PATH)/mysqlaux.so : lualib-src/lua-mysqlaux.c | $(LUA_CLIB_PATH) $(CC) $(CFLAGS) $(SHARED) $^ -o $@ $(LUA_CLIB_PATH)/debugchannel.so : lualib-src/lua-debugchannel.c | $(LUA_CLIB_PATH) - $(CC) $(CFLAGS) $(SHARED) $^ -o $@ + $(CC) $(CFLAGS) $(SHARED) -Iskynet-src $^ -o $@ clean : rm -f $(SKYNET_BUILD_PATH)/skynet $(CSERVICE_PATH)/*.so $(LUA_CLIB_PATH)/*.so diff --git a/lualib-src/lua-bson.c b/lualib-src/lua-bson.c index 9071f56d4..0fc269852 100644 --- a/lualib-src/lua-bson.c +++ b/lualib-src/lua-bson.c @@ -8,6 +8,7 @@ #include #include #include +#include "atomic.h" #define DEFAULT_CAP 64 #define MAX_NUMBER 1024 @@ -1140,7 +1141,7 @@ lobjectid(lua_State *L) { } else { time_t ti = time(NULL); // old_counter is a static var, use atom inc. - uint32_t id = __sync_fetch_and_add(&oid_counter,1); + uint32_t id = ATOM_FINC(&oid_counter); oid[2] = (ti>>24) & 0xff; oid[3] = (ti>>16) & 0xff; diff --git a/lualib-src/lua-clientsocket.c b/lualib-src/lua-clientsocket.c index 89bfe1f86..fa1a88e8c 100644 --- a/lualib-src/lua-clientsocket.c +++ b/lualib-src/lua-clientsocket.c @@ -128,11 +128,8 @@ lusleep(lua_State *L) { #define QUEUE_SIZE 1024 -#define LOCK(q) while (__sync_lock_test_and_set(&(q)->lock,1)) {} -#define UNLOCK(q) __sync_lock_release(&(q)->lock); - struct queue { - int lock; + pthread_mutex_t lock; int head; int tail; char * queue[QUEUE_SIZE]; @@ -153,7 +150,7 @@ readline_stdin(void * arg) { memcpy(str, tmp, n); str[n] = 0; - LOCK(q); + pthread_mutex_lock(&q->lock); q->queue[q->tail] = str; if (++q->tail >= QUEUE_SIZE) { @@ -163,7 +160,7 @@ readline_stdin(void * arg) { // queue overflow exit(1); } - UNLOCK(q); + pthread_mutex_unlock(&q->lock); } return NULL; } @@ -171,16 +168,16 @@ readline_stdin(void * arg) { static int lreadstdin(lua_State *L) { struct queue *q = lua_touserdata(L, lua_upvalueindex(1)); - LOCK(q); + pthread_mutex_lock(&q->lock); if (q->head == q->tail) { - UNLOCK(q); + pthread_mutex_unlock(&q->lock); return 0; } char * str = q->queue[q->head]; if (++q->head >= QUEUE_SIZE) { q->head = 0; } - UNLOCK(q); + pthread_mutex_unlock(&q->lock); lua_pushstring(L, str); free(str); return 1; @@ -201,6 +198,7 @@ luaopen_clientsocket(lua_State *L) { struct queue * q = lua_newuserdata(L, sizeof(*q)); memset(q, 0, sizeof(*q)); + pthread_mutex_init(&q->lock, NULL); lua_pushcclosure(L, lreadstdin, 1); lua_setfield(L, -2, "readstdin"); diff --git a/lualib-src/lua-debugchannel.c b/lualib-src/lua-debugchannel.c index 9e4539d3c..ad7e1f1b5 100644 --- a/lualib-src/lua-debugchannel.c +++ b/lualib-src/lua-debugchannel.c @@ -4,10 +4,9 @@ #include #include #include +#include "spinlock.h" #define METANAME "debugchannel" -#define LOCK(q) while (__sync_lock_test_and_set(&(q)->lock,1)) {} -#define UNLOCK(q) __sync_lock_release(&(q)->lock); struct command { struct command * next; @@ -15,7 +14,7 @@ struct command { }; struct channel { - int lock; + struct spinlock lock; int ref; struct command * head; struct command * tail; @@ -26,6 +25,7 @@ channel_new() { struct channel * c = malloc(sizeof(*c)); memset(c, 0 , sizeof(*c)); c->ref = 1; + SPIN_INIT(c) return c; } @@ -33,21 +33,21 @@ channel_new() { static struct channel * channel_connect(struct channel *c) { struct channel * ret = NULL; - LOCK(c) + SPIN_LOCK(c) if (c->ref == 1) { ++c->ref; ret = c; } - UNLOCK(c) + SPIN_UNLOCK(c) return ret; } static struct channel * channel_release(struct channel *c) { - LOCK(c) + SPIN_LOCK(c) --c->ref; if (c->ref > 0) { - UNLOCK(c) + SPIN_UNLOCK(c) return c; } // never unlock while reference is 0 @@ -59,6 +59,8 @@ channel_release(struct channel *c) { free(p); p = next; } + SPIN_UNLOCK(c) + SPIN_DESTROY(c) free(c); return NULL; } @@ -67,9 +69,9 @@ channel_release(struct channel *c) { static struct command * channel_read(struct channel *c, double timeout) { struct command * ret = NULL; - LOCK(c) + SPIN_LOCK(c) if (c->head == NULL) { - UNLOCK(c) + SPIN_UNLOCK(c) int ti = (int)(timeout * 100000); usleep(ti); return NULL; @@ -79,7 +81,7 @@ channel_read(struct channel *c, double timeout) { if (c->head == NULL) { c->tail = NULL; } - UNLOCK(c) + SPIN_UNLOCK(c) return ret; } @@ -90,14 +92,14 @@ channel_write(struct channel *c, const char * s, size_t sz) { cmd->sz = sz; cmd->next = NULL; memcpy(cmd+1, s, sz); - LOCK(c) + SPIN_LOCK(c) if (c->tail == NULL) { c->head = c->tail = cmd; } else { c->tail->next = cmd; c->tail = cmd; } - UNLOCK(c) + SPIN_UNLOCK(c) } struct channel_box { diff --git a/lualib-src/lua-multicast.c b/lualib-src/lua-multicast.c index 599616f9a..cff8d4349 100644 --- a/lualib-src/lua-multicast.c +++ b/lualib-src/lua-multicast.c @@ -5,6 +5,8 @@ #include #include +#include "atomic.h" + struct mc_package { int reference; uint32_t size; @@ -116,7 +118,7 @@ static int mc_closelocal(lua_State *L) { struct mc_package *pack = lua_touserdata(L,1); - int ref = __sync_sub_and_fetch(&pack->reference, 1); + int ref = ATOM_DEC(&pack->reference); if (ref <= 0) { skynet_free(pack->data); skynet_free(pack); diff --git a/lualib-src/lua-sharedata.c b/lualib-src/lua-sharedata.c index e7a27c57e..99cadd15a 100644 --- a/lualib-src/lua-sharedata.c +++ b/lualib-src/lua-sharedata.c @@ -4,6 +4,7 @@ #include #include #include +#include "atomic.h" #define KEYTYPE_INTEGER 0 #define KEYTYPE_STRING 1 @@ -663,7 +664,7 @@ releaseobj(lua_State *L) { struct ctrl *c = lua_touserdata(L, 1); struct table *tbl = c->root; struct state *s = lua_touserdata(tbl->L, 1); - __sync_fetch_and_sub(&s->ref, 1); + ATOM_DEC(&s->ref); c->root = NULL; c->update = NULL; @@ -674,7 +675,7 @@ static int lboxconf(lua_State *L) { struct table * tbl = get_table(L,1); struct state * s = lua_touserdata(tbl->L, 1); - __sync_fetch_and_add(&s->ref, 1); + ATOM_INC(&s->ref); struct ctrl * c = lua_newuserdata(L, sizeof(*c)); c->root = tbl; @@ -719,7 +720,7 @@ static int lincref(lua_State *L) { struct table *tbl = get_table(L,1); struct state * s = lua_touserdata(tbl->L, 1); - int ref = __sync_add_and_fetch(&s->ref, 1); + int ref = ATOM_INC(&s->ref); lua_pushinteger(L , ref); return 1; @@ -729,7 +730,7 @@ static int ldecref(lua_State *L) { struct table *tbl = get_table(L,1); struct state * s = lua_touserdata(tbl->L, 1); - int ref = __sync_sub_and_fetch(&s->ref, 1); + int ref = ATOM_DEC(&s->ref); lua_pushinteger(L , ref); return 1; diff --git a/lualib-src/lua-stm.c b/lualib-src/lua-stm.c index b38fbcd10..e175d1006 100644 --- a/lualib-src/lua-stm.c +++ b/lualib-src/lua-stm.c @@ -7,6 +7,7 @@ #include "rwlock.h" #include "skynet_malloc.h" +#include "atomic.h" struct stm_object { struct rwlock lock; @@ -45,7 +46,7 @@ static void stm_releasecopy(struct stm_copy *copy) { if (copy == NULL) return; - if (__sync_sub_and_fetch(©->reference, 1) == 0) { + if (ATOM_DEC(©->reference) == 0) { skynet_free(copy->msg); skynet_free(copy); } @@ -70,7 +71,7 @@ stm_release(struct stm_object *obj) { static void stm_releasereader(struct stm_object *obj) { rwlock_rlock(&obj->lock); - if (__sync_sub_and_fetch(&obj->reference,1) == 0) { + if (ATOM_DEC(&obj->reference) == 0) { // last reader, no writer. so no need to unlock assert(obj->copy == NULL); skynet_free(obj); @@ -82,7 +83,7 @@ stm_releasereader(struct stm_object *obj) { static void stm_grab(struct stm_object *obj) { rwlock_rlock(&obj->lock); - int ref = __sync_fetch_and_add(&obj->reference,1); + int ref = ATOM_FINC(&obj->reference); rwlock_runlock(&obj->lock); assert(ref > 0); } @@ -92,7 +93,7 @@ stm_copy(struct stm_object *obj) { rwlock_rlock(&obj->lock); struct stm_copy * ret = obj->copy; if (ret) { - int ref = __sync_fetch_and_add(&ret->reference,1); + int ref = ATOM_FINC(&ret->reference); assert(ref > 0); } rwlock_runlock(&obj->lock); diff --git a/skynet-src/atomic.h b/skynet-src/atomic.h new file mode 100644 index 000000000..f79c71488 --- /dev/null +++ b/skynet-src/atomic.h @@ -0,0 +1,14 @@ +#ifndef SKYNET_ATOMIC_H +#define SKYNET_ATOMIC_H + +#define ATOM_CAS(ptr, oval, nval) __sync_bool_compare_and_swap(ptr, oval, nval) +#define ATOM_CAS_POINTER(ptr, oval, nval) __sync_bool_compare_and_swap(ptr, oval, nval) +#define ATOM_INC(ptr) __sync_add_and_fetch(ptr, 1) +#define ATOM_FINC(ptr) __sync_fetch_and_add(ptr, 1) +#define ATOM_DEC(ptr) __sync_sub_and_fetch(ptr, 1) +#define ATOM_FDEC(ptr) __sync_fetch_and_sub(ptr, 1) +#define ATOM_ADD(ptr,n) __sync_add_and_fetch(ptr, n) +#define ATOM_SUB(ptr,n) __sync_sub_and_fetch(ptr, n) +#define ATOM_AND(ptr,n) __sync_and_and_fetch(ptr, n) + +#endif diff --git a/skynet-src/malloc_hook.c b/skynet-src/malloc_hook.c index d9ad6f32c..3b0721689 100644 --- a/skynet-src/malloc_hook.c +++ b/skynet-src/malloc_hook.c @@ -6,6 +6,7 @@ #include "malloc_hook.h" #include "skynet.h" +#include "atomic.h" static size_t _used_memory = 0; static size_t _memory_block = 0; @@ -32,11 +33,11 @@ get_allocated_field(uint32_t handle) { ssize_t old_alloc = data->allocated; if(old_handle == 0 || old_alloc <= 0) { // data->allocated may less than zero, because it may not count at start. - if(!__sync_bool_compare_and_swap(&data->handle, old_handle, handle)) { + if(!ATOM_CAS(&data->handle, old_handle, handle)) { return 0; } if (old_alloc < 0) { - __sync_bool_compare_and_swap(&data->allocated, old_alloc, 0); + ATOM_CAS(&data->allocated, old_alloc, 0); } } if(data->handle != handle) { @@ -47,21 +48,21 @@ get_allocated_field(uint32_t handle) { inline static void update_xmalloc_stat_alloc(uint32_t handle, size_t __n) { - __sync_add_and_fetch(&_used_memory, __n); - __sync_add_and_fetch(&_memory_block, 1); + ATOM_ADD(&_used_memory, __n); + ATOM_INC(&_memory_block); ssize_t* allocated = get_allocated_field(handle); if(allocated) { - __sync_add_and_fetch(allocated, __n); + ATOM_ADD(allocated, __n); } } inline static void update_xmalloc_stat_free(uint32_t handle, size_t __n) { - __sync_sub_and_fetch(&_used_memory, __n); - __sync_sub_and_fetch(&_memory_block, 1); + ATOM_SUB(&_used_memory, __n); + ATOM_DEC(&_memory_block); ssize_t* allocated = get_allocated_field(handle); if(allocated) { - __sync_sub_and_fetch(allocated, __n); + ATOM_SUB(allocated, __n); } } diff --git a/skynet-src/rwlock.h b/skynet-src/rwlock.h index e366f18c5..5a995918d 100644 --- a/skynet-src/rwlock.h +++ b/skynet-src/rwlock.h @@ -1,6 +1,8 @@ #ifndef SKYNET_RWLOCK_H #define SKYNET_RWLOCK_H +#ifndef USE_PTHREAD_LOCK + struct rwlock { int write; int read; @@ -45,4 +47,42 @@ rwlock_runlock(struct rwlock *lock) { __sync_sub_and_fetch(&lock->read,1); } +#else + +#include + +// only for some platform doesn't have __sync_* +// todo: check the result of pthread api + +struct rwlock { + pthread_rwlock_t lock; +}; + +static inline void +rwlock_init(struct rwlock *lock) { + pthread_rwlock_init(&lock->lock, NULL); +} + +static inline void +rwlock_rlock(struct rwlock *lock) { + pthread_rwlock_rdlock(&lock->lock); +} + +static inline void +rwlock_wlock(struct rwlock *lock) { + pthread_rwlock_wrlock(&lock->lock); +} + +static inline void +rwlock_wunlock(struct rwlock *lock) { + pthread_rwlock_unlock(&lock->lock); +} + +static inline void +rwlock_runlock(struct rwlock *lock) { + pthread_rwlock_unlock(&lock->lock); +} + +#endif + #endif diff --git a/skynet-src/skynet_env.c b/skynet-src/skynet_env.c index 9dbbcf9c3..a5882f21d 100644 --- a/skynet-src/skynet_env.c +++ b/skynet-src/skynet_env.c @@ -1,5 +1,6 @@ #include "skynet.h" #include "skynet_env.h" +#include "spinlock.h" #include #include @@ -8,18 +9,15 @@ #include struct skynet_env { - int lock; + struct spinlock lock; lua_State *L; }; static struct skynet_env *E = NULL; -#define LOCK(q) while (__sync_lock_test_and_set(&(q)->lock,1)) {} -#define UNLOCK(q) __sync_lock_release(&(q)->lock); - const char * skynet_getenv(const char *key) { - LOCK(E) + SPIN_LOCK(E) lua_State *L = E->L; @@ -27,14 +25,14 @@ skynet_getenv(const char *key) { const char * result = lua_tostring(L, -1); lua_pop(L, 1); - UNLOCK(E) + SPIN_UNLOCK(E) return result; } void skynet_setenv(const char *key, const char *value) { - LOCK(E) + SPIN_LOCK(E) lua_State *L = E->L; lua_getglobal(L, key); @@ -43,12 +41,12 @@ skynet_setenv(const char *key, const char *value) { lua_pushstring(L,value); lua_setglobal(L,key); - UNLOCK(E) + SPIN_UNLOCK(E) } void skynet_env_init() { E = skynet_malloc(sizeof(*E)); - E->lock = 0; + SPIN_INIT(E) E->L = luaL_newstate(); } diff --git a/skynet-src/skynet_module.c b/skynet-src/skynet_module.c index 2da346294..d586427f0 100644 --- a/skynet-src/skynet_module.c +++ b/skynet-src/skynet_module.c @@ -1,6 +1,7 @@ #include "skynet.h" #include "skynet_module.h" +#include "spinlock.h" #include #include @@ -13,7 +14,7 @@ struct modules { int count; - int lock; + struct spinlock lock; const char * path; struct skynet_module m[MAX_MODULE_TYPE]; }; @@ -95,7 +96,7 @@ skynet_module_query(const char * name) { if (result) return result; - while(__sync_lock_test_and_set(&M->lock,1)) {} + SPIN_LOCK(M) result = _query(name); // double check @@ -114,21 +115,22 @@ skynet_module_query(const char * name) { } } - __sync_lock_release(&M->lock); + SPIN_UNLOCK(M) return result; } void skynet_module_insert(struct skynet_module *mod) { - while(__sync_lock_test_and_set(&M->lock,1)) {} + SPIN_LOCK(M) struct skynet_module * m = _query(mod->name); assert(m == NULL && M->count < MAX_MODULE_TYPE); int index = M->count; M->m[index] = *mod; ++M->count; - __sync_lock_release(&M->lock); + + SPIN_UNLOCK(M) } void * @@ -164,7 +166,8 @@ skynet_module_init(const char *path) { struct modules *m = skynet_malloc(sizeof(*m)); m->count = 0; m->path = skynet_strdup(path); - m->lock = 0; + + SPIN_INIT(m) M = m; } diff --git a/skynet-src/skynet_monitor.c b/skynet-src/skynet_monitor.c index 9ee302e5f..8d47fc895 100644 --- a/skynet-src/skynet_monitor.c +++ b/skynet-src/skynet_monitor.c @@ -3,6 +3,7 @@ #include "skynet_monitor.h" #include "skynet_server.h" #include "skynet.h" +#include "atomic.h" #include #include @@ -30,7 +31,7 @@ void skynet_monitor_trigger(struct skynet_monitor *sm, uint32_t source, uint32_t destination) { sm->source = source; sm->destination = destination; - __sync_fetch_and_add(&sm->version , 1); + ATOM_INC(&sm->version); } void diff --git a/skynet-src/skynet_mq.c b/skynet-src/skynet_mq.c index ba0c61e61..157f33d8b 100644 --- a/skynet-src/skynet_mq.c +++ b/skynet-src/skynet_mq.c @@ -1,6 +1,7 @@ #include "skynet.h" #include "skynet_mq.h" #include "skynet_handle.h" +#include "spinlock.h" #include #include @@ -18,11 +19,11 @@ #define MQ_OVERLOAD 1024 struct message_queue { + struct spinlock lock; uint32_t handle; int cap; int head; int tail; - int lock; int release; int in_global; int overload; @@ -34,19 +35,16 @@ struct message_queue { struct global_queue { struct message_queue *head; struct message_queue *tail; - int lock; + struct spinlock lock; }; static struct global_queue *Q = NULL; -#define LOCK(q) while (__sync_lock_test_and_set(&(q)->lock,1)) {} -#define UNLOCK(q) __sync_lock_release(&(q)->lock); - void skynet_globalmq_push(struct message_queue * queue) { struct global_queue *q= Q; - LOCK(q) + SPIN_LOCK(q) assert(queue->next == NULL); if(q->tail) { q->tail->next = queue; @@ -54,14 +52,14 @@ skynet_globalmq_push(struct message_queue * queue) { } else { q->head = q->tail = queue; } - UNLOCK(q) + SPIN_UNLOCK(q) } struct message_queue * skynet_globalmq_pop() { struct global_queue *q = Q; - LOCK(q) + SPIN_LOCK(q) struct message_queue *mq = q->head; if(mq) { q->head = mq->next; @@ -71,7 +69,7 @@ skynet_globalmq_pop() { } mq->next = NULL; } - UNLOCK(q) + SPIN_UNLOCK(q) return mq; } @@ -83,7 +81,7 @@ skynet_mq_create(uint32_t handle) { q->cap = DEFAULT_QUEUE_SIZE; q->head = 0; q->tail = 0; - q->lock = 0; + SPIN_INIT(q) // When the queue is create (always between service create and service init) , // set in_global flag to avoid push it to global queue . // If the service init success, skynet_context_new will call skynet_mq_force_push to push it to global queue. @@ -100,6 +98,7 @@ skynet_mq_create(uint32_t handle) { static void _release(struct message_queue *q) { assert(q->next == NULL); + SPIN_DESTROY(q) skynet_free(q->queue); skynet_free(q); } @@ -113,11 +112,11 @@ int skynet_mq_length(struct message_queue *q) { int head, tail,cap; - LOCK(q) + SPIN_LOCK(q) head = q->head; tail = q->tail; cap = q->cap; - UNLOCK(q) + SPIN_UNLOCK(q) if (head <= tail) { return tail - head; @@ -138,7 +137,7 @@ skynet_mq_overload(struct message_queue *q) { int skynet_mq_pop(struct message_queue *q, struct skynet_message *message) { int ret = 1; - LOCK(q) + SPIN_LOCK(q) if (q->head != q->tail) { *message = q->queue[q->head++]; @@ -167,7 +166,7 @@ skynet_mq_pop(struct message_queue *q, struct skynet_message *message) { q->in_global = 0; } - UNLOCK(q) + SPIN_UNLOCK(q) return ret; } @@ -190,7 +189,7 @@ expand_queue(struct message_queue *q) { void skynet_mq_push(struct message_queue *q, struct skynet_message *message) { assert(message); - LOCK(q) + SPIN_LOCK(q) q->queue[q->tail] = *message; if (++ q->tail >= q->cap) { @@ -206,25 +205,26 @@ skynet_mq_push(struct message_queue *q, struct skynet_message *message) { skynet_globalmq_push(q); } - UNLOCK(q) + SPIN_UNLOCK(q) } void skynet_mq_init() { struct global_queue *q = skynet_malloc(sizeof(*q)); memset(q,0,sizeof(*q)); + SPIN_INIT(q); Q=q; } void skynet_mq_mark_release(struct message_queue *q) { - LOCK(q) + SPIN_LOCK(q) assert(q->release == 0); q->release = 1; if (q->in_global != MQ_IN_GLOBAL) { skynet_globalmq_push(q); } - UNLOCK(q) + SPIN_UNLOCK(q) } static void @@ -238,13 +238,13 @@ _drop_queue(struct message_queue *q, message_drop drop_func, void *ud) { void skynet_mq_release(struct message_queue *q, message_drop drop_func, void *ud) { - LOCK(q) + SPIN_LOCK(q) if (q->release) { - UNLOCK(q) + SPIN_UNLOCK(q) _drop_queue(q, drop_func, ud); } else { skynet_globalmq_push(q); - UNLOCK(q) + SPIN_UNLOCK(q) } } diff --git a/skynet-src/skynet_server.c b/skynet-src/skynet_server.c index 523311938..4972e7a40 100644 --- a/skynet-src/skynet_server.c +++ b/skynet-src/skynet_server.c @@ -10,6 +10,8 @@ #include "skynet_monitor.h" #include "skynet_imp.h" #include "skynet_log.h" +#include "spinlock.h" +#include "atomic.h" #include @@ -21,16 +23,18 @@ #ifdef CALLING_CHECK -#define CHECKCALLING_BEGIN(ctx) assert(__sync_lock_test_and_set(&ctx->calling,1) == 0); -#define CHECKCALLING_END(ctx) __sync_lock_release(&ctx->calling); -#define CHECKCALLING_INIT(ctx) ctx->calling = 0; -#define CHECKCALLING_DECL int calling; +#define CHECKCALLING_BEGIN(ctx) if (!(spinlock_trylock(&ctx->calling))) { assert(0); } +#define CHECKCALLING_END(ctx) spinlock_unlock(&ctx->calling); +#define CHECKCALLING_INIT(ctx) spinlock_init(&ctx->calling); +#define CHECKCALLING_DESTROY(ctx) spinlock_destroy(&ctx->calling); +#define CHECKCALLING_DECL struct spinlock calling; #else #define CHECKCALLING_BEGIN(ctx) #define CHECKCALLING_END(ctx) #define CHECKCALLING_INIT(ctx) +#define CHECKCALLING_DESTROY(ctx) #define CHECKCALLING_DECL #endif @@ -68,12 +72,12 @@ skynet_context_total() { static void context_inc() { - __sync_fetch_and_add(&G_NODE.total,1); + ATOM_INC(&G_NODE.total); } static void context_dec() { - __sync_fetch_and_sub(&G_NODE.total,1); + ATOM_DEC(&G_NODE.total); } uint32_t @@ -179,7 +183,7 @@ skynet_context_newsession(struct skynet_context *ctx) { void skynet_context_grab(struct skynet_context *ctx) { - __sync_add_and_fetch(&ctx->ref,1); + ATOM_INC(&ctx->ref); } void @@ -197,13 +201,14 @@ delete_context(struct skynet_context *ctx) { } skynet_module_instance_release(ctx->mod, ctx->instance); skynet_mq_mark_release(ctx->queue); + CHECKCALLING_DESTROY(ctx) skynet_free(ctx); context_dec(); } struct skynet_context * skynet_context_release(struct skynet_context *ctx) { - if (__sync_sub_and_fetch(&ctx->ref,1) == 0) { + if (ATOM_DEC(&ctx->ref) == 0) { delete_context(ctx); return NULL; } @@ -560,7 +565,7 @@ cmd_logon(struct skynet_context * context, const char * param) { if (lastf == NULL) { f = skynet_log_open(context, handle); if (f) { - if (!__sync_bool_compare_and_swap(&ctx->logfile, NULL, f)) { + if (!ATOM_CAS_POINTER(&ctx->logfile, NULL, f)) { // logfile opens in other thread, close this one. fclose(f); } @@ -581,7 +586,7 @@ cmd_logoff(struct skynet_context * context, const char * param) { FILE * f = ctx->logfile; if (f) { // logfile may close in other thread - if (__sync_bool_compare_and_swap(&ctx->logfile, f, NULL)) { + if (ATOM_CAS_POINTER(&ctx->logfile, f, NULL)) { skynet_log_close(context, f, handle); } } diff --git a/skynet-src/skynet_timer.c b/skynet-src/skynet_timer.c index f23114dce..c9a113aee 100644 --- a/skynet-src/skynet_timer.c +++ b/skynet-src/skynet_timer.c @@ -4,6 +4,7 @@ #include "skynet_mq.h" #include "skynet_server.h" #include "skynet_handle.h" +#include "spinlock.h" #include #include @@ -17,9 +18,6 @@ typedef void (*timer_execute_func)(void *ud,void *arg); -#define LOCK(q) while (__sync_lock_test_and_set(&(q)->lock,1)) {} -#define UNLOCK(q) __sync_lock_release(&(q)->lock); - #define TIME_NEAR_SHIFT 8 #define TIME_NEAR (1 << TIME_NEAR_SHIFT) #define TIME_LEVEL_SHIFT 6 @@ -45,7 +43,7 @@ struct link_list { struct timer { struct link_list near[TIME_NEAR]; struct link_list t[4][TIME_LEVEL]; - int lock; + struct spinlock lock; uint32_t time; uint32_t current; uint32_t starttime; @@ -97,12 +95,12 @@ timer_add(struct timer *T,void *arg,size_t sz,int time) { struct timer_node *node = (struct timer_node *)skynet_malloc(sizeof(*node)+sz); memcpy(node+1,arg,sz); - LOCK(T); + SPIN_LOCK(T); node->expire=time+T->time; add_node(T,node); - UNLOCK(T); + SPIN_UNLOCK(T); } static void @@ -162,16 +160,16 @@ timer_execute(struct timer *T) { while (T->near[idx].head.next) { struct timer_node *current = link_clear(&T->near[idx]); - UNLOCK(T); + SPIN_UNLOCK(T); // dispatch_list don't need lock T dispatch_list(current); - LOCK(T); + SPIN_LOCK(T); } } static void timer_update(struct timer *T) { - LOCK(T); + SPIN_LOCK(T); // try to dispatch timeout 0 (rare condition) timer_execute(T); @@ -181,7 +179,7 @@ timer_update(struct timer *T) { timer_execute(T); - UNLOCK(T); + SPIN_UNLOCK(T); } static struct timer * @@ -201,7 +199,8 @@ timer_create_timer() { } } - r->lock = 0; + SPIN_INIT(r) + r->current = 0; return r; diff --git a/skynet-src/socket_server.c b/skynet-src/socket_server.c index 78f9b47ba..b2454d2e4 100644 --- a/skynet-src/socket_server.c +++ b/skynet-src/socket_server.c @@ -2,6 +2,7 @@ #include "socket_server.h" #include "socket_poll.h" +#include "atomic.h" #include #include @@ -237,13 +238,13 @@ static int reserve_id(struct socket_server *ss) { int i; for (i=0;ialloc_id), 1); + int id = ATOM_INC(&(ss->alloc_id)); if (id < 0) { - id = __sync_and_and_fetch(&(ss->alloc_id), 0x7fffffff); + id = ATOM_AND(&(ss->alloc_id), 0x7fffffff); } struct socket *s = &ss->slot[HASH_ID(id)]; if (s->type == SOCKET_TYPE_INVALID) { - if (__sync_bool_compare_and_swap(&s->type, SOCKET_TYPE_INVALID, SOCKET_TYPE_RESERVE)) { + if (ATOM_CAS(&s->type, SOCKET_TYPE_INVALID, SOCKET_TYPE_RESERVE)) { s->id = id; s->fd = -1; return id; diff --git a/skynet-src/spinlock.h b/skynet-src/spinlock.h new file mode 100644 index 000000000..d9744c3f0 --- /dev/null +++ b/skynet-src/spinlock.h @@ -0,0 +1,77 @@ +#ifndef SKYNET_SPINLOCK_H +#define SKYNET_SPINLOCK_H + +#define SPIN_INIT(q) spinlock_init(&(q)->lock); +#define SPIN_LOCK(q) spinlock_lock(&(q)->lock); +#define SPIN_UNLOCK(q) spinlock_unlock(&(q)->lock); +#define SPIN_DESTROY(q) spinlock_destroy(&(q)->lock); + +#ifndef USE_PTHREAD_LOCK + +struct spinlock { + int lock; +}; + +static inline void +spinlock_init(struct spinlock *lock) { + lock->lock = 0; +} + +static inline void +spinlock_lock(struct spinlock *lock) { + while (__sync_lock_test_and_set(&lock->lock,1)) {} +} + +static inline int +spinlock_trylock(struct spinlock *lock) { + return __sync_lock_test_and_set(&lock->lock,1) == 0; +} + +static inline void +spinlock_unlock(struct spinlock *lock) { + __sync_lock_release(&lock->lock); +} + +static inline void +spinlock_destroy(struct spinlock *lock) { +} + +#else + +#include + +// we use mutex instead of spinlock for some reason +// you can also replace to pthread_spinlock + +struct spinlock { + pthread_mutex_t lock; +}; + +static inline void +spinlock_init(struct spinlock *lock) { + pthread_mutex_init(&lock->lock, NULL); +} + +static inline void +spinlock_lock(struct spinlock *lock) { + pthread_mutex_lock(&lock->lock); +} + +static inline int +spinlock_trylock(struct spinlock *lock) { + return pthread_mutex_trylock(&lock->lock) == 0; +} + +static inline void +spinlock_unlock(struct spinlock *lock) { + pthread_mutex_unlock(&lock->lock); +} + +static inline void +spinlock_destroy(struct spinlock *lock) { + pthread_mutex_destroy(&lock->lock); +} + +#endif + +#endif