Skip to content

Commit

Permalink
improve resolution (close #12)
Browse files Browse the repository at this point in the history
  • Loading branch information
daTokenizer committed Nov 24, 2016
1 parent a2c94f9 commit b66b596
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 37 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ endif

all: module.so

module.so:
module.so: FORCE
$(MAKE) -C ./src
cp ./src/module.so .

Expand Down
73 changes: 46 additions & 27 deletions src/module.c
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
#include "redismodule.h"
#include "rmutil/util.h"
#include "rmutil/strings.h"
#include "rmutil/test_util.h"
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stdint.h>
#include <stdbool.h>
#include <stdio.h>
#include <time.h>
#include "rmutil/util.h"
#include "khash.h"
#include "rmutil/util.h"
#include "rmutil/strings.h"
#include "rmutil/test_util.h"
#include "rmutil/alloc.h"


void printRedisStr(RedisModuleString *str, const char* name) {
printf("%s = %s\n", name, RedisModule_StringPtrLen(str, NULL));
}

//##########################################################
//#
//# C Utilities
//#
//#########################################################

char* string_append(char* a, const char* b)
{
Expand All @@ -29,9 +28,30 @@ char* string_append(char* a, const char* b)
return retstr;
}

// #define _POSIX_C_SOURCE 200809L

#include <inttypes.h>
#include <math.h>

long long current_time_ms (void)
{
long ms; // Milliseconds
time_t s; // Seconds
struct timespec spec;

clock_gettime(CLOCK_REALTIME, &spec);

s = spec.tv_sec*1000;
ms = round(spec.tv_nsec / 1.0e6); // Convert nanoseconds to milliseconds

return s+ms;
}



//##########################################################
//#
//# Linked List
//# Linked List Definitions
//#
//#########################################################

Expand Down Expand Up @@ -279,7 +299,7 @@ char* printList(ElementList* list)

//##########################################################
//#
//# Utilities
//# Dehydrator Utilities
//#
//#########################################################

Expand Down Expand Up @@ -596,7 +616,7 @@ int TimeToNextCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_OK;
}

time_t now = time(0);
time_t now = current_time_ms();
int time_to_next = -1;

khiter_t k;
Expand Down Expand Up @@ -698,7 +718,6 @@ int PushCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
// timeout str to int ttl
long long ttl;
int rep = RedisModule_StringToLongLong(timeout, &ttl);

if (rep == REDISMODULE_ERR) { return REDISMODULE_ERR; }

// get key dehydrator_name
Expand Down Expand Up @@ -740,7 +759,7 @@ int PushCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
RedisModuleString* saved_element = RedisModule_CreateStringFromString(ctx, element);

//create an ElementListNode
node = _createNewNode(saved_element, saved_element_id, ttl, time(0) + ttl);
node = _createNewNode(saved_element, saved_element_id, ttl, current_time_ms() + ttl);

// push to tail of the list
_listPush(timeout_queue, node);
Expand Down Expand Up @@ -825,7 +844,7 @@ int PollCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)

RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN);
int expired_element_num = 0;
time_t now = time(0);
time_t now = current_time_ms();
// for each timeout_queue in timeout_queues
khiter_t k;
for (k = kh_begin(dehydrator->timeout_queues); k != kh_end(dehydrator->timeout_queues); ++k)
Expand Down Expand Up @@ -875,7 +894,7 @@ int TestLook(RedisModuleCtx *ctx)


RedisModuleCallReply *push1 =
RedisModule_Call(ctx, "REDE.push", "cccc", "TEST_DEHYDRATOR_look", "test_element", "payload", "100");
RedisModule_Call(ctx, "REDE.push", "cccc", "TEST_DEHYDRATOR_look", "test_element", "payload", "100000");
RMUtil_Assert(RedisModule_CallReplyType(push1) != REDISMODULE_REPLY_ERROR);

RedisModuleCallReply *check2 =
Expand Down Expand Up @@ -908,7 +927,7 @@ int TestUpdate(RedisModuleCtx *ctx)
RMUtil_Assert(RedisModule_CallReplyType(check1) == REDISMODULE_REPLY_ERROR);

RedisModuleCallReply *push1 =
RedisModule_Call(ctx, "REDE.push", "cccc", "TEST_DEHYDRATOR_update", "test_element", "some payload", "100");
RedisModule_Call(ctx, "REDE.push", "cccc", "TEST_DEHYDRATOR_update", "test_element", "some payload", "100000");
RMUtil_Assert(RedisModule_CallReplyType(push1) != REDISMODULE_REPLY_ERROR);

RedisModuleCallReply *check2 =
Expand Down Expand Up @@ -941,20 +960,20 @@ int TestTimeToNext(RedisModuleCtx *ctx)
printf("Testing TTN - ");

RedisModuleCallReply *push1 =
RedisModule_Call(ctx, "REDE.push", "cccc", "TEST_DEHYDRATOR_ttn", "ttn_test_element", "payload", "3");
RedisModule_Call(ctx, "REDE.push", "cccc", "TEST_DEHYDRATOR_ttn", "ttn_test_element", "payload", "3000");
RMUtil_Assert(RedisModule_CallReplyType(push1) != REDISMODULE_REPLY_ERROR);

RedisModuleCallReply *check1 =
RedisModule_Call(ctx, "REDE.ttn", "c", "TEST_DEHYDRATOR_ttn");
RMUtil_Assert(RedisModule_CallReplyType(check1) != REDISMODULE_REPLY_ERROR);
RMUtil_Assert(RedisModule_CallReplyInteger(check1) == 3);
RMUtil_Assert(RedisModule_CallReplyInteger(check1) == 3000);

sleep(2);

RedisModuleCallReply *check2 =
RedisModule_Call(ctx, "REDE.ttn", "c", "TEST_DEHYDRATOR_ttn");
RMUtil_Assert(RedisModule_CallReplyType(check2) != REDISMODULE_REPLY_ERROR);
RMUtil_Assert(RedisModule_CallReplyInteger(check2) == 1);
RMUtil_Assert(RedisModule_CallReplyInteger(check2) == 1000);

sleep(2);

Expand Down Expand Up @@ -983,7 +1002,7 @@ int TestPush(RedisModuleCtx *ctx)
// RMUtil_Assert(RedisModule_CreateStringFromCallReply(check1) == NULL);

RedisModuleCallReply *push1 =
RedisModule_Call(ctx, "REDE.push", "cccc", "TEST_DEHYDRATOR_push", "push_test_element", "payload", "1");
RedisModule_Call(ctx, "REDE.push", "cccc", "TEST_DEHYDRATOR_push", "push_test_element", "payload", "1000");
RMUtil_Assert(RedisModule_CallReplyType(push1) != REDISMODULE_REPLY_ERROR);

RedisModuleCallReply *check2 =
Expand All @@ -1009,7 +1028,7 @@ int TestPull(RedisModuleCtx *ctx)
char * bad_store_key = "pull_test_bad_element";

RedisModuleCallReply *push1 =
RedisModule_Call(ctx, "REDE.push", "cccc", "TEST_DEHYDRATOR_pull", store_key, "payload", "100");
RedisModule_Call(ctx, "REDE.push", "cccc", "TEST_DEHYDRATOR_pull", store_key, "payload", "100000");
RMUtil_Assert(RedisModule_CallReplyType(push1) != REDISMODULE_REPLY_ERROR);


Expand Down Expand Up @@ -1057,22 +1076,22 @@ int TestPoll(RedisModuleCtx *ctx)
// push elements 1, 4, 7 & 3a (for 1, 4, 7 & 3 seconds)
// 1
RedisModuleCallReply *push1 =
RedisModule_Call(ctx, "REDE.push", "cccc", "TEST_DEHYDRATOR_poll", "e1", "element_1", "1");
RedisModule_Call(ctx, "REDE.push", "cccc", "TEST_DEHYDRATOR_poll", "e1", "element_1", "1000");
RMUtil_Assert(RedisModule_CallReplyType(push1) != REDISMODULE_REPLY_ERROR);

// 4
RedisModuleCallReply *push4 =
RedisModule_Call(ctx, "REDE.push", "cccc", "TEST_DEHYDRATOR_poll", "e4", "element_4", "4");
RedisModule_Call(ctx, "REDE.push", "cccc", "TEST_DEHYDRATOR_poll", "e4", "element_4", "4000");
RMUtil_Assert(RedisModule_CallReplyType(push4) != REDISMODULE_REPLY_ERROR);

// 7
RedisModuleCallReply *push7 =
RedisModule_Call(ctx, "REDE.push", "cccc", "TEST_DEHYDRATOR_poll", "e7", "element_7", "7");
RedisModule_Call(ctx, "REDE.push", "cccc", "TEST_DEHYDRATOR_poll", "e7", "element_7", "7000");
RMUtil_Assert(RedisModule_CallReplyType(push7) != REDISMODULE_REPLY_ERROR);

// 3a
RedisModuleCallReply *push3a =
RedisModule_Call(ctx, "REDE.push", "cccc", "TEST_DEHYDRATOR_poll", "e3a", "element_3a", "3");
RedisModule_Call(ctx, "REDE.push", "cccc", "TEST_DEHYDRATOR_poll", "e3a", "element_3a", "3000");
RMUtil_Assert(RedisModule_CallReplyType(push3a) != REDISMODULE_REPLY_ERROR);

// pull question 7
Expand All @@ -1091,7 +1110,7 @@ int TestPoll(RedisModuleCtx *ctx)
// push element 3b (for 3 seconds)
// 3b
RedisModuleCallReply *push_three_b =
RedisModule_Call(ctx, "REDE.push", "cccc", "TEST_DEHYDRATOR_poll", "e3b", "element_3b", "3");
RedisModule_Call(ctx, "REDE.push", "cccc", "TEST_DEHYDRATOR_poll", "e3b", "element_3b", "3000");
RMUtil_Assert(RedisModule_CallReplyType(push_three_b) != REDISMODULE_REPLY_ERROR);

// poll (t=1) - we expect only element 1 to pop out
Expand Down
8 changes: 4 additions & 4 deletions tests/helloworld.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
def helloworld(redis_service):
redis_service.execute_command("DEL", "helloworld_dehydrator")
# push some data into the dehydrator
redis_service.execute_command("rede.push", "helloworld_dehydrator", "x", "world", 1)
redis_service.execute_command("rede.push", "helloworld_dehydrator", "y", "goodbye",2)
redis_service.execute_command("rede.push", "helloworld_dehydrator", "z", "derp", 3)
redis_service.execute_command("rede.push", "helloworld_dehydrator", "x", "world", 1000)
redis_service.execute_command("rede.push", "helloworld_dehydrator", "y", "goodbye",2000)
redis_service.execute_command("rede.push", "helloworld_dehydrator", "z", "derp", 3000)

# pull unneeded data before it expires
redis_service.execute_command("rede.pull", "helloworld_dehydrator", "y")
Expand All @@ -23,7 +23,7 @@ def helloworld(redis_service):
time.sleep(1)
t1_poll_result = redis_service.execute_command("rede.poll", "helloworld_dehydrator")

time.sleep(redis_service.execute_command("rede.ttn", "helloworld_dehydrator"))
time.sleep(redis_service.execute_command("rede.ttn", "helloworld_dehydrator")/1000.0)
t3_poll_result = redis_service.execute_command("rede.poll", "helloworld_dehydrator")

print t3_poll_result[0], t1_poll_result[0]
Expand Down
10 changes: 5 additions & 5 deletions tests/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ def function_test_dehydrator(redis_service):
sys.stdout.write("module functional test (external) - ")
sys.stdout.flush()
# "push elements a,b & c (for 1,3 & 7 seconds)"
redis_service.execute_command("rede.push", "python_test_dehydrator", "a", "test_element a", 1)
redis_service.execute_command("rede.push", "python_test_dehydrator", "b", "test_element b",3)
redis_service.execute_command("rede.push", "python_test_dehydrator", "c", "test_element c", 7)
redis_service.execute_command("rede.push", "python_test_dehydrator", "a", "test_element a", 1000)
redis_service.execute_command("rede.push", "python_test_dehydrator", "b", "test_element b",3000)
redis_service.execute_command("rede.push", "python_test_dehydrator", "c", "test_element c", 7000)
# "pull element b"
redis_service.execute_command("rede.pull", "python_test_dehydrator", "b")
# "poll (t=0) - no element should pop out right away"
Expand Down Expand Up @@ -48,7 +48,7 @@ def load_test_dehydrator(redis_service, cycles=1000000, timeouts=[1,2,4,16,32,10
start = time.time()
# test push
for i in range(cycles):
redis_service.execute_command("rede.push", "python_load_test_dehydrator", "%d" % i, "payload", random.choice(timeouts))
redis_service.execute_command("rede.push", "python_load_test_dehydrator", "%d" % i, "payload", random.choice(timeouts)*1000)
push_end = time.time()

print "measuring PULL"
Expand All @@ -61,7 +61,7 @@ def load_test_dehydrator(redis_service, cycles=1000000, timeouts=[1,2,4,16,32,10
end_i = cycles/3
for j in range(3):
for i in range(start_i,end_i):
redis_service.execute_command("rede.push", "python_load_test_dehydrator", "%d" % i, "payload", (3-j+random.choice([1,2,3])))
redis_service.execute_command("rede.push", "python_load_test_dehydrator", "%d" % i, "payload", 1000*(3-j+random.choice([1,2,3])))
start_i += cycles/3
end_i += cycles/3
time.sleep(1)
Expand Down

0 comments on commit b66b596

Please sign in to comment.