/
concurrent.cr
217 lines (205 loc) · 4.71 KB
/
concurrent.cr
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
require "fiber"
require "channel"
require "crystal/scheduler"
require "./concurrent/*"
# Blocks the current fiber for the specified number of seconds.
#
# While this fiber is waiting this time, other ready-to-execute
# fibers might start their execution.
def sleep(seconds : Number)
if seconds < 0
raise ArgumentError.new "Sleep seconds must be positive"
end
Crystal::Scheduler.sleep(seconds.seconds)
end
# Blocks the current Fiber for the specified time span.
#
# While this fiber is waiting this time, other ready-to-execute
# fibers might start their execution.
def sleep(time : Time::Span)
Crystal::Scheduler.sleep(time)
end
# Blocks the current fiber forever.
#
# Meanwhile, other ready-to-execute fibers might start their execution.
def sleep
Crystal::Scheduler.reschedule
end
# Spawns a new fiber.
#
# The newly created fiber doesn't run as soon as spawned.
#
# Example:
# ```
# # Write "1" every 1 second and "2" every 2 seconds for 6 seconds.
#
# ch = Channel(Nil).new
#
# spawn do
# 6.times do
# sleep 1
# puts 1
# end
# ch.send(nil)
# end
#
# spawn do
# 3.times do
# sleep 2
# puts 2
# end
# ch.send(nil)
# end
#
# 2.times { ch.receive }
# ```
def spawn(*, name : String? = nil, same_thread = false, &block)
fiber = Fiber.new(name, &block)
if same_thread
fiber.@current_thread.set(Thread.current)
end
Crystal::Scheduler.enqueue fiber
fiber
end
# Spawns a fiber by first creating a `Proc`, passing the *call*'s
# expressions to it, and letting the `Proc` finally invoke the *call*.
#
# Compare this:
#
# ```
# i = 0
# while i < 5
# spawn { print(i) }
# i += 1
# end
# Fiber.yield
# # Output: 55555
# ```
#
# To this:
#
# ```
# i = 0
# while i < 5
# spawn print(i)
# i += 1
# end
# Fiber.yield
# # Output: 01234
# ```
#
# This is because in the first case all spawned fibers refer to
# the same local variable, while in the second example copies of
# *i* are passed to a `Proc` that eventually invokes the call.
macro spawn(call, *, name = nil, same_thread = false, &block)
{% if block %}
{% raise "`spawn(call)` can't be invoked with a block, did you mean `spawn(name: ...) { ... }`?" %}
{% end %}
{% if call.is_a?(Call) %}
->(
{% for arg, i in call.args %}
__arg{{i}} : typeof({{arg}}),
{% end %}
{% if call.named_args %}
{% for narg, i in call.named_args %}
__narg{{i}} : typeof({{narg.value}}),
{% end %}
{% end %}
) {
spawn(name: {{name}}, same_thread: {{same_thread}}) do
{% if call.receiver %}{{ call.receiver }}.{% end %}{{call.name}}(
{% for arg, i in call.args %}
__arg{{i}},
{% end %}
{% if call.named_args %}
{% for narg, i in call.named_args %}
{{narg.name}}: __narg{{i}},
{% end %}
{% end %}
)
end
{% if call.named_args %}
}.call({{*call.args}}, {{*call.named_args.map(&.value)}})
{% else %}
}.call({{*call.args}})
{% end %}
{% else %}
spawn do
{{call}}
end
{% end %}
end
# Wraps around exceptions re-raised from concurrent calls.
# The original exception can be accessed via `#cause`.
class ConcurrentExecutionException < Exception
end
# Runs the commands passed as arguments concurrently (in Fibers) and waits
# for them to finish.
#
# ```
# def say(word)
# puts word
# end
#
# # Will print out the three words concurrently
# parallel(
# say("concurrency"),
# say("is"),
# say("easy")
# )
# ```
#
# Can also be used to conveniently collect the return values of the
# concurrent operations.
#
# ```
# def concurrent_job(word)
# word
# end
#
# a, b, c =
# parallel(
# concurrent_job("concurrency"),
# concurrent_job("is"),
# concurrent_job("easy")
# )
#
# a # => "concurrency"
# b # => "is"
# c # => "easy"
# ```
#
# Due to the concurrent nature of this macro, it is highly recommended
# to handle any exceptions within the concurrent calls. Unhandled
# exceptions raised within the concurrent operations will be re-raised
# inside the parent fiber as `ConcurrentExecutionException`, with the
# `cause` attribute set to the original exception.
macro parallel(*jobs)
%channel = Channel(Exception | Nil).new
{% for job, i in jobs %}
%ret{i} = uninitialized typeof({{job}})
spawn do
begin
%ret{i} = {{job}}
rescue e : Exception
%channel.send e
else
%channel.send nil
end
end
{% end %}
{{ jobs.size }}.times do
%value = %channel.receive
if %value.is_a?(Exception)
raise ConcurrentExecutionException.new(
"An unhandled error occurred inside a `parallel` call",
cause: %value
)
end
end
{
{% for job, i in jobs %}
%ret{i},
{% end %}
}
end