Skip to content

Commit

Permalink
* FIX [mqtt/protocol] fix #6 & remove commented code.
Browse files Browse the repository at this point in the history
  • Loading branch information
JaylinYu committed Feb 1, 2022
1 parent 5663049 commit a104d42
Showing 1 changed file with 27 additions and 88 deletions.
115 changes: 27 additions & 88 deletions src/mqtt/protocol/mqtt/mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ static void mqtt_sock_recv(void *arg, nni_aio *aio);
static void mqtt_send_cb(void *arg);
static void mqtt_recv_cb(void *arg);
static void mqtt_timer_cb(void *arg);
static void mqtt_run_recv_queue(mqtt_sock_t *s);

static int mqtt_pipe_init(void *arg, nni_pipe *pipe, void *s);
static void mqtt_pipe_fini(void *arg);
Expand Down Expand Up @@ -83,7 +82,7 @@ struct mqtt_sock_s {
mqtt_ctx_t master; // to which we delegate send/recv calls
mqtt_pipe_t * mqtt_pipe;
nni_list recv_queue; // ctx pending to receive
nni_list send_queue; // ctx pending to send
// nni_list send_queue; // ctx pending to send
};

/******************************************************************************
Expand Down Expand Up @@ -111,25 +110,13 @@ mqtt_sock_init(void *arg, nni_sock *sock)

s->mqtt_pipe = NULL;
NNI_LIST_INIT(&s->recv_queue, mqtt_ctx_t, rqnode);
NNI_LIST_INIT(&s->send_queue, mqtt_ctx_t, sqnode);
// NNI_LIST_INIT(&s->send_queue, mqtt_ctx_t, sqnode);
}

static void
mqtt_sock_fini(void *arg)
{
mqtt_sock_t *s = arg;
// work_t * work;

// nni_mtx_lock(&s->mtx);
// NNI_ASSERT(nni_list_empty(&s->recv_queue));

// while (NULL != (work = nni_list_first(&s->free_list))) {
// nni_list_remove(&s->free_list, work);
// work_fini(work);
// nni_free(work, sizeof(work_t));
// }
// nni_mtx_unlock(&s->mtx);

mqtt_ctx_fini(&s->master);
nni_mtx_fini(&s->mtx);
}
Expand Down Expand Up @@ -162,16 +149,6 @@ mqtt_sock_recv(void *arg, nni_aio *aio)
mqtt_ctx_recv(&s->master, aio);
}

// Note: This routine should be called with the sock lock held.
static inline void
mqtt_sock_close_work_queue(mqtt_sock_t *s, nni_list *queue)
{
nni_msg * msg;
while (NULL != (msg = nni_list_first(queue))) {
nni_msg_free(msg);
}
}

/******************************************************************************
* Pipe Implementation *
******************************************************************************/
Expand All @@ -195,8 +172,8 @@ mqtt_pipe_init(void *arg, nni_pipe *pipe, void *s)
// accidental collision across restarts.
nni_id_map_init(&p->sent_unack, 0x0000u, 0xffffu, true);
nni_id_map_init(&p->recv_unack, 0x0000u, 0xffffu, true);
nni_lmq_init(&p->recv_messages, 1024); // remove hard code value
nni_lmq_init(&p->send_messages, 1024); // remove hard code value
nni_lmq_init(&p->recv_messages, 6); // remove hard code value
nni_lmq_init(&p->send_messages, 6); // remove hard code value

return (0);
}
Expand Down Expand Up @@ -234,14 +211,13 @@ mqtt_pipe_start(void *arg)

nni_mtx_lock(&s->mtx);
s->mqtt_pipe = p;
// mqtt_send_start(s);
nni_mtx_unlock(&s->mtx);
//initiate the resend timer
nni_sleep_aio(s->retry, &p->time_aio);
nni_pipe_recv(p->pipe, &p->recv_aio);
if ((c = nni_list_first(&s->send_queue)) != NULL) {
mqtt_ctx_send(c, c->saio);
}
// if ((c = nni_list_first(&s->send_queue)) != NULL) {
// mqtt_ctx_send(c, c->saio);
// }
return (0);
}

Expand All @@ -258,9 +234,14 @@ void
mqtt_close_unack_msg_cb(void *arg)
{
nni_msg * msg = arg;
nni_aio * aio = NULL;

aio = nni_mqtt_msg_get_aio(msg);
if (aio) {
nni_aio_finish_error(aio, NNG_ECLOSED);
}
nni_msg_free(msg);
//TODO trigger aio inside msg?
// mqtt_sock_close_work(work->mqtt_sock, work);

}

static void
Expand All @@ -274,7 +255,8 @@ mqtt_pipe_close(void *arg)
nni_aio_close(&p->send_aio);
nni_aio_close(&p->recv_aio);
nni_aio_close(&p->time_aio);
// mqtt_sock_close_work_queue(s, &s->recv_queue);
nni_lmq_flush(&p->recv_messages);
nni_lmq_flush(&p->send_messages);
//TODO free msg for each map
nni_id_map_foreach(&p->sent_unack, mqtt_close_unack_msg_cb);
nni_id_map_foreach(&p->recv_unack, mqtt_close_unack_msg_cb);
Expand Down Expand Up @@ -464,7 +446,8 @@ mqtt_recv_cb(void *arg)
break;

case NNG_MQTT_PINGRESP:
// do nothing
// free msg
nni_msg_free(msg);
nni_mtx_unlock(&s->mtx);
return;

Expand Down Expand Up @@ -587,21 +570,6 @@ mqtt_recv_cb(void *arg)
return;
}

// if (work_is_error(work)) {
// // protocol error, just close the connection
// nni_mtx_unlock(&s->mtx);
// nni_aio_finish_error(work->user_aio, NNG_EPROTO);
// nni_pipe_close(p->pipe);
// return;
// } else if (work_is_final(work)) {
// // good news, protocol state machine run to the end
// nni_aio *aio = work->user_aio;
// mqtt_sock_free_work(s, work);
// nni_mtx_unlock(&s->mtx);
// nni_aio_finish(aio, 0, 0);
// return;
// }

nni_mtx_unlock(&s->mtx);
if (user_aio) {
nni_aio_finish(user_aio, 0, 0);
Expand All @@ -610,30 +578,6 @@ mqtt_recv_cb(void *arg)
return;
}

// Note: This routine should be called with the sock lock held.
// static void
// mqtt_run_recv_queue(mqtt_sock_t *s)
// {
// work_t * work = nni_list_first(&s->recv_queue);
// mqtt_pipe_t *p = s->mqtt_pipe;
// nni_msg * msg;

// while (NULL != work) {
// if (0 != nni_lmq_get(&p->recv_messages, &msg)) {
// break;
// }
// nni_list_remove(&s->recv_queue, work);
// // nni_pipe_recv(p->pipe, &work->recv_aio);
// nni_aio_set_msg(work->user_aio, msg);
// nni_aio_finish(work->user_aio, 0,
// nni_msg_header_len(msg) + nni_msg_len(msg));
// mqtt_sock_free_work(s, work);
// work = nni_list_first(&s->recv_queue);
// }

// return;
// }

/******************************************************************************
* Context Implementation *
******************************************************************************/
Expand All @@ -659,7 +603,7 @@ mqtt_ctx_fini(void *arg)
nni_mtx_lock(&s->mtx);
if ((aio = ctx->saio) != NULL) {
ctx->saio = NULL;
nni_list_remove(&s->send_queue, ctx);
// nni_list_remove(&s->send_queue, ctx);
nni_aio_finish_error(aio, NNG_ECLOSED);
}
if ((aio = ctx->raio) != NULL) {
Expand Down Expand Up @@ -695,11 +639,15 @@ mqtt_ctx_send(void *arg, nni_aio *aio)

if (p == NULL) {
// connection is not established yet
// cache ctx for next
ctx->saio = aio;
ctx->raio = NULL;
nni_list_append(&s->send_queue, ctx);
// cache ctx for next or just finish_error?
nni_mtx_unlock(&s->mtx);
nni_aio_set_msg(aio, NULL);
nni_aio_finish(aio, 0, nni_msg_len(msg));
nni_msg_free(msg);
// ctx->saio = aio;
// ctx->raio = NULL;
// nni_list_append(&s->send_queue, ctx);
// nni_mtx_unlock(&s->mtx);
return;
}
msg = nni_aio_get_msg(aio);
Expand Down Expand Up @@ -808,15 +756,6 @@ mqtt_ctx_recv(void *arg, nni_aio *aio)
nni_list_append(&s->recv_queue, ctx);
nni_mtx_unlock(&s->mtx);
return;
// msg = nni_aio_get_msg(&p->recv_aio);
// if (msg == NULL) {
// nni_mtx_unlock(&s->mtx);
// return;
// }
// nni_aio_set_msg(&p->recv_aio, NULL);
// nni_mtx_unlock(&s->mtx);
// nni_aio_set_msg(aio, msg);
// nni_aio_finish(aio, 0, nni_msg_len(msg));
}

/******************************************************************************
Expand Down

0 comments on commit a104d42

Please sign in to comment.