Skip to content

Commit

Permalink
Merge pull request #1567 from mattsta/fix-cluster-join
Browse files Browse the repository at this point in the history
Bind source address for cluster communication
  • Loading branch information
antirez committed Mar 10, 2014
2 parents 0f1f257 + e5b1e7b commit 3b0edb8
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 7 deletions.
32 changes: 28 additions & 4 deletions src/anet.c
Expand Up @@ -234,11 +234,12 @@ static int anetCreateSocket(char *err, int domain) {

#define ANET_CONNECT_NONE 0
#define ANET_CONNECT_NONBLOCK 1
static int anetTcpGenericConnect(char *err, char *addr, int port, int flags)
static int anetTcpGenericConnect(char *err, char *addr, int port,
char *source_addr, int flags)
{
int s = ANET_ERR, rv;
char portstr[6]; /* strlen("65535") + 1; */
struct addrinfo hints, *servinfo, *p;
struct addrinfo hints, *servinfo, *bservinfo, *p, *b;

snprintf(portstr,sizeof(portstr),"%d",port);
memset(&hints,0,sizeof(hints));
Expand All @@ -258,6 +259,24 @@ static int anetTcpGenericConnect(char *err, char *addr, int port, int flags)
if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;
if (flags & ANET_CONNECT_NONBLOCK && anetNonBlock(err,s) != ANET_OK)
goto error;
if (source_addr) {
int bound = 0;
/* Using getaddrinfo saves us from self-determining IPv4 vs IPv6 */
if ((rv = getaddrinfo(source_addr, NULL, &hints, &bservinfo)) != 0) {
anetSetError(err, "%s", gai_strerror(rv));
goto end;
}
for (b = bservinfo; b != NULL; b = b->ai_next) {
if (bind(s,b->ai_addr,b->ai_addrlen) != -1) {
bound = 1;
break;
}
}
if (!bound) {
anetSetError(err, "bind: %s", strerror(errno));
goto end;
}
}
if (connect(s,p->ai_addr,p->ai_addrlen) == -1) {
/* If the socket is non-blocking, it is ok for connect() to
* return an EINPROGRESS error here. */
Expand Down Expand Up @@ -287,12 +306,17 @@ static int anetTcpGenericConnect(char *err, char *addr, int port, int flags)

int anetTcpConnect(char *err, char *addr, int port)
{
return anetTcpGenericConnect(err,addr,port,ANET_CONNECT_NONE);
return anetTcpGenericConnect(err,addr,port,NULL,ANET_CONNECT_NONE);
}

int anetTcpNonBlockConnect(char *err, char *addr, int port)
{
return anetTcpGenericConnect(err,addr,port,ANET_CONNECT_NONBLOCK);
return anetTcpGenericConnect(err,addr,port,NULL,ANET_CONNECT_NONBLOCK);
}

int anetTcpNonBlockBindConnect(char *err, char *addr, int port, char *source_addr)
{
return anetTcpGenericConnect(err,addr,port,source_addr,ANET_CONNECT_NONBLOCK);
}

int anetUnixGenericConnect(char *err, char *path, int flags)
Expand Down
1 change: 1 addition & 0 deletions src/anet.h
Expand Up @@ -45,6 +45,7 @@

int anetTcpConnect(char *err, char *addr, int port);
int anetTcpNonBlockConnect(char *err, char *addr, int port);
int anetTcpNonBlockBindConnect(char *err, char *addr, int port, char *source_addr);
int anetUnixConnect(char *err, char *path);
int anetUnixNonBlockConnect(char *err, char *path);
int anetRead(int fd, char *buf, int count);
Expand Down
11 changes: 8 additions & 3 deletions src/cluster.c
Expand Up @@ -2407,9 +2407,14 @@ void clusterCron(void) {
mstime_t old_ping_sent;
clusterLink *link;

fd = anetTcpNonBlockConnect(server.neterr, node->ip,
node->port+REDIS_CLUSTER_PORT_INCR);
if (fd == -1) continue;
fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
node->port+REDIS_CLUSTER_PORT_INCR, server.bindaddr[0]);
if (fd == -1) {
redisLog(REDIS_DEBUG, "Unable to connect to "
"Cluster Client [%s]:%d", node->ip,
node->port+REDIS_CLUSTER_PORT_INCR);
continue;
}
link = createClusterLink(node);
link->fd = fd;
node->link = link;
Expand Down

0 comments on commit 3b0edb8

Please sign in to comment.