Skip to content

Commit

Permalink
Add exec() with full async support.
Browse files Browse the repository at this point in the history
This is the first module function that uses the async support from core - in "sleeps" in async reactor until data in available (for stdout), allowing other tasks to be handled in the mean while by the process.
  • Loading branch information
bogdan-iancu committed Dec 19, 2014
1 parent 51d76a9 commit e7a090f
Show file tree
Hide file tree
Showing 3 changed files with 269 additions and 8 deletions.
179 changes: 175 additions & 4 deletions modules/exec/exec.c
@@ -1,8 +1,4 @@
/*
*
* $Id$
*
*
* Copyright (C) 2001-2003 FhG Fokus
*
* This file is part of opensips, a free SIP server.
Expand Down Expand Up @@ -35,6 +31,7 @@
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
/*
#include <sys/resource.h>
*/
Expand Down Expand Up @@ -649,3 +646,177 @@ int exec_sync(struct sip_msg* msg, str* command, str* input, gparam_p outvar, gp

return ret;
}


int start_async_exec(struct sip_msg* msg, str* command, str* input, gparam_p outvar)
{
pid_t pid;
FILE *pin, *pout;
int val, fd;

if (input || outvar) {
pid = __popen3(command->s, input ? &pin : NULL,
outvar ? &pout : NULL,
NULL);
} else {
pid = fork();
if (pid == 0) {
/* child process*/
execl("/bin/sh", "/bin/sh", "-c", command->s, NULL);
exit(-1);
}
if (pid<0) {
/*error of fork*/
LM_ERR("failed to fork (%s)\n",strerror(errno));
goto error;
}
}

if (input->len) {
if ( (val=fwrite(input->s, 1, input->len, pin)) != input->len) {
LM_ERR("failed to write all (%d needed, %d written) to input pipe,"
" but continuing\n",input->len,val);
}

if (ferror(pin)) {
LM_ERR("failure detected (%s), continuing..\n",strerror(errno));
}
pclose(pin);
}

/* set time to kill on the new process */
schedule_to_kill(pid);

if (outvar==NULL) {
/* nothing to wait for, simply return "no-async" indication */
return -1;
}

/* prepare the read FD and make it non-blocking */
if ( (fd=dup( fileno( pout ) ))<0 ) {
LM_ERR("dup failed: (%d) %s\n", errno, strerror(errno));
goto error;
}
val = fcntl( fd, F_GETFL);
if (val==-1){
LM_ERR("fnctl failed: (%d) %s\n", errno, strerror(errno));
goto error2;
}
if (fcntl( fd , F_SETFL, val|O_NONBLOCK)==-1){
LM_ERR("set non-blocking failed: (%d) %s\n",
errno, strerror(errno));
goto error2;
}

fclose(pout);

/* async started with success */
return fd;

error2:
close(fd);
error:
/* async failed */
if (outvar)
pclose(pout);
return -1;
}


enum async_ret_code resume_async_exec(int fd, struct sip_msg *msg, void *param)
{
#define MAX_LINE_SIZE 1024
char buf[MAX_LINE_SIZE+1];
exec_async_param *p = (exec_async_param*)param;
pv_value_t outval;
char *s1, *s2;
int n, len;

if (p->buf) {
memcpy( buf, p->buf, p->buf_len);
len = p->buf_len;
shm_free(p->buf);
p->buf = NULL;
} else {
len = 0;
}

do {
n=read( fd, buf+len, MAX_LINE_SIZE-len);
LM_DBG(" read %d [%.*s] \n",n, n<0?0:n,buf+len);
if (n<0) {
if (errno==EINTR) continue;
if (errno==EAGAIN || errno==EWOULDBLOCK) {
/* nothing more to read */
if (len) {
/* store what is left */
if ((p->buf=(char*)shm_malloc(len))==NULL) {
LM_ERR("failed to allocate buffer\n");
goto error;
}
memcpy( p->buf, buf, len);
p->buf_len = len;
LM_DBG(" storing %d [%.*s] \n", p->buf_len, p->buf_len, p->buf);
}
/* async should continue */
return ASYNC_CONTINUE;
}
LM_ERR("read failed with %d (%s)\n",errno, strerror(errno));
/* terminate everything */
goto error;
}
/* EOF ? */
if (n==0) {
if (len) {
/* take whatever is left in buffer and push it as var */
outval.flags = PV_VAL_STR;
outval.rs.s = buf;
outval.rs.len = len;
LM_DBG("setting var [%.*s]\n",outval.rs.len,outval.rs.s);
if (pv_set_value(msg, &p->outvar->v.pve->spec, 0, &outval) < 0) {
LM_ERR("failed to set variable :(, continuing \n");
}
}
break;
}
/* succesful reading ( n>0 ) */
LM_DBG(" having %d [%.*s] \n", len+n, len+n, buf);
if (n+len==MAX_LINE_SIZE) {
/* we have full buffer, pack it as a line */
buf[n+len] = '\n';
n++;
}
/* search for '\n' in the newly read data */
s1 = buf;
while ( (buf+len+n-s1>0) && ((s2=q_memchr(s1, '\n', buf+len+n-s1))!=NULL) ) {
/* push it as var */
outval.flags = PV_VAL_STR;
outval.rs.s = s1;
outval.rs.len = s2-s1;
LM_DBG("setting var [%.*s]\n",outval.rs.len,outval.rs.s);
if (pv_set_value(msg, &p->outvar->v.pve->spec, 0, &outval) < 0) {
LM_ERR("failed to set variable :(, continuing \n");
}
s1 = s2+1;
}
/* any data consumed ? */
if ( s1!=buf+len ) {
/* yes -> shift the whole buffer to left */
len = buf+len+n-s1;
if (len) memmove( buf, s1, len);
} else {
/* no -> increase the len of the buffer */
len += n;
}
}while(1);

/* done with the async */
shm_free(param);
return ASYNC_DONE;

error:
shm_free(param);
return ASYNC_ERROR;
#undef MAX_LINE_SIZE
}

8 changes: 8 additions & 0 deletions modules/exec/exec.h
Expand Up @@ -41,6 +41,12 @@ typedef struct _exec_list {
exec_cmd_t *first, *last;
} exec_list_t, *exec_list_p;

typedef struct _exec_async_param {
gparam_p outvar;
char *buf;
int buf_len;
} exec_async_param;

/* list head */
extern exec_list_p exec_async_list;

Expand All @@ -53,6 +59,8 @@ int exec_msg(struct sip_msg *msg, char *cmd );
int exec_avp(struct sip_msg *msg, char *cmd, pvname_list_p avpl);
int exec_getenv(struct sip_msg *msg, char *cmd, pvname_list_p avpl);
int exec_sync(struct sip_msg* msg, str* command, str* input, gparam_p outvar, gparam_p errvar);
int start_async_exec(struct sip_msg* msg, str* command, str* input, gparam_p outvar);
enum async_ret_code resume_async_exec(int fd, struct sip_msg *msg, void *param);

#endif

90 changes: 86 additions & 4 deletions modules/exec/exec_mod.c
@@ -1,8 +1,6 @@
/*
* execution module
*
* $Id$
*
* Copyright (C) 2001-2003 FhG Fokus
*
* This file is part of opensips, a free SIP server.
Expand Down Expand Up @@ -57,6 +55,9 @@ inline static int w_exec_msg(struct sip_msg* msg, char* cmd, char* foo);
inline static int w_exec_avp(struct sip_msg* msg, char* cmd, char* avpl);
inline static int w_exec_getenv(struct sip_msg* msg, char* cmd, char* avpl);
inline static int w_exec(struct sip_msg* msg, char* cmd, char* in, char* out, char* err, char* avp_env);
inline static int w_async_exec(struct sip_msg* msg,
async_resume_module **resume_f, void **resume_param,
char *cmd, char* out, char* in, char* err, char* avp_env );

static int exec_avp_fixup(void** param, int param_no);
static int exec_fixup(void** param, int param_no);
Expand All @@ -66,6 +67,15 @@ inline static void exec_shutdown(void);
/*
* Exported functions
*/
static acmd_export_t acmds[] = {
{"exec", (acmd_function)w_async_exec, 5, exec_fixup },
{"exec", (acmd_function)w_async_exec, 4, exec_fixup },
{"exec", (acmd_function)w_async_exec, 3, exec_fixup },
{"exec", (acmd_function)w_async_exec, 2, exec_fixup },
{"exec", (acmd_function)w_async_exec, 1, exec_fixup },
{0, 0, 0, 0}
};

static cmd_export_t cmds[] = {
{"exec_dset", (cmd_function)w_exec_dset, 1, fixup_spve_null, 0,
REQUEST_ROUTE|FAILURE_ROUTE},
Expand Down Expand Up @@ -110,7 +120,7 @@ struct module_exports exports= {
DEFAULT_DLFLAGS,/* dlopen flags */
NULL, /* OpenSIPS module dependencies */
cmds, /* Exported functions */
0, /* Exported async functions */
acmds, /* Exported async functions */
params, /* Exported parameters */
0, /* exported statistics */
0, /* exported MI functions */
Expand Down Expand Up @@ -479,7 +489,6 @@ static struct hf_wrapper* get_avp_values_list(struct sip_msg* msg, pv_param_p av

static int w_exec(struct sip_msg* msg, char* cmd, char* in, char* out, char* err ,char* avp_env)
{

str command;
str input = {NULL, 0};
int ret;
Expand Down Expand Up @@ -528,3 +537,76 @@ static int w_exec(struct sip_msg* msg, char* cmd, char* in, char* out, char* err

return ret;
}


static int w_async_exec(struct sip_msg* msg, async_resume_module **resume_f, void **resume_param,
char* cmd, char* out, char* in, char* err, char* avp_env)
{
str command;
str input = {NULL, 0};
struct hf_wrapper *hf=0;
environment_t* backup_env=0;
gparam_p outvar = (gparam_p)out;
exec_async_param *param;
int ret;

if (msg == 0 || cmd == 0)
return -1;

/* fetch command */
if(fixup_get_svalue(msg, (gparam_p)cmd, &command)!=0) {
LM_ERR("invalid command parameter\n");
return -1;
}

/* fetch input */
if (in != NULL) {
if (pv_printf_s(msg, (pv_elem_p)in, &input)!=0)
return -1;
}

if (avp_env != NULL) {
if ((hf=get_avp_values_list(msg, &((gparam_p)avp_env)->v.pve->spec.pvp)) == 0)
return -1;
backup_env=replace_env(hf);
if (!backup_env) {
LM_ERR("replace env failed\n");
release_vars(hf);
release_hf_struct(hf);
return -1;
}
release_hf_struct(hf);
}

/* better do this alloc now (before starting the async) to avoid
* the unplesant situation of having the async started and have a
* memory failure -> tricky to recover */
param = (exec_async_param*)shm_malloc(sizeof(exec_async_param));
if(param==NULL) {
LM_ERR("failed to allocate new async param\n");
if (backup_env) unset_env(backup_env);
return -1;
}

ret = start_async_exec(msg, &command, &input, outvar);

if (backup_env)
unset_env(backup_env);

/* populate resume point (if async started) */
if (ret>0) {
param->outvar = outvar;
/* that ^^^^ is save as "out" is a in private mem, but in all
* processes (set before forking) */
param->buf = NULL;
*resume_param = (void*)param;
*resume_f = resume_async_exec;
} else {
shm_free(param);
*resume_param = NULL;
*resume_f = NULL;
}

return ret;
}

0 comments on commit e7a090f

Please sign in to comment.