Skip to content

Commit

Permalink
Support write/close of STDIN for async proc.
Browse files Browse the repository at this point in the history
  • Loading branch information
jnthn committed Aug 14, 2014
1 parent e97226e commit e953112
Show file tree
Hide file tree
Showing 4 changed files with 275 additions and 24 deletions.
1 change: 1 addition & 0 deletions src/6model/bootstrap.c
Expand Up @@ -566,6 +566,7 @@ static void string_consts(MVMThreadContext *tc) {
string_creator(stderr_chars, "stderr_chars");
string_creator(stderr_bytes, "stderr_bytes");
string_creator(buf_type, "buf_type");
string_creator(write, "write");
}

/* Drives the overall bootstrap process. */
Expand Down
1 change: 1 addition & 0 deletions src/core/instance.h
Expand Up @@ -65,6 +65,7 @@ struct MVMStringConsts {
MVMString *stderr_chars;
MVMString *stderr_bytes;
MVMString *buf_type;
MVMString *write;
};

struct MVMReprRegistry {
Expand Down
2 changes: 1 addition & 1 deletion src/io/asyncsocket.c
Expand Up @@ -341,7 +341,7 @@ static void write_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_
}

/* Marks objects for a write task. */
void write_gc_mark(MVMThreadContext *tc, void *data, MVMGCWorklist *worklist) {
static void write_gc_mark(MVMThreadContext *tc, void *data, MVMGCWorklist *worklist) {
WriteInfo *wi = (WriteInfo *)data;
MVM_gc_worklist_add(tc, worklist, &wi->handle);
MVM_gc_worklist_add(tc, worklist, &wi->str_data);
Expand Down
295 changes: 272 additions & 23 deletions src/io/procops.c
Expand Up @@ -352,17 +352,274 @@ typedef struct {
MVMint64 signal;
} MVMIOAsyncProcessData;

/* Info we convey about an async spawn task. */
typedef struct {
MVMThreadContext *tc;
int work_idx;
MVMObject *handle;
MVMObject *callbacks;
char *prog;
char *cwd;
char **env;
char **args;
MVMDecodeStream *ds_stdout;
MVMDecodeStream *ds_stderr;
MVMuint32 seq_stdout;
MVMuint32 seq_stderr;
uv_stream_t *stdin_handle;
} SpawnInfo;

/* Info we convey about a write task. */
typedef struct {
MVMOSHandle *handle;
MVMString *str_data;
MVMObject *buf_data;
uv_write_t *req;
uv_buf_t buf;
MVMThreadContext *tc;
int work_idx;
} SpawnWriteInfo;

/* Completion handler for an asynchronous write. */
static void on_write(uv_write_t *req, int status) {
SpawnWriteInfo *wi = (SpawnWriteInfo *)req->data;
MVMThreadContext *tc = wi->tc;
MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
MVMAsyncTask *t = (MVMAsyncTask *)MVM_repr_at_pos_o(tc,
tc->instance->event_loop_active, wi->work_idx);
MVM_repr_push_o(tc, arr, t->body.schedulee);
if (status >= 0) {
MVMROOT(tc, arr, {
MVMROOT(tc, t, {
MVMObject *bytes_box = MVM_repr_box_int(tc,
tc->instance->boot_types.BOOTInt,
wi->buf.len);
MVM_repr_push_o(tc, arr, bytes_box);
});
});
MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
}
else {
MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
MVMROOT(tc, arr, {
MVMROOT(tc, t, {
MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
tc->instance->VMString, uv_strerror(status));
MVMObject *msg_box = MVM_repr_box_str(tc,
tc->instance->boot_types.BOOTStr, msg_str);
MVM_repr_push_o(tc, arr, msg_box);
});
});
}
MVM_repr_push_o(tc, t->body.queue, arr);
if (wi->str_data)
free(wi->buf.base);
free(wi->req);
}

/* Does setup work for an asynchronous write. */
static void write_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) {
MVMIOAsyncProcessData *handle_data;
MVMAsyncTask *spawn_task;
SpawnInfo *si;
char *output;
int output_size, r;

/* Add to work in progress. */
SpawnWriteInfo *wi = (SpawnWriteInfo *)data;
wi->tc = tc;
wi->work_idx = MVM_repr_elems(tc, tc->instance->event_loop_active);
MVM_repr_push_o(tc, tc->instance->event_loop_active, async_task);

/* Encode the string, or extract buf data. */
if (wi->str_data) {
MVMuint64 output_size_64;
output = MVM_string_utf8_encode(tc, wi->str_data, &output_size_64);
output_size = (int)output_size_64;
}
else {
MVMArray *buffer = (MVMArray *)wi->buf_data;
output = buffer->body.slots.i8 + buffer->body.start;
output_size = (int)buffer->body.elems;
}

/* Create and initialize write request. */
wi->req = malloc(sizeof(uv_write_t));
wi->buf = uv_buf_init(output, output_size);
wi->req->data = data;
handle_data = (MVMIOAsyncProcessData *)wi->handle->body.data;
spawn_task = (MVMAsyncTask *)handle_data->async_task;
si = spawn_task ? (SpawnInfo *)spawn_task->body.data : NULL;
if (!si || !si->stdin_handle || (r = uv_write(wi->req, si->stdin_handle, &(wi->buf), 1, on_write)) < 0) {
/* Error; need to notify. */
MVMROOT(tc, async_task, {
MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
MVMAsyncTask *t = (MVMAsyncTask *)async_task;
MVM_repr_push_o(tc, arr, t->body.schedulee);
MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
MVMROOT(tc, arr, {
MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
tc->instance->VMString, (si && si->stdin_handle
? uv_strerror(r)
: "This process is not opened for write"));
MVMObject *msg_box = MVM_repr_box_str(tc,
tc->instance->boot_types.BOOTStr, msg_str);
MVM_repr_push_o(tc, arr, msg_box);
});
MVM_repr_push_o(tc, t->body.queue, arr);
});

/* Cleanup handle. */
free(wi->req);
wi->req = NULL;
}
}

/* Marks objects for a write task. */
static void write_gc_mark(MVMThreadContext *tc, void *data, MVMGCWorklist *worklist) {
SpawnWriteInfo *wi = (SpawnWriteInfo *)data;
MVM_gc_worklist_add(tc, worklist, &wi->handle);
MVM_gc_worklist_add(tc, worklist, &wi->str_data);
MVM_gc_worklist_add(tc, worklist, &wi->buf_data);
}

/* Frees info for a write task. */
static void write_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) {
if (data)
free(data);
}

/* Operations table for async write task. */
static const MVMAsyncTaskOps write_op_table = {
write_setup,
NULL,
write_gc_mark,
write_gc_free
};

static MVMAsyncTask * write_str(MVMThreadContext *tc, MVMOSHandle *h, MVMObject *queue,
MVMObject *schedulee, MVMString *s, MVMObject *async_type) {
MVMAsyncTask *task;
SpawnWriteInfo *wi;

/* Validate REPRs. */
if (REPR(queue)->ID != MVM_REPR_ID_ConcBlockingQueue)
MVM_exception_throw_adhoc(tc,
"asyncwritestr target queue must have ConcBlockingQueue REPR");
if (REPR(async_type)->ID != MVM_REPR_ID_MVMAsyncTask)
MVM_exception_throw_adhoc(tc,
"asyncwritestr result type must have REPR AsyncTask");

/* Create async task handle. */
MVMROOT(tc, queue, {
MVMROOT(tc, schedulee, {
MVMROOT(tc, h, {
MVMROOT(tc, s, {
task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type);
});
});
});
});
MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue);
MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee);
task->body.ops = &write_op_table;
wi = calloc(1, sizeof(SpawnWriteInfo));
MVM_ASSIGN_REF(tc, &(task->common.header), wi->handle, h);
MVM_ASSIGN_REF(tc, &(task->common.header), wi->str_data, s);
task->body.data = wi;

/* Hand the task off to the event loop. */
MVM_io_eventloop_queue_work(tc, (MVMObject *)task);

return task;
}

static MVMAsyncTask * write_bytes(MVMThreadContext *tc, MVMOSHandle *h, MVMObject *queue,
MVMObject *schedulee, MVMObject *buffer, MVMObject *async_type) {
MVMAsyncTask *task;
SpawnWriteInfo *wi;

/* Validate REPRs. */
if (REPR(queue)->ID != MVM_REPR_ID_ConcBlockingQueue)
MVM_exception_throw_adhoc(tc,
"asyncwritebytes target queue must have ConcBlockingQueue REPR");
if (REPR(async_type)->ID != MVM_REPR_ID_MVMAsyncTask)
MVM_exception_throw_adhoc(tc,
"asyncwritebytes result type must have REPR AsyncTask");
if (!IS_CONCRETE(buffer) || REPR(buffer)->ID != MVM_REPR_ID_MVMArray)
MVM_exception_throw_adhoc(tc, "asyncwritebytes requires a native array to read from");
if (((MVMArrayREPRData *)STABLE(buffer)->REPR_data)->slot_type != MVM_ARRAY_U8
&& ((MVMArrayREPRData *)STABLE(buffer)->REPR_data)->slot_type != MVM_ARRAY_I8)
MVM_exception_throw_adhoc(tc, "asyncwritebytes requires a native array of uint8 or int8");

/* Create async task handle. */
MVMROOT(tc, queue, {
MVMROOT(tc, schedulee, {
MVMROOT(tc, h, {
MVMROOT(tc, buffer, {
task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type);
});
});
});
});
MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue);
MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee);
task->body.ops = &write_op_table;
wi = calloc(1, sizeof(SpawnWriteInfo));
MVM_ASSIGN_REF(tc, &(task->common.header), wi->handle, h);
MVM_ASSIGN_REF(tc, &(task->common.header), wi->buf_data, buffer);
task->body.data = wi;

/* Hand the task off to the event loop. */
MVM_io_eventloop_queue_work(tc, (MVMObject *)task);

return task;
}

/* Marks an async handle. */
static void proc_async_gc_mark(MVMThreadContext *tc, void *data, MVMGCWorklist *worklist) {
MVMIOAsyncProcessData *apd = (MVMIOAsyncProcessData *)data;
if (data)
MVM_gc_worklist_add(tc, worklist, &(apd->async_task));
}

/* Does an asynchronous close (since it must run on the event loop). */
static void close_cb(uv_handle_t *handle) {
free(handle);
}
static void close_perform(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) {
uv_close((uv_handle_t *)data, close_cb);
}

/* Operations table for async close task. */
static const MVMAsyncTaskOps close_op_table = {
close_perform,
NULL,
NULL,
NULL
};

static void close_stdin(MVMThreadContext *tc, MVMOSHandle *h) {
MVMIOAsyncProcessData *handle_data = (MVMIOAsyncProcessData *)h->body.data;
MVMAsyncTask *spawn_task = (MVMAsyncTask *)handle_data->async_task;
SpawnInfo *si = spawn_task ? (SpawnInfo *)spawn_task->body.data : NULL;
if (si->stdin_handle) {
MVMAsyncTask *task;
MVMROOT(tc, h, {
task = (MVMAsyncTask *)MVM_repr_alloc_init(tc,
tc->instance->boot_types.BOOTAsync);
});
task->body.ops = &close_op_table;
task->body.data = si->stdin_handle;
MVM_io_eventloop_queue_work(tc, (MVMObject *)task);
}
}

/* IO ops table, for async process, populated with functions. */
static const MVMIOAsyncWritable proc_async_writable = { /*write_str*/NULL, /*write_bytes*/NULL };
static const MVMIOAsyncWritable proc_async_writable = { write_str, write_bytes };
static const MVMIOClosable closable = { close_stdin };
static const MVMIOOps proc_op_table = {
NULL,
&closable,
NULL,
NULL,
NULL,
Expand All @@ -376,22 +633,6 @@ static const MVMIOOps proc_op_table = {
NULL
};

/* Info we convey about an async spawn task. */
typedef struct {
MVMThreadContext *tc;
int work_idx;
MVMObject *handle;
MVMObject *callbacks;
char *prog;
char *cwd;
char **env;
char **args;
MVMDecodeStream *ds_stdout;
MVMDecodeStream *ds_stderr;
MVMuint32 seq_stdout;
MVMuint32 seq_stderr;
} SpawnInfo;

static void spawn_async_close(uv_handle_t *handle) {
free(handle);
}
Expand Down Expand Up @@ -553,13 +794,22 @@ static void spawn_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_
si->work_idx = MVM_repr_elems(tc, tc->instance->event_loop_active);
MVM_repr_push_o(tc, tc->instance->event_loop_active, async_task);

/* TODO: support write */
process_stdio[0].flags = UV_INHERIT_FD;
process_stdio[0].data.fd = 0;
/* Create input/output handles as needed. */
if (MVM_repr_exists_key(tc, si->callbacks, tc->instance->str_consts.write)) {
uv_pipe_t *pipe = malloc(sizeof(uv_pipe_t));
uv_pipe_init(tc->loop, pipe, 0);
pipe->data = si;
process_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
process_stdio[0].data.stream = (uv_stream_t *)pipe;
si->stdin_handle = (uv_stream_t *)pipe;
}
else {
process_stdio[0].flags = UV_INHERIT_FD;
process_stdio[0].data.fd = 0;
}
if (MVM_repr_exists_key(tc, si->callbacks, tc->instance->str_consts.stdout_chars)) {
uv_pipe_t *pipe = malloc(sizeof(uv_pipe_t));
uv_pipe_init(tc->loop, pipe, 0);
uv_pipe_open(pipe, 0);
pipe->data = si;
process_stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
process_stdio[1].data.stream = (uv_stream_t *)pipe;
Expand All @@ -570,7 +820,6 @@ static void spawn_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_
else if (MVM_repr_exists_key(tc, si->callbacks, tc->instance->str_consts.stdout_bytes)) {
uv_pipe_t *pipe = malloc(sizeof(uv_pipe_t));
uv_pipe_init(tc->loop, pipe, 0);
uv_pipe_open(pipe, 0);
pipe->data = si;
process_stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
process_stdio[1].data.stream = (uv_stream_t *)pipe;
Expand Down

0 comments on commit e953112

Please sign in to comment.