Skip to content

Commit

Permalink
Revert "Reduce number of sockets created by closing off one half"
Browse files Browse the repository at this point in the history
This reverts commit a571e93.
This revert is needed as the commit breaks the IPC communication during startup , see #3081
  • Loading branch information
bogdan-iancu committed May 8, 2023
1 parent c9f5eff commit afeb858
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 86 deletions.
8 changes: 4 additions & 4 deletions ipc.h
Expand Up @@ -28,12 +28,12 @@ extern int ipc_shared_fd_read;
#define IPC_TYPE_NONE (-1)
#define ipc_bad_handler_type(htype) ((htype) < 0)

#define IPC_FD_READ(_proc_no) pt[_proc_no].ipc_pipe
#define IPC_FD_WRITE(_proc_no) pt[_proc_no].ipc_pipe
#define IPC_FD_READ(_proc_no) pt[_proc_no].ipc_pipe[0]
#define IPC_FD_WRITE(_proc_no) pt[_proc_no].ipc_pipe[1]
#define IPC_FD_READ_SELF IPC_FD_READ(process_no)
#define IPC_FD_READ_SHARED ipc_shared_fd_read
#define IPC_FD_SYNC_READ(_proc_no) pt[_proc_no].ipc_sync_pipe
#define IPC_FD_SYNC_WRITE(_proc_no) pt[_proc_no].ipc_sync_pipe
#define IPC_FD_SYNC_READ(_proc_no) pt[_proc_no].ipc_sync_pipe[0]
#define IPC_FD_SYNC_WRITE(_proc_no) pt[_proc_no].ipc_sync_pipe[1]
#define IPC_FD_SYNC_READ_SELF IPC_FD_SYNC_READ(process_no)

/* prototype of IPC handler - function called by the IPC engine
Expand Down
97 changes: 18 additions & 79 deletions pt.c
Expand Up @@ -119,8 +119,8 @@ int init_multi_proc_support(void)
/* reset fds to prevent bogus ops */
pt[i].unix_sock = -1;
pt[i].pid = -1;
pt[i].ipc_pipe = -1;
pt[i].ipc_sync_pipe = -1;
pt[i].ipc_pipe[0] = pt[i].ipc_pipe[1] = -1;
pt[i].ipc_sync_pipe[0] = pt[i].ipc_sync_pipe[1] = -1;
}

/* create the load-related stats (initially marked as hidden */
Expand Down Expand Up @@ -235,8 +235,8 @@ void reset_process_slot( int p_id )
pt[p_id].desc[0] = 0;
pt[p_id].flags = 0;

pt[p_id].ipc_pipe = -1;
pt[p_id].ipc_sync_pipe = -1;
pt[p_id].ipc_pipe[0] = pt[p_id].ipc_pipe[1] = -1;
pt[p_id].ipc_sync_pipe[0] = pt[p_id].ipc_sync_pipe[1] = -1;
pt[p_id].unix_sock = -1;

pt[p_id].log_level = pt[p_id].default_log_level = 0; /*not really needed*/
Expand All @@ -257,63 +257,6 @@ void reset_process_slot( int p_id )
#endif
}

static int close_unused_pipes(int proc_no, int idx, int is_parent)
{
int *fd;

fd = &pt[proc_no].ipc_pipe_holder[idx];
if (is_parent || *fd != -1) {
if (close(*fd) != 0) {
LM_BUG("failed to close pt[%d].ipc_pipe_holder[%d]"
" = %d, errno = %d\n", proc_no, idx, *fd, errno);
return -1;
}
if (is_parent)
*fd = -1;
}
fd = &pt[proc_no].ipc_sync_pipe_holder[idx];
if (is_parent || *fd != -1) {
if (close(*fd) != 0) {
LM_BUG("failed to close pt[%d].ipc_sync_pipe_holder[%d]"
" = %d, errno = %d\n", proc_no, idx, *fd, errno);
return -1;
}
if (is_parent)
*fd = -1;
}
return 0;
}

static int setup_child_ipc_pipes(int proc_no)
{
int fd1, fd2, eval = 0;

fd2 = pt[proc_no].ipc_pipe;
if (fd2 != -1) {
fd1 = pt[proc_no].ipc_pipe_holder[0];
if (dup2(fd1, fd2) < 0) {
LM_BUG("failed to dup2(%d, pt[%d].ipc_pipe"
" = %d), errno = %d\n", fd1, proc_no, fd2,
errno);
eval = -1;
}
}
fd2 = pt[proc_no].ipc_sync_pipe;
if (fd2 != -1) {
fd1 = pt[proc_no].ipc_sync_pipe_holder[0];
if (dup2(fd1, fd2) < 0) {
LM_BUG("failed to dup2(%d, pt[%d].ipc_sync_pipe"
" = %d), errno = %d\n", fd1, proc_no, fd2,
errno);
eval = -1;
}
}
for (int i = 0; i < counted_max_processes; i++) {
if (close_unused_pipes(i, 0, 0) != 0)
eval = -1;
}
return eval;
}

enum {CHLD_STARTING, CHLD_OK, CHLD_FAILED};

Expand All @@ -325,9 +268,6 @@ static __attribute__((__noreturn__)) void child_startup_failed(void)

static int internal_fork_child_setup(const struct internal_fork_params *ifpp)
{
if (setup_child_ipc_pipes(process_no) != 0)
return -1;

init_log_level();

tcp_connect_proc_to_tcp_main(process_no, 1);
Expand Down Expand Up @@ -377,16 +317,18 @@ int internal_fork(const struct internal_fork_params *ifpp)
/* set the IPC pipes */
if ( (ifpp->flags & OSS_PROC_NO_IPC) ) {
/* advertise no IPC to the rest of the procs */
pt[new_idx].ipc_pipe = -1;
pt[new_idx].ipc_sync_pipe = -1;
for (int i = 0; i < 2; i++) {
if (close_unused_pipes(new_idx, i, 1) != 0)
return -1;
}
pt[new_idx].ipc_pipe[0] = -1;
pt[new_idx].ipc_pipe[1] = -1;
pt[new_idx].ipc_sync_pipe[0] = -1;
pt[new_idx].ipc_sync_pipe[1] = -1;
/* NOTE: the IPC fds will remain open in the other processes,
* but they will not be known */
} else {
/* activate the IPC pipes */
pt[new_idx].ipc_pipe=pt[new_idx].ipc_pipe_holder[1];
pt[new_idx].ipc_sync_pipe=pt[new_idx].ipc_sync_pipe_holder[1];
pt[new_idx].ipc_pipe[0]=pt[new_idx].ipc_pipe_holder[0];
pt[new_idx].ipc_pipe[1]=pt[new_idx].ipc_pipe_holder[1];
pt[new_idx].ipc_sync_pipe[0]=pt[new_idx].ipc_sync_pipe_holder[0];
pt[new_idx].ipc_sync_pipe[1]=pt[new_idx].ipc_sync_pipe_holder[1];
}

pt[new_idx].pid = 0;
Expand Down Expand Up @@ -460,11 +402,6 @@ int internal_fork(const struct internal_fork_params *ifpp)
goto child_is_down;
}
pt[new_idx].flags |= OSS_PROC_IS_RUNNING;
if ( (ifpp->flags & OSS_PROC_NO_IPC)==0 ) {
/* close the child's end of the pipes */
if (close_unused_pipes(new_idx, 0, 1) != 0)
return -1;
}
tcp_connect_proc_to_tcp_main( new_idx, 0);
return new_idx;
child_is_down:
Expand Down Expand Up @@ -538,8 +475,10 @@ int count_child_processes(void)
void dynamic_process_final_exit(void)
{
/* prevent any more IPC */
pt[process_no].ipc_pipe = -1;
pt[process_no].ipc_sync_pipe = -1;
pt[process_no].ipc_pipe[0] = -1;
pt[process_no].ipc_pipe[1] = -1;
pt[process_no].ipc_sync_pipe[0] = -1;
pt[process_no].ipc_sync_pipe[1] = -1;

/* clear the per-process connection from the DB queues */
ql_force_process_disconnect(process_no);
Expand Down
8 changes: 5 additions & 3 deletions pt.h
Expand Up @@ -49,8 +49,10 @@ struct process_table {
/* various flags describing properties of this process */
unsigned int flags;

/* pipe used by the process to receive designated jobs (used by IPC) */
int ipc_pipe;
/* pipe used by the process to receive designated jobs (used by IPC)
* [1] for writting into by other process,
* [0] to listen on by this process */
int ipc_pipe[2];
/* same as above, but the holder used when the corresponding process
* does not exist */
int ipc_pipe_holder[2];
Expand All @@ -59,7 +61,7 @@ struct process_table {
* this pipe should only be used by a process to synchronously receive a
* message after he knows that some other process will send it for sure,
* and there's no other job that can overlap in the meantime */
int ipc_sync_pipe;
int ipc_sync_pipe[2];
/* same as above, but holder for non-existing processes */
int ipc_sync_pipe_holder[2];

Expand Down

0 comments on commit afeb858

Please sign in to comment.