Skip to content

Commit

Permalink
rest_client: add optional asynchronous capabilities to rest_get()
Browse files Browse the repository at this point in the history
rest_get() may now be used in an asynchronous manner, thanks to libcurl's
multi interface. Its syntax is identical to its blocking counterpart.

Scripting example:
    async(rest_get("URL", "$var(res)"), my_resume_route);
  • Loading branch information
liviuchircu authored and razvancrainea committed Jan 14, 2015
1 parent 651f0e2 commit c3d9ceb
Show file tree
Hide file tree
Showing 4 changed files with 442 additions and 33 deletions.
6 changes: 6 additions & 0 deletions modules/rest_client/rest_cb.c
Expand Up @@ -86,6 +86,12 @@ size_t header_func(char *ptr, size_t size, size_t nmemb, void *userdata)
left--;
}

st->s = pkg_realloc(st->s, left);
if (!st->s) {
LM_ERR("no more pkg mem\n");
return E_OUT_OF_MEM;
}

st->len = left;
memcpy(st->s, ptr, left);

Expand Down
145 changes: 137 additions & 8 deletions modules/rest_client/rest_client.c
Expand Up @@ -29,6 +29,7 @@
#include <stdlib.h>
#include <curl/curl.h>

#include "../../async.h"
#include "../../sr_module.h"
#include "../../dprint.h"
#include "../../mem/mem.h"
Expand All @@ -53,6 +54,7 @@ int ssl_verifyhost = 1;
* Module initialization and cleanup
*/
static int mod_init(void);
static int child_init(int rank);
static void mod_destroy(void);

/*
Expand All @@ -69,6 +71,23 @@ static int w_rest_get(struct sip_msg *msg, char *gp_url, char *body_pv,
static int w_rest_post(struct sip_msg *msg, char *gp_url, char *gp_body,
char *gp_ctype, char *body_pv, char *ctype_pv, char *code_pv);

static int w_async_rest_get(struct sip_msg *msg, async_resume_module **resume_f,
void **resume_param, char *gp_url,
char *body_pv, char *type_pv, char *code_pv);
static int w_async_rest_post(struct sip_msg *msg, async_resume_module **resume_f,
void **resume_param, char *gp_url, char *gp_body,
char *gp_type, char *body_pv, char *type_pv, char *code_pv);


static acmd_export_t acmds[] = {
{ "rest_get", (acmd_function)w_async_rest_get, 2, fixup_rest_get },
{ "rest_get", (acmd_function)w_async_rest_get, 3, fixup_rest_get },
{ "rest_get", (acmd_function)w_async_rest_get, 4, fixup_rest_get },
{ "rest_post", (acmd_function)w_async_rest_post, 4, fixup_rest_post },
{ "rest_post", (acmd_function)w_async_rest_post, 5, fixup_rest_post },
{ "rest_post", (acmd_function)w_async_rest_post, 6, fixup_rest_post },
{ 0, 0, 0, 0 }
};

/*
* Exported functions
Expand Down Expand Up @@ -119,30 +138,93 @@ struct module_exports exports = {
DEFAULT_DLFLAGS, /* dlopen flags */
NULL, /* OpenSIPS module dependencies */
cmds, /* Exported functions */
0, /* Exported async functions */
acmds, /* Exported async functions */
params, /* Exported parameters */
0, /* exported statistics */
0, /* exported MI functions */
0, /* exported pseudo-variables */
0, /* extra processes */
NULL, /* exported statistics */
NULL, /* exported MI functions */
NULL, /* exported pseudo-variables */
NULL, /* extra processes */
mod_init, /* module initialization function */
0, /* response function*/
NULL, /* response function*/
mod_destroy,
0 /* per-child init function */
child_init,/* per-child init function */
};

static void *osips_malloc(size_t size)
{
void *p = pkg_malloc(size);

return p;
}

static void *osips_calloc(size_t nmemb, size_t size)
{
void *p = pkg_malloc(nmemb * size);
if (p)
memset(p, '\0', nmemb * size);

return p;
}

static void *osips_realloc(void *ptr, size_t size)
{
void *p = pkg_realloc(ptr, size);

return p;
}

static char *osips_strdup(const char *cp)
{
char *rval;
int len;

len = strlen(cp) + 1;
rval = pkg_malloc(len);
if (!rval)
return NULL;

memcpy(rval, cp, len);
return rval;
}

static void osips_free(void *ptr)
{
if (ptr)
pkg_free(ptr);
}

static int mod_init(void)
{
LM_DBG("Initializing...\n");

curl_global_init(CURL_GLOBAL_ALL);
curl_global_init_mem(CURL_GLOBAL_ALL,
osips_malloc,
osips_free,
osips_realloc,
osips_strdup,
osips_calloc);

multi_handle = curl_multi_init();

LM_INFO("Module initialized!\n");

return 0;
}

static int child_init(int rank)
{
if (rank <= PROC_MAIN)
return 0;

multi_handle = curl_multi_init();
if (!multi_handle) {
LM_ERR("failed to init CURLM handle\n");
return -1;
}

return 0;
}

static void mod_destroy(void)
{
curl_global_cleanup();
Expand Down Expand Up @@ -225,3 +307,50 @@ static int w_rest_post(struct sip_msg *msg, char *gp_url, char *gp_body,
(pv_spec_p)ctype_pv, (pv_spec_p)code_pv);
}

static int w_async_rest_get(struct sip_msg *msg, async_resume_module **resume_f,
void **resume_param, char *gp_url,
char *body_pv, char *ctype_pv, char *code_pv)
{
rest_async_param *param;
str url;
int read_fd;

if (fixup_get_svalue(msg, (gparam_p)gp_url, &url) != 0) {
LM_ERR("Invalid HTTP URL pseudo variable!\n");
return -1;
}

LM_DBG("async rest get %.*s %p %p %p\n", url.len, url.s, body_pv, ctype_pv, code_pv);

param = shm_malloc(sizeof *param);
if (!param) {
LM_ERR("no more shm\n");
return -1;
}
memset(param, '\0', sizeof *param);

read_fd = start_async_get(msg, url.s, &param->handle, &param->body,
ctype_pv ? &param->ctype : NULL);

if (read_fd < 0) {
*resume_param = NULL;
*resume_f = NULL;
} else {
*resume_f = resume_async_get;

param->body_pv = (pv_spec_p)body_pv;
param->ctype_pv = (pv_spec_p)ctype_pv;
param->code_pv = (pv_spec_p)code_pv;
*resume_param = param;
}

return read_fd;
}

static int w_async_rest_post(struct sip_msg *msg, async_resume_module **resume_f,
void **resume_param, char *gp_url, char *gp_body,
char *gp_type, char *body_pv, char *type_pv, char *code_pv)
{
return 0;
}

0 comments on commit c3d9ceb

Please sign in to comment.