-
Notifications
You must be signed in to change notification settings - Fork 13
/
future.rb
111 lines (95 loc) · 2.84 KB
/
future.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
require 'thread'
require 'delegate'
require 'forwardable'
module Futuroscope
# A Future is an object that gets initialized with a block and will behave
# exactly like the block's result, but being able to "borrow" its result from
# the future. That is, will block when the result is not ready until it is,
# and will return it instantly if the thread's execution already finished.
#
class Future < ::Delegator
extend ::Forwardable
attr_reader :worker_thread
def worker_thread=(thread)
@mutex.synchronize { @worker_thread = thread }
end
# Initializes a future with a block and starts its execution.
#
# Examples:
#
# future = Futuroscope::Future.new { sleep(1); :edballs }
# sleep(1)
# puts future
# => :edballs
# # This will return in 1 second and not 2 if the execution wasn't
# # deferred to a thread.
#
# pool - A pool where all the futures will be scheduled.
# block - A block that will be run in the background.
#
# Returns a Future
def initialize(pool = ::Futuroscope.default_pool, &block)
@worker_finished = ConditionVariable.new
@pool = pool
@block = block
@mutex = ::Mutex.new
@worker_thread = nil
@pool.push self
end
# Semipublic: Forces this future to be run.
def resolve!
@mutex.synchronize do
begin
Thread.handle_interrupt(DeadlockError => :immediate) do
@resolved_future = { value: @block.call }
end
rescue ::Exception => e
@resolved_future = { exception: e }
ensure
@pool.done_with self
@worker_thread = nil
@worker_finished.broadcast
end
end
end
# Semipublic: Returns the future's value. Will wait for the future to be
# completed or return its value otherwise. Can be called multiple times.
#
# Returns the Future's block execution result.
def __getobj__
resolved_future_value_or_raise[:value]
end
def __setobj__ obj
@resolved_future = { value: obj }
end
def marshal_dump
resolved_future_value_or_raise
end
def marshal_load value
@resolved_future = value
end
def resolved?
instance_variable_defined? :@resolved_future
end
def_delegators :__getobj__, :class, :kind_of?, :is_a?, :nil?
alias_method :future_value, :__getobj__
private
def resolved_future_value_or_raise
resolved_future.tap do |resolved|
::Kernel.raise resolved[:exception] if resolved.has_key?(:exception)
end
end
def resolved_future
unless resolved?
@pool.depend self
wait_until_resolved
end
@resolved_future
end
def wait_until_resolved
@mutex.synchronize do
@worker_finished.wait(@mutex) unless resolved?
end unless resolved?
end
end
end