Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files

- Implement leveldb_load_queue()

- Add MAXQUEUELEN
  • Loading branch information...
decke
decke committed May 18, 2012
1 parent 7480023 commit 2b7cb8d042761f40abc948bce654cf85c6ed0263
Showing with 80 additions and 3 deletions.
  1. +58 −2 leveldb.c
  2. +1 −0 leveldb.h
  3. +2 −0 stomp.h
  4. +18 −1 stomputil.c
  5. +1 −0 stomputil.h
@@ -25,6 +25,7 @@
*/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

/* atomic_fetchadd */
@@ -36,6 +37,7 @@
#include "log.h"
#include "util.h"
#include "client.h"
#include "stomp.h"

#define CheckNoError(err) \
if ((err) != NULL) { \
@@ -99,12 +101,12 @@ int leveldb_free(void)
int leveldb_add_message(struct queue *queue, char *message)
{
leveldb_writebatch_t *wb;
char key[256];
char key[MAXQUEUELEN+10];
char value[16];
char *error = NULL;
int seq;

if(strlen(queue->queuename) > strlen(key)-10){
if(strlen(queue->queuename) >= MAXQUEUELEN){
logerror("LevelDB add_message failed: Queuename too long");
return 1;
}
@@ -130,6 +132,60 @@ int leveldb_add_message(struct queue *queue, char *message)
return 1;
}

loginfo("Added message %d to %s: %.20s", queue->write, queue->queuename, message);

return 0;
}

int leveldb_load_queue(struct queue *queue)
{
char key[MAXQUEUELEN+10];
char *value;
char *error = NULL;
size_t value_len;

if(strlen(queue->queuename) >= MAXQUEUELEN){
logerror("LevelDB add_message failed: Queuename too long");
return 1;
}

/* queuename.read */
snprintf(key, sizeof(key)-1, "%s.read", queue->queuename);
key[sizeof(key)-1] = '\0';

value = leveldb_get(db, roptions, key, strlen(key), &value_len, &error);
if(error != NULL){
logerror("LevelDB load_queue failed: %s", error);
return 1;
}

if(value != NULL){
queue->read = atoi(value)+1;
free(value);
value = NULL;
}
else
queue->read = 1;


/* queuename.write */
snprintf(key, sizeof(key)-1, "%s.write", queue->queuename);
key[sizeof(key)-1] = '\0';

value = leveldb_get(db, roptions, key, strlen(key), &value_len, &error);
if(error != NULL){
logerror("LevelDB load_queue failed: %s", error);
return 1;
}

if(value != NULL){
queue->write = atoi(value)+1;
free(value);
value = NULL;
}
else
queue->write = 1;

return 0;
}

@@ -33,5 +33,6 @@ extern int leveldb_free(void);
extern int leveldb_add_message(struct queue *queue, char *message);
extern char* leveldb_get_message(struct queue *queue);
extern int leveldb_ack_message(struct queue *queue);
extern int leveldb_load_queue(struct queue *queue);

#endif /* _LEVELDB_H_ */
@@ -30,6 +30,8 @@
#include <event2/buffer.h>
#include <event2/http.h>

#define MAXQUEUELEN 128

enum stomp_direction {
STOMP_IN = 1,
STOMP_OUT
@@ -39,13 +39,14 @@
#include "log.h"
#include "stomp.h"
#include "stomputil.h"
#include "leveldb.h"


struct queue* stomp_add_queue(const char *queuename)
{
struct queue *entry;

if(queuename == NULL || strlen(queuename) > 512)
if(queuename == NULL || strlen(queuename) >= MAXQUEUELEN)
return NULL;

entry = malloc(sizeof(*entry));
@@ -54,6 +55,16 @@ struct queue* stomp_add_queue(const char *queuename)

TAILQ_INIT(&entry->subscribers);
TAILQ_INSERT_TAIL(&queues, entry, entries);

#ifdef WITH_LEVELDB
if(leveldb_load_queue(entry) != 0){
stomp_free_queue(entry);
return NULL;
}
#else
entry->read = 1;
entry->write = 1;
#endif

return entry;
}
@@ -71,6 +82,12 @@ struct queue* stomp_find_queue(const char *queuename)
return NULL;
}

void stomp_free_queue(struct queue *queue)
{
/* TODO: implement stomp_free_queue */
;
}

void stomp_free_client(struct client *client)
{
/* TODO: remove all subscriptions */
@@ -29,6 +29,7 @@

extern struct queue* stomp_add_queue(const char *queuename);
extern struct queue* stomp_find_queue(const char *queuename);
extern void stomp_free_queue(struct queue *queue);

extern int stomp_parse_headers(struct evkeyvalq *headers, char *request);
extern void stomp_free_client(struct client *client);

0 comments on commit 2b7cb8d

Please sign in to comment.
You can’t perform that action at this time.