Skip to content

Commit

Permalink
Async IO. Crashes the debugger though...
Browse files Browse the repository at this point in the history
  • Loading branch information
Keno committed Mar 2, 2012
1 parent 6e404fc commit 6d49a61
Show file tree
Hide file tree
Showing 26 changed files with 513 additions and 282 deletions.
2 changes: 1 addition & 1 deletion Make.inc
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ OSLIBS += -ldl -Wl,-w -framework ApplicationServices
#CFLAGS += -fno-optimize-sibling-calls -fno-inline-functions
endif

OSLIBS += -Wl,--export-dynamic -Wl,--version-script=$(JULIAHOME)/src/julia.expmap
OSLIBS += -Wl,--export-all-symbols -Wl,--version-script=$(JULIAHOME)/src/julia.expmap

# Libraries to link
LIBS = $(shell $(LLVM_CONFIG) --libfiles) $(JULIAHOME)/external/libuv/uv.a -L"D:\Program Files\Microsoft SDKs\Windows\v7.1\Lib" -lntdll -lkernel32 -lWs2_32 -lIphlpapi $(JULIAHOME)/src/flisp/libflisp.a $(JULIAHOME)/src/support/libsupport.a -L$(EXTROOT)/lib -lm $(OSLIBS) -lpthread $(shell $(LLVM_CONFIG) --ldflags)
Expand Down
27 changes: 15 additions & 12 deletions j/client.j
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ function _jl_answer_color()
end

_jl_color_available() =
success(`tput setaf 0`) || has(ENV, "TERM") && matches(r"^xterm", ENV["TERM"])
true || success(`tput setaf 0`) || has(ENV, "TERM") && matches(r"^xterm", ENV["TERM"])

_jl_banner() = print(_jl_have_color ? _jl_banner_color : _jl_banner_plain)

function repl_callback(ast::ANY, show_value)
# use root task to execute user input
del_fd_handler(STDIN.fd)
del_io_handler(STDIN)
put(_jl_repl_channel, (ast, show_value))
end

Expand Down Expand Up @@ -81,16 +81,19 @@ function run_repl()
ccall(:jl_enable_color, Void, ())
end

while true
ccall(:repl_callback_enable, Void, ())
add_fd_handler(STDIN.fd, fd->ccall(:jl_stdin_callback, Void, ()))
(ast, show_value) = take(_jl_repl_channel)
if show_value == -1
# exit flag
break
end
_jl_eval_user_input(ast, show_value!=0)
end
ccall(:repl_callback_enable, Void, ())
add_io_handler(STDIN,()->ccall(:jl_stdin_callback,Void,()))

run_event_loop(globalEventLoop());
#TODO: make into callback
#while true
# (ast, show_value) = take(_jl_repl_channel)
# if show_value == -1
# # exit flag
# break
# end
# _jl_eval_user_input(ast, show_value!=0)
#end

if _jl_have_color
print(_jl_color_normal)
Expand Down
39 changes: 7 additions & 32 deletions j/io.j
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
const sizeof_ios_t = int(ccall(:jl_sizeof_ios_t, Int32, ()))
const sizeof_fd_set = int(ccall(:jl_sizeof_fd_set, Int32, ()))

type IOStream
abstract Stream

type IOStream <: Stream
ios::Array{Uint8,1}

# TODO: delay adding finalizer, e.g. for memio with a small buffer, or
Expand All @@ -15,8 +17,7 @@ type IOStream
end
IOStream() = IOStream(true)

global make_stdout_stream
make_stdout_stream() = new(ccall(:jl_stdout_stream, Any, ()))

end

fd(s::IOStream) = ccall(:jl_ios_fd, Int, (Ptr{Void},), s.ios)
Expand Down Expand Up @@ -67,32 +68,6 @@ memio() = memio(0, true)

convert(T::Type{Ptr}, s::IOStream) = convert(T, s.ios)

current_output_stream() = ccall(:jl_current_output_stream_obj, IOStream, ())

set_current_output_stream(s::IOStream) =
ccall(:jl_set_current_output_stream_obj, Void, (Any,), s)

function with_output_stream(s::IOStream, f::Function, args...)
try
set_current_output_stream(s)
f(args...)
catch e
throw(e)
end
end

# custom version for print_to_*
function _jl_with_output_stream(s::IOStream, f::Function, args...)
try
set_current_output_stream(s)
f(args...)
catch e
# only add finalizer if takebuf doesn't happen
finalizer(s, close)
throw(e)
end
end

takebuf_array(s::IOStream) =
ccall(:jl_takebuf_array, Any, (Ptr{Void},), s.ios)::Array{Uint8,1}

Expand All @@ -101,13 +76,13 @@ takebuf_string(s::IOStream) =

function print_to_array(size::Integer, f::Function, args...)
s = memio(size, false)
_jl_with_output_stream(s, f, args...)
#_jl_with_output_stream(s, f, args...)
takebuf_array(s)
end

function print_to_string(size::Integer, f::Function, args...)
s = memio(size, false)
_jl_with_output_stream(s, f, args...)
#_jl_with_output_stream(s, f, args...)
takebuf_string(s)
end

Expand Down Expand Up @@ -187,7 +162,7 @@ function write(s::IOStream, p::Ptr, nb::Integer)
s.ios, p, uint(nb))
end

function write{T,N}(s::IOStream, a::SubArray{T,N,Array})
function write{T,N}(s::Stream, a::SubArray{T,N,Array})
if !isa(T,BitsKind) || stride(a,1)!=1
return invoke(write, (Any, AbstractArray), s, a)
end
Expand Down
71 changes: 37 additions & 34 deletions j/multi.j
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ function add_workers(PGRP::ProcessGroup, w::Array{Any,1})
w[i].id = PGRP.np+i
send_msg_now(w[i], w[i].id, newlocs)
sockets[w[i].fd] = w[i].socket
add_fd_handler(w[i].fd, handler)
#add_fd_handler(w[i].fd, handler)
end
PGRP.locs = newlocs
PGRP.np += n
Expand All @@ -218,7 +218,7 @@ function _jl_join_pgroup(myid, locs, sockets)
w[i] = Worker(locs[i].host, locs[i].port)
w[i].id = i
sockets[w[i].fd] = w[i].socket
add_fd_handler(w[i].fd, handler)
#add_fd_handler(w[i].fd, handler)
send_msg_now(w[i], :identify_socket, myid)
end
for i = (myid+1):np
Expand Down Expand Up @@ -863,7 +863,7 @@ function accept_handler(accept_fd, sockets)
PGRP = _jl_join_pgroup(_myid, locs, sockets)
PGRP.workers[1] = Worker("", 0, connectfd, sock, 1)
end
add_fd_handler(connectfd, fd->message_handler(fd, sockets))
#add_fd_handler(connectfd, fd->message_handler(fd, sockets))
end
end

Expand Down Expand Up @@ -924,7 +924,7 @@ function message_handler(fd, sockets)
catch e
if isa(e,EOFError)
#print("eof. $(myid()) exiting\n")
del_fd_handler(fd)
#del_fd_handler(fd)
# TODO: remove machine from group
throw(DisconnectException())
else
Expand Down Expand Up @@ -961,7 +961,7 @@ function start_worker(wrfd)
global const Scheduler = current_task()

worker_sockets = HashTable()
add_fd_handler(sockfd, fd->accept_handler(fd, worker_sockets))
#add_fd_handler(sockfd, fd->accept_handler(fd, worker_sockets))

try
event_loop(false)
Expand Down Expand Up @@ -991,7 +991,7 @@ function start_remote_workers(machines, cmds)
let stream = fdio(fd, true)
outs[i] = stream
# redirect console output from workers to the client's stdout
add_fd_handler(fd, fd->write(stdout_stream, readline(stream)))
#add_fd_handler(fd, fd->write(stdout_stream, readline(stream)))
end
end
end
Expand Down Expand Up @@ -1556,44 +1556,47 @@ end
yield() = yieldto(Scheduler)

const _jl_fd_handlers = HashTable()
const _jl_read_handlers = HashTable()

add_fd_handler(fd::Int32, H) = (_jl_fd_handlers[fd]=H)
del_fd_handler(fd::Int32) = del(_jl_fd_handlers, fd)
function io_callback(io::AsyncStream)
_jl_fd_handlers[io](io)
end


function add_io_handler(io::AsyncStream, H)
(_jl_fd_handlers[fd]=H)
ccall(:jl_start_reading,Bool,(Ptr{Int32},Ptr{Void},Ptr{Int32}),io.handle,io.buf.ios,make_callback(io_callback,(),io))
end

function del_io_handler(io::AsyncStream)
ccall(:jl_stop_reading,Bool,(Ptr{Int32},),io.handle)
del(_jl_fd_handlers, fd)
end

#add_fd_handler(fd::Int32, H) = (_jl_fd_handlers[fd]=H)
#del_fd_handler(fd::Int32) = del(_jl_fd_handlers, fd)

const fgcm = createSingleAsyncWork(globalEventLoop(),make_callback(flush_gc_msgs,()));

function _jl_idle_cb()
if !isempty(Workqueue)
perform_work()
else
queue_async(fgcm)
end
end

function event_loop(isclient)
fdset = FDSet()
iserr, lasterr = false, ()

while true
add_idle_cb(globalEventLoop(),make_callback(_jl_idle_cb,()))
while false
try
if iserr
show(lasterr)
iserr, lasterr = false, ()
end
while true
del_all(fdset)
for (fd,_) = _jl_fd_handlers
add(fdset, fd)
end

bored = isempty(Workqueue)
if bored
flush_gc_msgs()
end
nselect = select_read(fdset, bored ? 10.0 : 0.0)
if nselect == 0
if !isempty(Workqueue)
perform_work()
end
else
for fd=int32(0):int32(fdset.nfds-1)
if has(fdset,fd)
h = _jl_fd_handlers[fd]
h(fd)
end
end
end
end
run_event_loop(global_event_loop);
catch e
if isa(e,DisconnectException)
# TODO: wake up tasks waiting for failed process
Expand Down
1 change: 1 addition & 0 deletions j/stage0.j
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ end

include("sysimg.j")

show("sys0.ji")
ccall(:jl_save_system_image, Void, (Ptr{Uint8},Ptr{Uint8}),
cstring("sys0.ji"), cstring("j/start_image.j"))

Expand Down
2 changes: 1 addition & 1 deletion j/start_image.j
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const stderr_stream = fdio(ccall(:jl_stderr, Int32, ()))
# Essential libraries
_jl_libpcre = dlopen("libpcre")
_jl_libgrisu = dlopen("libgrisu")
_jl_libm = dlopen("libm")
#_jl_libm = dlopen("libm")
_jl_libfdm = dlopen("libfdm")
_jl_libamos = dlopen("libamos")
_jl_librandom = dlopen("librandom"); _jl_librandom_init();
Expand Down
Loading

0 comments on commit 6d49a61

Please sign in to comment.