Skip to content
This repository
Browse code

lookupKeyByPattern() used by SORT GET/BY rewritten. Fixes issue #460.

lookupKeyByPattern() was implemented with a trick to speedup the lookup
process allocating two fake Redis obejcts on the stack. However now that
we propagate expires to the slave as DEL operations the lookup of the
key may result into a call to expireIfNeeded() having the stack
allocated object as argument, that may in turn use it to create the
protocol to send to the slave. But since this fake obejcts are
inherently read-only this is a problem.

As a side effect of this fix there are no longer size limits in the
pattern to be used with GET/BY option of SORT.

See antirez#460 for bug details.
  • Loading branch information...
commit 8e5e8f0eca221e0e5e048963bfb6ed21b6f3733d 1 parent 7b4dc9b
Salvatore Sanfilippo antirez authored

Showing 1 changed file with 42 additions and 32 deletions. Show diff stats Hide diff stats

  1. +42 32 src/sort.c
74 src/sort.c
@@ -8,21 +8,27 @@ redisSortOperation *createSortOperation(int type, robj *pattern) {
8 8 return so;
9 9 }
10 10
11   -/* Return the value associated to the key with a name obtained
12   - * substituting the first occurence of '*' in 'pattern' with 'subst'.
  11 +/* Return the value associated to the key with a name obtained using
  12 + * the following rules:
  13 + *
  14 + * 1) The first occurence of '*' in 'pattern' is substituted with 'subst'.
  15 + *
  16 + * 2) If 'pattern' matches the "->" string, everything on the left of
  17 + * the arrow is treated as the name of an hash field, and the part on the
  18 + * left as the key name containing an hash. The value of the specified
  19 + * field is returned.
  20 + *
  21 + * 3) If 'pattern' equals "#", the function simply returns 'subst' itself so
  22 + * that the SORT command can be used like: SORT key GET # to retrieve
  23 + * the Set/List elements directly.
  24 + *
13 25 * The returned object will always have its refcount increased by 1
14 26 * when it is non-NULL. */
15 27 robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
16   - char *p, *f;
  28 + char *p, *f, *k;
17 29 sds spat, ssub;
18   - robj keyobj, fieldobj, *o;
  30 + robj *keyobj, *fieldobj, *o;
19 31 int prefixlen, sublen, postfixlen, fieldlen;
20   - /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
21   - struct {
22   - int len;
23   - int free;
24   - char buf[REDIS_SORTKEY_MAX+1];
25   - } keyname, fieldname;
26 32
27 33 /* If the pattern is "#" return the substitution object itself in order
28 34 * to implement the "SORT ... GET #" feature. */
@@ -36,9 +42,10 @@ robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
36 42 * a decoded object on the fly. Otherwise getDecodedObject will just
37 43 * increment the ref count, that we'll decrement later. */
38 44 subst = getDecodedObject(subst);
39   -
40 45 ssub = subst->ptr;
41   - if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
  46 +
  47 + /* If we can't find '*' in the pattern we return NULL as to GET a
  48 + * fixed key does not make sense. */
42 49 p = strchr(spat,'*');
43 50 if (!p) {
44 51 decrRefCount(subst);
@@ -46,46 +53,49 @@ robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
46 53 }
47 54
48 55 /* Find out if we're dealing with a hash dereference. */
49   - if ((f = strstr(p+1, "->")) != NULL) {
50   - fieldlen = sdslen(spat)-(f-spat);
51   - /* this also copies \0 character */
52   - memcpy(fieldname.buf,f+2,fieldlen-1);
53   - fieldname.len = fieldlen-2;
  56 + if ((f = strstr(p+1, "->")) != NULL && *(f+2) != '\0') {
  57 + fieldlen = sdslen(spat)-(f-spat)-2;
  58 + fieldobj = createStringObject(f+2,fieldlen);
54 59 } else {
55 60 fieldlen = 0;
56 61 }
57 62
  63 + /* Perform the '*' substitution. */
58 64 prefixlen = p-spat;
59 65 sublen = sdslen(ssub);
60   - postfixlen = sdslen(spat)-(prefixlen+1)-fieldlen;
61   - memcpy(keyname.buf,spat,prefixlen);
62   - memcpy(keyname.buf+prefixlen,ssub,sublen);
63   - memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
64   - keyname.buf[prefixlen+sublen+postfixlen] = '\0';
65   - keyname.len = prefixlen+sublen+postfixlen;
66   - decrRefCount(subst);
  66 + postfixlen = sdslen(spat)-(prefixlen+1)-(fieldlen ? fieldlen+2 : 0);
  67 + keyobj = createStringObject(NULL,prefixlen+sublen+postfixlen);
  68 + k = keyobj->ptr;
  69 + memcpy(k,spat,prefixlen);
  70 + memcpy(k+prefixlen,ssub,sublen);
  71 + memcpy(k+prefixlen+sublen,p+1,postfixlen);
  72 + decrRefCount(subst); /* Incremented by decodeObject() */
67 73
68 74 /* Lookup substituted key */
69   - initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(struct sdshdr)));
70   - o = lookupKeyRead(db,&keyobj);
71   - if (o == NULL) return NULL;
  75 + o = lookupKeyRead(db,keyobj);
  76 + if (o == NULL) goto noobj;
72 77
73 78 if (fieldlen > 0) {
74   - if (o->type != REDIS_HASH || fieldname.len < 1) return NULL;
  79 + if (o->type != REDIS_HASH) goto noobj;
75 80
76 81 /* Retrieve value from hash by the field name. This operation
77 82 * already increases the refcount of the returned object. */
78   - initStaticStringObject(fieldobj,((char*)&fieldname)+(sizeof(struct sdshdr)));
79   - o = hashTypeGetObject(o, &fieldobj);
  83 + o = hashTypeGetObject(o, fieldobj);
80 84 } else {
81   - if (o->type != REDIS_STRING) return NULL;
  85 + if (o->type != REDIS_STRING) goto noobj;
82 86
83 87 /* Every object that this function returns needs to have its refcount
84 88 * increased. sortCommand decreases it again. */
85 89 incrRefCount(o);
86 90 }
87   -
  91 + decrRefCount(keyobj);
  92 + if (fieldlen) decrRefCount(fieldobj);
88 93 return o;
  94 +
  95 +noobj:
  96 + decrRefCount(keyobj);
  97 + if (fieldlen) decrRefCount(fieldobj);
  98 + return NULL;
89 99 }
90 100
91 101 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with

0 comments on commit 8e5e8f0

Please sign in to comment.
Something went wrong with that request. Please try again.