Skip to content

Commit

Permalink
Add support for multiple Redis servers
Browse files Browse the repository at this point in the history
Two new GUCs were added: "redislog.shuffle_hosts" and "redislog.hosts". The
list of Redis servers is parsed and shuffled on every new connection.

Fixes #6 and #9
  • Loading branch information
leonardoce authored and mnencia committed Oct 27, 2015
1 parent 958f5c4 commit ad88dbf
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 38 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ compiler:
script:
- make && sudo make install
- sudo pg_createcluster --start $PGVERSION main -o shared_preload_libraries='redislog' -- -A trust
- psql -U postgres postgres -c 'show redislog.host'
- psql -U postgres postgres -c 'show redislog.hosts'
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ The module can be activated by adding the following parameters in
`postgresql.conf`:

shared_preload_libraries = 'redislog'
redislog.host = '127.0.0.1'
redislog.hosts = '127.0.0.1'
redislog.port = 6379
redislog.key = 'postgres'
redislog.min_error_statement = error
redislog.min_messages = warning
redislog.ship_to_redis_only = true
redislog.shuffle_hosts = false

## TODO

Expand Down
278 changes: 242 additions & 36 deletions redislog.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,16 @@
#include "postmaster/syslogger.h"
#include "storage/proc.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/elog.h"
#include "utils/guc.h"
#include "utils/json.h"
#include "utils/ps_status.h"

#include "hiredis/hiredis.h"

#define REDIS_DEFAULT_PORT 6379

/* Allow load of this module in shared libs */
PG_MODULE_MAGIC;

Expand All @@ -56,13 +59,13 @@ void _PG_fini(void);
static emit_log_hook_type prev_log_hook = NULL;

/* GUC Variables */
char *Redislog_host = NULL;
int Redislog_port = 6379;
char *Redislog_hosts = NULL;
int Redislog_timeout = 1000;
char *Redislog_key = NULL;
int Redislog_min_error_statement = ERROR;
int Redislog_min_messages = WARNING;
bool Redislog_ship_to_redis_only = TRUE;
bool Redislog_shuffle_hosts = TRUE;

/* Log timestamp */
#define LOG_TIMESTAMP_LEN 128
Expand All @@ -81,8 +84,11 @@ static int lastPid = 0;
static const char *error_severity(int elevel);

/* Configuration options management */
static void guc_on_assign_reopen_string(const char *newval, void *extra);
static void guc_on_assign_reopen_int(int newval, void *extra);
static bool guc_check_hosts_list(char **newvalue, void **extra, GucSource source);
static void guc_assign_hosts_list(const char *newval, void *extra);
static void split_host_port(const char *token, char **hostName, int *port);
static char **create_host_list(char *hosts_string, int *hosts_count);
static bool host_port_pair_is_correct(const char *str);

/* Redis specific prototypes */
static void redis_log_hook(ErrorData *edata);
Expand Down Expand Up @@ -115,21 +121,95 @@ static const struct config_enum_entry server_message_level_options[] = {
* will be opened on next event.
*/
static void
guc_on_assign_reopen_string(const char *newval, void *extra)
guc_assign_hosts_list(const char *newval, void *extra)
{
redis_close_connection();
}

/*
* Useful for HUP triggered port reassignment: close the connection, a new one
* will be opened on next event.
* Check if the hosts list is syntactically correct
*/
static void
guc_on_assign_reopen_int(int newval, void *extra)
static bool guc_check_hosts_list(char **newvalue, void **extra, GucSource source)
{
redis_close_connection();
char *hosts_string;
int hosts_count;
int i;
char **server_lookup;

hosts_string = pstrdup(*newvalue);
server_lookup = create_host_list(hosts_string, &hosts_count);

if (server_lookup==NULL)
{
GUC_check_errdetail("redislog.hosts list syntax is invalid");
pfree(hosts_string);
return false;
}

if (hosts_count==0)
{
GUC_check_errdetail("redislog.hosts must not be empty");
pfree(server_lookup);
pfree(hosts_string);
return false;
}

for (i=0; i<hosts_count; i++)
{
if (!host_port_pair_is_correct(server_lookup[i]))
{
GUC_check_errdetail("redislog.hosts \"%s\" entry must be of form HOST[:PORT]", server_lookup[i]);
pfree(server_lookup);
pfree(hosts_string);
return false;
}
}

pfree(server_lookup);
pfree(hosts_string);

return true;
}

/*
* Check if the string is of the form HOST[:PORT]
*/
static bool
host_port_pair_is_correct(const char *str)
{
char *p_colon;
int port;

Assert(str!=NULL);

if (strlen(str)==0)
{
/* the string is empty */
return false;
}

p_colon = strchr(str, ':');
if (p_colon==str)
{
/* missing hostname */
return false;
}

if (p_colon==NULL)
{
/* will use the default port */
return true;
}

port = pg_atoi(p_colon+1, sizeof(int32), '\0');
if (port==0)
{
/* port is not valid */
return false;
}

return true;
}

/*
* error_severity
Expand Down Expand Up @@ -200,26 +280,128 @@ static bool
redis_open_connection()
{
struct timeval timeout;
char *hosts_string;
char **server_lookup;
int i, hosts_count;

if (!redis_context)
if (redis_context)
{
/*
* The connection is already opened
*/
return true;
}

hosts_string = pstrdup(Redislog_hosts);
server_lookup = create_host_list(hosts_string, &hosts_count);

if (server_lookup==NULL)
{
/*
* The host list have a syntax error
*/
pfree(hosts_string);
return false;
}


for(i=0; i<hosts_count; i++)
{
char *tok = server_lookup[i];
char *hostname = NULL;
int port;

split_host_port(tok, &hostname, &port);

timeout.tv_sec = Redislog_timeout / 1000;
timeout.tv_usec = Redislog_timeout % 1000 * 1000;
if (Redislog_host[0] == '/')
redis_context = redisConnectUnixWithTimeout(Redislog_host, timeout);

if (hostname[0] == '/')
redis_context = redisConnectUnixWithTimeout(hostname, timeout);
else
redis_context = redisConnectWithTimeout(Redislog_host, Redislog_port, timeout);
redis_context = redisConnectWithTimeout(hostname, port, timeout);

pfree(hostname);

if (redis_context == NULL || redis_context->err)
{
/*
* Something went wrong.
*/
redis_close_connection();
return false;
}
else
{
/*
* target server found
*/
break;
}

}
return true;

pfree(server_lookup);
pfree(hosts_string);

return i<hosts_count;
}


/*
* Create the host list array, that must be freed by the
* caller. Returns NULL if the redislog.hosts list is
* syntactitally incorrect.
*/
static char **
create_host_list(char *hosts_string, int *hosts_count)
{
List *hosts_list;
ListCell *l;
char **server_lookup;
int i;

Assert(hosts_count!=NULL);
Assert(hosts_string!=NULL);

*hosts_count = 0;

if (!SplitIdentifierString(hosts_string, ',', &hosts_list))
{
/*
* The hosts list have been checked in the GUC's check hook
*/
list_free(hosts_list);
return NULL;
}

*hosts_count = list_length(hosts_list);
server_lookup = palloc(*hosts_count * sizeof(char *));
if (Redislog_shuffle_hosts)
{
/* Fisher–Yates shuffle */
for (i=0; i<*hosts_count; i++)
{
int j;

j = random() % (i+1);
if (i != j)
server_lookup[i] = server_lookup[j];
server_lookup[j] = list_nth(hosts_list, i);
}
}
else
{
i = 0;
foreach(l, hosts_list)
{
server_lookup[i] = lfirst(l);
i++;
}
}

list_free(hosts_list);

return server_lookup;
}


Expand Down Expand Up @@ -266,6 +448,29 @@ redis_log_shipper(char *data, int len)
return false;
}

/*
* Parse a string in the form HOST[:PORT]
* The default port is REDIS_DEFAULT_PORT
*/
static void
split_host_port(const char *token, char **hostName, int *port)
{
char *colon;

*hostName = pstrdup(token);
colon = strchr(*hostName, ':');

if (colon==NULL)
{
*port = REDIS_DEFAULT_PORT;
}
else
{
*port = pg_atoi(colon+1, sizeof(int32), '\0');
*colon = '\x0';
}
}

/*
* setup formatted_start_time
* (taken from backend/utils/error/elog.c)
Expand Down Expand Up @@ -589,28 +794,17 @@ void
_PG_init(void)
{
/* Set up GUCs */
DefineCustomStringVariable("redislog.host",
"Redis server host name or IP address.",
NULL,
&Redislog_host,
DefineCustomStringVariable("redislog.hosts",
"List of Redis servers",
"List of HOST[:PORT] pairs separated by commas, ad example:"
"HOST[:PORT][, ...]. If port is not specified the default "
"6379 is assumed",
&Redislog_hosts,
"127.0.0.1",
PGC_SIGHUP,
GUC_NOT_IN_SAMPLE | GUC_SUPERUSER_ONLY,
NULL,
&guc_on_assign_reopen_string,
NULL);

DefineCustomIntVariable("redislog.port",
"Redis server port number.",
NULL,
&Redislog_port,
6379,
0,
65535,
PGC_SIGHUP,
GUC_NOT_IN_SAMPLE | GUC_SUPERUSER_ONLY,
NULL,
&guc_on_assign_reopen_int,
GUC_NOT_IN_SAMPLE | GUC_SUPERUSER_ONLY | GUC_LIST_INPUT,
&guc_check_hosts_list,
&guc_assign_hosts_list,
NULL);

DefineCustomIntVariable("redislog.connection_timeout",
Expand Down Expand Up @@ -679,6 +873,18 @@ _PG_init(void)
NULL,
NULL);

DefineCustomBoolVariable("redislog.shuffle_hosts",
"If true the list of available Redis servers is shuffled",
"Shuffle the list of available Redis server in order to"
"balance events serevrs",
&Redislog_shuffle_hosts,
TRUE,
PGC_SUSET,
GUC_NOT_IN_SAMPLE,
NULL,
NULL,
NULL);

prev_log_hook = emit_log_hook;
emit_log_hook = redis_log_hook;

Expand Down

0 comments on commit ad88dbf

Please sign in to comment.