Skip to content

Commit

Permalink
Add down-scaling support for the auto-scaling
Browse files Browse the repository at this point in the history
  • Loading branch information
bogdan-iancu committed Feb 6, 2019
1 parent f583b7a commit e2032c7
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 22 deletions.
5 changes: 3 additions & 2 deletions main.c
Original file line number Diff line number Diff line change
Expand Up @@ -540,9 +540,10 @@ void handle_sigs(void)
LM_DBG("unknown child process %d ended. Ignoring\n",chld);
continue;
}
if (pt[i].flags & OSS_PROC_DYNAMIC) {
LM_DBG("dynamic forked process %d/%d ended with "
if (pt[i].flags & OSS_PROC_SELFEXIT) {
LM_WARN("process %d/%d did selfexit with "
"status %d\n", i, chld, WTERMSIG(chld_status));
reset_process_slot(i);
continue;
}
do_exit = 1;
Expand Down
6 changes: 5 additions & 1 deletion net/net_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1614,7 +1614,11 @@ static void tcp_main_server(void)
}
}
/* add all the unix sockets used for communcation with other opensips
* processes (get fd, new connection a.s.o) */
* processes (get fd, new connection a.s.o)
* NOTE: we add even the socks for the inactive/unfork processes - the
* socks are already created, but the triggering is from proc to
* main, having them into reactor is harmless - thye will never
* trigger as there is no proc on the other end to write us */
for (n=1; n<counted_max_processes; n++) {
/* skip myslef (as process) and -1 socks (disabled)
(we can't have 0, we never close it!) */
Expand Down
57 changes: 53 additions & 4 deletions net/net_udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@

/* if the UDP network layer is used or not by some protos */
static int udp_disabled = 1;
/* flag per process to control the termination stages */
static int _termination_in_progress = 0;

extern void handle_sigs(void);

Expand Down Expand Up @@ -284,6 +286,14 @@ inline static int handle_io(struct fd_map* fm, int idx,int event_type)
n = -1;
break;
}

if (reactor_is_empty() && _termination_in_progress==1) {
LM_WARN("reactor got empty while termination in progress\n");
ipc_handle_all_pending_jobs(IPC_FD_READ_SELF);
if (reactor_is_empty())
dynamic_process_final_exit();
}

pt_become_idle();
return n;
}
Expand Down Expand Up @@ -329,12 +339,12 @@ int udp_proc_reactor_init( struct socket_info *si )
}


int fork_dynamic_udp_process(void *si_filter)
static int fork_dynamic_udp_process(void *si_filter)
{
struct socket_info *si = (struct socket_info*)si_filter;
int p_id;

if ( (p_id=internal_fork( "UDP receiver", 0, TYPE_UDP))<0 ) {
if ((p_id=internal_fork( "UDP receiver", OSS_PROC_DYNAMIC, TYPE_UDP))<0) {
LM_CRIT("cannot fork UDP process\n");
return(-1);
} else if (p_id==0) {
Expand All @@ -359,14 +369,52 @@ int fork_dynamic_udp_process(void *si_filter)
error:
report_failure_status();
LM_ERR("Initializing new process failed, exiting with error \n");
exit(-1);
pt[process_no].flags |= OSS_PROC_SELFEXIT;
exit( -1);
} else {
/*parent/main*/
return p_id;
}
}


static void udp_process_graceful_terminate(int sender, void *param)
{
/* we accept this only from the main proccess */
if (sender!=0) {
LM_BUG("graceful terminate received from a non-main process!!\n");
return;
}
LM_NOTICE("process %d received RPC to terminate from Main\n",process_no);

/*remove from reactor all the shared fds, so we stop reading from them */

/*remove timer jobs pipe */
reactor_del_reader( timer_fd_out, -1, 0);

/*remove IPC dispatcher pipe */
reactor_del_reader( IPC_FD_READ_SHARED, -1, 0);

/*remove network interface */
reactor_del_reader( bind_address->socket, -1, 0);

/*remove private IPC pipe */
reactor_del_reader( IPC_FD_READ_SELF, -1, 0);

/* let's drain the private IPC */
ipc_handle_all_pending_jobs(IPC_FD_READ_SELF);

/* what is left now is the reactor are async fd's, so we need to
* wait to complete all of them */
if (reactor_is_empty())
dynamic_process_final_exit();

/* the exit will be triggered by the reactor, when empty */
_termination_in_progress = 1;
LM_WARN("reactor not empty, waiting for pending async\n");
}


/* starts all UDP related processes */
int udp_start_processes(int *chd_rank, int *startup_done)
{
Expand All @@ -385,7 +433,8 @@ int udp_start_processes(int *chd_rank, int *startup_done)

if (enable_dynamic_workers &&
create_process_group( TYPE_UDP, si, si->workers_min,
si->workers_max, fork_dynamic_udp_process)!=0)
si->workers_max, fork_dynamic_udp_process,
udp_process_graceful_terminate)!=0)
LM_ERR("failed to create group of UDP processes for <%.*s>, "
"auto forking will not be possible\n",
si->name.len, si->name.s);
Expand Down
70 changes: 58 additions & 12 deletions pt.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "mem/shm_mem.h"
#include "net/net_tcp.h"
#include "net/net_udp.h"
#include "db/db_insertq.h"
#include "socket_info.h"
#include "sr_module.h"
#include "dprint.h"
Expand Down Expand Up @@ -189,7 +190,7 @@ void set_proc_attrs( char *fmt, ...)
* it can be reused later
* WARNING: this should be called only by main process and when it is 100%
* that the process mapped on this slot is not running anymore */
static void _reset_process_slot( int p_id )
void reset_process_slot( int p_id )
{
if (is_main==0) {
LM_BUG("buggy call from non-main process!!!");
Expand Down Expand Up @@ -281,7 +282,7 @@ int internal_fork(char *proc_desc, unsigned int flags,
if ( (pid=fork())<0 ){
LM_CRIT("cannot fork \"%s\" process (%d: %s)\n",proc_desc,
errno, strerror(errno));
_reset_process_slot( new_idx );
reset_process_slot( new_idx );
return -1;
}

Expand Down Expand Up @@ -354,6 +355,7 @@ struct process_group {
enum process_type type;
struct socket_info *si_filter;
fork_new_process_f *fork_func;
terminate_process_f *term_func;
unsigned int max_procs;
unsigned int min_procs;
/* some reference to a profile to give us params for fork/rip procs */
Expand All @@ -364,17 +366,17 @@ struct process_group {
struct process_group *next;
};

#define PG_HISTORY_DEFAULT_SIZE 6 /*to be replaced with val from profile*/
#define PG_HIGH_MIN_SCORE 3 /*to be replaced with val from profile*/
#define PG_HLOAD_TRESHOLD 70 /*to be replaced with val from profile*/
#define PG_HISTORY_DEFAULT_SIZE 5 /*to be replaced with val from profile*/
#define PG_HIGH_MIN_SCORE 4 /*to be replaced with val from profile*/
#define PG_HLOAD_TRESHOLD 50 /*to be replaced with val from profile*/
#define PG_LLOAD_TRESHOLD 20 /*to be replaced with val from profile*/

struct process_group *pg_head = NULL;

int create_process_group(enum process_type type,
struct socket_info *si_filter,
unsigned int min_procs, unsigned int max_procs,
fork_new_process_f *f)
fork_new_process_f *f1, terminate_process_f *f2)
{
struct process_group *pg, *it;

Expand All @@ -394,7 +396,8 @@ int create_process_group(enum process_type type,
pg->si_filter = si_filter;
pg->max_procs = max_procs;
pg->min_procs = min_procs;
pg->fork_func = f;
pg->fork_func = f1;
pg->term_func = f2;
pg->next = NULL;

pg->history_size = PG_HISTORY_DEFAULT_SIZE;
Expand All @@ -414,20 +417,39 @@ int create_process_group(enum process_type type,
}


void rescale_group_history(struct process_group *pg, unsigned int idx,
int org_size, int offset)
{
unsigned int k;
unsigned char old;

k = idx;
do {
old = pg->history_map[k] ;
pg->history_map[k] = (pg->history_map[k]*org_size)/(org_size+offset);
LM_DBG("rescaling old %d to %d [idx %d]\n",
old, pg->history_map[k], k);

k = k ? (k-1) : (PG_HISTORY_DEFAULT_SIZE-1) ;
} while(k!=idx);
}


void check_and_adjust_number_of_workers(void)
{
struct process_group *pg;
unsigned int i, k, idx;
unsigned int load;
unsigned int procs_no;
unsigned char cnt_under, cnt_over;
int p_id;
int p_id, last_idx_in_pg;

/* iterate all the groups we have */
for ( pg=pg_head ; pg ; pg=pg->next ) {

load = 0;
procs_no = 0;
last_idx_in_pg = -1;

/* find the processes belonging to this group */
for ( i=0 ; i<counted_max_processes ; i++) {
Expand All @@ -436,6 +458,7 @@ void check_and_adjust_number_of_workers(void)
continue;

load += get_stat_val( pt[i].load_rt );
last_idx_in_pg = i;
procs_no++;

}
Expand Down Expand Up @@ -463,31 +486,37 @@ void check_and_adjust_number_of_workers(void)
/* decide what to do */
if (cnt_over>=PG_HIGH_MIN_SCORE) {
if (procs_no<pg->max_procs) {
LM_DBG("score %d/%d -> forking new proc in group %d "
LM_NOTICE("score %d/%d -> forking new proc in group %d "
"(with %d procs)\n", cnt_over, PG_HISTORY_DEFAULT_SIZE,
pg->type, procs_no);
/* we need to fork one more process here */
if ( (p_id=pg->fork_func(pg->si_filter))<0 ||
wait_for_one_child()<0 ) {
LM_ERR("failed to fork new process for group %d "
"(current %d procs)\n",pg->type,procs_no);
if (p_id>0)
_reset_process_slot( p_id );
} else {
rescale_group_history( pg, idx, procs_no, +1);
pg->no_downscale_cycles = 10*PG_HISTORY_DEFAULT_SIZE;
}
}
} else if (cnt_under==PG_HISTORY_DEFAULT_SIZE) {
if (procs_no>pg->min_procs && procs_no!=1 &&
pg->no_downscale_cycles==0) {
/* try to estimate the load after downscaling */
load = (pg->history_map[idx]*procs_no) / (procs_no-1);
load = 0;
k = idx;
do {
load += pg->history_map[k];
k = k ? (k-1) : (PG_HISTORY_DEFAULT_SIZE-1) ;
} while(k!=idx);
load = (load*procs_no) / (procs_no-1);
if (load<PG_HLOAD_TRESHOLD) {
/* down scale one more process here */
LM_DBG("score %d/%d -> ripping one proc from group %d "
"(with %d procs), estimated load -> %d\n", cnt_under,
PG_HISTORY_DEFAULT_SIZE, pg->type, procs_no,
load );
ipc_send_rpc( last_idx_in_pg, pg->term_func, NULL);
}
}
}
Expand All @@ -496,3 +525,20 @@ void check_and_adjust_number_of_workers(void)
if (pg->no_downscale_cycles) pg->no_downscale_cycles--;
}
}


void dynamic_process_final_exit(void)
{
/* prevent any more IPC */
pt[process_no].ipc_pipe[0] = -1;
pt[process_no].ipc_pipe[1] = -1;

/* clear the per-process connection from the DB queues */
ql_force_process_disconnect(process_no);

/* mark myself as DYNAMIC (just in case) to have an err-less terminatio */
pt[process_no].flags |= OSS_PROC_SELFEXIT;

/* the process slot in the proc table will be purge on SIGCHLG by main */
exit(0);
}
10 changes: 8 additions & 2 deletions pt.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

#include "pt_load.h"
#include "socket_info.h"
#include "ipc.h"

#define MAX_PT_DESC 128

Expand Down Expand Up @@ -87,7 +88,6 @@ struct process_table {
struct proc_load_info load;
};

typedef int (*forked_proc_func)(int i);

extern struct process_table *pt;
extern int process_no;
Expand All @@ -103,6 +103,7 @@ int count_init_child_processes(void);
#define OSS_PROC_DOING_DUMP (1<<3) /* this process is writing a corefile */
#define OSS_PROC_DYNAMIC (1<<4) /* proc was created at runtime */
#define OSS_PROC_IS_RUNNING (1<<5) /* proc is running */
#define OSS_PROC_SELFEXIT (1<<6) /* proc does controlled exit */

#define is_process_running(_idx) \
( (pt[_idx].flags&OSS_PROC_IS_RUNNING)?1:0 )
Expand Down Expand Up @@ -131,12 +132,17 @@ inline static int get_process_ID_by_PID(pid_t pid)


typedef int (fork_new_process_f)(void *);
typedef ipc_rpc_f terminate_process_f;

int create_process_group(enum process_type type,
struct socket_info *si_filter,
unsigned int min_procs, unsigned int max_procs,
fork_new_process_f *f);
fork_new_process_f *f1, terminate_process_f *f2);

void check_and_adjust_number_of_workers(void);

void reset_process_slot(int p_id);

void dynamic_process_final_exit(void);

#endif
2 changes: 1 addition & 1 deletion timer.c
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ int start_timer_extra_processes(int *chd_rank)
pid_t pid;

if (enable_dynamic_workers &&
create_process_group( TYPE_TIMER, NULL, 0, 0,NULL)!=0)
create_process_group( TYPE_TIMER, NULL, 0, 0,NULL,NULL)!=0)
LM_ERR("failed to create group of TIMER processes, "
"auto forking will not be possible\n");

Expand Down

0 comments on commit e2032c7

Please sign in to comment.