Skip to content

Commit

Permalink
Merge remote-tracking branch 'antirez/unstable' into pubsub-persistence
Browse files Browse the repository at this point in the history
  • Loading branch information
abhinavsingh committed Apr 15, 2015
2 parents 6fdfc98 + 6c60526 commit 2e54948
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 14 deletions.
7 changes: 7 additions & 0 deletions src/bio.c
Expand Up @@ -142,6 +142,13 @@ void *bioProcessBackgroundJobs(void *arg) {
unsigned long type = (unsigned long) arg;
sigset_t sigset;

/* Check that the type is within the right interval. */
if (type >= REDIS_BIO_NUM_OPS) {
redisLog(REDIS_WARNING,
"Warning: bio thread started with wrong type %lu",type);
return NULL;
}

/* Make the thread killable at any time, so that bioKillThreads()
* can work reliably. */
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
Expand Down
44 changes: 35 additions & 9 deletions src/networking.c
Expand Up @@ -135,23 +135,49 @@ redisClient *createClient(int fd) {
* returns REDIS_OK, and make sure to install the write handler in our event
* loop so that when the socket is writable new data gets written.
*
* If the client should not receive new data, because it is a fake client,
* a master, a slave not yet online, or because the setup of the write handler
* failed, the function returns REDIS_ERR.
* If the client should not receive new data, because it is a fake client
* (used to load AOF in memory), a master or because the setup of the write
* handler failed, the function returns REDIS_ERR.
*
* The function may return REDIS_OK without actually installing the write
* event handler in the following cases:
*
* 1) The event handler should already be installed since the output buffer
* already contained something.
* 2) The client is a slave but not yet online, so we want to just accumulate
* writes in the buffer but not actually sending them yet.
*
* Typically gets called every time a reply is built, before adding more
* data to the clients output buffers. If the function returns REDIS_ERR no
* data should be appended to the output buffers. */
int prepareClientToWrite(redisClient *c) {
/* If it's the Lua client we always return ok without installing any
* handler since there is no socket at all. */
if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK;

/* Masters don't receive replies, unless REDIS_MASTER_FORCE_REPLY flag
* is set. */
if ((c->flags & REDIS_MASTER) &&
!(c->flags & REDIS_MASTER_FORCE_REPLY)) return REDIS_ERR;
if (c->fd <= 0) return REDIS_ERR; /* Fake client */

if (c->fd <= 0) return REDIS_ERR; /* Fake client for AOF loading. */

/* Only install the handler if not already installed and, in case of
* slaves, if the client can actually receive writes. */
if (c->bufpos == 0 && listLength(c->reply) == 0 &&
(c->replstate == REDIS_REPL_NONE ||
c->replstate == REDIS_REPL_ONLINE) &&
aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
(c->replstate == REDIS_REPL_ONLINE && !c->repl_put_online_on_ack)))
{
/* Try to install the write handler. */
if (aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
sendReplyToClient, c) == AE_ERR)
{
freeClientAsync(c);
return REDIS_ERR;
}
}

/* Authorize the caller to queue in the output buffer of this client. */
return REDIS_OK;
}

Expand Down Expand Up @@ -454,10 +480,10 @@ void addReplyLongLongWithPrefix(redisClient *c, long long ll, char prefix) {
/* Things like $3\r\n or *2\r\n are emitted very often by the protocol
* so we have a few shared objects to use if the integer is small
* like it is most of the times. */
if (prefix == '*' && ll < REDIS_SHARED_BULKHDR_LEN) {
if (prefix == '*' && ll < REDIS_SHARED_BULKHDR_LEN && ll >= 0) {
addReply(c,shared.mbulkhdr[ll]);
return;
} else if (prefix == '$' && ll < REDIS_SHARED_BULKHDR_LEN) {
} else if (prefix == '$' && ll < REDIS_SHARED_BULKHDR_LEN && ll >= 0) {
addReply(c,shared.bulkhdr[ll]);
return;
}
Expand Down
6 changes: 4 additions & 2 deletions src/replication.c
Expand Up @@ -652,15 +652,16 @@ void replconfCommand(redisClient *c) {
*
* It does a few things:
*
* 1) Put the slave in ONLINE state.
* 1) Put the slave in ONLINE state (useless when the function is called
* because state is already ONLINE but repl_put_online_on_ack is true).
* 2) Make sure the writable event is re-installed, since calling the SYNC
* command disables it, so that we can accumulate output buffer without
* sending it to the slave.
* 3) Update the count of good slaves. */
void putSlaveOnline(redisClient *slave) {
slave->replstate = REDIS_REPL_ONLINE;
slave->repl_put_online_on_ack = 0;
slave->repl_ack_time = server.unixtime;
slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
sendReplyToClient, slave) == AE_ERR) {
redisLog(REDIS_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno));
Expand Down Expand Up @@ -773,6 +774,7 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
* is technically online now. */
slave->replstate = REDIS_REPL_ONLINE;
slave->repl_put_online_on_ack = 1;
slave->repl_ack_time = server.unixtime; /* Timeout otherwise. */
} else {
if (bgsaveerr != REDIS_OK) {
freeClient(slave);
Expand Down
18 changes: 16 additions & 2 deletions src/t_set.c
Expand Up @@ -144,7 +144,11 @@ void setTypeReleaseIterator(setTypeIterator *si) {
* Since set elements can be internally be stored as redis objects or
* simple arrays of integers, setTypeNext returns the encoding of the
* set object you are iterating, and will populate the appropriate pointer
* (eobj) or (llobj) accordingly.
* (objele) or (llele) accordingly.
*
* Note that both the objele and llele pointers should be passed and cannot
* be NULL since the function will try to defensively populate the non
* used field with values which are easy to trap if misused.
*
* When there are no longer elements -1 is returned.
* Returned objects ref count is not incremented, so this function is
Expand All @@ -154,9 +158,13 @@ int setTypeNext(setTypeIterator *si, robj **objele, int64_t *llele) {
dictEntry *de = dictNext(si->di);
if (de == NULL) return -1;
*objele = dictGetKey(de);
*llele = -123456789; /* Not needed. Defensive. */
} else if (si->encoding == REDIS_ENCODING_INTSET) {
if (!intsetGet(si->subject->ptr,si->ii++,llele))
return -1;
*objele = NULL; /* Not needed. Defensive. */
} else {
redisPanic("Wrong set encoding in setTypeNext");
}
return si->encoding;
}
Expand Down Expand Up @@ -197,15 +205,21 @@ robj *setTypeNextObject(setTypeIterator *si) {
* field of the object and is used by the caller to check if the
* int64_t pointer or the redis object pointer was populated.
*
* Note that both the objele and llele pointers should be passed and cannot
* be NULL since the function will try to defensively populate the non
* used field with values which are easy to trap if misused.
*
* When an object is returned (the set was a real set) the ref count
* of the object is not incremented so this function can be considered
* copy on write friendly. */
int setTypeRandomElement(robj *setobj, robj **objele, int64_t *llele) {
if (setobj->encoding == REDIS_ENCODING_HT) {
dictEntry *de = dictGetRandomKey(setobj->ptr);
*objele = dictGetKey(de);
*llele = -123456789; /* Not needed. Defensive. */
} else if (setobj->encoding == REDIS_ENCODING_INTSET) {
*llele = intsetRandom(setobj->ptr);
*objele = NULL; /* Not needed. Defensive. */
} else {
redisPanic("Unknown set encoding");
}
Expand Down Expand Up @@ -240,7 +254,7 @@ void setTypeConvert(robj *setobj, int enc) {

/* To add the elements we extract integers and create redis objects */
si = setTypeInitIterator(setobj);
while (setTypeNext(si,NULL,&intele) != -1) {
while (setTypeNext(si,&element,&intele) != -1) {
element = createStringObjectFromLongLong(intele);
redisAssertWithInfo(NULL,element,
dictAdd(d,element,NULL) == DICT_OK);
Expand Down
1 change: 1 addition & 0 deletions tests/cluster/run.tcl
Expand Up @@ -17,6 +17,7 @@ proc main {} {
}
run_tests
cleanup
end_tests
}

if {[catch main e]} {
Expand Down
14 changes: 14 additions & 0 deletions tests/instances.tcl
Expand Up @@ -19,6 +19,7 @@ set ::verbose 0
set ::valgrind 0
set ::pause_on_error 0
set ::simulate_error 0
set ::failed 0
set ::sentinel_instances {}
set ::redis_instances {}
set ::sentinel_base_port 20000
Expand Down Expand Up @@ -231,6 +232,7 @@ proc test {descr code} {
flush stdout

if {[catch {set retval [uplevel 1 $code]} error]} {
incr ::failed
if {[string match "assertion:*" $error]} {
set msg [string range $error 10 end]
puts [colorstr red $msg]
Expand All @@ -246,6 +248,7 @@ proc test {descr code} {
}
}

# Execute all the units inside the 'tests' directory.
proc run_tests {} {
set tests [lsort [glob ../tests/*]]
foreach test $tests {
Expand All @@ -258,6 +261,17 @@ proc run_tests {} {
}
}

# Print a message and exists with 0 / 1 according to zero or more failures.
proc end_tests {} {
if {$::failed == 0} {
puts "GOOD! No errors."
exit 0
} else {
puts "WARNING $::failed tests faield."
exit 1
}
}

# The "S" command is used to interact with the N-th Sentinel.
# The general form is:
#
Expand Down
1 change: 1 addition & 0 deletions tests/sentinel/run.tcl
Expand Up @@ -13,6 +13,7 @@ proc main {} {
spawn_instance redis $::redis_base_port $::instances_count
run_tests
cleanup
end_tests
}

if {[catch main e]} {
Expand Down
7 changes: 6 additions & 1 deletion tests/support/server.tcl
Expand Up @@ -54,10 +54,15 @@ proc kill_server config {

# kill server and wait for the process to be totally exited
catch {exec kill $pid}
if {$::valgrind} {
set max_wait 60000
} else {
set max_wait 10000
}
while {[is_alive $config]} {
incr wait 10

if {$wait >= 5000} {
if {$wait >= $max_wait} {
puts "Forcing process $pid to exit..."
catch {exec kill -KILL $pid}
} elseif {$wait % 1000 == 0} {
Expand Down

0 comments on commit 2e54948

Please sign in to comment.