Permalink
Browse files

kick slow clients and write a message (after dropping their pending d…

…ata)
  • Loading branch information...
1 parent ac22e1f commit 7cbcd8553d744f4d01affb2d0d89dd794d192774 @jehiah jehiah committed Dec 22, 2010
Showing with 47 additions and 1 deletion.
  1. +47 −1 pubsub/pubsub.c
View
@@ -8,12 +8,18 @@
#define BUFSZ 1024
#define BOUNDARY "xXPubSubXx"
+#define MAX_PENDING_DATA 1024*1024*50
int ps_debug = 0;
+enum kick_client_enum {
+ CLIENT_OK = 0,
+ KICK_CLIENT = 1,
+};
typedef struct cli {
int multipart;
int websocket;
+ enum kick_client_enum kick_client;
uint64_t connection_id;
time_t connect_time;
struct evbuffer *buf;
@@ -28,6 +34,39 @@ uint64_t msgRecv = 0;
uint64_t msgSent = 0;
+
+int
+is_slow(struct cli *client) {
+ if (client->kick_client == KICK_CLIENT) { return 1; }
+ struct evhttp_connection *evcon;
+ unsigned long output_buffer_length;
+
+ evcon = (struct evhttp_connection *)client->req->evcon;
+ output_buffer_length = (unsigned long)EVBUFFER_LENGTH(evcon->output_buffer);
+ if (output_buffer_length > MAX_PENDING_DATA) {
+ fprintf(stdout, "%lu >> kicking client with %lu pending data\n", client->connection_id, output_buffer_length);
+ client->kick_client = KICK_CLIENT;
+ // clear the clients output buffer
+ evbuffer_drain(evcon->output_buffer, EVBUFFER_LENGTH(evcon->output_buffer));
+ evbuffer_add_printf(evcon->output_buffer, "ERROR_TOO_SLOW. kicked for having %lu pending bytes\n", output_buffer_length);
+ return 1;
+ }
+ return 0;
+}
+
+int
+can_kick(struct cli *client) {
+ if (client->kick_client == CLIENT_OK){return 0;}
+ // if the buffer length is back to zero, we can kick now
+ // our error notice has been pushed to the client
+ struct evhttp_connection *evcon;
+ evcon = (struct evhttp_connection *)client->req->evcon;
+ if (EVBUFFER_LENGTH(evcon->output_buffer) == 0){
+ return 1;
+ }
+ return 0;
+}
+
void
argtoi(struct evkeyvalq *args, char *key, int *val, int def)
{
@@ -129,7 +168,13 @@ void pub_cb(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
TAILQ_FOREACH(client, &clients, entries) {
msgSent++;
evbuffer_drain(client->buf, EVBUFFER_LENGTH(client->buf));
-
+ if (is_slow(client)) {
+ if (can_kick(client)) {
+ evhttp_connection_free(client->req->evcon);
+ continue;
+ }
+ continue;
+ }
if (client->websocket) {
// set to non-chunked so that send_reply_chunked doesn't add \r\n before/after this block
client->req->chunked = 0;
@@ -182,6 +227,7 @@ void sub_cb(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
client->connect_time = time(NULL);
time_struct = gmtime(&client->connect_time);
client->buf = evbuffer_new();
+ client->kick_client = CLIENT_OK;
strftime(buf, 248, "%Y-%m-%d %H:%M:%S", time_struct);

0 comments on commit 7cbcd85

Please sign in to comment.