Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use enif_consume_timeslice and don't monopolize scheduler thread #49

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
95 changes: 82 additions & 13 deletions c_src/decoder.c
Expand Up @@ -47,7 +47,6 @@ typedef struct {
jiffy_st* atoms;

ERL_NIF_TERM arg;
ErlNifBinary bin;

int is_partial;

Expand All @@ -59,22 +58,37 @@ typedef struct {
char* st_data;
int st_size;
int st_top;

int is_resource;
size_t reds;
} Decoder;


void
dec_init(Decoder* d, ErlNifEnv* env, ERL_NIF_TERM arg, ErlNifBinary* bin)
dec_init_bin(Decoder* d, ErlNifEnv* env, ERL_NIF_TERM arg, ErlNifBinary* bin)
{
d->arg = arg;

d->p = (char*) bin->data;
d->u = bin->data;
d->len = bin->size;

d->env = env;
}

int
dec_init(Decoder* d, ErlNifEnv* env, ERL_NIF_TERM arg,
ERL_NIF_TERM opts, ErlNifBinary* bin)
{
int i;
ERL_NIF_TERM val;

d->env = env;
d->atoms = enif_priv_data(env);
d->arg = arg;

d->is_partial = 0;

d->p = (char*) bin->data;
d->u = bin->data;
d->len = bin->size;
dec_init_bin(d, env, arg, bin);
d->i = 0;

d->st_data = (char*) enif_alloc(STACK_SIZE_INC * sizeof(char));
Expand All @@ -87,11 +101,26 @@ dec_init(Decoder* d, ErlNifEnv* env, ERL_NIF_TERM arg, ErlNifBinary* bin)

d->st_data[0] = st_value;
d->st_top++;

d->reds = REDUCTIONS;

if(!enif_is_list(env, opts)) {
return 0;
}
while(enif_get_list_cell(env, opts, &val, &opts)) {
if(!get_reductions(env, val, d->atoms, &d->reds)) {
return 0;
}
}

d->is_resource = 0;
return 1;
}

void
dec_destroy(Decoder* d)
dec_destroy(ErlNifEnv* env, void* dec)
{
Decoder* d = dec;
if(d->st_data != NULL) {
enif_free(d->st_data);
}
Expand Down Expand Up @@ -604,26 +633,60 @@ make_array(ErlNifEnv* env, ERL_NIF_TERM list)
return ret;
}

static ERL_NIF_TERM
dec_yield(Decoder* d, ERL_NIF_TERM objs, ERL_NIF_TERM curr)
{
Decoder* dec = d;
if(!d->is_resource) {
dec = enif_alloc_resource(d->atoms->res_decoder, sizeof(Decoder));
*dec = *d;
dec->is_resource = 1;
}
ERL_NIF_TERM val = enif_make_resource(d->env, dec);
return enif_make_tuple4(d->env,
d->atoms->atom_partial, val, objs, curr);
}

ERL_NIF_TERM
decode(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
Decoder dec;
Decoder* d = &dec;

ErlNifBinary bin;

ERL_NIF_TERM objs = enif_make_list(env, 0);
ERL_NIF_TERM curr = enif_make_list(env, 0);
ERL_NIF_TERM objs;
ERL_NIF_TERM curr;
ERL_NIF_TERM val;
ERL_NIF_TERM ret;

if(argc != 1) {
if(argc != 2) {
return enif_make_badarg(env);
} else if(!enif_inspect_binary(env, argv[0], &bin)) {
return enif_make_badarg(env);
}

dec_init(d, env, argv[0], &bin);
int arity;
ERL_NIF_TERM* args;
if(enif_get_tuple(env, argv[1], &arity, (const ERL_NIF_TERM **) &args)) {
jiffy_st *priv = enif_priv_data(env);
if(arity != 3 ) {
return enif_make_badarg(env);
}
if(!enif_get_resource(env, args[0], priv->res_decoder, (void **) &d)) {
return enif_make_badarg(env);
}
objs = args[1];
curr = args[2];
dec_init_bin(d, env, argv[0], &bin);
} else {
objs = enif_make_list(env, 0);
curr = enif_make_list(env, 0);
if (!dec_init(d, env, argv[0], argv[1], &bin)) {
return enif_make_badarg(env);
}
}

size_t processed = d->i;

//fprintf(stderr, "Parsing:\r\n");
while(d->i < bin.size) {
Expand Down Expand Up @@ -897,6 +960,11 @@ decode(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
ret = dec_error(d, "invalid_internal_state");
goto done;
}
if(dec_curr(d) != st_done) {
if(jiffy_consume_timeslice(env, d->reds, d->i, &processed)) {
return dec_yield(d, objs, curr);
}
}
}

if(dec_curr(d) != st_done) {
Expand All @@ -908,7 +976,8 @@ decode(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
}

done:
dec_destroy(d);
jiffy_consume_timeslice(env, d->reds, d->i, &processed);
dec_destroy(env, d);

return ret;
}
63 changes: 53 additions & 10 deletions c_src/encoder.c
Expand Up @@ -36,12 +36,16 @@ typedef struct {

int iolen;
ERL_NIF_TERM iolist;
size_t iosize;
ErlNifBinary* curr;


char* p;
unsigned char* u;
size_t i;

int is_resource;
size_t reds;
} Encoder;


Expand All @@ -61,7 +65,7 @@ static char* shifts[NUM_SHIFTS] = {


int
enc_init(Encoder* e, ErlNifEnv* env, ERL_NIF_TERM opts, ErlNifBinary* bin)
enc_init(Encoder* e, ErlNifEnv* env, ERL_NIF_TERM opts)
{
ERL_NIF_TERM val;

Expand All @@ -71,6 +75,7 @@ enc_init(Encoder* e, ErlNifEnv* env, ERL_NIF_TERM opts, ErlNifBinary* bin)
e->pretty = 0;
e->shiftcnt = 0;
e->count = 0;
e->reds = REDUCTIONS;

if(!enif_is_list(env, opts)) {
return 0;
Expand All @@ -83,15 +88,16 @@ enc_init(Encoder* e, ErlNifEnv* env, ERL_NIF_TERM opts, ErlNifBinary* bin)
e->pretty = 1;
} else if(enif_compare(val, e->atoms->atom_force_utf8) == 0) {
// Ignore, handled in Erlang
} else {
} else if(!get_reductions(env, val, e->atoms, &e->reds)) {
return 0;
}
}

e->iolen = 0;
e->iolist = enif_make_list(env, 0);
e->curr = bin;
if(!enif_alloc_binary(BIN_INC_SIZE, e->curr)) {
e->iosize = 0;
e->curr = enif_alloc(sizeof(ErlNifBinary));
if(!e->curr || !enif_alloc_binary(BIN_INC_SIZE, e->curr)) {
return 0;
}

Expand All @@ -101,15 +107,19 @@ enc_init(Encoder* e, ErlNifEnv* env, ERL_NIF_TERM opts, ErlNifBinary* bin)
e->u = (unsigned char*) e->curr->data;
e->i = 0;

e->is_resource = 0;

return 1;
}

void
enc_destroy(Encoder* e)
enc_destroy(ErlNifEnv* env, void* enc)
{
Encoder *e = enc;
if(e->curr != NULL) {
enif_release_binary(e->curr);
}
enif_free(e->curr);
}

ERL_NIF_TERM
Expand Down Expand Up @@ -189,6 +199,7 @@ enc_unknown(Encoder* e, ERL_NIF_TERM value)

e->iolist = enif_make_list_cell(e->env, value, e->iolist);
e->iolen++;
e->iosize += e->i;

// Reinitialize our binary for the next buffer.
e->curr = bin;
Expand Down Expand Up @@ -493,13 +504,26 @@ enc_comma(Encoder* e)
return 1;
}

static ERL_NIF_TERM
enc_yield(Encoder* e, ERL_NIF_TERM stack)
{
Encoder* enc = e;
if(!e->is_resource) {
enc = enif_alloc_resource(e->atoms->res_encoder, sizeof(Encoder));
*enc = *e;
enc->is_resource = 1;
}
ERL_NIF_TERM val = enif_make_resource(e->env, enc);
return enif_make_tuple4(e->env, e->atoms->atom_partial, val, stack, e->iolist);
}


ERL_NIF_TERM
encode(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
Encoder enc;
Encoder* e = &enc;

ErlNifBinary bin;
ERL_NIF_TERM ret;

ERL_NIF_TERM stack;
Expand All @@ -514,11 +538,26 @@ encode(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
return enif_make_badarg(env);
}

if(!enc_init(e, env, argv[1], &bin)) {
return enif_make_badarg(env);
jiffy_st *priv = enif_priv_data(env);
if(!enif_get_resource(env, argv[0], priv->res_encoder, (void **) &e)) {
if(!enc_init(e, env, argv[1])) {
return enif_make_badarg(env);
}
stack = enif_make_list(env, 1, argv[0]);
} else {
int arity;
ERL_NIF_TERM* args;
if(!enif_get_tuple(env, argv[1], &arity, (const ERL_NIF_TERM **) &args)) {
return enif_make_badarg(env);
} else if(arity != 2) {
return enif_make_badarg(env);
}
stack = args[0];
e->iolist = args[1];
e->env = env;
}

stack = enif_make_list(env, 1, argv[0]);
size_t processed = e->iosize + e->i;

while(!enif_is_empty_list(env, stack)) {
if(!enif_get_list_cell(env, stack, &curr, &stack)) {
Expand Down Expand Up @@ -690,6 +729,9 @@ encode(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
goto done;
}
}
if(jiffy_consume_timeslice(env, e->reds, e->iosize + e->i, &processed)) {
return enc_yield(e, stack);
}
}

if(!enc_done(e, &item)) {
Expand All @@ -704,6 +746,7 @@ encode(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
}

done:
enc_destroy(e);
jiffy_consume_timeslice(env, e->reds, e->i, &processed);
enc_destroy(env, e);
return ret;
}
8 changes: 7 additions & 1 deletion c_src/jiffy.c
Expand Up @@ -23,11 +23,17 @@ load(ErlNifEnv* env, void** priv, ERL_NIF_TERM info)
st->atom_uescape = make_atom(env, "uescape");
st->atom_pretty = make_atom(env, "pretty");
st->atom_force_utf8 = make_atom(env, "force_utf8");
st->atom_reductions = make_atom(env, "reductions");

// Markers used in encoding
st->ref_object = make_atom(env, "$object_ref$");
st->ref_array = make_atom(env, "$array_ref$");

st->res_encoder = enif_open_resource_type(env, "jiffy", "encoder",
enc_destroy, ERL_NIF_RT_CREATE, NULL);
st->res_decoder = enif_open_resource_type(env, "jiffy", "decoder",
dec_destroy, ERL_NIF_RT_CREATE, NULL);

*priv = (void*) st;

return 0;
Expand All @@ -54,7 +60,7 @@ unload(ErlNifEnv* env, void* priv)

static ErlNifFunc funcs[] =
{
{"nif_decode", 1, decode},
{"nif_decode", 2, decode},
{"nif_encode", 2, encode}
};

Expand Down
10 changes: 10 additions & 0 deletions c_src/jiffy.h
Expand Up @@ -6,6 +6,8 @@

#include "erl_nif.h"

#define REDUCTIONS 1000

typedef struct {
ERL_NIF_TERM atom_ok;
ERL_NIF_TERM atom_error;
Expand All @@ -19,17 +21,25 @@ typedef struct {
ERL_NIF_TERM atom_uescape;
ERL_NIF_TERM atom_pretty;
ERL_NIF_TERM atom_force_utf8;
ERL_NIF_TERM atom_reductions;

ERL_NIF_TERM ref_object;
ERL_NIF_TERM ref_array;

ErlNifResourceType *res_encoder;
ErlNifResourceType *res_decoder;
} jiffy_st;

ERL_NIF_TERM make_atom(ErlNifEnv* env, const char* name);
ERL_NIF_TERM make_ok(jiffy_st* st, ErlNifEnv* env, ERL_NIF_TERM data);
ERL_NIF_TERM make_error(jiffy_st* st, ErlNifEnv* env, const char* error);
int get_reductions(ErlNifEnv *env, ERL_NIF_TERM term, jiffy_st* st, size_t* val);
int jiffy_consume_timeslice(ErlNifEnv *env, size_t reds, size_t cur, size_t* proc);

ERL_NIF_TERM decode(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM encode(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
void enc_destroy(ErlNifEnv* env, void* e);
void dec_destroy(ErlNifEnv* env, void* d);

int int_from_hex(const unsigned char* p);
int int_to_hex(int val, char* p);
Expand Down