Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

some fixes #1

Merged
merged 5 commits into from
Aug 6, 2012
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 14 additions & 15 deletions skynet_handle.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,20 @@ skynet_handle_register(struct skynet_context *ctx) {
for (;;) {
int i;
for (i=0;i<s->slot_size;i++) {
int hash = (i+s->handle_index) & (s->slot_size-1);
uint32_t handle = (i+s->handle_index) & HANDLE_MASK;
int hash = handle & (s->slot_size-1);
if (s->slot[hash] == NULL) {
s->slot[hash] = ctx;
uint32_t handle = s->handle_index + i ;
skynet_context_init(ctx, handle);
s->handle_index = handle + 1;

rwlock_wunlock(&s->lock);

s->handle_index = handle + 1;

return (handle & HANDLE_MASK) | s->harbor;
handle |= s->harbor;
skynet_context_init(ctx, handle);
return handle;
}
}
assert((s->slot_size*2 - 1) <= HANDLE_MASK);
struct skynet_context ** new_slot = malloc(s->slot_size * 2 * sizeof(struct skynet_context *));
memset(new_slot, 0, s->slot_size * 2 * sizeof(struct skynet_context *));
for (i=0;i<s->slot_size;i++) {
Expand All @@ -76,18 +77,17 @@ skynet_handle_retire(uint32_t handle) {
s->slot[hash] = NULL;

int i;
int j=0;
for (i=0;i<s->name_count;i++,j++) {
int j=0, n=s->name_count;
for (i=0; i<n; ++i) {
if (s->name[i].handle == handle) {
free(s->name[i].name);
++j;
--s->name_count;
} else {
if (i!=j) {
s->name[i] = s->name[j];
}
continue;
} else if (i!=j) {
s->name[j] = s->name[i];
}
++j;
}
s->name_count = j;
}

rwlock_wunlock(&s->lock);
Expand Down Expand Up @@ -128,7 +128,6 @@ skynet_handle_findname(const char * name) {
int c = strcmp(n->name, name);
if (c==0) {
handle = n->handle;
handle |= s->harbor;
break;
}
if (c<0) {
Expand Down
45 changes: 35 additions & 10 deletions skynet_harbor.c
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,36 @@ _report_zmq_error(int rc) {
}
}

static int
_isdecimal(int c) {
return c>='0' && c<='9';
}

// Name-updating protocols:
//
// 1) harbor_id=harbor_address
// 2) context_name=context_handle
static int
_split_name(uint8_t *buf, int len, int *np) {
uint8_t *sep;
if (len > 0 && _isdecimal(buf[0])) {
int i=0;
int n=0;
do {
n = n*10 + (buf[i]-'0');
} while(++i<len && _isdecimal(buf[i]));
if (i < len && buf[i] == '=') {
buf[i] = '\0';
*np = n;
return i;
}
} else if ((sep = memchr(buf, '=', len)) != NULL) {
*sep = '\0';
return (int)(sep-buf);
}
return -1;
}

// Always in main harbor thread
static void
_name_update() {
Expand All @@ -321,17 +351,11 @@ _name_update() {
int rc = zmq_recv(Z->zmq_local,&content,0);
_report_zmq_error(rc);
int sz = zmq_msg_size(&content);
int i;
int n = 0;
uint8_t * buffer = zmq_msg_data(&content);
for (i=0;i<sz;i++) {
if (buffer[i] == '=') {
buffer[i] = '\0';
break;
}
n = n * 10 + (buffer[i] - '0');
}
if (i==sz) {

int n = 0;
int i = _split_name(buffer, sz, &n);
if (i == -1) {
char tmp[sz+1];
memcpy(tmp,buffer,sz);
tmp[sz] = '\0';
Expand Down Expand Up @@ -526,6 +550,7 @@ _remote_send() {
// double check
if (!skynet_remotemq_pop(Z->queue,&msg)) {
printf("goback %x\n",msg.destination);
__sync_lock_test_and_set(&Z->notice_event, 1);
goto _goback;
}
}
Expand Down
14 changes: 11 additions & 3 deletions skynet_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,18 @@ skynet_context_message_dispatch(void) {

assert(ctx->in_global_queue);

int re_q = 1;
struct skynet_message msg;
if (skynet_mq_pop(q,&msg)) {
// empty queue
__sync_lock_release(&ctx->in_global_queue);
skynet_context_release(ctx);
return 0;
if (skynet_mq_pop(q,&msg)) {
skynet_context_release(ctx);
return 0;
}
if (__sync_lock_test_and_set(&ctx->in_global_queue, 1)) {
re_q = 0;
}
}

if (ctx->cb == NULL) {
Expand All @@ -178,7 +184,9 @@ skynet_context_message_dispatch(void) {

skynet_context_release(ctx);

skynet_globalmq_push(q);
if (re_q) {
skynet_globalmq_push(q);
}

return 0;
}
Expand Down