Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added inter-task communication channels #12264

Merged
merged 1 commit into from
Jul 28, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions base/REPL.jl
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ abstract AbstractREPL
answer_color(::AbstractREPL) = ""

type REPLBackend
repl_channel::RemoteRef
response_channel::RemoteRef
repl_channel::Channel
response_channel::Channel
in_eval::Bool
ans
backend_task::Task
Expand Down Expand Up @@ -75,7 +75,7 @@ function eval_user_input(ast::ANY, backend::REPLBackend)
end
end

function start_repl_backend(repl_channel::RemoteRef, response_channel::RemoteRef)
function start_repl_backend(repl_channel::Channel, response_channel::Channel)
backend = REPLBackend(repl_channel, response_channel, false, nothing)
backend.backend_task = @schedule begin
# include looks at this to determine the relative include path
Expand Down Expand Up @@ -154,13 +154,13 @@ end

# A reference to a backend
immutable REPLBackendRef
repl_channel::RemoteRef
response_channel::RemoteRef
repl_channel::Channel
response_channel::Channel
end

function run_repl(repl::AbstractREPL)
repl_channel = RemoteRef()
response_channel = RemoteRef()
repl_channel = Channel(1)
response_channel = Channel(1)
backend = start_repl_backend(repl_channel, response_channel)
run_frontend(repl, REPLBackendRef(repl_channel,response_channel))
backend
Expand Down Expand Up @@ -665,7 +665,7 @@ function setup_interface(repl::LineEditREPL; hascolor = repl.hascolor, extra_rep
#
# Usage:
#
# repl_channel,response_channel = RemoteRef(),RemoteRef()
# repl_channel,response_channel = Channel(),Channel()
# start_repl_backend(repl_channel, response_channel)
# setup_interface(REPLDisplay(t),repl_channel,response_channel)
#
Expand Down Expand Up @@ -894,8 +894,8 @@ input_color(r::StreamREPL) = r.input_color
function run_repl(stream::AsyncStream)
repl =
@async begin
repl_channel = RemoteRef()
response_channel = RemoteRef()
repl_channel = Channel(1)
response_channel = Channel(1)
start_repl_backend(repl_channel, response_channel)
StreamREPL_frontend(repl, repl_channel, response_channel)
end
Expand Down
137 changes: 137 additions & 0 deletions base/channels.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
# This file is a part of Julia. License is MIT: http://julialang.org/license

abstract AbstractChannel{T}

type Channel{T} <: AbstractChannel{T}
cond_take::Condition # waiting for data to become available
cond_put::Condition # waiting for a writeable slot
state::Symbol

data::Array{T,1}
szp1::Int # current channel size plus one
sz_max::Int # maximum size of channel
take_pos::Int # read position
put_pos::Int # write position

function Channel(sz)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should throw an error rather the silently truncate. I feel the user should also be able. To define a Chanel length greater than 32.

I guess you could also throw a nicer error checking if the Chanel size is greater than or equal to 1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can define a channel size greater than 32. data is a circular buffer that is managed via szp1, take_pos and put_pos. The initial allocation is 32, It is grown on demand to a maximum size of sz_max. Even a channel of typemax(Int) will only allocate storage for 32 objects initially.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A further optimization that should be done is halving data once the current number of objects is less than 50% of data length

sz_max = sz == typemax(Int) ? typemax(Int) - 1 : sz
szp1 = sz > 32 ? 33 : sz+1
new(Condition(), Condition(), :open,
Array(T, szp1), szp1, sz_max, 1, 1)
end
end

const DEF_CHANNEL_SZ=32

Channel() = Channel(DEF_CHANNEL_SZ)
Channel(sz::Int) = Channel{Any}(sz)

closed_exception() = InvalidStateException("Channel is closed.", :closed)
function close(c::Channel)
c.state = :closed
notify_error(c::Channel, closed_exception())
c
end
isopen(c::Channel) = (c.state == :open)

type InvalidStateException <: Exception
msg::AbstractString
state
end
InvalidStateException() = InvalidStateException("")
InvalidStateException(msg) = InvalidStateException(msg, 0)

function put!(c::Channel, v)
!isopen(c) && throw(closed_exception())
d = c.take_pos - c.put_pos
if (d == 1) || (d == -(c.szp1-1))
# grow the channel if possible
if (c.szp1 - 1) < c.sz_max
if ((c.szp1-1) * 2) > c.sz_max
c.szp1 = c.sz_max + 1
else
c.szp1 = ((c.szp1-1) * 2) + 1
end
newdata = Array(eltype(c), c.szp1)
if c.put_pos > c.take_pos
copy!(newdata, 1, c.data, c.take_pos, (c.put_pos - c.take_pos))
c.put_pos = c.put_pos - c.take_pos + 1
else
len_first_part = length(c.data) - c.take_pos + 1
copy!(newdata, 1, c.data, c.take_pos, len_first_part)
copy!(newdata, len_first_part+1, c.data, 1, c.put_pos-1)
c.put_pos = len_first_part + c.put_pos
end
c.take_pos = 1
c.data = newdata
else
wait(c.cond_put)
end
end

c.data[c.put_pos] = v
c.put_pos = (c.put_pos == c.szp1 ? 1 : c.put_pos + 1)
notify(c.cond_take, nothing, true, false) # notify all, since some of the waiters may be on a "fetch" call.
v
end

function fetch(c::Channel)
wait(c)
c.data[c.take_pos]
end

function take!(c::Channel)
!isopen(c) && !isready(c) && throw(closed_exception())
while !isready(c)
wait(c.cond_take)
end
v = c.data[c.take_pos]
c.take_pos = (c.take_pos == c.szp1 ? 1 : c.take_pos + 1)
notify(c.cond_put, nothing, false, false) # notify only one, since only one slot has become available for a put!.
v
end

isready(c::Channel) = (c.take_pos == c.put_pos ? false : true)

function wait(c::Channel)
while !isready(c)
wait(c.cond_take)
end
nothing
end

function notify_error(c::Channel, err)
notify_error(c.cond_take, err)
notify_error(c.cond_put, err)
end

eltype{T}(c::Channel{T}) = T

function length(c::Channel)
if c.put_pos >= c.take_pos
return c.put_pos - c.take_pos
else
return c.szp1 - c.take_pos + c.put_pos
end
end

size(c::Channel) = c.sz_max

show(io::IO, c::Channel) = print(io, "$(typeof(c))(sz_max:$(size(c)),sz_curr:$(length(c)))")

start{T}(c::Channel{T}) = Ref{Nullable{T}}(Nullable{T}())
function done(c::Channel, state::Ref)
try
# we are waiting either for more data or channel to be closed
state.x = take!(c)
return false
catch e
if isa(e, InvalidStateException) && e.state==:closed
return true
else
rethrow(e)
end
end
end
next{T}(c::Channel{T}, state) = (get(state.x), Ref{Nullable{T}}(Nullable{T}()))

2 changes: 2 additions & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export
BufferStream,
CartesianIndex,
CartesianRange,
Channel,
Cmd,
Colon,
Complex,
Expand Down Expand Up @@ -157,6 +158,7 @@ export
DimensionMismatch,
EOFError,
ErrorException,
InvalidStateException,
KeyError,
LoadError,
MethodError,
Expand Down
6 changes: 3 additions & 3 deletions base/precompile.jl
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ precompile(Base.ProcessGroup, (Int, Array{Any,1}, Array{Any,1}))
precompile(Base.REPL.(:(==)), (Base.REPL.REPLDisplay{Base.REPL.LineEditREPL}, Base.REPL.REPLDisplay{Base.REPL.LineEditREPL}))
precompile(Base.REPL.LineEditREPL, (Base.Terminals.TTYTerminal, Bool, ASCIIString, ASCIIString, ASCIIString, ASCIIString, ASCIIString, Bool, Bool, Bool, Bool))
precompile(Base.REPL.LineEditREPL, (Base.Terminals.TTYTerminal,))
precompile(Base.REPL.REPLBackendRef, (RemoteRef, RemoteRef))
precompile(Base.REPL.REPLBackendRef, (Channel, Channel))
precompile(Base.REPL.REPLDisplay, (Base.REPL.BasicREPL,))
precompile(Base.REPL.REPLDisplay, (Base.REPL.LineEditREPL,))
precompile(Base.REPL.add_history, (Base.REPL.REPLHistoryProvider, Base.LineEdit.PromptState))
Expand All @@ -135,9 +135,9 @@ precompile(Base.REPL.respond, (Function, Base.REPL.LineEditREPL, Base.LineEdit.P
precompile(Base.REPL.return_callback, (Base.LineEdit.PromptState,))
precompile(Base.REPL.run_repl, (Base.REPL.LineEditREPL,))
precompile(Base.REPL.send_to_backend, (Expr, Base.REPL.REPLBackendRef))
precompile(Base.REPL.send_to_backend, (Expr, RemoteRef, RemoteRef))
precompile(Base.REPL.send_to_backend, (Expr, Channel, Channel))
precompile(Base.REPL.send_to_backend, (Symbol, Base.REPL.REPLBackendRef))
precompile(Base.REPL.start_repl_backend, (RemoteRef, RemoteRef))
precompile(Base.REPL.start_repl_backend, (Channel, Channel))
precompile(Base.REPLCompletions.complete_methods, (ASCIIString,))
precompile(Base.REPLCompletions.complete_symbol, (ASCIIString, Function))
precompile(Base.REPLCompletions.completions, (ASCIIString, Int))
Expand Down
1 change: 1 addition & 0 deletions base/sysimg.jl
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ importall .Enums
# concurrency and parallelism
include("serialize.jl")
importall .Serializer
include("channels.jl")
include("multi.jl")
include("managers.jl")

Expand Down
3 changes: 2 additions & 1 deletion base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ function wait(c::Condition)
end
end

function notify(c::Condition, arg::ANY=nothing; all=true, error=false)
notify(c::Condition, arg::ANY=nothing; all=true, error=false) = notify(c, arg, all, error)
function notify(c::Condition, arg, all, error)
if all
for t in c.waitq
schedule(t, arg, error=error)
Expand Down
84 changes: 43 additions & 41 deletions doc/manual/control-flow.rst
Original file line number Diff line number Diff line change
Expand Up @@ -600,47 +600,49 @@ Built-in :exc:`Exception`\ s
:exc:`Exception`\ s are thrown when an unexpected condition has occurred. The
built-in :exc:`Exception`\ s listed below all interrupt the normal flow of control.

+---------------------------+
| :exc:`Exception` |
+===========================+
| :exc:`ArgumentError` |
+---------------------------+
| :exc:`BoundsError` |
+---------------------------+
| :exc:`DivideError` |
+---------------------------+
| :exc:`DomainError` |
+---------------------------+
| :exc:`EOFError` |
+---------------------------+
| :exc:`ErrorException` |
+---------------------------+
| :exc:`InexactError` |
+---------------------------+
| :exc:`InterruptException` |
+---------------------------+
| :exc:`KeyError` |
+---------------------------+
| :exc:`LoadError` |
+---------------------------+
| :exc:`OutOfMemoryError` |
+---------------------------+
| :exc:`ReadOnlyMemoryError`|
+---------------------------+
| :exc:`MethodError` |
+---------------------------+
| :exc:`OverflowError` |
+---------------------------+
| :exc:`ParseError` |
+---------------------------+
| :exc:`SystemError` |
+---------------------------+
| :exc:`TypeError` |
+---------------------------+
| :exc:`UndefRefError` |
+---------------------------+
| :exc:`UndefVarError` |
+---------------------------+
+------------------------------+
| :exc:`Exception` |
+==============================+
| :exc:`ArgumentError` |
+------------------------------+
| :exc:`BoundsError` |
+------------------------------+
| :exc:`DivideError` |
+------------------------------+
| :exc:`DomainError` |
+------------------------------+
| :exc:`EOFError` |
+------------------------------+
| :exc:`ErrorException` |
+------------------------------+
| :exc:`InexactError` |
+------------------------------+
| :exc:`InterruptException` |
+------------------------------+
| :exc:`InvalidStateException` |
+------------------------------+
| :exc:`KeyError` |
+------------------------------+
| :exc:`LoadError` |
+------------------------------+
| :exc:`OutOfMemoryError` |
+------------------------------+
| :exc:`ReadOnlyMemoryError` |
+------------------------------+
| :exc:`MethodError` |
+------------------------------+
| :exc:`OverflowError` |
+------------------------------+
| :exc:`ParseError` |
+------------------------------+
| :exc:`SystemError` |
+------------------------------+
| :exc:`TypeError` |
+------------------------------+
| :exc:`UndefRefError` |
+------------------------------+
| :exc:`UndefVarError` |
+------------------------------+


For example, the :func:`sqrt` function throws a :exc:`DomainError` if applied to a
Expand Down
18 changes: 18 additions & 0 deletions doc/manual/parallel-computing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,24 @@ preemptively. This means context switches only occur at well-defined
points: in this case, when :func:`remotecall_fetch` is called.


Channels
--------
Channels provide for a fast means of inter-task communication. A
``Channel(T::Type, n::Int)`` is a shared queue of maximum length ``n``
holding objects of type ``T``. Multiple readers can read off the channel
via ``fetch`` and ``take!``. Multiple writers can add to the channel via
``put!``. ``isready`` tests for the prescence of any object in
the channel, while ``wait`` waits for an object to become available.
``close`` closes a Channel. On a closed channel, ``put!`` will fail,
while ``take!`` and ``fetch`` successfully return any existing values
till it is emptied.

A Channel can be used as an iterable object in a ``for`` loop, in which
case the loop runs as long as the channel has data or is open. The loop
variable takes on all values added to the channel. An empty, closed channel
causes the ``for`` loop to terminate.


Shared Arrays (Experimental)
-----------------------------------------------

Expand Down
Loading