Skip to content

Commit

Permalink
Merge pull request #325 from cloudwu/pthreadlock
Browse files Browse the repository at this point in the history
add pthread lock
  • Loading branch information
cloudwu committed Aug 17, 2015
2 parents 2583af2 + bf8f9b8 commit 77bdfb2
Show file tree
Hide file tree
Showing 18 changed files with 249 additions and 104 deletions.
9 changes: 5 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ CSERVICE_PATH ?= cservice

SKYNET_BUILD_PATH ?= .

CFLAGS = -g -O2 -Wall -I$(LUA_INC) $(MYCFLAGS)
CFLAGS = -g -O2 -Wall -I$(LUA_INC) $(MYCFLAGS)
# CFLAGS += -DUSE_PTHREAD_LOCK

# lua

Expand Down Expand Up @@ -79,7 +80,7 @@ $(LUA_CLIB_PATH)/socketdriver.so : lualib-src/lua-socket.c | $(LUA_CLIB_PATH)
$(CC) $(CFLAGS) $(SHARED) $^ -o $@ -Iskynet-src -Iservice-src

$(LUA_CLIB_PATH)/bson.so : lualib-src/lua-bson.c | $(LUA_CLIB_PATH)
$(CC) $(CFLAGS) $(SHARED) $^ -o $@ -Iskynet-src
$(CC) $(CFLAGS) $(SHARED) -Iskynet-src $^ -o $@ -Iskynet-src

$(LUA_CLIB_PATH)/mongo.so : lualib-src/lua-mongo.c | $(LUA_CLIB_PATH)
$(CC) $(CFLAGS) $(SHARED) $^ -o $@ -Iskynet-src
Expand Down Expand Up @@ -109,7 +110,7 @@ $(LUA_CLIB_PATH)/crypt.so : lualib-src/lua-crypt.c lualib-src/lsha1.c | $(LUA_CL
$(CC) $(CFLAGS) $(SHARED) $^ -o $@

$(LUA_CLIB_PATH)/sharedata.so : lualib-src/lua-sharedata.c | $(LUA_CLIB_PATH)
$(CC) $(CFLAGS) $(SHARED) $^ -o $@
$(CC) $(CFLAGS) $(SHARED) -Iskynet-src $^ -o $@

$(LUA_CLIB_PATH)/stm.so : lualib-src/lua-stm.c | $(LUA_CLIB_PATH)
$(CC) $(CFLAGS) $(SHARED) -Iskynet-src $^ -o $@
Expand All @@ -124,7 +125,7 @@ $(LUA_CLIB_PATH)/mysqlaux.so : lualib-src/lua-mysqlaux.c | $(LUA_CLIB_PATH)
$(CC) $(CFLAGS) $(SHARED) $^ -o $@

$(LUA_CLIB_PATH)/debugchannel.so : lualib-src/lua-debugchannel.c | $(LUA_CLIB_PATH)
$(CC) $(CFLAGS) $(SHARED) $^ -o $@
$(CC) $(CFLAGS) $(SHARED) -Iskynet-src $^ -o $@

clean :
rm -f $(SKYNET_BUILD_PATH)/skynet $(CSERVICE_PATH)/*.so $(LUA_CLIB_PATH)/*.so
Expand Down
3 changes: 2 additions & 1 deletion lualib-src/lua-bson.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include "atomic.h"

#define DEFAULT_CAP 64
#define MAX_NUMBER 1024
Expand Down Expand Up @@ -1140,7 +1141,7 @@ lobjectid(lua_State *L) {
} else {
time_t ti = time(NULL);
// old_counter is a static var, use atom inc.
uint32_t id = __sync_fetch_and_add(&oid_counter,1);
uint32_t id = ATOM_FINC(&oid_counter);

oid[2] = (ti>>24) & 0xff;
oid[3] = (ti>>16) & 0xff;
Expand Down
16 changes: 7 additions & 9 deletions lualib-src/lua-clientsocket.c
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,8 @@ lusleep(lua_State *L) {

#define QUEUE_SIZE 1024

#define LOCK(q) while (__sync_lock_test_and_set(&(q)->lock,1)) {}
#define UNLOCK(q) __sync_lock_release(&(q)->lock);

struct queue {
int lock;
pthread_mutex_t lock;
int head;
int tail;
char * queue[QUEUE_SIZE];
Expand All @@ -153,7 +150,7 @@ readline_stdin(void * arg) {
memcpy(str, tmp, n);
str[n] = 0;

LOCK(q);
pthread_mutex_lock(&q->lock);
q->queue[q->tail] = str;

if (++q->tail >= QUEUE_SIZE) {
Expand All @@ -163,24 +160,24 @@ readline_stdin(void * arg) {
// queue overflow
exit(1);
}
UNLOCK(q);
pthread_mutex_unlock(&q->lock);
}
return NULL;
}

static int
lreadstdin(lua_State *L) {
struct queue *q = lua_touserdata(L, lua_upvalueindex(1));
LOCK(q);
pthread_mutex_lock(&q->lock);
if (q->head == q->tail) {
UNLOCK(q);
pthread_mutex_unlock(&q->lock);
return 0;
}
char * str = q->queue[q->head];
if (++q->head >= QUEUE_SIZE) {
q->head = 0;
}
UNLOCK(q);
pthread_mutex_unlock(&q->lock);
lua_pushstring(L, str);
free(str);
return 1;
Expand All @@ -201,6 +198,7 @@ luaopen_clientsocket(lua_State *L) {

struct queue * q = lua_newuserdata(L, sizeof(*q));
memset(q, 0, sizeof(*q));
pthread_mutex_init(&q->lock, NULL);
lua_pushcclosure(L, lreadstdin, 1);
lua_setfield(L, -2, "readstdin");

Expand Down
26 changes: 14 additions & 12 deletions lualib-src/lua-debugchannel.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include "spinlock.h"

#define METANAME "debugchannel"
#define LOCK(q) while (__sync_lock_test_and_set(&(q)->lock,1)) {}
#define UNLOCK(q) __sync_lock_release(&(q)->lock);

struct command {
struct command * next;
size_t sz;
};

struct channel {
int lock;
struct spinlock lock;
int ref;
struct command * head;
struct command * tail;
Expand All @@ -26,28 +25,29 @@ channel_new() {
struct channel * c = malloc(sizeof(*c));
memset(c, 0 , sizeof(*c));
c->ref = 1;
SPIN_INIT(c)

return c;
}

static struct channel *
channel_connect(struct channel *c) {
struct channel * ret = NULL;
LOCK(c)
SPIN_LOCK(c)
if (c->ref == 1) {
++c->ref;
ret = c;
}
UNLOCK(c)
SPIN_UNLOCK(c)
return ret;
}

static struct channel *
channel_release(struct channel *c) {
LOCK(c)
SPIN_LOCK(c)
--c->ref;
if (c->ref > 0) {
UNLOCK(c)
SPIN_UNLOCK(c)
return c;
}
// never unlock while reference is 0
Expand All @@ -59,6 +59,8 @@ channel_release(struct channel *c) {
free(p);
p = next;
}
SPIN_UNLOCK(c)
SPIN_DESTROY(c)
free(c);
return NULL;
}
Expand All @@ -67,9 +69,9 @@ channel_release(struct channel *c) {
static struct command *
channel_read(struct channel *c, double timeout) {
struct command * ret = NULL;
LOCK(c)
SPIN_LOCK(c)
if (c->head == NULL) {
UNLOCK(c)
SPIN_UNLOCK(c)
int ti = (int)(timeout * 100000);
usleep(ti);
return NULL;
Expand All @@ -79,7 +81,7 @@ channel_read(struct channel *c, double timeout) {
if (c->head == NULL) {
c->tail = NULL;
}
UNLOCK(c)
SPIN_UNLOCK(c)

return ret;
}
Expand All @@ -90,14 +92,14 @@ channel_write(struct channel *c, const char * s, size_t sz) {
cmd->sz = sz;
cmd->next = NULL;
memcpy(cmd+1, s, sz);
LOCK(c)
SPIN_LOCK(c)
if (c->tail == NULL) {
c->head = c->tail = cmd;
} else {
c->tail->next = cmd;
c->tail = cmd;
}
UNLOCK(c)
SPIN_UNLOCK(c)
}

struct channel_box {
Expand Down
4 changes: 3 additions & 1 deletion lualib-src/lua-multicast.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include <stdint.h>
#include <string.h>

#include "atomic.h"

struct mc_package {
int reference;
uint32_t size;
Expand Down Expand Up @@ -116,7 +118,7 @@ static int
mc_closelocal(lua_State *L) {
struct mc_package *pack = lua_touserdata(L,1);

int ref = __sync_sub_and_fetch(&pack->reference, 1);
int ref = ATOM_DEC(&pack->reference);
if (ref <= 0) {
skynet_free(pack->data);
skynet_free(pack);
Expand Down
9 changes: 5 additions & 4 deletions lualib-src/lua-sharedata.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include "atomic.h"

#define KEYTYPE_INTEGER 0
#define KEYTYPE_STRING 1
Expand Down Expand Up @@ -663,7 +664,7 @@ releaseobj(lua_State *L) {
struct ctrl *c = lua_touserdata(L, 1);
struct table *tbl = c->root;
struct state *s = lua_touserdata(tbl->L, 1);
__sync_fetch_and_sub(&s->ref, 1);
ATOM_DEC(&s->ref);
c->root = NULL;
c->update = NULL;

Expand All @@ -674,7 +675,7 @@ static int
lboxconf(lua_State *L) {
struct table * tbl = get_table(L,1);
struct state * s = lua_touserdata(tbl->L, 1);
__sync_fetch_and_add(&s->ref, 1);
ATOM_INC(&s->ref);

struct ctrl * c = lua_newuserdata(L, sizeof(*c));
c->root = tbl;
Expand Down Expand Up @@ -719,7 +720,7 @@ static int
lincref(lua_State *L) {
struct table *tbl = get_table(L,1);
struct state * s = lua_touserdata(tbl->L, 1);
int ref = __sync_add_and_fetch(&s->ref, 1);
int ref = ATOM_INC(&s->ref);
lua_pushinteger(L , ref);

return 1;
Expand All @@ -729,7 +730,7 @@ static int
ldecref(lua_State *L) {
struct table *tbl = get_table(L,1);
struct state * s = lua_touserdata(tbl->L, 1);
int ref = __sync_sub_and_fetch(&s->ref, 1);
int ref = ATOM_DEC(&s->ref);
lua_pushinteger(L , ref);

return 1;
Expand Down
9 changes: 5 additions & 4 deletions lualib-src/lua-stm.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include "rwlock.h"
#include "skynet_malloc.h"
#include "atomic.h"

struct stm_object {
struct rwlock lock;
Expand Down Expand Up @@ -45,7 +46,7 @@ static void
stm_releasecopy(struct stm_copy *copy) {
if (copy == NULL)
return;
if (__sync_sub_and_fetch(&copy->reference, 1) == 0) {
if (ATOM_DEC(&copy->reference) == 0) {
skynet_free(copy->msg);
skynet_free(copy);
}
Expand All @@ -70,7 +71,7 @@ stm_release(struct stm_object *obj) {
static void
stm_releasereader(struct stm_object *obj) {
rwlock_rlock(&obj->lock);
if (__sync_sub_and_fetch(&obj->reference,1) == 0) {
if (ATOM_DEC(&obj->reference) == 0) {
// last reader, no writer. so no need to unlock
assert(obj->copy == NULL);
skynet_free(obj);
Expand All @@ -82,7 +83,7 @@ stm_releasereader(struct stm_object *obj) {
static void
stm_grab(struct stm_object *obj) {
rwlock_rlock(&obj->lock);
int ref = __sync_fetch_and_add(&obj->reference,1);
int ref = ATOM_FINC(&obj->reference);
rwlock_runlock(&obj->lock);
assert(ref > 0);
}
Expand All @@ -92,7 +93,7 @@ stm_copy(struct stm_object *obj) {
rwlock_rlock(&obj->lock);
struct stm_copy * ret = obj->copy;
if (ret) {
int ref = __sync_fetch_and_add(&ret->reference,1);
int ref = ATOM_FINC(&ret->reference);
assert(ref > 0);
}
rwlock_runlock(&obj->lock);
Expand Down
14 changes: 14 additions & 0 deletions skynet-src/atomic.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#ifndef SKYNET_ATOMIC_H
#define SKYNET_ATOMIC_H

#define ATOM_CAS(ptr, oval, nval) __sync_bool_compare_and_swap(ptr, oval, nval)
#define ATOM_CAS_POINTER(ptr, oval, nval) __sync_bool_compare_and_swap(ptr, oval, nval)
#define ATOM_INC(ptr) __sync_add_and_fetch(ptr, 1)
#define ATOM_FINC(ptr) __sync_fetch_and_add(ptr, 1)
#define ATOM_DEC(ptr) __sync_sub_and_fetch(ptr, 1)
#define ATOM_FDEC(ptr) __sync_fetch_and_sub(ptr, 1)
#define ATOM_ADD(ptr,n) __sync_add_and_fetch(ptr, n)
#define ATOM_SUB(ptr,n) __sync_sub_and_fetch(ptr, n)
#define ATOM_AND(ptr,n) __sync_and_and_fetch(ptr, n)

#endif
17 changes: 9 additions & 8 deletions skynet-src/malloc_hook.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "malloc_hook.h"
#include "skynet.h"
#include "atomic.h"

static size_t _used_memory = 0;
static size_t _memory_block = 0;
Expand All @@ -32,11 +33,11 @@ get_allocated_field(uint32_t handle) {
ssize_t old_alloc = data->allocated;
if(old_handle == 0 || old_alloc <= 0) {
// data->allocated may less than zero, because it may not count at start.
if(!__sync_bool_compare_and_swap(&data->handle, old_handle, handle)) {
if(!ATOM_CAS(&data->handle, old_handle, handle)) {
return 0;
}
if (old_alloc < 0) {
__sync_bool_compare_and_swap(&data->allocated, old_alloc, 0);
ATOM_CAS(&data->allocated, old_alloc, 0);
}
}
if(data->handle != handle) {
Expand All @@ -47,21 +48,21 @@ get_allocated_field(uint32_t handle) {

inline static void
update_xmalloc_stat_alloc(uint32_t handle, size_t __n) {
__sync_add_and_fetch(&_used_memory, __n);
__sync_add_and_fetch(&_memory_block, 1);
ATOM_ADD(&_used_memory, __n);
ATOM_INC(&_memory_block);
ssize_t* allocated = get_allocated_field(handle);
if(allocated) {
__sync_add_and_fetch(allocated, __n);
ATOM_ADD(allocated, __n);
}
}

inline static void
update_xmalloc_stat_free(uint32_t handle, size_t __n) {
__sync_sub_and_fetch(&_used_memory, __n);
__sync_sub_and_fetch(&_memory_block, 1);
ATOM_SUB(&_used_memory, __n);
ATOM_DEC(&_memory_block);
ssize_t* allocated = get_allocated_field(handle);
if(allocated) {
__sync_sub_and_fetch(allocated, __n);
ATOM_SUB(allocated, __n);
}
}

Expand Down
Loading

0 comments on commit 77bdfb2

Please sign in to comment.