diff --git a/src/pubsub.c b/src/pubsub.c index 3c9a2229e8cc..53f5f8d4e6b5 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -274,6 +274,27 @@ int pubsubPublishMessage(robj *channel, robj *message) { * Pubsub commands implementation *----------------------------------------------------------------------------*/ +robj *llast(redisClient *c, robj *key) { + listTypeEntry entry; + robj *o, *value = NULL; + long llen; + o = lookupKeyRead(c->db, key); + if(o != NULL && o->encoding == REDIS_ENCODING_QUICKLIST) { + llen = listTypeLength(o); + listTypeIterator *iter = listTypeInitIterator(o, llen-1, REDIS_TAIL); + listTypeNext(iter, &entry); + quicklistEntry *qe = &entry.entry; + if (qe->value) { + value = createStringObject((char *)qe->value, + qe->sz); + } else { + value = createStringObjectFromLongLong(qe->longval); + } + listTypeReleaseIterator(iter); + } + return value; +} + void subscribeCommand(redisClient *c) { int j; @@ -282,14 +303,15 @@ void subscribeCommand(redisClient *c) { c->flags |= REDIS_PUBSUB; /* Send last received message on the subscribed channel(s) */ - robj *o; + robj *value; for (j = 1; j < c->argc; j++) { - o = lookupKeyRead(c->db, c->argv[j]); - if(o != NULL) { + value = llast(c, c->argv[j]); + if (value != NULL) { addReply(c,shared.mbulkhdr[3]); addReply(c,shared.messagebulk); addReplyBulk(c,c->argv[j]); - addReplyBulk(c,o); + addReplyBulk(c,value); + decrRefCount(value); } } } @@ -314,6 +336,7 @@ void psubscribeCommand(redisClient *c) { c->flags |= REDIS_PUBSUB; /* Send last received message on the channel(s) matching subscribed patterns */ + robj *value; for (j = 1; j < c->argc; j++) { robj *pat = c->argv[j]; dictIterator *di = dictGetIterator(server.pubsub_channels); @@ -326,13 +349,14 @@ void psubscribeCommand(redisClient *c) { (char*)channel, sdslen(channel), 0)) { - robj *o = lookupKeyRead(c->db, cobj); - if(o != NULL) { - addReply(c,shared.mbulkhdr[4]); - addReply(c,shared.pmessagebulk); - addReplyBulk(c,pat); - addReplyBulk(c,cobj); - addReplyBulk(c,o); + value = llast(c, cobj); + if (value != NULL) { + addReply(c,shared.mbulkhdr[4]); + addReply(c,shared.pmessagebulk); + addReplyBulk(c,pat); + addReplyBulk(c,cobj); + addReplyBulk(c,value); + decrRefCount(value); } } } @@ -345,7 +369,6 @@ void punsubscribeCommand(redisClient *c) { pubsubUnsubscribeAllPatterns(c,1); } else { int j; - for (j = 1; j < c->argc; j++) pubsubUnsubscribePattern(c,c->argv[j],1); } @@ -359,8 +382,27 @@ void publishCommand(redisClient *c) { else forceCommandPropagation(c,REDIS_PROPAGATE_REPL); - /* Persist last published message in channel specific key */ - setKey(c->db, c->argv[1], c->argv[2]); + if (receivers == 0) { + int j, pushed = 0, where = REDIS_TAIL; + robj *lobj = lookupKeyWrite(c->db,c->argv[1]); + for (j = 2; j < c->argc; j++) { + c->argv[j] = tryObjectEncoding(c->argv[j]); + if (!lobj) { + lobj = createQuicklistObject(); + quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size, + server.list_compress_depth); + dbAdd(c->db,c->argv[1],lobj); + } + listTypePush(lobj,c->argv[j],where); + pushed++; + } + if (pushed) { + char *event = (where == REDIS_HEAD) ? "lpush" : "rpush"; + signalModifiedKey(c->db,c->argv[1]); + notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,c->argv[1],c->db->id); + } + server.dirty += pushed; + } addReplyLongLong(c,receivers); }