Skip to content

Commit

Permalink
Refrain from using atexit() to clean-up our logs semaphore. (#811)
Browse files Browse the repository at this point in the history
* Introduce a semaphore owner (pid_t) to control semaphore clean-up.
* Make sure to set returnCode and errno in runprogram when access fails.
* Fix pg_autoctl do tmux session single semaphore.
  • Loading branch information
DimCitus committed Oct 8, 2021
1 parent 84576fd commit c1dfd70
Show file tree
Hide file tree
Showing 11 changed files with 98 additions and 22 deletions.
24 changes: 24 additions & 0 deletions src/bin/lib/subcommands.c/runprogram.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,18 @@ execute_subprogram(Program *prog)
int outpipe[2] = { 0, 0 };
int errpipe[2] = { 0, 0 };

/* first level sanity check */
if (access(prog->program, F_OK | X_OK) == -1)
{
fprintf(stderr, "Failed to find executable program at \"%s\": %s\n",
prog->program,
strerror(errno));

prog->returnCode = -1;
prog->error = errno;
return;
}

/* Flush stdio channels just before fork, to avoid double-output problems */
fflush(stdout);
fflush(stderr);
Expand Down Expand Up @@ -326,6 +338,18 @@ execute_program(Program *prog)
return;
}

/* first level sanity check */
if (access(prog->program, F_OK | X_OK) == -1)
{
fprintf(stderr, "Failed to find executable program at \"%s\": %s\n",
prog->program,
strerror(errno));

prog->returnCode = -1;
prog->error = errno;
return;
}

if (prog->tty == false)
{
/*
Expand Down
5 changes: 0 additions & 5 deletions src/bin/pg_autoctl/azure.c
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,6 @@ azure_start_command(Program *program)
pid_t fpid;
char command[BUFSIZE] = { 0 };

IntString semIdString = intToString(log_semaphore.semId);

(void) snprintf_program_command_line(program, command, sizeof(command));

if (dryRun)
Expand All @@ -174,9 +172,6 @@ azure_start_command(Program *program)
fflush(stdout);
fflush(stderr);

/* we want to use the same logs semaphore in the sub-processes */
setenv(PG_AUTOCTL_LOG_SEMAPHORE, semIdString.strValue, 1);

/* time to create the node_active sub-process */
fpid = fork();

Expand Down
9 changes: 9 additions & 0 deletions src/bin/pg_autoctl/cli_do_tmux.c
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,15 @@ tmux_start_server(const char *scriptName, const char *binpath)
char tmux[MAXPGPATH] = { 0 };
char command[BUFSIZE] = { 0 };

/*
* Here we are going to remain the parent process of multiple pg_autoctl
* top-level processes, one per node (and the monitor). We don't want all
* those processes to use the same semaphore for logging, so make sure we
* remove ourselves from the environment before we start all the
* sub-processes.
*/
unsetenv(PG_AUTOCTL_LOG_SEMAPHORE);

if (setenv("PG_AUTOCTL_DEBUG", "1", 1) != 0)
{
log_error("Failed to set environment PG_AUTOCTL_DEBUG: %m");
Expand Down
3 changes: 0 additions & 3 deletions src/bin/pg_autoctl/demoapp.c
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,6 @@ demoapp_run(const char *pguri, DemoAppOptions *demoAppOptions)
int startedClientsCount = 0;
pid_t clientsPidArray[MAX_CLIENTS_COUNT + 1] = { 0 };

IntString semIdString = intToString(log_semaphore.semId);

log_info("Starting %d concurrent clients as sub-processes", clientsCount);


Expand All @@ -195,7 +193,6 @@ demoapp_run(const char *pguri, DemoAppOptions *demoAppOptions)
fflush(stderr);

/* we want to use the same logs semaphore in the sub-processes */
setenv(PG_AUTOCTL_LOG_SEMAPHORE, semIdString.strValue, 1);

for (int index = 0; index <= clientsCount; index++)
{
Expand Down
67 changes: 63 additions & 4 deletions src/bin/pg_autoctl/lock_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,44 @@ semaphore_init(Semaphore *semaphore)
}
else
{
return semaphore_create(semaphore);
bool success = semaphore_create(semaphore);

/*
* Only the main process should unlink the semaphore at exit time.
*
* When we create a semaphore, ensure we put our semId in the expected
* environment variable (PG_AUTOCTL_LOG_SEMAPHORE), and we assign the
* current process' pid as the semaphore owner.
*
* When we open a pre-existing semaphore using PG_AUTOCTL_LOG_SEMAPHORE
* as the semId, the semaphore owner is left to zero.
*
* The atexit(3) function that removes the semaphores only acts when
* the owner is our current pid. That way, in case of an early failure
* in execv(), the semaphore is not dropped from under the main
* program.
*
* A typical way execv() would fail is when calling run_program() on a
* pathname that does not exists.
*
* Per atexit(3) manual page:
*
* When a child process is created via fork(2), it inherits copies of
* its parent's registrations. Upon a successful call to one of the
* exec(3) functions, all registrations are removed.
*
* And that's why it's important that we don't remove the semaphore in
* the atexit() cleanup function when a call to run_command() fails
* early.
*/
if (success)
{
IntString semIdString = intToString(semaphore->semId);

setenv(PG_AUTOCTL_LOG_SEMAPHORE, semIdString.strValue, 1);
}

return success;
}
}

Expand All @@ -64,15 +101,33 @@ semaphore_init(Semaphore *semaphore)
bool
semaphore_finish(Semaphore *semaphore)
{
if (env_exists(PG_AUTOCTL_LOG_SEMAPHORE))
/*
* At initialization time we either create a new semaphore and register
* getpid() as the owner, or we open a previously existing semaphore from
* its semId as found in our environment variable PG_AUTOCTL_LOG_SEMAPHORE.
*
* At finish time (called from the atexit(3) registry), we remove the
* semaphore only when we are the owner of it. We expect semaphore->owner
* to be either zero (0), or to have been filled with our own pid.
*/
if (semaphore->owner == 0)
{
/* there's no semaphore closing protocol in SysV */
return true;
}
else
else if (semaphore->owner == getpid())
{
return semaphore_unlink(semaphore);
}
else
{
log_fatal("BUG: semaphore_finish semId %d owner is %d, getpid is %d",
semaphore->semId,
semaphore->owner,
getpid());

return false;
}
}


Expand All @@ -84,6 +139,7 @@ semaphore_create(Semaphore *semaphore)
{
union semun semun;

semaphore->owner = getpid();
semaphore->semId = semget(IPC_PRIVATE, 1, 0600);

if (semaphore->semId < 0)
Expand Down Expand Up @@ -122,7 +178,10 @@ semaphore_create(Semaphore *semaphore)
bool
semaphore_open(Semaphore *semaphore)
{
char semIdString[BUFSIZE];
char semIdString[BUFSIZE] = { 0 };

/* ensure the owner is set to zero when we re-open an existing semaphore */
semaphore->owner = 0;

if (!get_env_copy(PG_AUTOCTL_LOG_SEMAPHORE, semIdString, BUFSIZE))
{
Expand Down
1 change: 1 addition & 0 deletions src/bin/pg_autoctl/lock_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
typedef struct Semaphore
{
int semId;
pid_t owner;
} Semaphore;


Expand Down
2 changes: 1 addition & 1 deletion src/bin/pg_autoctl/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ char *ps_buffer; /* will point to argv area */
size_t ps_buffer_size; /* space determined at run time */
size_t last_status_len; /* use to minimize length of clobber */

Semaphore log_semaphore; /* allows inter-process locking */
Semaphore log_semaphore = { 0 }; /* allows inter-process locking */


static void set_logger(void);
Expand Down
2 changes: 0 additions & 2 deletions src/bin/pg_autoctl/service_keeper.c
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,8 @@ service_keeper_runprogram(Keeper *keeper)
* --pgdata was parsed from the main command line to start our sub-process.
*/
char *pgdata = keeperOptions.pgSetup.pgdata;
IntString semIdString = intToString(log_semaphore.semId);

setenv(PG_AUTOCTL_DEBUG, "1", 1);
setenv(PG_AUTOCTL_LOG_SEMAPHORE, semIdString.strValue, 1);

args[argsIndex++] = (char *) pg_autoctl_program;
args[argsIndex++] = "do";
Expand Down
2 changes: 0 additions & 2 deletions src/bin/pg_autoctl/service_monitor.c
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,8 @@ service_monitor_runprogram(Monitor *monitor)
IS_EMPTY_STRING_BUFFER(monitorOptions.pgSetup.pgdata)
? keeperOptions.pgSetup.pgdata
: monitorOptions.pgSetup.pgdata;
IntString semIdString = intToString(log_semaphore.semId);

setenv(PG_AUTOCTL_DEBUG, "1", 1);
setenv(PG_AUTOCTL_LOG_SEMAPHORE, semIdString.strValue, 1);

args[argsIndex++] = (char *) pg_autoctl_program;
args[argsIndex++] = "do";
Expand Down
3 changes: 0 additions & 3 deletions src/bin/pg_autoctl/service_monitor_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,13 @@ service_monitor_init_start(void *context, pid_t *pid)
* then quit, avoiding to call the atexit() semaphore clean-up
* function.
*/
IntString semIdString = intToString(log_semaphore.semId);

const char *serviceName = createAndRun ?
"pg_autoctl: monitor listener" :
"pg_autoctl: monitor installer";

(void) set_ps_title(serviceName);

setenv(PG_AUTOCTL_LOG_SEMAPHORE, semIdString.strValue, 1);

/* finish the install if necessary */
if (!monitor_install(config->hostname, *pgSetup, false))
{
Expand Down
2 changes: 0 additions & 2 deletions src/bin/pg_autoctl/service_postgres_ctl.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,7 @@ service_postgres_ctl_runprogram()
? keeperOptions.pgSetup.pgdata
: monitorOptions.pgSetup.pgdata;

IntString semIdString = intToString(log_semaphore.semId);
setenv(PG_AUTOCTL_DEBUG, "1", 1);
setenv(PG_AUTOCTL_LOG_SEMAPHORE, semIdString.strValue, 1);

args[argsIndex++] = (char *) pg_autoctl_program;
args[argsIndex++] = "do";
Expand Down

0 comments on commit c1dfd70

Please sign in to comment.