From afeb858654a3c26089c5c199bb6a4ba9b6dd2087 Mon Sep 17 00:00:00 2001 From: Bogdan-Andrei Iancu Date: Mon, 8 May 2023 10:28:33 +0300 Subject: [PATCH] Revert "Reduce number of sockets created by closing off one half" This reverts commit a571e938e57b39b5e9759af6a9eda47f8aa081e2. This revert is needed as the commit breaks the IPC communication during startup , see #3081 --- ipc.h | 8 ++--- pt.c | 97 +++++++++++------------------------------------------------ pt.h | 8 +++-- 3 files changed, 27 insertions(+), 86 deletions(-) diff --git a/ipc.h b/ipc.h index d5a06c01622..ff24342b382 100644 --- a/ipc.h +++ b/ipc.h @@ -28,12 +28,12 @@ extern int ipc_shared_fd_read; #define IPC_TYPE_NONE (-1) #define ipc_bad_handler_type(htype) ((htype) < 0) -#define IPC_FD_READ(_proc_no) pt[_proc_no].ipc_pipe -#define IPC_FD_WRITE(_proc_no) pt[_proc_no].ipc_pipe +#define IPC_FD_READ(_proc_no) pt[_proc_no].ipc_pipe[0] +#define IPC_FD_WRITE(_proc_no) pt[_proc_no].ipc_pipe[1] #define IPC_FD_READ_SELF IPC_FD_READ(process_no) #define IPC_FD_READ_SHARED ipc_shared_fd_read -#define IPC_FD_SYNC_READ(_proc_no) pt[_proc_no].ipc_sync_pipe -#define IPC_FD_SYNC_WRITE(_proc_no) pt[_proc_no].ipc_sync_pipe +#define IPC_FD_SYNC_READ(_proc_no) pt[_proc_no].ipc_sync_pipe[0] +#define IPC_FD_SYNC_WRITE(_proc_no) pt[_proc_no].ipc_sync_pipe[1] #define IPC_FD_SYNC_READ_SELF IPC_FD_SYNC_READ(process_no) /* prototype of IPC handler - function called by the IPC engine diff --git a/pt.c b/pt.c index 302bafc6500..c8e34e2f685 100644 --- a/pt.c +++ b/pt.c @@ -119,8 +119,8 @@ int init_multi_proc_support(void) /* reset fds to prevent bogus ops */ pt[i].unix_sock = -1; pt[i].pid = -1; - pt[i].ipc_pipe = -1; - pt[i].ipc_sync_pipe = -1; + pt[i].ipc_pipe[0] = pt[i].ipc_pipe[1] = -1; + pt[i].ipc_sync_pipe[0] = pt[i].ipc_sync_pipe[1] = -1; } /* create the load-related stats (initially marked as hidden */ @@ -235,8 +235,8 @@ void reset_process_slot( int p_id ) pt[p_id].desc[0] = 0; pt[p_id].flags = 0; - pt[p_id].ipc_pipe = -1; - pt[p_id].ipc_sync_pipe = -1; + pt[p_id].ipc_pipe[0] = pt[p_id].ipc_pipe[1] = -1; + pt[p_id].ipc_sync_pipe[0] = pt[p_id].ipc_sync_pipe[1] = -1; pt[p_id].unix_sock = -1; pt[p_id].log_level = pt[p_id].default_log_level = 0; /*not really needed*/ @@ -257,63 +257,6 @@ void reset_process_slot( int p_id ) #endif } -static int close_unused_pipes(int proc_no, int idx, int is_parent) -{ - int *fd; - - fd = &pt[proc_no].ipc_pipe_holder[idx]; - if (is_parent || *fd != -1) { - if (close(*fd) != 0) { - LM_BUG("failed to close pt[%d].ipc_pipe_holder[%d]" - " = %d, errno = %d\n", proc_no, idx, *fd, errno); - return -1; - } - if (is_parent) - *fd = -1; - } - fd = &pt[proc_no].ipc_sync_pipe_holder[idx]; - if (is_parent || *fd != -1) { - if (close(*fd) != 0) { - LM_BUG("failed to close pt[%d].ipc_sync_pipe_holder[%d]" - " = %d, errno = %d\n", proc_no, idx, *fd, errno); - return -1; - } - if (is_parent) - *fd = -1; - } - return 0; -} - -static int setup_child_ipc_pipes(int proc_no) -{ - int fd1, fd2, eval = 0; - - fd2 = pt[proc_no].ipc_pipe; - if (fd2 != -1) { - fd1 = pt[proc_no].ipc_pipe_holder[0]; - if (dup2(fd1, fd2) < 0) { - LM_BUG("failed to dup2(%d, pt[%d].ipc_pipe" - " = %d), errno = %d\n", fd1, proc_no, fd2, - errno); - eval = -1; - } - } - fd2 = pt[proc_no].ipc_sync_pipe; - if (fd2 != -1) { - fd1 = pt[proc_no].ipc_sync_pipe_holder[0]; - if (dup2(fd1, fd2) < 0) { - LM_BUG("failed to dup2(%d, pt[%d].ipc_sync_pipe" - " = %d), errno = %d\n", fd1, proc_no, fd2, - errno); - eval = -1; - } - } - for (int i = 0; i < counted_max_processes; i++) { - if (close_unused_pipes(i, 0, 0) != 0) - eval = -1; - } - return eval; -} enum {CHLD_STARTING, CHLD_OK, CHLD_FAILED}; @@ -325,9 +268,6 @@ static __attribute__((__noreturn__)) void child_startup_failed(void) static int internal_fork_child_setup(const struct internal_fork_params *ifpp) { - if (setup_child_ipc_pipes(process_no) != 0) - return -1; - init_log_level(); tcp_connect_proc_to_tcp_main(process_no, 1); @@ -377,16 +317,18 @@ int internal_fork(const struct internal_fork_params *ifpp) /* set the IPC pipes */ if ( (ifpp->flags & OSS_PROC_NO_IPC) ) { /* advertise no IPC to the rest of the procs */ - pt[new_idx].ipc_pipe = -1; - pt[new_idx].ipc_sync_pipe = -1; - for (int i = 0; i < 2; i++) { - if (close_unused_pipes(new_idx, i, 1) != 0) - return -1; - } + pt[new_idx].ipc_pipe[0] = -1; + pt[new_idx].ipc_pipe[1] = -1; + pt[new_idx].ipc_sync_pipe[0] = -1; + pt[new_idx].ipc_sync_pipe[1] = -1; + /* NOTE: the IPC fds will remain open in the other processes, + * but they will not be known */ } else { /* activate the IPC pipes */ - pt[new_idx].ipc_pipe=pt[new_idx].ipc_pipe_holder[1]; - pt[new_idx].ipc_sync_pipe=pt[new_idx].ipc_sync_pipe_holder[1]; + pt[new_idx].ipc_pipe[0]=pt[new_idx].ipc_pipe_holder[0]; + pt[new_idx].ipc_pipe[1]=pt[new_idx].ipc_pipe_holder[1]; + pt[new_idx].ipc_sync_pipe[0]=pt[new_idx].ipc_sync_pipe_holder[0]; + pt[new_idx].ipc_sync_pipe[1]=pt[new_idx].ipc_sync_pipe_holder[1]; } pt[new_idx].pid = 0; @@ -460,11 +402,6 @@ int internal_fork(const struct internal_fork_params *ifpp) goto child_is_down; } pt[new_idx].flags |= OSS_PROC_IS_RUNNING; - if ( (ifpp->flags & OSS_PROC_NO_IPC)==0 ) { - /* close the child's end of the pipes */ - if (close_unused_pipes(new_idx, 0, 1) != 0) - return -1; - } tcp_connect_proc_to_tcp_main( new_idx, 0); return new_idx; child_is_down: @@ -538,8 +475,10 @@ int count_child_processes(void) void dynamic_process_final_exit(void) { /* prevent any more IPC */ - pt[process_no].ipc_pipe = -1; - pt[process_no].ipc_sync_pipe = -1; + pt[process_no].ipc_pipe[0] = -1; + pt[process_no].ipc_pipe[1] = -1; + pt[process_no].ipc_sync_pipe[0] = -1; + pt[process_no].ipc_sync_pipe[1] = -1; /* clear the per-process connection from the DB queues */ ql_force_process_disconnect(process_no); diff --git a/pt.h b/pt.h index a12452c3f54..09c26bd4ac1 100644 --- a/pt.h +++ b/pt.h @@ -49,8 +49,10 @@ struct process_table { /* various flags describing properties of this process */ unsigned int flags; - /* pipe used by the process to receive designated jobs (used by IPC) */ - int ipc_pipe; + /* pipe used by the process to receive designated jobs (used by IPC) + * [1] for writting into by other process, + * [0] to listen on by this process */ + int ipc_pipe[2]; /* same as above, but the holder used when the corresponding process * does not exist */ int ipc_pipe_holder[2]; @@ -59,7 +61,7 @@ struct process_table { * this pipe should only be used by a process to synchronously receive a * message after he knows that some other process will send it for sure, * and there's no other job that can overlap in the meantime */ - int ipc_sync_pipe; + int ipc_sync_pipe[2]; /* same as above, but holder for non-existing processes */ int ipc_sync_pipe_holder[2];