Skip to content

Commit

Permalink
refactor codes
Browse files Browse the repository at this point in the history
  • Loading branch information
ideawu committed Sep 7, 2017
1 parent c032b5a commit 7bc40f0
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 84 deletions.
4 changes: 3 additions & 1 deletion src/comet/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
include ../../build.mk

OBJS = subscriber.o channel.o server.o
OBJS = subscriber.o presence.o channel.o server.o
CFLAGS += -I ../

all: $(OBJS)
Expand All @@ -12,6 +12,8 @@ all: $(OBJS)

subscriber.o: subscriber.h subscriber.cpp
${CXX} -c $(CFLAGS) subscriber.cpp
presence.o: presence.h presence.cpp
${CXX} -c $(CFLAGS) presence.cpp
channel.o: channel.h channel.cpp
${CXX} -c $(CFLAGS) channel.cpp
server.o: server.h server.cpp
Expand Down
62 changes: 62 additions & 0 deletions src/comet/http_query.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
Copyright (c) 2012-2017 The icomet Authors. All rights reserved.
Use of this source code is governed by a BSD-style license that can be
found in the LICENSE file.
*/
#ifndef ICOMET_HTTP_QUERY_H
#define ICOMET_HTTP_QUERY_H

#include <evhttp.h>
#include <event2/http.h>

class HttpQuery{
private:
struct evkeyvalq _get;
struct evkeyvalq _post;
bool _has_post;
public:
HttpQuery(struct evhttp_request *req){
_has_post = false;
if(evhttp_request_get_command(req) == EVHTTP_REQ_POST){
evbuffer *body_evb = evhttp_request_get_input_buffer(req);
size_t len = evbuffer_get_length(body_evb);
if(len > 0){
_has_post = true;
char *data = (char *)malloc(len + 1);
evbuffer_copyout(body_evb, data, len);
data[len] = '\0';
evhttp_parse_query_str(data, &_post);
free(data);
}
}
evhttp_parse_query(evhttp_request_get_uri(req), &_get);
}
~HttpQuery(){
evhttp_clear_headers(&_get);
if(_has_post){
evhttp_clear_headers(&_post);
}
}
int get_int(const char *name, int def){
if(_has_post){
const char *val = evhttp_find_header(&_post, name);
if(val){
return atoi(val);
}
}
const char *val = evhttp_find_header(&_get, name);
return val? atoi(val) : def;
}
const char* get_str(const char *name, const char *def){
if(_has_post){
const char *val = evhttp_find_header(&_post, name);
if(val){
return val;
}
}
const char *val = evhttp_find_header(&_get, name);
return val? val : def;
}
};

#endif
33 changes: 33 additions & 0 deletions src/comet/presence.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
Copyright (c) 2012-2017 The icomet Authors. All rights reserved.
Use of this source code is governed by a BSD-style license that can be
found in the LICENSE file.
*/
#include "presence.h"
#include "server.h"
#include "util/log.h"
#include "server_config.h"
#include <http-internal.h>

static void connection_closecb(struct evhttp_connection *evcon, void *arg){
log_info("presence subscriber disconnected");
PresenceSubscriber *psub = (PresenceSubscriber *)arg;
psub->close();
}

void PresenceSubscriber::start(){
log_info("%s:%d psub, psubs: %d", req->remote_host, req->remote_port, serv->psubs.size);
bufferevent_enable(req->evcon->bufev, EV_READ);
evhttp_connection_set_closecb(req->evcon, connection_closecb, this);

evhttp_send_reply_start(req, HTTP_OK, "OK");
}

void PresenceSubscriber::close(){
log_info("%s:%d psub_end", req->remote_host, req->remote_port);
if(req->evcon){
evhttp_connection_set_closecb(req->evcon, NULL, NULL);
}
evhttp_send_reply_end(req);
serv->psub_end(this);
}
3 changes: 3 additions & 0 deletions src/comet/presence.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ class PresenceSubscriber

Server *serv;
struct evhttp_request *req;

void start();
void close();
};

#endif
82 changes: 2 additions & 80 deletions src/comet/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,56 +12,7 @@ found in the LICENSE file.
#include "server_config.h"
#include "util/log.h"
#include "util/list.h"

class HttpQuery{
private:
struct evkeyvalq _get;
struct evkeyvalq _post;
bool _has_post;
public:
HttpQuery(struct evhttp_request *req){
_has_post = false;
if(evhttp_request_get_command(req) == EVHTTP_REQ_POST){
evbuffer *body_evb = evhttp_request_get_input_buffer(req);
size_t len = evbuffer_get_length(body_evb);
if(len > 0){
_has_post = true;
char *data = (char *)malloc(len + 1);
evbuffer_copyout(body_evb, data, len);
data[len] = '\0';
evhttp_parse_query_str(data, &_post);
free(data);
}
}
evhttp_parse_query(evhttp_request_get_uri(req), &_get);
}
~HttpQuery(){
evhttp_clear_headers(&_get);
if(_has_post){
evhttp_clear_headers(&_post);
}
}
int get_int(const char *name, int def){
if(_has_post){
const char *val = evhttp_find_header(&_post, name);
if(val){
return atoi(val);
}
}
const char *val = evhttp_find_header(&_get, name);
return val? atoi(val) : def;
}
const char* get_str(const char *name, const char *def){
if(_has_post){
const char *val = evhttp_find_header(&_post, name);
if(val){
return val;
}
}
const char *val = evhttp_find_header(&_get, name);
return val? val : def;
}
};
#include "http_query.h"

Server::Server(){
this->auth = AUTH_NONE;
Expand Down Expand Up @@ -170,35 +121,17 @@ void Server::add_presence(PresenceType type, const std::string &cname){
}
}

static void on_psub_disconnect(struct evhttp_connection *evcon, void *arg){
log_info("presence subscriber disconnected");
PresenceSubscriber *psub = (PresenceSubscriber *)arg;
Server *serv = psub->serv;
serv->psub_end(psub);
}

int Server::psub(struct evhttp_request *req){
bufferevent_enable(req->evcon->bufev, EV_READ);

PresenceSubscriber *psub = new PresenceSubscriber();
psub->req = req;
psub->serv = this;
psubs.push_back(psub);
log_info("%s:%d psub, psubs: %d", req->remote_host, req->remote_port, psubs.size);

evhttp_send_reply_start(req, HTTP_OK, "OK");
evhttp_connection_set_closecb(req->evcon, on_psub_disconnect, psub);
psub->start();
return 0;
}

int Server::psub_end(PresenceSubscriber *psub){
struct evhttp_request *req = psub->req;
if(req->evcon){
evhttp_connection_set_closecb(req->evcon, NULL, NULL);
}
evhttp_send_reply_end(req);
psubs.remove(psub);
log_info("%s:%d psub_end, psubs: %d", req->remote_host, req->remote_port, psubs.size);
delete psub;
return 0;
}
Expand Down Expand Up @@ -233,16 +166,6 @@ int Server::sub(struct evhttp_request *req, Subscriber::Type sub_type){
return 0;
}

evhttp_add_header(req->output_headers, "Expires", "0");
evhttp_add_header(req->output_headers, "Cache-Control", "no-cache");
if(sub_type == Subscriber::SSE){
evhttp_add_header(req->output_headers, "Content-Type", "text/event-stream; charset=utf-8");
// 允许客户端跨域请求
evhttp_add_header(req->output_headers, "Access-Control-Allow-Origin", "*");
}else{
evhttp_add_header(req->output_headers, "Content-Type", "application/json; charset=utf-8");
}

HttpQuery query(req);
int seq = query.get_int("seq", 0);
int noop = query.get_int("noop", 0);
Expand Down Expand Up @@ -279,7 +202,6 @@ int Server::sub(struct evhttp_request *req, Subscriber::Type sub_type){
Subscriber *sub = new Subscriber();
sub->req = req;
sub->type = sub_type;
sub->idle = 0;
sub->seq_next = seq;
sub->seq_noop = noop;
sub->callback = cb;
Expand Down
3 changes: 1 addition & 2 deletions src/comet/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ class Server
Channel* new_channel(const std::string &cname);
void free_channel(Channel *channel);

LinkedList<PresenceSubscriber *> psubs;

void add_presence(PresenceType type, const std::string &cname);
//void flush_presence();

Expand All @@ -47,6 +45,7 @@ class Server
};

int auth;
LinkedList<PresenceSubscriber *> psubs;

Server();
~Server();
Expand Down
17 changes: 16 additions & 1 deletion src/comet/subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ static std::string iframe_chunk_suffix = ");</script>";

Subscriber::Subscriber(){
req = NULL;
idle = 0;
}

Subscriber::~Subscriber(){
Expand All @@ -27,6 +28,18 @@ static void connection_closecb(struct evhttp_connection *evcon, void *arg){
sub->close();
}

static void add_http_header(int sub_type, struct evhttp_request *req){
evhttp_add_header(req->output_headers, "Expires", "0");
evhttp_add_header(req->output_headers, "Cache-Control", "no-cache");
if(sub_type == Subscriber::SSE){
evhttp_add_header(req->output_headers, "Content-Type", "text/event-stream; charset=utf-8");
// 允许客户端跨域请求
evhttp_add_header(req->output_headers, "Access-Control-Allow-Origin", "*");
}else{
evhttp_add_header(req->output_headers, "Content-Type", "application/json; charset=utf-8");
}
}

void Subscriber::start(){
log_debug("%s:%d sub %s, seq: %d, subs: %d",
req->remote_host, req->remote_port,
Expand All @@ -38,8 +51,9 @@ void Subscriber::start(){
// 有两个地方需要调用 evhttp_send_reply_end(), 一是服务端主动关闭,二是客户端主动关闭
// 客户端主动关闭会触发 connection_closecb
evhttp_connection_set_closecb(req->evcon, connection_closecb, this);
evhttp_add_header(req->output_headers, "Connection", "keep-alive");

add_http_header(this->type, req);
evhttp_add_header(req->output_headers, "Connection", "keep-alive");
evhttp_send_reply_start(req, HTTP_OK, "OK");

if(this->type == Subscriber::IFRAME){
Expand Down Expand Up @@ -195,6 +209,7 @@ void Subscriber::send_msg(struct evhttp_request *req, const char *type, const st
}

void Subscriber::send_error_reply(int sub_type, struct evhttp_request *req, const char *cb, const std::string &cname, const char *type, const char *content){
add_http_header(sub_type, req);
evhttp_send_reply_start(req, HTTP_OK, "OK");
if(sub_type == Subscriber::IFRAME){
struct evbuffer *buf = evhttp_request_get_output_buffer(req);
Expand Down

0 comments on commit 7bc40f0

Please sign in to comment.