/
future.cr
153 lines (128 loc) · 3 KB
/
future.cr
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# :nodoc:
class Concurrent::Future(R)
enum State
Idle
Delayed
Running
Completed
Canceled
end
@value : R?
@error : Exception?
@delay : Float64
def initialize(run_immediately = true, delay = 0.0, &@block : -> R)
@state = State::Idle
@value = nil
@error = nil
@channel = Channel(Nil).new
@delay = delay.to_f
@cancel_msg = nil
spawn_compute if run_immediately
end
def get
wait
value_or_raise
end
def success?
completed? && !@error
end
def failure?
completed? && @error
end
def canceled?
@state == State::Canceled
end
def completed?
@state == State::Completed
end
def running?
@state == State::Running
end
def delayed?
@state == State::Delayed
end
def idle?
@state == State::Idle
end
def cancel(msg = "Future canceled, you reached the [End of Time]")
return if @state >= State::Completed
@state = State::Canceled
@cancel_msg = msg
@channel.close
nil
end
private def compute
return if @state >= State::Delayed
run_compute
end
private def spawn_compute
return if @state >= State::Delayed
@state = @delay > 0 ? State::Delayed : State::Running
spawn { run_compute }
end
private def run_compute
delay = @delay
if delay > 0
sleep delay
return if @state >= State::Canceled
@state = State::Running
end
begin
@value = @block.call
rescue ex
@error = ex
ensure
@channel.close
@state = State::Completed
end
end
private def wait
return if @state >= State::Completed
compute
@channel.receive?
end
private def value_or_raise
raise Concurrent::CanceledError.new(@cancel_msg) if @state == State::Canceled
error = @error
value = @value
if !error.nil?
raise error
elsif value.is_a?(R)
value
else
raise "Compiler bug"
end
end
end
# Spawns a `Fiber` to compute *&block* in the background after *delay* has elapsed.
# Access to get is synchronized between fibers. *&block* is only called once.
# May be canceled before *&block* is called by calling `cancel`.
# ```
# d = delay(1) { Process.kill(Signal::KILL, Process.pid) }
# # ... long operations ...
# d.cancel
# ```
def delay(delay, &block : -> _)
Concurrent::Future.new delay: delay, &block
end
# Spawns a `Fiber` to compute *&block* in the background.
# Access to get is synchronized between fibers. *&block* is only called once.
# ```
# f = future { `echo hello` }
# # ... other actions ...
# f.get # => "hello\n"
# ```
def future(&exp : -> _)
Concurrent::Future.new &exp
end
# Conditionally spawns a `Fiber` to run *&block* in the background.
# Access to get is synchronized between fibers. *&block* is only called once.
# *&block* doesn't run by default, only when `get` is called.
# ```
# l = lazy { expensive_computation }
# spawn { maybe_use_computation(l) }
# spawn { maybe_use_computation(l) }
# ```
def lazy(&block : -> _)
Concurrent::Future.new run_immediately: false, &block
end