Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files

- Introduce stomputil.c|h and add a few more queue functions

  • Loading branch information
decke
decke committed May 18, 2012
1 parent 7272d87 commit 74800232ef848c6b84d7dace88b1a6e3ac2763d8
Showing with 217 additions and 110 deletions.
  1. +1 −1 Makefile
  2. +1 −0 server.c
  3. +35 −106 stomp.c
  4. +0 −3 stomp.h
  5. +144 −0 stomputil.c
  6. +36 −0 stomputil.h
@@ -20,7 +20,7 @@ SRC+= leveldb.c
CFLAGS+=-I${LOCALBASE}/include
LDFLAGS+=-L${LOCALBASE}/lib/event2 -L${LOCALBASE}/lib

SRC+= log.c util.c server.c common.c stomp.c
SRC+= log.c util.c server.c common.c stomp.c stomputil.c
OBJS= ${SRC:.c=.o}

all: redqd
@@ -56,6 +56,7 @@
#include "server.h"
#include "client.h"
#include "stomp.h"
#include "stomputil.h"
#include "leveldb.h"

struct event_base *base;
141 stomp.c
@@ -41,6 +41,8 @@
#include "server.h"
#include "client.h"
#include "stomp.h"
#include "stomputil.h"
#include "leveldb.h"

/* internal data structs */
struct CommandHandler
@@ -216,7 +218,7 @@ int stomp_disconnect(struct client *client)

int stomp_subscribe(struct client *client)
{
struct queue *entry, *tmp_entry;
struct queue *entry;
const char *queuename;

client->response_cmd = STOMP_CMD_NONE;
@@ -228,24 +230,16 @@ int stomp_subscribe(struct client *client)
return 1;
}

for (entry = TAILQ_FIRST(&queues); entry != NULL; entry = tmp_entry) {
tmp_entry = TAILQ_NEXT(entry, entries);
if (strcmp(entry->queuename, queuename) == 0){
entry = tmp_entry;
break;
}
}

entry = stomp_find_queue(queuename);
if (entry == NULL){
entry = malloc(sizeof(*entry));
entry->queuename = malloc(strlen(queuename)+1);
strcpy(entry->queuename, queuename);
TAILQ_INIT(&entry->subscribers);
TAILQ_INSERT_TAIL(&queues, entry, entries);
entry = stomp_add_queue(queuename);
if(entry == NULL){
client->response_cmd = STOMP_CMD_ERROR;
evhttp_add_header(client->response_headers, "message", "Could not create destination");
return 1;
}
}

/* TODO: check if already subscribed */

TAILQ_INSERT_TAIL(&entry->subscribers, client, entries);

return 0;
@@ -264,107 +258,42 @@ int stomp_send(struct client *client)
return 1;
}

TAILQ_FOREACH(queue, &queues, entries) {
if(strcmp(queue->queuename, queuename) == 0)
break;
}

if(queue != NULL){
/* Send it out to the subscribers */
TAILQ_FOREACH(subscriber, &queue->subscribers, entries){
subscriber->response_cmd = STOMP_CMD_MESSAGE;
subscriber->response_headers = client->request_headers;
subscriber->response = client->request_body;

stomp_handle_response(subscriber);

subscriber->response_cmd = STOMP_CMD_NONE;
subscriber->response_headers = NULL;
subscriber->response = NULL;
queue = stomp_find_queue(queuename);
if (queue == NULL){
queue = stomp_add_queue(queuename);
if(queue == NULL){
client->response_cmd = STOMP_CMD_ERROR;
evhttp_add_header(client->response_headers, "message", "Creating destination failed");
return 1;
}
}
else if(strncmp(queuename, "/topic/", 7) != 0){
/* TODO: Store message in LevelDB */

loginfo("No active subscribers on that queue");
}

client->response_cmd = STOMP_CMD_NONE;
client->response = NULL;

return 0;
}


int stomp_parse_headers(struct evkeyvalq *headers, char *request)
{
char *line;
size_t line_length;
char *skey, *svalue;
struct evbuffer *buffer;

buffer = evbuffer_new();

evbuffer_add(buffer, request, strlen(request));

while ((line = evbuffer_readln(buffer, &line_length, EVBUFFER_EOL_CRLF)) != NULL) {
skey = NULL;
svalue = NULL;

if(strchr(line, ':') == NULL){
continue;
}

/* Processing of header lines */
svalue = line;
skey = strsep(&svalue, ":");
if (svalue == NULL){
free(line);
evbuffer_free(buffer);
return 2;
}

svalue += strspn(svalue, " ");

/* TODO: check if header with same name already parsed */

if (evhttp_add_header(headers, skey, svalue) == -1){
free(line);
evbuffer_free(buffer);
#ifdef WITH_LEVELDB
if(strncmp(queuename, "/topic/", 7) != 0){
if(leveldb_add_message(queue, client->request) != 0){
client->response_cmd = STOMP_CMD_ERROR;
evhttp_add_header(client->response_headers, "message", "Storing message failed");
return 1;
}

free(line);
}
#endif

evbuffer_free(buffer);

return 0;
}


void stomp_free_client(struct client *client)
{
/* TODO: remove all subscriptions */
/* TODO: free all allocated memory */
/* Send it out to the subscribers */
TAILQ_FOREACH(subscriber, &queue->subscribers, entries){
subscriber->response_cmd = STOMP_CMD_MESSAGE;
subscriber->response_headers = client->request_headers;
subscriber->response = client->request_body;

struct client *entry, *tmp_entry;
stomp_handle_response(subscriber);

for (entry = TAILQ_FIRST(&clients); entry != NULL; entry = tmp_entry) {
tmp_entry = TAILQ_NEXT(entry, entries);
if ((void *)tmp_entry != NULL && client->fd == tmp_entry->fd) {
TAILQ_REMOVE(&clients, entry, entries);
free(entry);
}
subscriber->response_cmd = STOMP_CMD_NONE;
subscriber->response_headers = NULL;
subscriber->response = NULL;
}

client->authenticated = 0;
logwarn("Free client %d", client->fd);
client->response_cmd = STOMP_CMD_NONE;
client->response = NULL;

if(client->response_cmd == STOMP_CMD_DISCONNECT){
bufferevent_free(client->bev);
close(client->fd);
free(client);
}
return 0;
}

@@ -56,8 +56,5 @@ extern int stomp_send(struct client *client);

extern int stomp_handle_request(struct client *client);
extern int stomp_handle_response(struct client *client);
extern int stomp_parse_headers(struct evkeyvalq *headers, char *request);
extern void stomp_free_client(struct client *client);


#endif /* _STOMP_H_ */
@@ -0,0 +1,144 @@
/*
* Copyright (C) 2011 Bernhard Froehlich <decke@bluelife.at>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Author's name may not be used endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include <stdlib.h>
#include <string.h>
#include <sys/queue.h>
#include <unistd.h>

#include <event2/event.h>
#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <event2/http.h>
#include <event2/keyvalq_struct.h>

#include "client.h"
#include "log.h"
#include "stomp.h"
#include "stomputil.h"


struct queue* stomp_add_queue(const char *queuename)
{
struct queue *entry;

if(queuename == NULL || strlen(queuename) > 512)
return NULL;

entry = malloc(sizeof(*entry));
entry->queuename = malloc(strlen(queuename)+1);
strcpy(entry->queuename, queuename);

TAILQ_INIT(&entry->subscribers);
TAILQ_INSERT_TAIL(&queues, entry, entries);

return entry;
}

struct queue* stomp_find_queue(const char *queuename)
{
struct queue *queue;

TAILQ_FOREACH(queue, &queues, entries) {
if(strcmp(queue->queuename, queuename) == 0){
return queue;
}
}

return NULL;
}

void stomp_free_client(struct client *client)
{
/* TODO: remove all subscriptions */
/* TODO: free all allocated memory */

struct client *entry, *tmp_entry;

for (entry = TAILQ_FIRST(&clients); entry != NULL; entry = tmp_entry) {
tmp_entry = TAILQ_NEXT(entry, entries);
if ((void *)tmp_entry != NULL && client->fd == tmp_entry->fd) {
TAILQ_REMOVE(&clients, entry, entries);
free(entry);
}
}

client->authenticated = 0;
logwarn("Free client %d", client->fd);

if(client->response_cmd == STOMP_CMD_DISCONNECT){
bufferevent_free(client->bev);
close(client->fd);
free(client);
}
}

int stomp_parse_headers(struct evkeyvalq *headers, char *request)
{
char *line;
size_t line_length;
char *skey, *svalue;
struct evbuffer *buffer;

buffer = evbuffer_new();

evbuffer_add(buffer, request, strlen(request));

while ((line = evbuffer_readln(buffer, &line_length, EVBUFFER_EOL_CRLF)) != NULL) {
skey = NULL;
svalue = NULL;

if(strchr(line, ':') == NULL){
continue;
}

/* Processing of header lines */
svalue = line;
skey = strsep(&svalue, ":");
if (svalue == NULL){
free(line);
evbuffer_free(buffer);
return 2;
}

svalue += strspn(svalue, " ");

/* TODO: check if header with same name already parsed */

if (evhttp_add_header(headers, skey, svalue) == -1){
free(line);
evbuffer_free(buffer);
return 1;
}

free(line);
}

evbuffer_free(buffer);

return 0;
}

@@ -0,0 +1,36 @@
/*
* Copyright (C) 2011 Bernhard Froehlich <decke@bluelife.at>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Author's name may not be used endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#ifndef _STOMPUTIL_H_
#define _STOMPUTIL_H_

extern struct queue* stomp_add_queue(const char *queuename);
extern struct queue* stomp_find_queue(const char *queuename);

extern int stomp_parse_headers(struct evkeyvalq *headers, char *request);
extern void stomp_free_client(struct client *client);

#endif /* _STOMPUTIL_H_ */

0 comments on commit 7480023

Please sign in to comment.
You can’t perform that action at this time.