Skip to content

Commit

Permalink
fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
changchang committed Mar 19, 2013
1 parent db58bae commit 66dddb7
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 37 deletions.
9 changes: 8 additions & 1 deletion include/pomelo-private/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ int pc_run(pc_client_t *client);
* Wait a condition notify of client, used internal only.
*
* @param client client instance.
* @param timeout wait timeout value or -1 for wait forever.
* @param timeout wait timeout value or 0 for wait forever.
*/
void pc__cond_wait(pc_client_t *client, uint64_t timeout);

Expand Down Expand Up @@ -159,4 +159,11 @@ void pc__default_msg_encode_done_cb(pc_client_t * client, pc_buf_t buf);
void pc__pkg_cb(pc_pkg_type type, const char *data, size_t len,
void *attach);

/**
* Clear the client instance.
*
* @param client client instance.
*/
void pc__client_clear(pc_client_t *client);

#endif /* PC_INTERNAL_H */
6 changes: 3 additions & 3 deletions include/pomelo.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,9 @@ struct pc_client_s {
pc_msg_parse_done_cb parse_msg_done;
pc_msg_encode_cb encode_msg;
pc_msg_encode_done_cb encode_msg_done;
uv_timer_t heartbeat_timer;
uv_timer_t timeout_timer;
uv_async_t close_async;
uv_timer_t *heartbeat_timer;
uv_timer_t *timeout_timer;
uv_async_t *close_async;
uv_mutex_t mutex;
uv_cond_t cond;
uv_mutex_t listener_mutex;
Expand Down
78 changes: 52 additions & 26 deletions src/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
#include <stdio.h>
#include <stddef.h>
#include <string.h>
#include <unistd.h>
#include "pomelo.h"
#include "pomelo-private/listener.h"
#include "pomelo-protocol/package.h"
#include "pomelo-protocol/message.h"
#include "pomelo-private/transport.h"
#include "pomelo-private/internal.h"
#include "pomelo-private/common.h"
#include "pomelo-private/ngx-queue.h"

static void pc__client_init(pc_client_t *client);
Expand Down Expand Up @@ -55,22 +57,35 @@ void pc__client_init(pc_client_t *client) {
abort();
}

if(uv_timer_init(client->uv_loop, &client->heartbeat_timer)) {
client->heartbeat_timer = (uv_timer_t *)malloc(sizeof(uv_timer_t));
if(client->heartbeat_timer == NULL) {
fprintf(stderr, "Fail to malloc client->heartbeat_timer.\n");
abort();
}
if(uv_timer_init(client->uv_loop, client->heartbeat_timer)) {
fprintf(stderr, "Fail to init client->heartbeat_timer.\n");
abort();
}
client->heartbeat_timer.timer_cb = pc__heartbeat_cb;
client->heartbeat_timer.data = client;
client->heartbeat_timer->timer_cb = pc__heartbeat_cb;
client->heartbeat_timer->data = client;
client->heartbeat = 0;

if(uv_timer_init(client->uv_loop, &client->timeout_timer)) {
client->timeout_timer = (uv_timer_t *)malloc(sizeof(uv_timer_t));
if(client->timeout_timer == NULL) {
fprintf(stderr, "Fail to malloc client->timeout_timer.\n");
abort();
}
if(uv_timer_init(client->uv_loop, client->timeout_timer)) {
fprintf(stderr, "Fail to init client->timeout_timer.\n");
abort();
}
client->timeout_timer.timer_cb = pc__timeout_cb;
client->timeout_timer.data = client;
client->timeout_timer->timer_cb = pc__timeout_cb;
client->timeout_timer->data = client;
client->timeout = 0;

uv_async_init(client->uv_loop, &client->close_async, pc__close_async_cb);
client->close_async.data = client;
client->close_async = (uv_async_t *)malloc(sizeof(uv_async_t));
uv_async_init(client->uv_loop, client->close_async, pc__close_async_cb);
client->close_async->data = client;
uv_mutex_init(&client->mutex);
uv_cond_init(&client->cond);
uv_mutex_init(&client->listener_mutex);
Expand All @@ -90,11 +105,6 @@ void pc__client_init(pc_client_t *client) {
* Clear all inner resource of Pomelo client
*/
void pc__client_clear(pc_client_t *client) {
if(client->uv_loop) {
free(client->uv_loop);
client->uv_loop = NULL;
}

if(client->listeners) {
pc_map_destroy(client->listeners);
client->listeners = NULL;
Expand Down Expand Up @@ -139,7 +149,7 @@ void pc_client_stop(pc_client_t *client) {
return;
}

if(PC_TP_ST_CLOSED == client->state) {
if(PC_ST_CLOSED == client->state) {
return;
}

Expand All @@ -149,9 +159,20 @@ void pc_client_stop(pc_client_t *client) {
client->transport = NULL;
}

uv_close((uv_handle_t *)&client->heartbeat_timer, NULL);
uv_close((uv_handle_t *)&client->timeout_timer, NULL);
uv_close((uv_handle_t *)&client->close_async, NULL);
if(client->heartbeat_timer != NULL) {
uv_close((uv_handle_t *)client->heartbeat_timer, pc__handle_close_cb);
client->heartbeat_timer = NULL;
client->heartbeat = 0;
}
if(client->timeout_timer != NULL) {
uv_close((uv_handle_t *)client->timeout_timer, pc__handle_close_cb);
client->timeout_timer = NULL;
client->timeout = 0;
}
if(client->close_async != NULL) {
uv_close((uv_handle_t *)client->close_async, pc__handle_close_cb);
client->close_async = NULL;
}
}

void pc_client_destroy(pc_client_t *client) {
Expand All @@ -170,13 +191,19 @@ void pc_client_destroy(pc_client_t *client) {
// 1. asyn worker thread
// 2. wait cond until signal or timeout
// 3. free client
uv_async_send(&client->close_async);
uv_async_send(client->close_async);

pc__cond_wait(client, 3000);
pc__cond_wait(client, 3);

if(PC_ST_CLOSED != client->state) {
pc_client_stop(client);
pc__client_clear(client);
// wait uv_loop_t stop
sleep(1);
if(client->uv_loop) {
free(client->uv_loop);
client->uv_loop = NULL;
}
}
free(client);
}
Expand Down Expand Up @@ -237,7 +264,7 @@ int pc_add_listener(pc_client_t *client, const char *event,
}
listener->cb = event_cb;

uv_mutex_lock(&client->mutex);
uv_mutex_lock(&client->listener_mutex);
ngx_queue_t *head = pc_map_get(client->listeners, event);

if(head == NULL) {
Expand All @@ -254,13 +281,13 @@ int pc_add_listener(pc_client_t *client, const char *event,
}

ngx_queue_insert_tail(head, &listener->queue);
uv_mutex_unlock(&client->mutex);
uv_mutex_unlock(&client->listener_mutex);

return 0;
}

void pc_remove_listener(pc_client_t *client, const char *event, pc_event_cb cb) {
uv_mutex_lock(&client->mutex);
uv_mutex_lock(&client->listener_mutex);
ngx_queue_t *head = (ngx_queue_t *)pc_map_get(client->listeners, event);
if(head == NULL) {
return;
Expand All @@ -282,11 +309,11 @@ void pc_remove_listener(pc_client_t *client, const char *event, pc_event_cb cb)
pc_map_del(client->listeners, event);
free(head);
}
uv_mutex_unlock(&client->mutex);
uv_mutex_unlock(&client->listener_mutex);
}

void pc_emit_event(pc_client_t *client, const char *event, void *data) {
uv_mutex_lock(&client->mutex);
uv_mutex_lock(&client->listener_mutex);
ngx_queue_t *head = (ngx_queue_t *)pc_map_get(client->listeners, event);
if(head == NULL) {
return;
Expand All @@ -299,7 +326,7 @@ void pc_emit_event(pc_client_t *client, const char *event, void *data) {
listener = ngx_queue_data(item, pc_listener_t, queue);
listener->cb(client, event, data);
}
uv_mutex_unlock(&client->mutex);
uv_mutex_unlock(&client->listener_mutex);
}

int pc_run(pc_client_t *client) {
Expand Down Expand Up @@ -342,5 +369,4 @@ void pc__release_requests(pc_map_t *map, const char* key, void *value) {
void pc__close_async_cb(uv_async_t *handle, int status) {
pc_client_t *client = (pc_client_t *)handle->data;
pc_client_stop(client);
pc__client_clear(client);
}
20 changes: 19 additions & 1 deletion src/map.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,25 @@ int pc_map_set(pc_map_t *map, const char *key, void *value) {

size_t hash = pc__hash(key);

ngx_queue_insert_tail(&map->buckets[hash % map->capacity], &pair->queue);
ngx_queue_t *head = &map->buckets[hash % map->capacity];
ngx_queue_t *q = NULL;
pc__pair_t *old_pair = NULL;
ngx_queue_foreach(q, head) {
old_pair = ngx_queue_data(q, pc__pair_t, queue);
if(!strcmp(old_pair->key, key)) {
ngx_queue_remove(q);
ngx_queue_init(q);
} else {
old_pair = NULL;
}
}

ngx_queue_insert_tail(head, &pair->queue);

if(old_pair) {
map->release_value(map, old_pair->key, old_pair->value);
free(old_pair);
}

return 0;
error:
Expand Down
4 changes: 2 additions & 2 deletions src/pkg-handshake.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ int pc__handshake_resp(pc_client_t *client,
if(hb > 0) {
client->heartbeat = hb * 1000;
client->timeout = client->heartbeat * PC_HEARTBEAT_TIMEOUT_FACTOR;
uv_timer_set_repeat(&client->heartbeat_timer, client->heartbeat);
uv_timer_set_repeat(&client->timeout_timer, client->timeout);
uv_timer_set_repeat(client->heartbeat_timer, client->heartbeat);
uv_timer_set_repeat(client->timeout_timer, client->timeout);
} else {
client->heartbeat = -1;
client->timeout = -1;
Expand Down
6 changes: 3 additions & 3 deletions src/pkg-heartbeat.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ int pc__binary_write(pc_client_t *client, const char *data, size_t len,
static void pc__heartbeat_req_cb(uv_write_t* req, int status);

int pc__heartbeat(pc_client_t *client) {
uv_timer_stop(&client->timeout_timer);
uv_timer_again(&client->heartbeat_timer);
uv_timer_stop(client->timeout_timer);
uv_timer_again(client->heartbeat_timer);
return 0;
}

Expand Down Expand Up @@ -54,7 +54,7 @@ void pc__heartbeat_cb(uv_timer_t* heartbeat_timer, int status) {
return;
}

uv_timer_again(&client->timeout_timer);
uv_timer_again(client->timeout_timer);
}

void pc__timeout_cb(uv_timer_t* timeout_timer, int status) {
Expand Down
4 changes: 3 additions & 1 deletion src/thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
void pc__cond_wait(pc_client_t *client, uint64_t timeout) {
uv_mutex_lock(&client->mutex);
if(timeout > 0) {
uv_cond_timedwait(&client->cond, &client->mutex, 3000);
timeout *= 1e9;
uv_cond_timedwait(&client->cond, &client->mutex, timeout);
} else {
uv_cond_wait(&client->cond, &client->mutex);
}
Expand Down Expand Up @@ -34,5 +35,6 @@ void pc__worker(void *arg) {
client->state = PC_ST_CLOSED;

pc_emit_event(client, PC_EVENT_DISCONNECT, NULL);
pc__client_clear(client);
pc__cond_broadcast(client);
}

0 comments on commit 66dddb7

Please sign in to comment.