Permalink
Browse files

pubsub_filtered, stream_filter: new Makefile, refactor shared code

while refactoring, fixed:
free parsed fields when appropriate
error out if limit of parsed fields exceeded

also bump version
  • Loading branch information...
1 parent f6f6fe0 commit b54526a698a10c7e54b537506a3a7cbe0b9137f5 Pierce Lopez committed Jun 29, 2012
@@ -0,0 +1,3 @@
+/pubsub_filtered
+/stream_filter
+/build/
View
@@ -1,25 +1,49 @@
-LIBEVENT ?= /usr/local
TARGET ?= /usr/local
+LIBEVENT ?= /usr/local
LIBSIMPLEHTTP ?= /usr/local
+BLDDIR = build
+
+BINARIES = pubsub_filtered stream_filter
+
+SRCS_pubsub_filtered = pubsub_filtered.c md5.c shared.c
+SRCS_stream_filter = stream_filter.c md5.c shared.c
+
+all: $(BINARIES)
CFLAGS = -I. -I$(LIBSIMPLEHTTP)/include -I.. -I$(LIBEVENT)/include -g
-LIBS = -L. -L$(LIBSIMPLEHTTP)/lib -L../simplehttp -L$(LIBEVENT)/lib -levent -lsimplehttp -ljson -lpcre -lm -lpubsubclient -lcrypto
+LDFLAGS_pubsub_filtered = -L. -L$(LIBSIMPLEHTTP)/lib -L../simplehttp -L$(LIBEVENT)/lib -levent -lsimplehttp -ljson -lpcre -lm -lpubsubclient -lcrypto
+LDFLAGS_stream_filter = -L. -L$(LIBSIMPLEHTTP)/lib -L../simplehttp -lsimplehttp -ljson
+
+OBJS_pubsub_filtered := $(patsubst %.c, $(BLDDIR)/%.o, $(SRCS_pubsub_filtered))
+OBJS_stream_filter := $(patsubst %.c, $(BLDDIR)/%.o, $(SRCS_stream_filter))
+OBJS_all := $(OBJS_pubsub_filtered) $(OBJS_stream_filter)
-LIBS_STREAM_FILTER = -L. -L$(LIBSIMPLEHTTP)/lib -L../simplehttp -lsimplehttp -ljson
+DEPS := $(patsubst %.o, %.o.deps, $(OBJS_all))
+# sort removes duplicates
+-include $(sort $(DEPS))
-pubsub_filtered: pubsub_filtered.c md5.c
- $(CC) $(CFLAGS) -o $@ md5.c $< $(LIBS)
+pubsub_filtered: $(OBJS_pubsub_filtered)
+stream_filter: $(OBJS_stream_filter)
-stream_filter: stream_filter.c md5.c
- $(CC) $(CFLAGS) -o $@ md5.c $< $(LIBS_STREAM_FILTER)
+$(BINARIES): %:
+ @echo " LD " $@
+ @mkdir -p $(dir $@)
+ @$(CC) -o $@ $(OBJS_$@) $(LDFLAGS_$@)
-install:
- /usr/bin/install -d $(TARGET)/bin/
- /usr/bin/install pubsub_filtered $(TARGET)/bin/
+$(BLDDIR)/%.o: %.c
+ @echo " CC " $@
+ @mkdir -p $(dir $@)
+ @$(CC) -o $@ -c $< $(CFLAGS) -MMD -MF $@.deps
-install_stream_filter:
- /usr/bin/install -d $(TARGET)/bin/
- /usr/bin/install stream_filter $(TARGET)/bin/
+install: $(patsubst %, $(TARGET)/bin/%, $(BINARIES))
+
+$(TARGET)/bin/%: %
+ @echo " INSTALL " $@
+ @/usr/bin/install -d $(dir $@)
+ @/usr/bin/install -D $< $@
clean:
- rm -rf *.o pubsub_filtered stream_filter *.dSYM
+ @echo " RM " $(BINARIES) $(OBJS_all) $(DEPS)
+ @rm -rf $(BINARIES) $(OBJS_all) $(DEPS)
+
+.PHONY: all install clean
@@ -7,21 +7,23 @@
#include <simplehttp/simplehttp.h>
#include <pubsubclient/pubsubclient.h>
#include <json/json.h>
-#include "http-internal.h"
-#include "md5.h"
-#include "pcre.h"
#include <openssl/sha.h>
#include <openssl/evp.h>
#include <openssl/buffer.h>
+#include "shared.h"
+#include "http-internal.h"
+#include "pcre.h"
+
#define DEBUG 1
#define SUCCESS 0
#define FAILURE 1
-#define VERSION "1.2"
+#define VERSION "1.3"
#define RECONNECT_SECS 5
#define ADDR_BUFSZ 256
+#define MAX_FIELDS 64
#define BOUNDARY "xXPubSubXx"
#define MAX_PENDING_DATA 1024*1024*50
#define OVECCOUNT 30 /* should be a multiple of 3 */
@@ -54,8 +56,6 @@ typedef struct cli {
} cli;
TAILQ_HEAD(, cli) clients;
-char *md5_hash(const char *string);
-
void error_cb(int status_code, void *arg);
void source_reconnect_cb(int fd, short what, void *ctx);
void reconnect_to_source(int retryNow);
@@ -65,7 +65,6 @@ int filter_message(char *subject, pcre *filter, struct json_object *json_in);
int parse_encrypted_fields(char *str);
int parse_blacklisted_fields(char *str);
-int parse_fields(const char *str, char **field_array);
int can_kick(struct cli *client);
int is_slow(struct cli *client);
@@ -85,13 +84,13 @@ static struct timeval reconnect_tv = {RECONNECT_SECS, 0};
static struct evhttp_connection *evhttp_source_connection = NULL;
static struct evhttp_request *evhttp_source_request = NULL;
-static char *encrypted_fields[64];
+static char *encrypted_fields[MAX_FIELDS];
static int num_encrypted_fields = 0;
-static char *blacklisted_fields[64];
+static char *blacklisted_fields[MAX_FIELDS];
+static int num_blacklisted_fields = 0;
static char *expected_key = NULL;
static char *expected_value = NULL;
static pcre *expected_value_regex = NULL;
-static int num_blacklisted_fields = 0;
char *base64(const unsigned char *input, int length)
@@ -116,92 +115,26 @@ char *base64(const unsigned char *input, int length)
}
/*
- * Parse a comma-delimited string and populate
- * the blacklisted_fields array with the results.
- *
- * See parse_fields().
+ * populate the blacklisted_fields array
*/
int parse_blacklisted_fields(char *str)
{
- int i;
-
- num_blacklisted_fields = parse_fields(str, blacklisted_fields);
-
- for (i = 0; i < num_blacklisted_fields; i++) {
- fprintf(stdout, "Blacklist field: \"%s\"\n", blacklisted_fields[i]);
- }
-
- return 1;
+ num_blacklisted_fields = parse_fields(str, blacklisted_fields, MAX_FIELDS, "Blacklist", stdout);
+ return (num_blacklisted_fields <= MAX_FIELDS);
}
-
/*
- * Parse a comma-delimited string and populate
- * the encrypted_fields array with the results.
- *
- * See parse_fields().
+ * populate the encrypted_fields array with the results.
*/
int parse_encrypted_fields(char *str)
{
- int i;
- num_encrypted_fields = parse_fields(str, encrypted_fields);
-
- for (i = 0; i < num_encrypted_fields; i++) {
- fprintf(stdout, "Encrypted field: \"%s\"\n", encrypted_fields[i]);
- }
-
- return 1;
-}
-
-/*
- * Parse a comma-delimited list of strings and put them
- * in an char array. Array better have enough slots
- * because I didn't have time to work out the memory allocation.
- */
-int parse_fields(const char *str, char **field_array)
-{
- int i;
- const char delim[] = ",";
- char *tok, *str_ptr, *save_ptr;
-
- if (!str) {
- return;
- }
-
- str_ptr = strdup(str);
-
- tok = strtok_r(str_ptr, delim, &save_ptr);
-
- i = 0;
- while (tok != NULL) {
- field_array[i] = strdup(tok);
- tok = strtok_r(NULL, delim, &save_ptr);
- i++;
- }
-
- return i;
-}
-
-/* md5 encrypt a string */
-char *md5_hash(const char *string)
-{
- char *output = calloc(1, 33 * sizeof(char));
- struct cvs_MD5Context context;
- unsigned char checksum[16];
- int i;
-
- cvs_MD5Init (&context);
- cvs_MD5Update (&context, string, strlen(string));
- cvs_MD5Final (checksum, &context);
- for (i = 0; i < 16; i++) {
- sprintf(&output[i * 2], "%02x", (unsigned int) checksum[i]);
- }
- output[32] = '\0';
- return output;
+ num_encrypted_fields = parse_fields(str, encrypted_fields, MAX_FIELDS, "Encrypted", stdout);
+ return (num_encrypted_fields <= MAX_FIELDS);
}
/*
* Filters a JSON object
+ * return non-zero if message should be kept
*/
int filter_message(char *subject, pcre *filter, struct json_object *json_in)
{
@@ -235,7 +168,6 @@ int filter_message(char *subject, pcre *filter, struct json_object *json_in)
return 1;
}
-
/*
* Callback for each fetched pubsub message.
*/
@@ -263,48 +195,22 @@ void process_message_cb(char *source, void *arg)
return;
}
- if (expected_value != NULL) {
- element = json_object_object_get(json_in, expected_key);
- if (element == NULL) {
- json_object_put(json_in);
- return;
- }
- if (json_object_is_type(element, json_type_null)) {
- json_object_put(json_in);
- return;
- }
- raw_string = json_object_get_string(element);
- if (raw_string == NULL || !strlen(raw_string) || strcmp(raw_string, expected_value) != 0) {
- json_object_put(json_in);
- return;
- }
+ // filter
+ if (expected_value && !filter_message_simple(expected_key, expected_value, json_in)) {
+ json_object_put(json_in);
+ return;
}
- // filter
if (expected_value_regex && !filter_message(expected_key, expected_value_regex, json_in)) {
+ json_object_put(json_in);
return;
}
- // loop through the fields we need to encrypt
- for (i = 0; i < num_encrypted_fields; i++) {
- field_key = encrypted_fields[i];
- element = json_object_object_get(json_in, field_key);
- if (element) {
- raw_string = json_object_get_string(element);
- } else {
- continue;
- }
- encrypted_string = md5_hash(raw_string);
- //if (DEBUG)fprintf(stdout, "encrypting %s \"%s\" => \"%s\"\n", field_key, raw_string, encrypted_string);
- json_object_object_add(json_in, field_key, json_object_new_string(encrypted_string));
- free(encrypted_string);
- }
- // loop through and remove the blacklisted fields
- for (i = 0; i < num_blacklisted_fields; i++) {
- field_key = blacklisted_fields[i];
- //if (DEBUG)fprintf(stdout, "removing %s\n", field_key);
- json_object_object_del(json_in, field_key);
- }
+ // remove the blacklisted fields
+ delete_fields(blacklisted_fields, num_blacklisted_fields, json_in);
+
+ // fields we need to encrypt
+ encrypt_fields(encrypted_fields, num_encrypted_fields, json_in);
json_out = json_object_to_json_string(json_in);
//if (DEBUG)fprintf(stdout, "json_out = %d bytes\n" , strlen(json_out));
@@ -724,6 +630,8 @@ int main(int argc, char **argv)
free(pubsub_url);
free(source_address);
free(source_path);
+ free_fields(blacklisted_fields, num_blacklisted_fields);
+ free_fields(encrypted_fields, num_encrypted_fields);
return 0;
}
Oops, something went wrong.

0 comments on commit b54526a

Please sign in to comment.