Skip to content

Commit

Permalink
Merge pull request #48 from VCTLabs/separate-redis-contexts
Browse files Browse the repository at this point in the history
subscribe/unsubscribe commands need separate context from rest
  • Loading branch information
SJLC committed May 4, 2023
2 parents de2fe92 + 72d41b7 commit 0b9a87f
Showing 1 changed file with 75 additions and 14 deletions.
89 changes: 75 additions & 14 deletions src/redis_ipc.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@
#define STDERR_DEBUG_FIELD "stderr_debug"
#define SETTINGS_WRITER_FIELD "settings_writer"

/* need to keep one redis context dedicated to subscribe/unsibscribe commands,
* while all other command types can share a redis context */
enum redis_cmd_class
{
REDIS_NORMAL_CMDS = 0,
REDIS_SUB_UNSUB_CMDS
};

struct redis_ipc_config
{
int debug_verbosity;
Expand All @@ -39,7 +47,8 @@ struct redis_ipc_per_thread
char *component; // component name used in redis
char *thread; // thread name used in redis
char *result_queue_path; // based on component and thread
redisContext *redis_state; // state for connection to redis server
redisContext *redis_state; // state for primary connection to redis server
redisContext *redis_sub_state; // state for subscriptions to redis server
const char *redis_socket_path; // path for connection to redis server
unsigned int command_ctr; // counter for number of commands sent by thread
int force_quiet; // force stderr prints off
Expand Down Expand Up @@ -173,6 +182,11 @@ void cleanup_per_thread_info(struct redis_ipc_per_thread *thread_info)
// closes connection and frees memory related to this connection
redisFree(thread_info->redis_state);
}
if (thread_info->redis_sub_state)
{
// closes connection and frees memory related to this connection
redisFree(thread_info->redis_sub_state);
}
}

// initialize per-thread state
Expand Down Expand Up @@ -203,10 +217,14 @@ int redis_ipc_init(const char *this_component, const char *this_thread)
if (new_info->redis_socket_path == NULL)
new_info->redis_socket_path = RIPC_SERVER_PATH;

// each thread gets its own redis connection
// each thread needs a pair of redis connections,
// since subscribe/unsubscribe commands need separate context from the rest
new_info->redis_state = redisConnectUnix(new_info->redis_socket_path);
if (new_info->redis_state == NULL || new_info->redis_state->err)
goto redis_ipc_init_failed;
new_info->redis_sub_state = redisConnectUnix(new_info->redis_socket_path);
if (new_info->redis_sub_state == NULL || new_info->redis_sub_state->err)
goto redis_ipc_init_failed;

// each thread gets its own results queue in redis
memset(result_queue_path, 0, sizeof(result_queue_path));
Expand Down Expand Up @@ -337,19 +355,25 @@ int stderr_debug_is_enabled()

// check for errors in redis command execution;
// reset connection or free reply object if error is found
redisReply * validate_redis_reply(redisReply *reply)
redisReply * validate_redis_reply(redisReply *reply, enum redis_cmd_class cmd_class)
{
struct redis_ipc_per_thread *thread_info = get_per_thread_info();
redisReply *validated_reply = reply;
redisContext *redis_state = NULL;

if (cmd_class == REDIS_SUB_UNSUB_CMDS)
redis_state = thread_info->redis_sub_state;
else
redis_state = thread_info->redis_state;

// error in connection to server
if (reply == NULL)
{
// must reconnect to redis server after an error
// coverity[CWE-476] NOT null if redis_ipc_init() ran in this thread
stderr_debug("[ERROR] '%s', need to reconnect", thread_info->redis_state->errstr);
redisFree(thread_info->redis_state);
thread_info->redis_state = redisConnectUnix(RIPC_SERVER_PATH);
redisFree(redis_state);
redis_state = redisConnectUnix(RIPC_SERVER_PATH);
}
// error in command
else if (reply->type == REDIS_REPLY_ERROR)
Expand Down Expand Up @@ -388,7 +412,7 @@ redisReply * redis_command(const char *format, ...)

// check for redis errors and avoid returning an error string
// (returns NULL instead, harder to overlook...)
reply = validate_redis_reply(reply);
reply = validate_redis_reply(reply, REDIS_NORMAL_CMDS);

return reply;
}
Expand Down Expand Up @@ -418,7 +442,38 @@ redisReply * redis_command_from_list(int argc, const char **argv)

// check for redis errors and avoid returning an error string
// (returns NULL instead, harder to overlook...)
reply = validate_redis_reply(reply);
reply = validate_redis_reply(reply, REDIS_NORMAL_CMDS);

return reply;
}

// run redis subscribe/unsubscribe command and return reply object if command succeeds
//
// if return value is non-null,
// caller is responsible for calling freeReplyObject() when done with reply
redisReply * redis_subscription_command(const char *format, ...)
{
struct redis_ipc_per_thread *thread_info = get_per_thread_info();
redisReply *reply = NULL;
va_list argp;

if (stderr_debug_is_enabled())
{
va_start(argp, format);
fprintf(stderr, "(%s:subscriptions) ", thread_info->component);
vfprintf(stderr, format, argp);
fprintf(stderr, "\n");
va_end(argp);
}

va_start(argp, format);
// coverity[CWE-476] NOT null if redis_ipc_init() ran in this thread
reply = redisvCommand(thread_info->redis_sub_state, format, argp);
va_end(argp);

// check for redis errors and avoid returning an error string
// (returns NULL instead, harder to overlook...)
reply = validate_redis_reply(reply, REDIS_SUB_UNSUB_CMDS);

return reply;
}
Expand Down Expand Up @@ -1177,7 +1232,7 @@ static int redis_subscribe(const char *channel_path)
int ret = RIPC_FAIL;

// don't forget to free reply later
reply = redis_command("PSUBSCRIBE %s", channel_path);
reply = redis_subscription_command("PSUBSCRIBE %s", channel_path);

if (reply != NULL)
{
Expand Down Expand Up @@ -1310,7 +1365,7 @@ static int redis_unsubscribe(char *channel_path)
int ret = RIPC_FAIL;

// don't forget to free reply later
reply = redis_command("PUNSUBSCRIBE %s", channel_path);
reply = redis_subscription_command("PUNSUBSCRIBE %s", channel_path);

if (reply != NULL)
{
Expand Down Expand Up @@ -1409,14 +1464,15 @@ json_object * redis_ipc_get_message_timeout(struct timeval timeout)
if (thread_info == NULL)
goto redis_get_channel_message_finish;

// block until a message is available or timeout is reached
redisSetTimeout(thread_info->redis_state, timeout);
ret = redisGetReply(thread_info->redis_state, &reply);
// block until a message is available or timeout is reached,
// using special redis context for subscriptions
redisSetTimeout(thread_info->redis_sub_state, timeout);
ret = redisGetReply(thread_info->redis_sub_state, &reply);
if (ret != REDIS_OK)
{
// must reconnect to redis server after an error
redisFree(thread_info->redis_state);
thread_info->redis_state = redisConnectUnix(RIPC_SERVER_PATH);
redisFree(thread_info->redis_sub_state);
thread_info->redis_sub_state = redisConnectUnix(RIPC_SERVER_PATH);

goto redis_get_channel_message_finish;
}
Expand All @@ -1440,6 +1496,11 @@ json_object * redis_ipc_get_message_timeout(struct timeval timeout)

// parse message back into json object
message = json_tokener_parse(message_str);
if (message == NULL)
{
// message is plain string, probably settings change notification
message = json_object_new_string(message_str);
}

redis_get_channel_message_finish:
if (reply != NULL)
Expand Down

0 comments on commit 0b9a87f

Please sign in to comment.