forked from petergoldstein/dalli
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.rb
296 lines (266 loc) · 9.91 KB
/
client.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
# encoding: ascii
module Dalli
class Client
##
# Dalli::Client is the main class which developers will use to interact with
# the memcached server. Usage:
#
# Dalli::Client.new(['localhost:11211:10', 'cache-2.example.com:11211:5', '192.168.0.1:22122:5'],
# :threadsafe => true, :failover => true, :expires_in => 300)
#
# servers is an Array of "host:port:weight" where weight allows you to distribute cache unevenly.
# Both weight and port are optional. If you pass in nil, Dalli will default to 'localhost:11211'.
# Note that the <tt>MEMCACHE_SERVERS</tt> environment variable will override the servers parameter for use
# in managed environments like Heroku.
#
# You can also provide a Unix socket as an argument, for example:
#
# Dalli::Client.new("/tmp/memcached.sock")
#
# Initial testing shows that Unix sockets are about twice as fast as TCP sockets
# but Unix sockets only work on localhost.
#
# Options:
# - :failover - if a server is down, look for and store values on another server in the ring. Default: true.
# - :threadsafe - ensure that only one thread is actively using a socket at a time. Default: true.
# - :expires_in - default TTL in seconds if you do not pass TTL as a parameter to an individual operation, defaults to 0 or forever
# - :compression - defaults to false, if true Dalli will compress values larger than 100 bytes before
# sending them to memcached.
# - :async - assume its running inside the EM reactor. Requires em-synchrony to be installed. Default: false.
#
def initialize(servers=nil, options={})
@servers = env_servers || servers || '127.0.0.1:11211'
@options = { :expires_in => 0 }.merge(options)
self.extend(Dalli::Client::MemcacheClientCompatibility) if Dalli::Client.compatibility_mode
@ring = nil
end
##
# Turn on compatibility mode, which mixes in methods in memcache_client_compatibility.rb
# This value is set to true in memcache-client.rb.
def self.compatibility_mode
@compatibility_mode ||= false
end
def self.compatibility_mode=(compatibility_mode)
require 'dalli/compatibility'
@compatibility_mode = compatibility_mode
end
#
# The standard memcached instruction set
#
##
# Turn on quiet aka noreply support.
# All relevant operations within this block will be effectively
# pipelined as Dalli will use 'quiet' operations where possible.
# Currently supports the set, add, replace and delete operations.
def multi
old, Thread.current[:dalli_multi] = Thread.current[:dalli_multi], true
yield
ensure
Thread.current[:dalli_multi] = old
end
def get(key, options=nil)
resp = perform(:get, key)
(!resp || resp == 'Not found') ? nil : resp
end
##
# Fetch multiple keys efficiently.
# Returns a hash of { 'key' => 'value', 'key2' => 'value1' }
def get_multi(*keys)
return {} if keys.empty?
options = nil
options = keys.pop if keys.last.is_a?(Hash) || keys.last.nil?
ring.lock do
keys.flatten.each do |key|
begin
perform(:getkq, key)
rescue DalliError, NetworkError => e
Dalli.logger.debug { e.message }
Dalli.logger.debug { "unable to get key #{key}" }
end
end
values = {}
ring.servers.each do |server|
next unless server.alive?
begin
server.request(:noop).each_pair do |key, value|
values[key_without_namespace(key)] = value
end
rescue DalliError, NetworkError => e
Dalli.logger.debug { e.message }
Dalli.logger.debug { "results from this server will be missing" }
end
end
values
end
end
def fetch(key, ttl=nil, options=nil)
ttl ||= @options[:expires_in]
val = get(key, options)
if val.nil? && block_given?
val = yield
add(key, val, ttl, options)
end
val
end
##
# compare and swap values using optimistic locking.
# Fetch the existing value for key.
# If it exists, yield the value to the block.
# Add the block's return value as the new value for the key.
# Add will fail if someone else changed the value.
#
# Returns:
# - nil if the key did not exist.
# - false if the value was changed by someone else.
# - true if the value was successfully updated.
def cas(key, ttl=nil, options=nil, &block)
ttl ||= @options[:expires_in]
(value, cas) = perform(:cas, key)
value = (!value || value == 'Not found') ? nil : value
if value
newvalue = block.call(value)
perform(:set, key, newvalue, ttl, cas, options)
end
end
def set(key, value, ttl=nil, options=nil)
raise "Invalid API usage, please require 'dalli/memcache-client' for compatibility, see Upgrade.md" if options == true
ttl ||= @options[:expires_in]
perform(:set, key, value, ttl, 0, options)
end
##
# Conditionally add a key/value pair, if the key does not already exist
# on the server. Returns true if the operation succeeded.
def add(key, value, ttl=nil, options=nil)
ttl ||= @options[:expires_in]
perform(:add, key, value, ttl, options)
end
##
# Conditionally add a key/value pair, only if the key already exists
# on the server. Returns true if the operation succeeded.
def replace(key, value, ttl=nil, options=nil)
ttl ||= @options[:expires_in]
perform(:replace, key, value, ttl, options)
end
def delete(key)
perform(:delete, key)
end
##
# Append value to the value already stored on the server for 'key'.
# Appending only works for values stored with :raw => true.
def append(key, value)
perform(:append, key, value.to_s)
end
##
# Prepend value to the value already stored on the server for 'key'.
# Prepending only works for values stored with :raw => true.
def prepend(key, value)
perform(:prepend, key, value.to_s)
end
def flush(delay=0)
time = -delay
ring.servers.map { |s| s.request(:flush, time += delay) }
end
# deprecated, please use #flush.
alias_method :flush_all, :flush
##
# Incr adds the given amount to the counter on the memcached server.
# Amt must be a positive integer value.
#
# If default is nil, the counter must already exist or the operation
# will fail and will return nil. Otherwise this method will return
# the new value for the counter.
#
# Note that the ttl will only apply if the counter does not already
# exist. To increase an existing counter and update its TTL, use
# #cas.
def incr(key, amt=1, ttl=nil, default=nil)
raise ArgumentError, "Positive values only: #{amt}" if amt < 0
ttl ||= @options[:expires_in]
perform(:incr, key, amt.to_i, ttl, default)
end
##
# Decr subtracts the given amount from the counter on the memcached server.
# Amt must be a positive integer value.
#
# memcached counters are unsigned and cannot hold negative values. Calling
# decr on a counter which is 0 will just return 0.
#
# If default is nil, the counter must already exist or the operation
# will fail and will return nil. Otherwise this method will return
# the new value for the counter.
#
# Note that the ttl will only apply if the counter does not already
# exist. To decrease an existing counter and update its TTL, use
# #cas.
def decr(key, amt=1, ttl=nil, default=nil)
raise ArgumentError, "Positive values only: #{amt}" if amt < 0
ttl ||= @options[:expires_in]
perform(:decr, key, amt.to_i, ttl, default)
end
##
# Collect the stats for each server.
# Returns a hash like { 'hostname:port' => { 'stat1' => 'value1', ... }, 'hostname2:port' => { ... } }
def stats
values = {}
ring.servers.each do |server|
values["#{server.hostname}:#{server.port}"] = server.alive? ? server.request(:stats) : nil
end
values
end
##
# Reset stats for each server.
def reset_stats
ring.servers.map do |server|
server.alive? ? server.request(:reset_stats) : nil
end
end
##
# Close our connection to each server.
# If you perform another operation after this, the connections will be re-established.
def close
if @ring
@ring.servers.each { |s| s.close }
@ring = nil
end
end
alias_method :reset, :close
private
def ring
@ring ||= Dalli::Ring.new(
Array(@servers).map do |s|
Dalli::Server.new(s, @options)
end, @options
)
end
def env_servers
ENV['MEMCACHE_SERVERS'] ? ENV['MEMCACHE_SERVERS'].split(',') : nil
end
# Chokepoint method for instrumentation
def perform(op, key, *args)
key = key.to_s
validate_key(key)
key = key_with_namespace(key)
begin
server = ring.server_for_key(key)
server.request(op, key, *args)
rescue NetworkError => e
Dalli.logger.debug { e.message }
Dalli.logger.debug { "retrying request with new server" }
retry
end
end
def validate_key(key)
raise ArgumentError, "illegal character in key #{key}" if key.respond_to?(:ascii_only?) && !key.ascii_only?
raise ArgumentError, "illegal character in key #{key}" if key =~ /\s/
raise ArgumentError, "illegal character in key #{key}" if key =~ /[\x00-\x20\x80-\xFF]/
raise ArgumentError, "key cannot be blank" if key.nil? || key.strip.size == 0
raise ArgumentError, "key too long #{key.inspect}" if key.length > 250
end
def key_with_namespace(key)
@options[:namespace] ? "#{@options[:namespace]}:#{key}" : key
end
def key_without_namespace(key)
@options[:namespace] ? key.gsub(%r(\A#{@options[:namespace]}:), '') : key
end
end
end