-
Notifications
You must be signed in to change notification settings - Fork 0
/
executor.rb
95 lines (71 loc) · 2.13 KB
/
executor.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
module MultiRedis
class Executor
def initialize options = {}
@operations = []
@redis = options[:redis]
end
def add operation, *args
@operations << { op: operation, args: args }
end
def execute options = {}
redis = @redis || MultiRedis.redis
total = 0
execution = Array.new @operations.length do |i|
total += @operations[i][:op].steps.length
OperationExecution.new @operations[i][:op], @operations[i][:args], redis
end
while execution.any?{ |oe| !oe.done? } && total >= 1
total -= 1 # safeguard against infinite loop
execution.each do |oe|
oe.execute_current_step while oe.next? :call
end
TYPES.each do |type|
if execution.any?{ |oe| oe.next? type }
shared_results = [] # TODO: use shared context object
redis.send type do
execution.each do |oe|
oe.execute_current_step shared_results if oe.next? type
end
end
execution.each{ |oe| oe.resolve_futures! }
end
end
end
execution.each{ |oe| oe.resolve_operation_future! }
execution.collect!{ |oe| oe.final_results }
end
private
TYPES = [ :pipelined, :multi ]
class OperationExecution
attr_reader :final_results
def initialize operation, args, redis
@operation = operation
@args = args
@context = Context.new redis
@steps = operation.steps
@current_index = 0
end
def done?
!current_step
end
def next? type
current_step && current_step.type == type
end
def execute_current_step shared_results = nil
results = @context.execute shared_results, current_step, *@args
@current_index += 1
@final_results = results
end
def resolve_futures!
@context.resolve_futures!
end
def resolve_operation_future!
@operation.future.value = @final_results if @operation.future
end
private
def current_step
@steps[@current_index]
end
end
end
end