forked from redis/redis-rb
-
Notifications
You must be signed in to change notification settings - Fork 10
/
pipeline.rb
106 lines (86 loc) · 2.19 KB
/
pipeline.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
class Redis
unless defined?(::BasicObject)
class BasicObject
instance_methods.each { |meth| undef_method(meth) unless meth =~ /\A(__|instance_eval)/ }
end
end
class Pipeline
attr :futures
def initialize
@without_reconnect = false
@shutdown = false
@futures = []
end
def without_reconnect?
@without_reconnect
end
def shutdown?
@shutdown
end
def call(command, &block)
# A pipeline that contains a shutdown should not raise ECONNRESET when
# the connection is gone.
@shutdown = true if command.first == :shutdown
future = Future.new(command, block)
@futures << future
future
end
def call_pipeline(pipeline)
@shutdown = true if pipeline.shutdown?
@futures.concat(pipeline.futures)
nil
end
def commands
@futures.map { |f| f._command }
end
def without_reconnect(&block)
@without_reconnect = true
yield
end
def finish(replies)
futures.each_with_index.map do |future, i|
future._set(replies[i])
end
end
class Multi < self
def finish(replies)
return if replies.last.nil? # The transaction failed because of WATCH.
if replies.last.size < futures.size - 2
# Some command wasn't recognized by Redis.
raise replies.detect { |r| r.kind_of?(::Exception) }
end
super(replies.last)
end
def commands
[[:multi]] + super + [[:exec]]
end
end
end
class FutureNotReady < RuntimeError
def initialize
super("Value will be available once the pipeline executes.")
end
end
class Future < BasicObject
FutureNotReady = ::Redis::FutureNotReady.new
def initialize(command, transformation)
@command = command
@transformation = transformation
@object = FutureNotReady
end
def inspect
"<Redis::Future #{@command.inspect}>"
end
def _set(object)
@object = @transformation ? @transformation.call(object) : object
value
end
def _command
@command
end
def value
::Kernel.raise(@object) if @object.kind_of?(::Exception)
@object
end
end
end