Permalink
Browse files

Persist last published message on pubsub channels

Ability to fetch last published message:

redis> publish channel1 c1m1
(integer) 0
redis> get channel1
"c1m1"
redis> publish channel1 c1m2
(integer) 0
redis> get channel 1
"c1m2"

Auto push last published message to client upon successful subscription either
by using `SUBSCRIBE` or `PSUBSCRIBE`.  Example:

redis> subscribe channel1
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel1"
3) (integer) 1
4) "message"
5) "channel1"
6) "c1m2"
  • Loading branch information...
1 parent 37260bc commit 9e546d3d408a98d61c9aeca68bbd4f7b56c8c968 @abhinavsingh committed Mar 30, 2015
Showing with 42 additions and 0 deletions.
  1. +42 −0 src/pubsub.c
View
@@ -280,6 +280,18 @@ void subscribeCommand(redisClient *c) {
for (j = 1; j < c->argc; j++)
pubsubSubscribeChannel(c,c->argv[j]);
c->flags |= REDIS_PUBSUB;
+
+ /* Send last received message on the subscribed channel(s) */
+ robj *o;
+ for (j = 1; j < c->argc; j++) {
+ o = lookupKeyRead(c->db, c->argv[j]);
+ if(o != NULL) {
+ addReply(c,shared.mbulkhdr[3]);
+ addReply(c,shared.messagebulk);
+ addReplyBulk(c,c->argv[j]);
+ addReplyBulk(c,o);
+ }
+ }
}
void unsubscribeCommand(redisClient *c) {
@@ -300,6 +312,32 @@ void psubscribeCommand(redisClient *c) {
for (j = 1; j < c->argc; j++)
pubsubSubscribePattern(c,c->argv[j]);
c->flags |= REDIS_PUBSUB;
+
+ /* Send last received message on the channel(s) matching subscribed patterns */
+ for (j = 1; j < c->argc; j++) {
+ robj *pat = c->argv[j];
+ dictIterator *di = dictGetIterator(server.pubsub_channels);
+ dictEntry *de;
+ while((de = dictNext(di)) != NULL) {
+ robj *cobj = dictGetKey(de);
+ sds channel = cobj->ptr;
+ if (stringmatchlen((char*)pat->ptr,
+ sdslen(pat->ptr),
+ (char*)channel,
+ sdslen(channel), 0))
+ {
+ robj *o = lookupKeyRead(c->db, cobj);
+ if(o != NULL) {
+ addReply(c,shared.mbulkhdr[4]);
+ addReply(c,shared.pmessagebulk);
+ addReplyBulk(c,pat);
+ addReplyBulk(c,cobj);
+ addReplyBulk(c,o);
+ }
+ }
+ }
+ dictReleaseIterator(di);
+ }
}
void punsubscribeCommand(redisClient *c) {
@@ -320,6 +358,10 @@ void publishCommand(redisClient *c) {
clusterPropagatePublish(c->argv[1],c->argv[2]);
else
forceCommandPropagation(c,REDIS_PROPAGATE_REPL);
+
+ /* Persist last published message in channel specific key */
+ setKey(c->db, c->argv[1], c->argv[2]);
+
addReplyLongLong(c,receivers);
}

1 comment on commit 9e546d3

Please sign in to comment.