Skip to content

Commit

Permalink
finish type coding
Browse files Browse the repository at this point in the history
  • Loading branch information
daTokenizer committed Nov 11, 2016
1 parent 6e3a2e7 commit bbfe7f8
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 65 deletions.
145 changes: 85 additions & 60 deletions hardcore_low_level.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ ElementListNode* _createNewNode(RedisModuleString* element, RedisModuleString* e
ElementListNode* newNode
= (ElementListNode*)RedisModule_Alloc(sizeof(ElementListNode));

newNode->element_id = RedisModule_Strdup(element_id);
newNode->element_id = element_id;
newNode->element = element;
newNode->expiration = expiration;
newNode->next = NULL;
Expand Down Expand Up @@ -133,8 +133,8 @@ ElementListNode* _listFind(ElementList* list, RedisModuleString* element_id)
void deleteNode(ElementListNode* node)
{
// free everything else related to the node
RedisModule_Free(node->element_id);
RedisModule_Free(node->element);
RedisModule_FreeString(node->element_id); // TODO: in this needed
RedisModule_FreeString(node->element); // TODO: in this needed
RedisModule_Free(node);
}

Expand Down Expand Up @@ -199,20 +199,26 @@ Dehydrator* _createDehydrator(RedisModuleString* dehydrator_name)

Dehydrator* getDehydrator(RedisModuleString* dehydrator_name)
{
// // get key dehydrator_name
// // TODO:
// if (dehydrator_name does not EXIST)
// {
// Dehydrator* dehydrator = _createDehydrator(dehydrator_name);
// //save into redis under dehydrator_name
// return dehydrator;
// }
// else if (key contains somthing that is not a dehydrator)
// {
// return NULL;
// }
// return content of the key;
return NULL;
// get key dehydrator_name
RedisModuleKey *key = RedisModule_OpenKey(ctx,argv[1],
REDISMODULE_READ|REDISMODULE_WRITE);
int type = RedisModule_KeyType(key);
if (type != REDISMODULE_KEYTYPE_EMPTY &&
RedisModule_ModuleTypeGetType(key) != DehydratorType)
{
// key contains somthing that is not a dehydrator
return NULL// TODO: RedisModule_ReplyWithError(ctx,REDISMODULE_ERRORMSG_WRONGTYPE);
}
if (type == REDISMODULE_KEYTYPE_EMPTY)
{
Dehydrator* dehydrator = _createDehydrator(dehydrator_name);
RedisModule_ModuleTypeSetValue(key, DehydratorType, dehydrator);
return dehydrator;
}
else
{
return RedisModule_ModuleTypeGetValue(key);
}
}


Expand Down Expand Up @@ -242,6 +248,8 @@ void deleteDehydrator(Dehydrator* dehydrator)
}
kh_destroy(32, dehydrator->element_nodes);

RedisModule_FreeString(dehydrator->name);

// delete the dehydrator
RedisModule_Free(dehydrator);
}
Expand Down Expand Up @@ -308,24 +316,26 @@ char* _toQueueName(int ttl)
void DehydratorTypeRdbSave(RedisModuleIO *rdb, void *value)
{
Dehydrator *dehy = value;
RedisModule_SaveStringBuffer(rdb, dehy->name);
int queue_num = kh_size(dehy->timeout_queues);
RedisModule_SaveUnsigned(rdb, queue_num);
RedisModule_SaveString(rdb, dehy->name);
RedisModule_SaveUnsigned(rdb, kh_size(dehydrator->timeout_queues));
// for each timeout_queue in timeout_queues
khiter_t k;
for (k = kh_begin(dehydrator->timeout_queues); k != kh_end(dehydrator->timeout_queues); ++k)
{
if (!kh_exist(dehydrator->timeout_queues, k)) continue;
ElementList* list = kh_value(dehydrator->timeout_queues, k);
int ttl = kh_key(dehydrator->timeout_queues, k);
RedisModule_SaveUnsigned(rdb, ttl);
RedisModule_SaveUnsigned(rdb, list->len);
ElementListNode* node = list->head;
boolean done_with_queue = false;
while (!done_with_queue)
{
if ((node != NULL))
{
RedisModule_SaveUnsigned(rdb, head->expiration);
RedisModule_SaveStringBuffer(rdb, head->element_id);
head->element;
RedisModule_SaveString(rdb, head->element_id);
RedisModule_SaveString(rdb, head->element);
}
else
{
Expand All @@ -334,49 +344,64 @@ void DehydratorTypeRdbSave(RedisModuleIO *rdb, void *value)
node = node->next;
}
}
}

void *DehydratorTypeRdbLoad(RedisModuleIO *rdb, int encver)
{
if (encver != 0) { return NULL; }
khiter_t k;
RedisModuleString* name = RedisModule_LoadString(rdb);
Dehydrator *dehy = _createDehydrator(name);
//create an ElementListNode
uint64_t queue_num = RedisModule_LoadUnsigned(rdb);
while(queue_num--)
{
ElementListNode* _listPop(ElementList* list) {
ElementList* timeout_queue = _createNewList()
uint64_t ttl = RedisModule_LoadUnsigned(rdb);

uint64_t node_num = RedisModule_LoadUnsigned(rdb);
while(node_num--)
{
RedisModuleString* element = RedisModule_LoadString(rdb);
RedisModuleString* element_id = RedisModule_LoadString(rdb);
uint64_t expiration = RedisModule_LoadUnsigned(rdb);

ElementListNode* node = _createNewNode(element, element_id, expiration);

_listPush(timeout_queue, node);

// mark element dehytion location in element_nodes
int retval;
k = kh_put(32, dehydrator->element_nodes, element_id, &retval);
kh_value(dehydrator->element_nodes, k) = node;
}

int retval;
k = kh_put(16, dehydrator->timeout_queues, ttl, &retval);
kh_value(dehydrator->timeout_queues, k) = timeout_queue;
}

while(node) { // TODO: all of this should be replaced
RedisModule_SaveSigned(rdb,node->value);
node = node->next;
}
return dehy;
}

void DehydratorTypeAofRewrite(RedisModuleIO *aof, RedisModuleString *key, void *value) {
REDISMODULE_NOT_USED(aof);
REDISMODULE_NOT_USED(key);
REDISMODULE_NOT_USED(value);
// TODO: stub
// struct DehydratorTypeObject *hto = value;
// struct DehydratorTypeNode *node = hto->head;
// while(node) {
// RedisModule_EmitAOF(aof,"HELLOTYPE.INSERT","sl",key,node->value);
// node = node->next;
// }
}

void DehydratorTypeDigest(RedisModuleDigest *digest, void *value) {
REDISMODULE_NOT_USED(digest);
REDISMODULE_NOT_USED(value);
/* TODO: The DIGEST module interface is yet not implemented. */
}
//
//
// void *DehydratorTypeRdbLoad(RedisModuleIO *rdb, int encver) {
// if (encver != 0) {
// /* RedisModule_Log("warning","Can't load data with version %d", encver);*/
// return NULL;
// }
// int lenptr;
// char* name = RedisModule_LoadStringBuffer(rdb, &lenptr);
// Dehydrator *dehy = _createDehydrator(name);
// while(elements--) {
// int64_t ele = RedisModule_LoadSigned(rdb);
// _createNewNode
// DehydratorTypeInsert(dehy,ele);
// }
// return dehy;
// }

//
// void DehydratorTypeAofRewrite(RedisModuleIO *aof, RedisModuleString *key, void *value) {
// struct DehydratorTypeObject *hto = value;
// struct DehydratorTypeNode *node = hto->head;
// while(node) {
// RedisModule_EmitAOF(aof,"HELLOTYPE.INSERT","sl",key,node->value);
// node = node->next;
// }
// }
//
// void DehydratorTypeDigest(RedisModuleDigest *digest, void *value) {
// REDISMODULE_NOT_USED(digest);
// REDISMODULE_NOT_USED(value);
// /* TODO: The DIGEST module interface is yet not implemented. */
// }

void DehydratorTypeFree(void *value)
{
Expand Down
10 changes: 5 additions & 5 deletions module.c
Original file line number Diff line number Diff line change
Expand Up @@ -591,11 +591,11 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx)
}

DehydratorType = RedisModule_CreateDataType(ctx, "dehy-type", 0,
HelloTypeRdbLoad,
HelloTypeRdbSave,
HelloTypeAofRewrite,
HelloTypeDigest,
HelloTypeFree
DehydratorTypeRdbLoad,
DehydratorTypeRdbSave,
DehydratorTypeAofRewrite,
DehydratorTypeDigest,
DehydratorTypeFree
);
if (DehydratorType == NULL) return REDISMODULE_ERR;

Expand Down

0 comments on commit bbfe7f8

Please sign in to comment.