Skip to content

Commit

Permalink
A big update that needs your help with testing.
Browse files Browse the repository at this point in the history
This passes all of my various smoke tests. I'd consider this a very
pre-alpha that could use some more abuse and eye balls. If you find a
bug be sure and open an issue so I can try and get it fixed.

There was a dead lock when processing jobs. I refactored quite a bit of
the concurrency code to try and simplify and streamline things. I'm now
using opaque pointers considerably to try and limit the scope of
critical sections.

The basic concurrency concept is such:

There are a given number of workers that can process requests for JS
execution. This number of workers can be interrogated and changed using
emonk:num_workers/0, emonk:add_worker/0, and emonk:rem_worker/0.

A Context is required to run JavaScript code. It is created via
emonk:new_context/0 and emonk:new_context/1. The optional parameter is
for specifying a stack size in bytes.

Once a context is created you can eval code with emonk:eval/2,3 or call
functions in the global scope with emonk:call/3,4. Generally speaking
you'll eval some code and then call some named function. The eval call
takes a context on which to run, the script as a binary, and an optional
timeout. The call function takes a context, a function name, an array of
arguments, and an optional timeout.

The general effect of eval and call is to place a job request onto a
queue associated with the context. A context will then process the jobs
in the order retrieved and send the results back to Erlang.

Contexts are processed in a round robin order and a context can only
process a single job at a time. So, if you have a very active context
with lots of requests, and a request comes into an idle context, the
idle context will jump in front of the busy one minus the job it might
currently be processing.

Hopefully that makes sense.
  • Loading branch information
davisp committed Aug 3, 2010
1 parent 35df4db commit 60bdbcb
Show file tree
Hide file tree
Showing 19 changed files with 995 additions and 728 deletions.
1 change: 1 addition & 0 deletions c_src/alias.h
Expand Up @@ -3,5 +3,6 @@

typedef ERL_NIF_TERM ENTERM;
typedef const ERL_NIF_TERM CENTERM;
typedef ErlNifBinary ENBINARY;

#endif // Included alias.h
35 changes: 0 additions & 35 deletions c_src/job.c

This file was deleted.

35 changes: 0 additions & 35 deletions c_src/job.h

This file was deleted.

174 changes: 28 additions & 146 deletions c_src/main.c
Expand Up @@ -4,206 +4,88 @@

#include "alias.h"
#include "state.h"
#include "vm.h"
#include "worker.h"

static int
load(ErlNifEnv* env, void** priv, ENTERM load_info)
{
state_t* state = state_create(env);
if(state == NULL)
{
return -1;
}
state_ptr state = state_create(env);
if(state == NULL) return -1;

*priv = (void*) state;

return 0;
}

static void
unload(ErlNifEnv* env, void* priv)
{
state_t* state = (state_t*) priv;
state_destroy(state);
state_destroy((state_ptr) priv);
}

static ENTERM
num_workers(ErlNifEnv* env, int argc, CENTERM argv[])
{
state_ptr state = (state_ptr) enif_priv_data(env);
return enif_make_uint(env, state_num_workers(state));
}

static ENTERM
add_worker(ErlNifEnv* env, int argc, CENTERM argv[])
{
state_t* state = (state_t*) enif_priv_data(env);
state_ptr state = (state_ptr) enif_priv_data(env);

if(!state_add_worker(state))
{
return state->atom_error;
return state_error(state);
}

return state->atom_ok;
return state_ok(state);
}

static ENTERM
rem_worker(ErlNifEnv* env, int argc, CENTERM argv[])
{
state_t* state = (state_t*) enif_priv_data(env);
state_ptr state = (state_ptr) enif_priv_data(env);

if(!state_rem_worker(state))
{
return state->atom_error;
return state_error(state);
}

return state->atom_ok;
return state_ok(state);
}

static ENTERM
num_workers(ErlNifEnv* env, int argc, CENTERM argv[])
create_ctx(ErlNifEnv* env, int argc, CENTERM argv[])
{
state_t* state = (state_t*) enif_priv_data(env);
return enif_make_int(env, state_num_workers(state));
}

static ENTERM
create_ctx(ErlNifEnv* env, int argc, CENTERM argv[0])
{
state_t* state = (state_t*) enif_priv_data(env);
size_t stack_size;
vm_t* vm;
state_ptr state = (state_ptr) enif_priv_data(env);
unsigned int stack_size;
vm_ptr vm;
ENTERM ret;

if(argc != 1 || !enif_get_uint(env, argv[0], &stack_size))
{
return enif_make_badarg(env);
}

enif_mutex_lock(state->lock);
vm = enif_alloc_resource(state->ctx_res, sizeof(vm_t));
if(vm == NULL) return enif_make_badarg(env);
vm = vm_init(state, (size_t) stack_size);
if(vm == NULL) return state_error(state);

ret = enif_make_resource(env, vm);
enif_release_resource(vm);

if(!vm_init(vm, state->runtime, stack_size))
{
return enif_make_badarg(env);
}

enif_mutex_unlock(state->lock);

return enif_make_tuple2(env, state->atom_ok, ret);
}

static ENTERM
eval(ErlNifEnv* env, int argc, CENTERM argv[])
{
state_t* state = (state_t*) enif_priv_data(env);
vm_t* vm;
job_t* job;
req_t* req;
ErlNifBinary script;

if(argc != 4) return enif_make_badarg(env);

if(!enif_get_resource(env, argv[0], state->ctx_res, (void**) &vm))
{
return enif_make_badarg(env);
}


job = job_create();
if(job == NULL) return enif_make_badarg(env);
job->type = job_eval;
job->ref = enif_make_copy(job->env, argv[1]);
if(!enif_get_local_pid(env, argv[2], &job->pid)) goto error;
if(!enif_inspect_binary(env, argv[3], &script)) goto error;
if(!enif_alloc_binary(script.size, &(job->script))) goto error;
memcpy(job->script.data, script.data, script.size);

enif_mutex_lock(vm->lock);

if(!queue_push(vm->jobs, job))
{
enif_mutex_unlock(vm->lock);
goto error;
}
job = NULL;

if(vm->status == vm_idle)
{
req = req_create(req_exec, vm);
if(!queue_push(state->requests, req))
{
enif_mutex_unlock(vm->lock);
goto error;
}
req = NULL;
}

enif_mutex_unlock(vm->lock);

return state->atom_ok;

error:
if(job != NULL) job_destroy(job);
if(req != NULL) req_destroy(req);
return enif_make_badarg(env);
}

static ENTERM
call(ErlNifEnv* env, int argc, CENTERM argv[])
{
state_t* state = (state_t*) enif_priv_data(env);
vm_t* vm;
job_t* job;
req_t* req;

if(argc != 5) return enif_make_badarg(env);

if(!enif_get_resource(env, argv[0], state->ctx_res, (void**) &vm))
{
return enif_make_badarg(env);
}

job = job_create();
if(job == NULL) return enif_make_badarg(env);
job->type = job_call;
job->ref = enif_make_copy(job->env, argv[1]);
if(!enif_get_local_pid(env, argv[2], &job->pid)) goto error;
job->name = enif_make_copy(job->env, argv[3]);
job->args = enif_make_copy(job->env, argv[4]);

enif_mutex_lock(vm->lock);

if(!queue_push(vm->jobs, job))
{
enif_mutex_unlock(vm->lock);
goto error;
}
job = NULL;

if(vm->status == vm_idle)
{
req = req_create(req_exec, vm);
if(!queue_push(state->requests, req))
{
enif_mutex_unlock(vm->lock);
goto error;
}
req = NULL;
}

enif_mutex_unlock(vm->lock);

return state->atom_ok;

error:
if(job != NULL) job_destroy(job);
if(req != NULL) req_destroy(req);
return enif_make_badarg(env);
return enif_make_tuple2(env, state_ok(state), ret);
}

static ErlNifFunc nif_funcs[] = {
{"add_worker", 0, add_worker},
{"rem_worker", 0, rem_worker},
{"num_workers", 0, num_workers},
{"create_ctx", 1, create_ctx},
{"eval", 4, eval},
{"call", 5, call}
{"eval", 4, vm_add_eval},
{"call", 5, vm_add_call}
};

ERL_NIF_INIT(emonk, nif_funcs, &load, NULL, NULL, unload);
Expand Down

0 comments on commit 60bdbcb

Please sign in to comment.