Skip to content

Commit

Permalink
add response_write_callback
Browse files Browse the repository at this point in the history
  • Loading branch information
caosiyang committed Aug 20, 2012
1 parent 58b7b9e commit 28e3b97
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 81 deletions.
161 changes: 88 additions & 73 deletions connection.cpp
Expand Up @@ -46,32 +46,36 @@ void ws_serve_loop(ws_conn_t *conn) {
//read the websocket request
void accept_websocket_request(ws_conn_t *conn) {
LOG("%s", __func__);
if (conn && conn->bev) {
//void(*readcb)(struct bufferevent*, void*) = NULL;
//void(*writecb)(struct bufferevent*, void*) = NULL;
//void(*eventcb)(struct bufferevent*, short, void*) = NULL;
//void *cbarg = NULL;
//bufferevent_getcb(conn->bev, &readcb, &writecb, &eventcb, &cbarg);
bufferevent_setcb(conn->bev, request_read_cb, write_cb, close_cb, conn);
bufferevent_setwatermark(conn->bev, EV_READ, 1, 1);
bufferevent_setwatermark(conn->bev, EV_WRITE, 0, 0);
bufferevent_enable(conn->bev, EV_READ | EV_WRITE);
if (conn) {
if (conn->bev) {
//void(*readcb)(struct bufferevent*, void*) = NULL;
//void(*writecb)(struct bufferevent*, void*) = NULL;
//void(*eventcb)(struct bufferevent*, short, void*) = NULL;
//void *cbarg = NULL;
//bufferevent_getcb(conn->bev, &readcb, &writecb, &eventcb, &cbarg);
bufferevent_setcb(conn->bev, request_read_cb, response_write_cb, close_cb, conn);
bufferevent_setwatermark(conn->bev, EV_READ, 1, 1);
bufferevent_setwatermark(conn->bev, EV_WRITE, 0, 0);
bufferevent_enable(conn->bev, EV_READ);
}
}
}


void respond_websocket_request(ws_conn_t *conn) {
//handshake
ws_req_t ws_req;
//parse request
parse_websocket_request(conn->ws_req_str.c_str(), &ws_req);
//generate response
conn->ws_resp_str = generate_websocket_response(&ws_req);
bufferevent_write(conn->bev, conn->ws_resp_str.c_str(), conn->ws_resp_str.length());
bufferevent_disable(conn->bev, EV_READ);
if (conn) {
ws_req_t ws_req;
parse_websocket_request(conn->ws_req_str.c_str(), &ws_req); //parse request
conn->ws_resp_str = generate_websocket_response(&ws_req); //generate response

if (!conn->ws_resp_str.empty()) {
bufferevent_write(conn->bev, conn->ws_resp_str.c_str(), conn->ws_resp_str.length());
}
}
}


//request read callback
void request_read_cb(struct bufferevent *bev, void *ctx) {
ws_conn_t *conn = (ws_conn_t*)ctx;
char c;
Expand All @@ -84,43 +88,52 @@ void request_read_cb(struct bufferevent *bev, void *ctx) {
//}
//check if receive request completely
if (n >= 4 && conn->ws_req_str.substr(n - 4) == "\r\n\r\n") {
bufferevent_disable(conn->bev, EV_READ);
respond_websocket_request(conn);
}
}


//response write callback
void response_write_cb(struct bufferevent *bev, void *ctx) {
LOG("%s", __func__);
if (ctx) {
ws_conn_t *conn = (ws_conn_t*)ctx;
if (conn->handshake_cb_unit.cb) {
websocket_cb cb = conn->handshake_cb_unit.cb;
void *cbarg = conn->handshake_cb_unit.cbarg;
cb(cbarg);
}

respond_websocket_request(conn);
LOG("%s", conn->ws_req_str.c_str());
LOG("%s", conn->ws_resp_str.c_str());

//frame recv loop
frame_recv_loop(conn);
frame_recv_loop(conn); //frame receive loop
}
}


//send a frame
int32_t send_a_frame(ws_conn_t *conn, const frame_buffer_t *fb) {
if (bufferevent_write(conn->bev, fb->data, fb->len) == fb->len) {
return 0;
}
return -1;

inline int32_t send_a_frame(ws_conn_t *conn, const frame_buffer_t *fb) {
return bufferevent_write(conn->bev, fb->data, fb->len) == fb->len ? 0 : -1;
}


void frame_recv_loop(ws_conn_t *conn) {
conn->step = ONE;
conn->ntoread = 2;
bufferevent_setcb(conn->bev, frame_read_cb, write_cb, close_cb, conn);
bufferevent_setwatermark(conn->bev, EV_READ, conn->ntoread, conn->ntoread);
bufferevent_enable(conn->bev, EV_READ);
if (conn) {
conn->step = ONE;
conn->ntoread = 2;
bufferevent_setcb(conn->bev, frame_read_cb, write_cb, close_cb, conn);
bufferevent_setwatermark(conn->bev, EV_READ, conn->ntoread, conn->ntoread);
bufferevent_setwatermark(conn->bev, EV_WRITE, 0, 0);
bufferevent_enable(conn->bev, EV_READ);
}
}


void frame_read_cb(struct bufferevent *bev, void *ctx) {
//LOG("%s", __func__);
LOG("%s", __func__);
if (!ctx) {
return;
}
ws_conn_t *conn = (ws_conn_t*)ctx;

switch (conn->step) {
Expand All @@ -129,30 +142,31 @@ void frame_read_cb(struct bufferevent *bev, void *ctx) {
LOG("---- STEP 1 ----");
char tmp[conn->ntoread];
bufferevent_read(bev, tmp, conn->ntoread);
//LOG("i read %d bytes", conn->ntoread);
//parse header
if (parse_frame_header(tmp, conn->frame) == 0) {
LOG("FIN = %d", conn->frame->fin);
LOG("OPCODE = %d", conn->frame->opcode);
LOG("MASK = %d", conn->frame->mask);
LOG("PAYLOAD_LEN = %d", conn->frame->payload_len);
}

//payload_len is [0, 127]
if (conn->frame->payload_len <= 125) {
conn->step = THREE;
conn->ntoread = 4;
bufferevent_setwatermark(bev, EV_READ, conn->ntoread, conn->ntoread);
}
else if (conn->frame->payload_len == 126) {
conn->step = TWO;
conn->ntoread = 2;
bufferevent_setwatermark(bev, EV_READ, conn->ntoread, conn->ntoread);
LOG("FIN = %lu", conn->frame->fin);
LOG("OPCODE = %lu", conn->frame->opcode);
LOG("MASK = %lu", conn->frame->mask);
LOG("PAYLOAD_LEN = %lu", conn->frame->payload_len);
//payload_len is [0, 127]
if (conn->frame->payload_len <= 125) {
conn->step = THREE;
conn->ntoread = 4;
bufferevent_setwatermark(bev, EV_READ, conn->ntoread, conn->ntoread);
} else if (conn->frame->payload_len == 126) {
conn->step = TWO;
conn->ntoread = 2;
bufferevent_setwatermark(bev, EV_READ, conn->ntoread, conn->ntoread);
} else if (conn->frame->payload_len == 127) {
conn->step = TWO;
conn->ntoread = 8;
bufferevent_setwatermark(bev, EV_READ, conn->ntoread, conn->ntoread);
}
}
else if (conn->frame->payload_len == 127) {
conn->step = TWO;
conn->ntoread = 8;
bufferevent_setwatermark(bev, EV_READ, conn->ntoread, conn->ntoread);
//TODO
//validate frame header
if (!is_frame_valid(conn->frame)) {
return;
}
break;
}
Expand All @@ -162,14 +176,12 @@ void frame_read_cb(struct bufferevent *bev, void *ctx) {
LOG("---- STEP 2 ----");
char tmp[conn->ntoread];
bufferevent_read(bev, tmp, conn->ntoread);
//LOG("i read %d bytes", conn->ntoread);
if (conn->frame->payload_len == 126) {
conn->frame->payload_len = ntohs(*(uint16_t*)tmp);
LOG("PAYLOAD_LEN = %d", conn->frame->payload_len);
}
if (conn->frame->payload_len == 127) {
LOG("PAYLOAD_LEN = %lu", conn->frame->payload_len);
} else if (conn->frame->payload_len == 127) {
conn->frame->payload_len = myntohll(*(uint64_t*)tmp);
LOG("PAYLOAD_LEN = %d", conn->frame->payload_len);
LOG("PAYLOAD_LEN = %llu", conn->frame->payload_len);
}
conn->step = THREE;
conn->ntoread = 4;
Expand All @@ -187,8 +199,7 @@ void frame_read_cb(struct bufferevent *bev, void *ctx) {
conn->step = FOUR;
conn->ntoread = conn->frame->payload_len;
bufferevent_setwatermark(bev, EV_READ, conn->ntoread, conn->ntoread);
}
else if (conn->frame->payload_len == 0) {
} else if (conn->frame->payload_len == 0) {
/*recv a whole frame*/
if (conn->frame->mask == 0) {
//recv an unmasked frame
Expand Down Expand Up @@ -330,21 +341,25 @@ void ws_conn_setcb(ws_conn_t *conn, enum CBTYPE cbtype, websocket_cb cb, void *c

void write_cb(struct bufferevent *bev, void *ctx) {
//LOG("%s", __func__);
ws_conn_t *conn = (ws_conn_t*)ctx;
if (conn->write_cb_unit.cb) {
websocket_cb cb = conn->write_cb_unit.cb;
void *cbarg = conn->write_cb_unit.cbarg;
cb(cbarg);
if (ctx) {
ws_conn_t *conn = (ws_conn_t*)ctx;
if (conn->write_cb_unit.cb) {
websocket_cb cb = conn->write_cb_unit.cb;
void *cbarg = conn->write_cb_unit.cbarg;
cb(cbarg);
}
}
}


void close_cb(struct bufferevent *bev, short what, void *ctx) {
//LOG("%s", __func__);
ws_conn_t *conn = (ws_conn_t*)ctx;
if (conn->close_cb_unit.cb) {
websocket_cb cb = conn->close_cb_unit.cb;
void *cbarg = conn->close_cb_unit.cbarg;
cb(cbarg);
if (ctx) {
ws_conn_t *conn = (ws_conn_t*)ctx;
if (conn->close_cb_unit.cb) {
websocket_cb cb = conn->close_cb_unit.cb;
void *cbarg = conn->close_cb_unit.cbarg;
cb(cbarg);
}
}
}
8 changes: 6 additions & 2 deletions connection.h
Expand Up @@ -33,7 +33,7 @@ typedef struct websocket_connection {
string ws_req_str;
string ws_resp_str;
enum Step step;
uint32_t ntoread;
uint64_t ntoread;
frame_t *frame; //current frame
ws_cb_unit handshake_cb_unit;
ws_cb_unit frame_recv_cb_unit;
Expand Down Expand Up @@ -78,7 +78,7 @@ void respond_websocket_request(ws_conn_t *conn);


//send a frame
int32_t send_a_frame(ws_conn_t *conn, const frame_buffer_t *fb);
inline int32_t send_a_frame(ws_conn_t *conn, const frame_buffer_t *fb);


//receive a frame
Expand All @@ -89,6 +89,10 @@ void frame_recv_loop(ws_conn_t *conn);
void request_read_cb(struct bufferevent *bev, void *ctx);


//response write callback
void response_write_cb(struct bufferevent *bev, void *ctx);


//frame read callback
void frame_read_cb(struct bufferevent *bev, void *ctx);

Expand Down
8 changes: 8 additions & 0 deletions frame.cpp
Expand Up @@ -20,6 +20,14 @@ void frame_free(frame_t *frame) {
}


bool is_frame_valid(const frame_t *frame) {
if (frame && frame->fin <= 1 && frame->opcode <= 0xf && frame->mask == 1) {
return true;
}
return false;
}


#if 0
int32_t frame_set(frame_t *frame, uint8_t fin, uint8_t opcode, uint64_t payload_len, const char *payload_data) {
if (!frame || fin > 1 || opcode > 0xf) {
Expand Down
3 changes: 3 additions & 0 deletions frame.h
Expand Up @@ -38,6 +38,9 @@ frame_t *frame_new();
void frame_free(frame_t *frame);


bool is_frame_valid(const frame_t *frame);


#if 0
int32_t frame_set(frame_t *frame,
uint8_t fin,
Expand Down
14 changes: 8 additions & 6 deletions websocket.cpp
Expand Up @@ -89,12 +89,14 @@ void print_websocket_request(const ws_req_t *ws_req) {

string generate_websocket_response(const ws_req_t *ws_req) {
string resp;
resp += "HTTP/1.1 101 WebSocket Protocol HandShake\r\n";
resp += "Connection: Upgrade\r\n";
resp += "Upgrade: WebSocket\r\n";
resp += "Server: WebChat Demo Server\r\n";
resp += "Sec-WebSocket-Accept: " + generate_key(ws_req->sec_websocket_key) + "\r\n";
resp += "\r\n";
if (ws_req) {
resp += "HTTP/1.1 101 WebSocket Protocol HandShake\r\n";
resp += "Connection: Upgrade\r\n";
resp += "Upgrade: WebSocket\r\n";
resp += "Server: WebChat Demo Server\r\n";
resp += "Sec-WebSocket-Accept: " + generate_key(ws_req->sec_websocket_key) + "\r\n";
resp += "\r\n";
}
return resp;
}

Expand Down

0 comments on commit 28e3b97

Please sign in to comment.