Skip to content

Commit

Permalink
Add persistence support for undelivered PubSub messages.
Browse files Browse the repository at this point in the history
Refer to
http://abhinavsingh.com/customizing-redis-pubsub-for-message-persistence-part-2/
for details. Here is a simple demo of what happens to published messages on
channels with no active subscribers:

$ ./src/redis-cli
127.0.0.1:6379> publish persistent-channel this
(integer) 0
127.0.0.1:6379> publish persistent-channel is
(integer) 0
127.0.0.1:6379> publish persistent-channel gonna
(integer) 0
127.0.0.1:6379> publish persistent-channel be
(integer) 0
127.0.0.1:6379> publish persistent-channel awesome
(integer) 0
127.0.0.1:6379> lrange persistent-channel 0 -1
1) "this"
2) "is"
3) "gonna"
4) "be"
5) "awesome"
127.0.0.1:6379>
  • Loading branch information
abhinavsingh committed Apr 4, 2015
1 parent 9e546d3 commit 6fdfc98
Showing 1 changed file with 56 additions and 14 deletions.
70 changes: 56 additions & 14 deletions src/pubsub.c
Expand Up @@ -274,6 +274,27 @@ int pubsubPublishMessage(robj *channel, robj *message) {
* Pubsub commands implementation * Pubsub commands implementation
*----------------------------------------------------------------------------*/ *----------------------------------------------------------------------------*/


robj *llast(redisClient *c, robj *key) {
listTypeEntry entry;
robj *o, *value = NULL;
long llen;
o = lookupKeyRead(c->db, key);
if(o != NULL && o->encoding == REDIS_ENCODING_QUICKLIST) {
llen = listTypeLength(o);
listTypeIterator *iter = listTypeInitIterator(o, llen-1, REDIS_TAIL);
listTypeNext(iter, &entry);
quicklistEntry *qe = &entry.entry;
if (qe->value) {
value = createStringObject((char *)qe->value,
qe->sz);
} else {
value = createStringObjectFromLongLong(qe->longval);
}
listTypeReleaseIterator(iter);
}
return value;
}

void subscribeCommand(redisClient *c) { void subscribeCommand(redisClient *c) {
int j; int j;


Expand All @@ -282,14 +303,15 @@ void subscribeCommand(redisClient *c) {
c->flags |= REDIS_PUBSUB; c->flags |= REDIS_PUBSUB;


/* Send last received message on the subscribed channel(s) */ /* Send last received message on the subscribed channel(s) */
robj *o; robj *value;
for (j = 1; j < c->argc; j++) { for (j = 1; j < c->argc; j++) {
o = lookupKeyRead(c->db, c->argv[j]); value = llast(c, c->argv[j]);
if(o != NULL) { if (value != NULL) {
addReply(c,shared.mbulkhdr[3]); addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.messagebulk); addReply(c,shared.messagebulk);
addReplyBulk(c,c->argv[j]); addReplyBulk(c,c->argv[j]);
addReplyBulk(c,o); addReplyBulk(c,value);
decrRefCount(value);
} }
} }
} }
Expand All @@ -314,6 +336,7 @@ void psubscribeCommand(redisClient *c) {
c->flags |= REDIS_PUBSUB; c->flags |= REDIS_PUBSUB;


/* Send last received message on the channel(s) matching subscribed patterns */ /* Send last received message on the channel(s) matching subscribed patterns */
robj *value;
for (j = 1; j < c->argc; j++) { for (j = 1; j < c->argc; j++) {
robj *pat = c->argv[j]; robj *pat = c->argv[j];
dictIterator *di = dictGetIterator(server.pubsub_channels); dictIterator *di = dictGetIterator(server.pubsub_channels);
Expand All @@ -326,13 +349,14 @@ void psubscribeCommand(redisClient *c) {
(char*)channel, (char*)channel,
sdslen(channel), 0)) sdslen(channel), 0))
{ {
robj *o = lookupKeyRead(c->db, cobj); value = llast(c, cobj);
if(o != NULL) { if (value != NULL) {
addReply(c,shared.mbulkhdr[4]); addReply(c,shared.mbulkhdr[4]);
addReply(c,shared.pmessagebulk); addReply(c,shared.pmessagebulk);
addReplyBulk(c,pat); addReplyBulk(c,pat);
addReplyBulk(c,cobj); addReplyBulk(c,cobj);
addReplyBulk(c,o); addReplyBulk(c,value);
decrRefCount(value);
} }
} }
} }
Expand All @@ -345,7 +369,6 @@ void punsubscribeCommand(redisClient *c) {
pubsubUnsubscribeAllPatterns(c,1); pubsubUnsubscribeAllPatterns(c,1);
} else { } else {
int j; int j;

for (j = 1; j < c->argc; j++) for (j = 1; j < c->argc; j++)
pubsubUnsubscribePattern(c,c->argv[j],1); pubsubUnsubscribePattern(c,c->argv[j],1);
} }
Expand All @@ -359,8 +382,27 @@ void publishCommand(redisClient *c) {
else else
forceCommandPropagation(c,REDIS_PROPAGATE_REPL); forceCommandPropagation(c,REDIS_PROPAGATE_REPL);


/* Persist last published message in channel specific key */ if (receivers == 0) {
setKey(c->db, c->argv[1], c->argv[2]); int j, pushed = 0, where = REDIS_TAIL;
robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
for (j = 2; j < c->argc; j++) {
c->argv[j] = tryObjectEncoding(c->argv[j]);
if (!lobj) {
lobj = createQuicklistObject();
quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
server.list_compress_depth);
dbAdd(c->db,c->argv[1],lobj);
}
listTypePush(lobj,c->argv[j],where);
pushed++;
}
if (pushed) {
char *event = (where == REDIS_HEAD) ? "lpush" : "rpush";
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,c->argv[1],c->db->id);
}
server.dirty += pushed;
}


addReplyLongLong(c,receivers); addReplyLongLong(c,receivers);
} }
Expand Down

0 comments on commit 6fdfc98

Please sign in to comment.