forked from redis/redis-rb
-
Notifications
You must be signed in to change notification settings - Fork 10
/
client.rb
284 lines (233 loc) · 6.13 KB
/
client.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
class Redis
class Client
MINUS = "-".freeze
PLUS = "+".freeze
COLON = ":".freeze
DOLLAR = "$".freeze
ASTERISK = "*".freeze
attr_accessor :db, :host, :port, :password, :logger
attr :timeout
def initialize(options = {})
@host = options[:host] || "127.0.0.1"
@port = (options[:port] || 6379).to_i
@db = (options[:db] || 0).to_i
@timeout = (options[:timeout] || 5).to_i
@password = options[:password]
@logger = options[:logger]
@sock = nil
end
def connect
connect_to(@host, @port)
call(:auth, @password) if @password
call(:select, @db) if @db != 0
@sock
end
def id
"redis://#{host}:#{port}/#{db}"
end
def call(*args)
process(args) do
read
end
end
def call_loop(*args)
process(args) do
loop { yield(read) }
end
end
def call_pipelined(commands)
process(*commands) do
Array.new(commands.size) { read }
end
end
def call_without_timeout(*args)
without_socket_timeout do
call(*args)
end
end
def process(*commands)
logging(commands) do
ensure_connected do
@sock.write(join_commands(commands))
yield if block_given?
end
end
end
def connected?
!! @sock
end
def disconnect
return unless connected?
begin
@sock.close
rescue
ensure
@sock = nil
end
end
def reconnect
disconnect
connect
end
def read
# We read the first byte using read() mainly because gets() is
# immune to raw socket timeouts.
begin
reply_type = @sock.read(1)
rescue Errno::EAGAIN
# We want to make sure it reconnects on the next command after the
# timeout. Otherwise the server may reply in the meantime leaving
# the protocol in a desync status.
disconnect
raise Errno::EAGAIN, "Timeout reading from the socket"
end
raise Errno::ECONNRESET, "Connection lost" unless reply_type
format_reply(reply_type, @sock.gets)
end
def without_socket_timeout
ensure_connected do
begin
self.timeout = 0
yield
ensure
self.timeout = @timeout if connected?
end
end
end
protected
def build_command(name, *args)
command = []
command << "*#{args.size + 1}"
command << "$#{string_size name}"
command << name
args.each do |arg|
arg = arg.to_s
command << "$#{string_size arg}"
command << arg
end
command
end
def deprecated(old, new = nil, trace = caller[0])
message = "The method #{old} is deprecated and will be removed in 2.0"
message << " - use #{new} instead" if new
Redis.deprecate(message, trace)
end
COMMAND_DELIMITER = "\r\n"
def join_commands(commands)
commands.map do |command|
build_command(*command).join(COMMAND_DELIMITER) + COMMAND_DELIMITER
end.join(COMMAND_DELIMITER) + COMMAND_DELIMITER
end
if "".respond_to?(:bytesize)
def string_size(string)
string.to_s.bytesize
end
else
def string_size(string)
string.to_s.size
end
end
def format_reply(reply_type, line)
case reply_type
when MINUS then format_error_reply(line)
when PLUS then format_status_reply(line)
when COLON then format_integer_reply(line)
when DOLLAR then format_bulk_reply(line)
when ASTERISK then format_multi_bulk_reply(line)
else raise ProtocolError.new(reply_type)
end
end
def format_error_reply(line)
raise "-" + line.strip
end
def format_status_reply(line)
line.strip
end
def format_integer_reply(line)
line.to_i
end
def format_bulk_reply(line)
bulklen = line.to_i
return if bulklen == -1
reply = @sock.read(bulklen)
@sock.read(2) # Discard CRLF.
reply
end
def format_multi_bulk_reply(line)
n = line.to_i
return if n == -1
reply = []
n.times { reply << read }
reply
end
def logging(commands)
return yield unless @logger && @logger.debug?
begin
commands.each do |name, *args|
@logger.debug("Redis >> #{name.to_s.upcase} #{args.join(" ")}")
end
t1 = Time.now
yield
ensure
@logger.debug("Redis >> %0.2fms" % ((Time.now - t1) * 1000))
end
end
if defined?(Timeout)
TimeoutError = Timeout::Error
else
TimeoutError = Exception
end
def connect_to(host, port)
begin
@sock = TCPSocket.new(host, port)
rescue TimeoutError
@sock = nil
raise
end
@sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1
# If the timeout is set we set the low level socket options in order
# to make sure a blocking read will return after the specified number
# of seconds. This hack is from memcached ruby client.
self.timeout = @timeout
rescue Errno::ECONNREFUSED
raise Errno::ECONNREFUSED, "Unable to connect to Redis on #{host}:#{port}"
end
def timeout=(timeout)
secs = Integer(timeout)
usecs = Integer((timeout - secs) * 1_000_000)
optval = [secs, usecs].pack("l_2")
begin
@sock.setsockopt Socket::SOL_SOCKET, Socket::SO_RCVTIMEO, optval
@sock.setsockopt Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, optval
rescue Errno::ENOPROTOOPT
end
end
def ensure_connected
connect unless connected?
begin
yield
rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNABORTED
if reconnect
yield
else
raise Errno::ECONNRESET
end
end
end
class ThreadSafe < self
def initialize(*args)
require "monitor"
super(*args)
@mutex = ::Monitor.new
end
def synchronize(&block)
@mutex.synchronize(&block)
end
def ensure_connected(&block)
super do
synchronize(&block)
end
end
end
end
end