Skip to content

Commit

Permalink
BITOP command 10x speed improvement.
Browse files Browse the repository at this point in the history
This commit adds a fast-path to the BITOP that can be used for all the
bytes from 0 to the minimal length of the string, and if there are
at max 16 input keys.

Often the intersected bitmaps are roughly the same size, so this
optimization can provide a 10x speed boost to most real world usages
of the command.

Bytes are processed four full words at a time, in loops specialized
for the specific BITOP sub-command, without the need to check for
length issues with the inputs (since we run this algorithm only as far
as there is data from all the keys at the same time).

The remaining part of the string is intersected in the usual way using
the slow but generic algorith.

It is possible to do better than this with inputs that are not roughly
the same size, sorting the input keys by length, by initializing the
result string in a smarter way, and noticing that the final part of the
output string composed of only data from the longest string does not
need any proecessing since AND, OR and XOR against an empty string does
not alter the output (zero in the first case, and the original string in
the other two cases).

More implementations will be implemented later likely, but this should
be enough to release Redis 2.6-RC4 with bitops merged in.

Note: this commit also adds better testing for BITOP NOT command, that
is currently the faster and hard to optimize further since it just
flips the bits of a single input string.
  • Loading branch information
antirez committed May 24, 2012
1 parent fa4a5d5 commit d866803
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 2 deletions.
70 changes: 69 additions & 1 deletion src/bitops.c
Expand Up @@ -162,6 +162,7 @@ void bitopCommand(redisClient *c) {
robj **objects; /* Array of soruce objects. */
unsigned char **src; /* Array of source strings pointers. */
long *len, maxlen = 0; /* Array of length of src strings, and max len. */
long minlen = 0; /* Min len among the input keys. */
unsigned char *res = NULL; /* Resulting string. */

/* Parse the operation name. */
Expand Down Expand Up @@ -213,6 +214,7 @@ void bitopCommand(redisClient *c) {
src[j] = objects[j]->ptr;
len[j] = sdslen(objects[j]->ptr);
if (len[j] > maxlen) maxlen = len[j];
if (j == 0 || len[j] < minlen) minlen = len[j];
}

/* Compute the bit operation, if at least one string is not empty. */
Expand All @@ -221,7 +223,73 @@ void bitopCommand(redisClient *c) {
unsigned char output, byte;
long i;

for (j = 0; j < maxlen; j++) {
/* Fast path: as far as we have data for all the input bitmaps we
* can take a fast path that performs much better than the
* vanilla algorithm. */
j = 0;
if (minlen && numkeys <= 16) {
unsigned long *lp[16];
unsigned long *lres = (unsigned long*) res;

/* Note: sds pointer is always aligned to 8 byte boundary. */
memcpy(lp,src,sizeof(unsigned long*)*numkeys);
memcpy(res,src[0],minlen);

/* Different branches per different operations for speed (sorry). */
if (op == BITOP_AND) {
while(minlen >= sizeof(unsigned long)*4) {
for (i = 1; i < numkeys; i++) {
lres[0] &= lp[i][0];
lres[1] &= lp[i][1];
lres[2] &= lp[i][2];
lres[3] &= lp[i][3];
lp[i]+=4;
}
lres+=4;
j += sizeof(unsigned long)*4;
minlen -= sizeof(unsigned long)*4;
}
} else if (op == BITOP_OR) {
while(minlen >= sizeof(unsigned long)*4) {
for (i = 1; i < numkeys; i++) {
lres[0] |= lp[i][0];
lres[1] |= lp[i][1];
lres[2] |= lp[i][2];
lres[3] |= lp[i][3];
lp[i]+=4;
}
lres+=4;
j += sizeof(unsigned long)*4;
minlen -= sizeof(unsigned long)*4;
}
} else if (op == BITOP_XOR) {
while(minlen >= sizeof(unsigned long)*4) {
for (i = 1; i < numkeys; i++) {
lres[0] ^= lp[i][0];
lres[1] ^= lp[i][1];
lres[2] ^= lp[i][2];
lres[3] ^= lp[i][3];
lp[i]+=4;
}
lres+=4;
j += sizeof(unsigned long)*4;
minlen -= sizeof(unsigned long)*4;
}
} else if (op == BITOP_NOT) {
while(minlen >= sizeof(unsigned long)*4) {
lres[0] = ~lres[0];
lres[1] = ~lres[1];
lres[2] = ~lres[2];
lres[3] = ~lres[3];
lres+=4;
j += sizeof(unsigned long)*4;
minlen -= sizeof(unsigned long)*4;
}
}
}

/* j is set to the next byte to process by the previous loop. */
for (; j < maxlen; j++) {
output = (len[0] <= j) ? 0 : src[0][j];
if (op == BITOP_NOT) output = ~output;
for (i = 1; i < numkeys; i++) {
Expand Down
12 changes: 11 additions & 1 deletion tests/unit/bitops.tcl
Expand Up @@ -24,13 +24,13 @@ proc simulate_bit_op {op args} {
set out {}
for {set x 0} {$x < $maxlen} {incr x} {
set bit [string range $b(0) $x $x]
if {$op eq {not}} {set bit [expr {!$bit}]}
for {set j 1} {$j < $count} {incr j} {
set bit2 [string range $b($j) $x $x]
switch $op {
and {set bit [expr {$bit & $bit2}]}
or {set bit [expr {$bit | $bit2}]}
xor {set bit [expr {$bit ^ $bit2}]}
not {set bit [expr {!$bit}]}
}
}
append out $bit
Expand Down Expand Up @@ -135,6 +135,16 @@ start_server {tags {"bitops"}} {
}
}

test {BITOP NOT fuzzing} {
for {set i 0} {$i < 10} {incr i} {
r flushall
set str [randstring 0 1000]
r set str $str
r bitop not target str
assert_equal [r get target] [simulate_bit_op not $str]
}
}

test {BITOP with integer encoded source objects} {
r set a 1
r set b 2
Expand Down

0 comments on commit d866803

Please sign in to comment.