Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

typedef struct {
int port;
char *unixsocket;
mode_t unixsocketperm;
char *cluster_address;
char *entry_node_host;
int entry_node_port;
Expand Down
72 changes: 63 additions & 9 deletions src/proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <sys/utsname.h>

#define DEFAULT_PORT 7777
#define DEFAULT_UNIXSOCKETPERM 0
#define DEFAULT_MAX_CLIENTS 10000
#define MAX_THREADS 500
#define DEFAULT_THREADS 8
Expand Down Expand Up @@ -76,6 +77,7 @@
isempty))
#define REQID_PRINTF_FMT "%d:%" PRId64 ":%" PRId64
#define REQID_PRINTF_ARG(r) r->client->thread_id, r->client->id, r->id
#define getClientPeerIP(c) (c->ip ? c->ip : config.unixsocket)
#define PROXY_CMD_LOG_MAX_LEN 4096

typedef struct proxyThread {
Expand Down Expand Up @@ -475,6 +477,14 @@ static sds genInfoString(sds section) {
(proxy.configfile ? proxy.configfile : ""),
(config.auth_user ? config.auth_user : "default")
);
if (proxy.unixsocket_fd != -1) {
info = sdscatprintf(info,
"unix_socket:%s\r\n"
"unix_socket_permissions:%o\r\n",
config.unixsocket,
config.unixsocketperm
);
}
}
if (default_section || all_sections ||
!strcasecmp("clients", section))
Expand Down Expand Up @@ -1417,6 +1427,8 @@ static void printHelp(void) {
" --tcpkeepalive TCP Keep Alive (default: %d)\n"
" --tcp-backlog TCP Backlog (default: %d)\n"
" --daemonize Execute the proxy in background\n"
" --unixsocket <sock_file> UNIX socket path (empty by default)\n"
" --unixsocketperm <mode> UNIX socket permissions (default: %o)\n"
" --disable-multiplexing <opt> When should multiplexing disabled\n"
" (never|auto|always) (default: auto)\n"
" --enable-cross-slot Enable cross-slot queries (warning: cross-slot"
Expand All @@ -1435,7 +1447,7 @@ static void printHelp(void) {
"'debug') \n"
" -h, --help Print this help\n",
DEFAULT_PORT, DEFAULT_MAX_CLIENTS, DEFAULT_THREADS, MAX_THREADS,
DEFAULT_TCP_KEEPALIVE, DEFAULT_TCP_BACKLOG);
DEFAULT_TCP_KEEPALIVE, DEFAULT_TCP_BACKLOG, DEFAULT_UNIXSOCKETPERM);
}

int parseOptions(int argc, char **argv) {
Expand Down Expand Up @@ -1465,7 +1477,16 @@ int parseOptions(int argc, char **argv) {
config.dump_buffer = 1;
else if (!strcmp("--dump-queues", arg))
config.dump_queues = 1;
else if (!strcmp("-c", arg) && !lastarg) {
else if (!strcmp(argv[i], "--unixsocket") && !lastarg)
config.unixsocket = zstrdup(argv[++i]);
else if (!strcmp(argv[i], "--unixsocketperm") && !lastarg) {
errno = 0;
config.unixsocketperm = (mode_t)strtol(argv[++i], NULL, 8);
if (errno || config.unixsocketperm > 0777) {
fprintf(stderr, "Invalid socket file permissions:%s\n", argv[i]);
exit(1);
}
} else if (!strcmp("-c", arg) && !lastarg) {
char *cfgfile = argv[++i];
if (!parseOptionsFromFile(cfgfile)) exit(1);
proxy.configfile = sdsnew(cfgfile);
Expand Down Expand Up @@ -1631,6 +1652,8 @@ static void checkTcpBacklogSettings(void) {

static void initConfig(void) {
config.port = DEFAULT_PORT;
config.unixsocket = NULL;
config.unixsocketperm = DEFAULT_UNIXSOCKETPERM;
config.tcpkeepalive = DEFAULT_TCP_KEEPALIVE;
config.maxclients = DEFAULT_MAX_CLIENTS;
config.num_threads = DEFAULT_THREADS;
Expand Down Expand Up @@ -1774,7 +1797,7 @@ static int processThreadPipeBufferForNewClients(proxyThread *thread) {
errno = 0;
if (!installIOHandler(el, c->fd, AE_READABLE, readQuery, c, 0)) {
proxyLogErr("ERROR: Failed to create read query handler for "
"client %s\n", c->ip);
"client %s\n", getClientPeerIP(c));
errno = EL_INSTALL_HANDLER_FAIL;
freeClient(c);
processed++;
Expand Down Expand Up @@ -2011,7 +2034,7 @@ static client *createClient(int fd, char *ip) {
}
c->status = CLIENT_STATUS_NONE;
c->fd = fd;
c->ip = sdsnew(ip);
c->ip = ip ? sdsnew(ip) : NULL;
c->obuf = sdsempty();
c->reply_array = NULL;
c->current_request = NULL;
Expand Down Expand Up @@ -2484,7 +2507,6 @@ void onClusterNodeDisconnection(clusterNode *node) {
}
}

/* TODO: implement also UNIX socket listener */
static int listen(void) {
int fd_idx = 0;
/* Try to use both IPv6 and IPv4 */
Expand All @@ -2501,6 +2523,18 @@ static int listen(void) {
anetNonBlock(NULL, proxy.fds[fd_idx++]);
else if (errno == EAFNOSUPPORT)
proxyLogWarn("Not listening to IPv4: unsupported\n");
/* UNIX socket listener */
if (config.unixsocket != NULL) {
unlink(config.unixsocket); /* Don't care if this fails */
proxy.unixsocket_fd = anetUnixServer(proxy.neterr, config.unixsocket,
config.unixsocketperm, config.tcp_backlog);
if (proxy.unixsocket_fd != ANET_ERR) {
anetNonBlock(NULL, proxy.unixsocket_fd);
proxy.fds[fd_idx++] = proxy.unixsocket_fd;
} else {
proxyLogWarn("Opening Unix socket: %s\n", proxy.neterr);
}
}
proxy.fd_count = fd_idx;
return fd_idx;
}
Expand Down Expand Up @@ -3441,15 +3475,15 @@ void readQuery(aeEventLoop *el, int fd, void *privdata, int mask){
if (errno == EAGAIN) {
return;
} else {
proxyLogDebug("Error reading from client %s: %s\n", c->ip,
strerror(errno));
proxyLogDebug("Error reading from client %s: %s\n",
getClientPeerIP(c), strerror(errno));
unlinkClient(c); /* TODO: Free? */
return;
}
} else if (nread == 0) {
proxyLogDebug("Client %" PRId64 " from %s closed connection "
"(thread: %d)\n",
c->id, c->ip, c->thread_id);
c->id, getClientPeerIP(c), c->thread_id);
freeClient(c);
return;
}
Expand Down Expand Up @@ -3481,7 +3515,7 @@ static void acceptHandler(int fd, char *ip) {
client *c = createClient(fd, ip);
if (c == NULL) return;
proxyLogDebug("Client %" PRId64 " connected from %s (thread: %d)\n",
c->id, ip, c->thread_id);
c->id, ip ? ip : config.unixsocket, c->thread_id);
proxyThread *thread = proxy.threads[c->thread_id];
assert(thread != NULL);
if (!awakeThreadForNewClient(thread, c)) {
Expand Down Expand Up @@ -3512,6 +3546,24 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask)
}
}

void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
int client_fd, max = MAX_ACCEPTS;
while (max--) {
client_fd = anetUnixAccept(proxy.neterr, fd);
if (client_fd == ANET_ERR) {
if (errno != EWOULDBLOCK)
proxyLogWarn("Accepting client connection: %s\n",
proxy.neterr);
return;
}
proxyLogDebug("Accepted connection to %s\n", config.unixsocket);
acceptHandler(client_fd, NULL);
}
}

/* Add the reply received by request `*req` to the `child_replies` rax.
* When all child requests received their replies, call the command's
* reply handler (handleReply), and free all the requests (both parent and
Expand Down Expand Up @@ -3863,6 +3915,7 @@ int main(int argc, char **argv) {
if (strcmp("default", config.auth_user) == 0) config.auth_user = NULL;
else if (config.auth == NULL) config.auth = "";
}
proxy.unixsocket_fd = -1;
proxy.tcp_backlog = config.tcp_backlog;
checkTcpBacklogSettings();
if (!listen()) {
Expand All @@ -3875,6 +3928,7 @@ int main(int argc, char **argv) {
initProxy();
for (i = 0; i < proxy.fd_count; i++) {
if (!installIOHandler(proxy.main_loop, proxy.fds[i], AE_READABLE,
proxy.fds[i] == proxy.unixsocket_fd ? acceptUnixHandler :
acceptTcpHandler, NULL, 0))
{
proxyLogErr("FATAL: Failed to create TCP accept handlers, "
Expand Down
3 changes: 2 additions & 1 deletion src/proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ typedef struct clientRequest {

typedef struct {
aeEventLoop *main_loop;
int fds[2];
int fds[3];
int fd_count;
int unixsocket_fd;
int tcp_backlog;
char neterr[ANET_ERR_LEN];
struct proxyThread **threads;
Expand Down