forked from JuliaLang/julia
-
Notifications
You must be signed in to change notification settings - Fork 0
/
channels.jl
129 lines (110 loc) · 3.66 KB
/
channels.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# This file is a part of Julia. License is MIT: http://julialang.org/license
abstract AbstractChannel{T}
type Channel{T} <: AbstractChannel{T}
cond_take::Condition # waiting for data to become available
cond_put::Condition # waiting for a writeable slot
state::Symbol
data::Array{T,1}
szp1::Int # current channel size plus one
sz_max::Int # maximum size of channel
take_pos::Int # read position
put_pos::Int # write position
function Channel(sz)
sz_max = sz == typemax(Int) ? typemax(Int) - 1 : sz
szp1 = sz > 32 ? 33 : sz+1
new(Condition(), Condition(), :open,
Array(T, szp1), szp1, sz_max, 1, 1)
end
end
const DEF_CHANNEL_SZ=32
Channel(sz::Int = DEF_CHANNEL_SZ) = Channel{Any}(sz)
closed_exception() = InvalidStateException("Channel is closed.", :closed)
function close(c::Channel)
c.state = :closed
notify_error(c::Channel, closed_exception())
c
end
isopen(c::Channel) = (c.state == :open)
type InvalidStateException <: Exception
msg::AbstractString
state::Symbol
end
function put!(c::Channel, v)
!isopen(c) && throw(closed_exception())
d = c.take_pos - c.put_pos
if (d == 1) || (d == -(c.szp1-1))
# grow the channel if possible
if (c.szp1 - 1) < c.sz_max
if ((c.szp1-1) * 2) > c.sz_max
c.szp1 = c.sz_max + 1
else
c.szp1 = ((c.szp1-1) * 2) + 1
end
newdata = Array(eltype(c), c.szp1)
if c.put_pos > c.take_pos
copy!(newdata, 1, c.data, c.take_pos, (c.put_pos - c.take_pos))
c.put_pos = c.put_pos - c.take_pos + 1
else
len_first_part = length(c.data) - c.take_pos + 1
copy!(newdata, 1, c.data, c.take_pos, len_first_part)
copy!(newdata, len_first_part+1, c.data, 1, c.put_pos-1)
c.put_pos = len_first_part + c.put_pos
end
c.take_pos = 1
c.data = newdata
else
wait(c.cond_put)
end
end
c.data[c.put_pos] = v
c.put_pos = (c.put_pos == c.szp1 ? 1 : c.put_pos + 1)
notify(c.cond_take, nothing, true, false) # notify all, since some of the waiters may be on a "fetch" call.
v
end
function fetch(c::Channel)
wait(c)
c.data[c.take_pos]
end
function take!(c::Channel)
!isopen(c) && !isready(c) && throw(closed_exception())
wait(c)
v = c.data[c.take_pos]
c.take_pos = (c.take_pos == c.szp1 ? 1 : c.take_pos + 1)
notify(c.cond_put, nothing, false, false) # notify only one, since only one slot has become available for a put!.
v
end
isready(c::Channel) = (c.take_pos == c.put_pos ? false : true)
function wait(c::Channel)
while !isready(c)
wait(c.cond_take)
end
nothing
end
function notify_error(c::Channel, err)
notify_error(c.cond_take, err)
notify_error(c.cond_put, err)
end
eltype{T}(::Type{Channel{T}}) = T
function n_avail(c::Channel)
if c.put_pos >= c.take_pos
return c.put_pos - c.take_pos
else
return c.szp1 - c.take_pos + c.put_pos
end
end
show(io::IO, c::Channel) = print(io, "$(typeof(c))(sz_max:$(c.sz_max),sz_curr:$(n_avail(c)))")
start{T}(c::Channel{T}) = Ref{Nullable{T}}()
function done(c::Channel, state::Ref)
try
# we are waiting either for more data or channel to be closed
state[] = take!(c)
return false
catch e
if isa(e, InvalidStateException) && e.state==:closed
return true
else
rethrow(e)
end
end
end
next{T}(c::Channel{T}, state) = (v=get(state[]); state[]=nothing; (v, state))