Skip to content

Commit

Permalink
Add option to run event loop once
Browse files Browse the repository at this point in the history
  • Loading branch information
Keno committed Mar 20, 2013
1 parent 98e003b commit 6b5ad9a
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 4 deletions.
20 changes: 19 additions & 1 deletion base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
include("uv_constants.jl")

## types ##

typealias Executable Union(Vector{ByteString},Function)
typealias Callback Union(Function,Bool)
type WaitTask
Expand Down Expand Up @@ -113,6 +112,7 @@ end
wait_connect_filter(w::AsyncStream, args...) = !w.open
wait_readable_filter(w::AsyncStream, args...) = nb_available(w.buffer) <= 0
wait_readnb_filter(w::(AsyncStream,Int), args...) = w[1].open && (nb_available(w[1].buffer) < w[2])
wait_readbyte_filter(w::(AsyncStream,Uint8), args...) = w[1].open && (search(w[1].buffer,w[2]) <= 0)
wait_readline_filter(w::AsyncStream, args...) = w.open && (search(w.buffer,'\n') <= 0)

function wait(forwhat::Vector, notify_list_name, filter_fcn)
Expand Down Expand Up @@ -143,6 +143,7 @@ wait_readable(x) = wait(x, :readnotify, wait_readable_filter)
wait_readline(x) = wait(x, :readnotify, wait_readline_filter)
wait_readnb(x::(AsyncStream,Int)) = wait(x, :readnotify, wait_readnb_filter)
wait_readnb(x::AsyncStream,b::Int) = wait_readnb((x,b))
wait_readbyte(x::AsyncStream,c::Uint8) = wait((x,c), :readnotify, wait_readbyte_filter)

#from `connect`
function _uv_hook_connectcb(sock::AsyncStream, status::Int32)
Expand Down Expand Up @@ -306,7 +307,15 @@ eventloop() = ccall(:jl_global_event_loop,Ptr{Void},())
function run_event_loop(loop::Ptr{Void})
ccall(:jl_run_event_loop,Void,(Ptr{Void},),loop)
end
function process_events(loop::Ptr{Void})
ccall(:jl_process_events,Int32,(Ptr{Void},),loop)
end
function run_event_loop_once(loop::Ptr{Void})
ccall(:jl_run_once,Int32,(Ptr{Void},),loop)
end
process_events() = process_events(eventloop())
run_event_loop() = run_event_loop(eventloop())
run_event_loop_once() = run_event_loop_once(eventloop())

##pipe functions
malloc_pipe() = c_malloc(_sizeof_uv_pipe)
Expand Down Expand Up @@ -382,6 +391,7 @@ end
function read(this::AsyncStream,::Type{Uint8})
buf = this.buffer
assert(buf.seekable == false)
start_reading(this)

This comment has been minimized.

Copy link
@JeffBezanson

JeffBezanson Mar 20, 2013

Member

We should really get rid of this. It seems crazy to call this on every read operation.

This comment has been minimized.

Copy link
@vtjnash

vtjnash Mar 20, 2013

Member

i think it's ok to call, but it really should be checking a flag to short-circuit the call to libuv when the stream is already prepped for reading

This comment has been minimized.

Copy link
@JeffBezanson

JeffBezanson Mar 20, 2013

Member

of course, a function that reads only 1 byte has to start with a check for buffered data and do a fast path in that case.
would it be bad to do start_reading once whenever a readable stream is opened so we can remove the function?

This comment has been minimized.

Copy link
@vtjnash

vtjnash Mar 20, 2013

Member
wait_readnb(this,1)
read(buf,Uint8)
end
Expand All @@ -394,6 +404,14 @@ function readline(this::AsyncStream)
readline(buf)
end

function readuntil(this::AsyncStream,c::Uint8)
buf = this.buffer
assert(buf.seekable == false)
start_reading(this)
wait_readbyte(this,c)
readuntil(buf,c)
end

function finish_read(pipe::NamedPipe)
close(pipe) #handles to UV and ios will be invalid after this point
end
Expand Down
9 changes: 7 additions & 2 deletions src/jl_uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,19 @@ DLLEXPORT uv_tcp_t *jl_make_tcp(uv_loop_t* loop, jl_value_t *julia_struct)

/** This file contains wrappers for most of libuv's stream functionailty. Once we can allocate structs in Julia, this file will be removed */

DLLEXPORT int jl_run_once(uv_loop_t *loop)

This comment has been minimized.

Copy link
@vtjnash

vtjnash Mar 20, 2013

Member

um, why? i wanted to delete jl_process_events

This comment has been minimized.

Copy link
@Keno

Keno Mar 20, 2013

Author Member

I needed more control in an application that did lots and lots of compute, but also needs some IO buffers filled in the background. I'm not quite sure what the right level of abstraction to expose to the user is here.

{
if (loop) return uv_run(loop,UV_RUN_ONCE);
}

DLLEXPORT void jl_run_event_loop(uv_loop_t *loop)
{
if (loop) uv_run(loop,UV_RUN_DEFAULT);
}

DLLEXPORT void jl_process_events(uv_loop_t *loop)
DLLEXPORT int jl_process_events(uv_loop_t *loop)
{
if (loop) uv_run(loop,UV_RUN_NOWAIT);
if (loop) return uv_run(loop,UV_RUN_NOWAIT);

This comment has been minimized.

Copy link
@vtjnash

vtjnash Mar 20, 2013

Member

you are missing a return statement for the else branch

This comment has been minimized.

Copy link
@JeffBezanson

JeffBezanson Mar 20, 2013

Member

it even gives a compiler warning

This comment has been minimized.

Copy link
@Keno

Keno Mar 20, 2013

Author Member

Sorry about that. Fixed.

}

DLLEXPORT uv_pipe_t *jl_init_pipe(uv_pipe_t *pipe, int writable, int julia_only, jl_value_t *julia_struct)
Expand Down
1 change: 1 addition & 0 deletions src/julia.expmap
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@
jl_reshape_array;
jl_restore_system_image;
jl_run_event_loop;
jl_run_once;
jl_save_system_image;
jl_set_const;
jl_set_current_module;
Expand Down
3 changes: 2 additions & 1 deletion src/julia.h
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,8 @@ DLLEXPORT int jl_spawn(char *name, char **argv, uv_loop_t *loop,
uv_handle_type stdout_type,uv_pipe_t *stdout_pipe,
uv_handle_type stderr_type,uv_pipe_t *stderr_pipe);
DLLEXPORT void jl_run_event_loop(uv_loop_t *loop);
DLLEXPORT void jl_process_events(uv_loop_t *loop);
DLLEXPORT int jl_run_once(uv_loop_t *loop);
DLLEXPORT int jl_process_events(uv_loop_t *loop);

DLLEXPORT uv_loop_t *jl_global_event_loop();

Expand Down

0 comments on commit 6b5ad9a

Please sign in to comment.