Skip to content

Commit

Permalink
Merge pull request #12264 from JuliaLang/amitm/channels_only
Browse files Browse the repository at this point in the history
Added inter-task communication channels
  • Loading branch information
amitmurthy committed Jul 28, 2015
2 parents 9390c71 + 21ca799 commit 9738275
Show file tree
Hide file tree
Showing 13 changed files with 325 additions and 70 deletions.
20 changes: 10 additions & 10 deletions base/REPL.jl
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ abstract AbstractREPL
answer_color(::AbstractREPL) = ""

type REPLBackend
repl_channel::RemoteRef
response_channel::RemoteRef
repl_channel::Channel
response_channel::Channel
in_eval::Bool
ans
backend_task::Task
Expand Down Expand Up @@ -75,7 +75,7 @@ function eval_user_input(ast::ANY, backend::REPLBackend)
end
end

function start_repl_backend(repl_channel::RemoteRef, response_channel::RemoteRef)
function start_repl_backend(repl_channel::Channel, response_channel::Channel)
backend = REPLBackend(repl_channel, response_channel, false, nothing)
backend.backend_task = @schedule begin
# include looks at this to determine the relative include path
Expand Down Expand Up @@ -154,13 +154,13 @@ end

# A reference to a backend
immutable REPLBackendRef
repl_channel::RemoteRef
response_channel::RemoteRef
repl_channel::Channel
response_channel::Channel
end

function run_repl(repl::AbstractREPL)
repl_channel = RemoteRef()
response_channel = RemoteRef()
repl_channel = Channel(1)
response_channel = Channel(1)
backend = start_repl_backend(repl_channel, response_channel)
run_frontend(repl, REPLBackendRef(repl_channel,response_channel))
backend
Expand Down Expand Up @@ -665,7 +665,7 @@ function setup_interface(repl::LineEditREPL; hascolor = repl.hascolor, extra_rep
#
# Usage:
#
# repl_channel,response_channel = RemoteRef(),RemoteRef()
# repl_channel,response_channel = Channel(),Channel()
# start_repl_backend(repl_channel, response_channel)
# setup_interface(REPLDisplay(t),repl_channel,response_channel)
#
Expand Down Expand Up @@ -894,8 +894,8 @@ input_color(r::StreamREPL) = r.input_color
function run_repl(stream::AsyncStream)
repl =
@async begin
repl_channel = RemoteRef()
response_channel = RemoteRef()
repl_channel = Channel(1)
response_channel = Channel(1)
start_repl_backend(repl_channel, response_channel)
StreamREPL_frontend(repl, repl_channel, response_channel)
end
Expand Down
137 changes: 137 additions & 0 deletions base/channels.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
# This file is a part of Julia. License is MIT: http://julialang.org/license

abstract AbstractChannel{T}

type Channel{T} <: AbstractChannel{T}
cond_take::Condition # waiting for data to become available
cond_put::Condition # waiting for a writeable slot
state::Symbol

data::Array{T,1}
szp1::Int # current channel size plus one
sz_max::Int # maximum size of channel
take_pos::Int # read position
put_pos::Int # write position

function Channel(sz)
sz_max = sz == typemax(Int) ? typemax(Int) - 1 : sz
szp1 = sz > 32 ? 33 : sz+1
new(Condition(), Condition(), :open,
Array(T, szp1), szp1, sz_max, 1, 1)
end
end

const DEF_CHANNEL_SZ=32

Channel() = Channel(DEF_CHANNEL_SZ)
Channel(sz::Int) = Channel{Any}(sz)

closed_exception() = InvalidStateException("Channel is closed.", :closed)
function close(c::Channel)
c.state = :closed
notify_error(c::Channel, closed_exception())
c
end
isopen(c::Channel) = (c.state == :open)

type InvalidStateException <: Exception
msg::AbstractString
state
end
InvalidStateException() = InvalidStateException("")
InvalidStateException(msg) = InvalidStateException(msg, 0)

function put!(c::Channel, v)
!isopen(c) && throw(closed_exception())
d = c.take_pos - c.put_pos
if (d == 1) || (d == -(c.szp1-1))
# grow the channel if possible
if (c.szp1 - 1) < c.sz_max
if ((c.szp1-1) * 2) > c.sz_max
c.szp1 = c.sz_max + 1
else
c.szp1 = ((c.szp1-1) * 2) + 1
end
newdata = Array(eltype(c), c.szp1)
if c.put_pos > c.take_pos
copy!(newdata, 1, c.data, c.take_pos, (c.put_pos - c.take_pos))
c.put_pos = c.put_pos - c.take_pos + 1
else
len_first_part = length(c.data) - c.take_pos + 1
copy!(newdata, 1, c.data, c.take_pos, len_first_part)
copy!(newdata, len_first_part+1, c.data, 1, c.put_pos-1)
c.put_pos = len_first_part + c.put_pos
end
c.take_pos = 1
c.data = newdata
else
wait(c.cond_put)
end
end

c.data[c.put_pos] = v
c.put_pos = (c.put_pos == c.szp1 ? 1 : c.put_pos + 1)
notify(c.cond_take, nothing, true, false) # notify all, since some of the waiters may be on a "fetch" call.
v
end

function fetch(c::Channel)
wait(c)
c.data[c.take_pos]
end

function take!(c::Channel)
!isopen(c) && !isready(c) && throw(closed_exception())
while !isready(c)
wait(c.cond_take)
end
v = c.data[c.take_pos]
c.take_pos = (c.take_pos == c.szp1 ? 1 : c.take_pos + 1)
notify(c.cond_put, nothing, false, false) # notify only one, since only one slot has become available for a put!.
v
end

isready(c::Channel) = (c.take_pos == c.put_pos ? false : true)

function wait(c::Channel)
while !isready(c)
wait(c.cond_take)
end
nothing
end

function notify_error(c::Channel, err)
notify_error(c.cond_take, err)
notify_error(c.cond_put, err)
end

eltype{T}(c::Channel{T}) = T

function length(c::Channel)
if c.put_pos >= c.take_pos
return c.put_pos - c.take_pos
else
return c.szp1 - c.take_pos + c.put_pos
end
end

size(c::Channel) = c.sz_max

show(io::IO, c::Channel) = print(io, "$(typeof(c))(sz_max:$(size(c)),sz_curr:$(length(c)))")

start{T}(c::Channel{T}) = Ref{Nullable{T}}(Nullable{T}())
function done(c::Channel, state::Ref)
try
# we are waiting either for more data or channel to be closed
state.x = take!(c)
return false
catch e
if isa(e, InvalidStateException) && e.state==:closed
return true
else
rethrow(e)
end
end
end
next{T}(c::Channel{T}, state) = (get(state.x), Ref{Nullable{T}}(Nullable{T}()))

2 changes: 2 additions & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export
BufferStream,
CartesianIndex,
CartesianRange,
Channel,
Cmd,
Colon,
Complex,
Expand Down Expand Up @@ -157,6 +158,7 @@ export
DimensionMismatch,
EOFError,
ErrorException,
InvalidStateException,
KeyError,
LoadError,
MethodError,
Expand Down
6 changes: 3 additions & 3 deletions base/precompile.jl
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ precompile(Base.ProcessGroup, (Int, Array{Any,1}, Array{Any,1}))
precompile(Base.REPL.(:(==)), (Base.REPL.REPLDisplay{Base.REPL.LineEditREPL}, Base.REPL.REPLDisplay{Base.REPL.LineEditREPL}))
precompile(Base.REPL.LineEditREPL, (Base.Terminals.TTYTerminal, Bool, ASCIIString, ASCIIString, ASCIIString, ASCIIString, ASCIIString, Bool, Bool, Bool, Bool))
precompile(Base.REPL.LineEditREPL, (Base.Terminals.TTYTerminal,))
precompile(Base.REPL.REPLBackendRef, (RemoteRef, RemoteRef))
precompile(Base.REPL.REPLBackendRef, (Channel, Channel))
precompile(Base.REPL.REPLDisplay, (Base.REPL.BasicREPL,))
precompile(Base.REPL.REPLDisplay, (Base.REPL.LineEditREPL,))
precompile(Base.REPL.add_history, (Base.REPL.REPLHistoryProvider, Base.LineEdit.PromptState))
Expand All @@ -135,9 +135,9 @@ precompile(Base.REPL.respond, (Function, Base.REPL.LineEditREPL, Base.LineEdit.P
precompile(Base.REPL.return_callback, (Base.LineEdit.PromptState,))
precompile(Base.REPL.run_repl, (Base.REPL.LineEditREPL,))
precompile(Base.REPL.send_to_backend, (Expr, Base.REPL.REPLBackendRef))
precompile(Base.REPL.send_to_backend, (Expr, RemoteRef, RemoteRef))
precompile(Base.REPL.send_to_backend, (Expr, Channel, Channel))
precompile(Base.REPL.send_to_backend, (Symbol, Base.REPL.REPLBackendRef))
precompile(Base.REPL.start_repl_backend, (RemoteRef, RemoteRef))
precompile(Base.REPL.start_repl_backend, (Channel, Channel))
precompile(Base.REPLCompletions.complete_methods, (ASCIIString,))
precompile(Base.REPLCompletions.complete_symbol, (ASCIIString, Function))
precompile(Base.REPLCompletions.completions, (ASCIIString, Int))
Expand Down
1 change: 1 addition & 0 deletions base/sysimg.jl
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ importall .Enums
# concurrency and parallelism
include("serialize.jl")
importall .Serializer
include("channels.jl")
include("multi.jl")
include("managers.jl")

Expand Down
3 changes: 2 additions & 1 deletion base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ function wait(c::Condition)
end
end

function notify(c::Condition, arg::ANY=nothing; all=true, error=false)
notify(c::Condition, arg::ANY=nothing; all=true, error=false) = notify(c, arg, all, error)
function notify(c::Condition, arg, all, error)
if all
for t in c.waitq
schedule(t, arg, error=error)
Expand Down
84 changes: 43 additions & 41 deletions doc/manual/control-flow.rst
Original file line number Diff line number Diff line change
Expand Up @@ -600,47 +600,49 @@ Built-in :exc:`Exception`\ s
:exc:`Exception`\ s are thrown when an unexpected condition has occurred. The
built-in :exc:`Exception`\ s listed below all interrupt the normal flow of control.

+---------------------------+
| :exc:`Exception` |
+===========================+
| :exc:`ArgumentError` |
+---------------------------+
| :exc:`BoundsError` |
+---------------------------+
| :exc:`DivideError` |
+---------------------------+
| :exc:`DomainError` |
+---------------------------+
| :exc:`EOFError` |
+---------------------------+
| :exc:`ErrorException` |
+---------------------------+
| :exc:`InexactError` |
+---------------------------+
| :exc:`InterruptException` |
+---------------------------+
| :exc:`KeyError` |
+---------------------------+
| :exc:`LoadError` |
+---------------------------+
| :exc:`OutOfMemoryError` |
+---------------------------+
| :exc:`ReadOnlyMemoryError`|
+---------------------------+
| :exc:`MethodError` |
+---------------------------+
| :exc:`OverflowError` |
+---------------------------+
| :exc:`ParseError` |
+---------------------------+
| :exc:`SystemError` |
+---------------------------+
| :exc:`TypeError` |
+---------------------------+
| :exc:`UndefRefError` |
+---------------------------+
| :exc:`UndefVarError` |
+---------------------------+
+------------------------------+
| :exc:`Exception` |
+==============================+
| :exc:`ArgumentError` |
+------------------------------+
| :exc:`BoundsError` |
+------------------------------+
| :exc:`DivideError` |
+------------------------------+
| :exc:`DomainError` |
+------------------------------+
| :exc:`EOFError` |
+------------------------------+
| :exc:`ErrorException` |
+------------------------------+
| :exc:`InexactError` |
+------------------------------+
| :exc:`InterruptException` |
+------------------------------+
| :exc:`InvalidStateException` |
+------------------------------+
| :exc:`KeyError` |
+------------------------------+
| :exc:`LoadError` |
+------------------------------+
| :exc:`OutOfMemoryError` |
+------------------------------+
| :exc:`ReadOnlyMemoryError` |
+------------------------------+
| :exc:`MethodError` |
+------------------------------+
| :exc:`OverflowError` |
+------------------------------+
| :exc:`ParseError` |
+------------------------------+
| :exc:`SystemError` |
+------------------------------+
| :exc:`TypeError` |
+------------------------------+
| :exc:`UndefRefError` |
+------------------------------+
| :exc:`UndefVarError` |
+------------------------------+


For example, the :func:`sqrt` function throws a :exc:`DomainError` if applied to a
Expand Down
18 changes: 18 additions & 0 deletions doc/manual/parallel-computing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,24 @@ preemptively. This means context switches only occur at well-defined
points: in this case, when :func:`remotecall_fetch` is called.


Channels
--------
Channels provide for a fast means of inter-task communication. A
``Channel(T::Type, n::Int)`` is a shared queue of maximum length ``n``
holding objects of type ``T``. Multiple readers can read off the channel
via ``fetch`` and ``take!``. Multiple writers can add to the channel via
``put!``. ``isready`` tests for the prescence of any object in
the channel, while ``wait`` waits for an object to become available.
``close`` closes a Channel. On a closed channel, ``put!`` will fail,
while ``take!`` and ``fetch`` successfully return any existing values
till it is emptied.

A Channel can be used as an iterable object in a ``for`` loop, in which
case the loop runs as long as the channel has data or is open. The loop
variable takes on all values added to the channel. An empty, closed channel
causes the ``for`` loop to terminate.


Shared Arrays (Experimental)
-----------------------------------------------

Expand Down
Loading

0 comments on commit 9738275

Please sign in to comment.