-
Notifications
You must be signed in to change notification settings - Fork 133
/
redis.rb
478 lines (410 loc) · 15.6 KB
/
redis.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
# frozen_string_literal: true
require "redis"
require "digest"
module MessageBus
module Backends
# The Redis backend stores published messages in Redis sorted sets (using
# ZADD, where the score is the message ID), one for each channel (where
# the full message is stored), and also in a global backlog as a simple
# pointer to the respective channel and channel-specific ID. In addition,
# publication publishes full messages to a Redis PubSub channel; this is
# used for actively subscribed message_bus servers to consume published
# messages in real-time while connected and forward them to subscribers,
# while catch-up is performed from the backlog sorted sets.
#
# Message lookup is performed using the Redis ZRANGEBYSCORE command, and
# backlog trimming uses ZREMRANGEBYSCORE. The last used channel-specific
# and global IDs are stored as integers in simple Redis keys and
# incremented on publication.
#
# Publication is implemented using a Lua script to ensure that it is
# atomic and messages are not corrupted by parallel publication.
#
# @note This backend diverges from the standard in Base in the following ways:
#
# * `max_backlog_age` options in this backend differ from the behaviour of
# other backends, in that either no messages are removed (when
# publications happen more regularly than this time-frame) or all
# messages are removed (when no publication happens during this
# time-frame).
#
# @see Base general information about message_bus backends
class Redis < Base
class BackLogOutOfOrder < StandardError
attr_accessor :highest_id
def initialize(highest_id)
@highest_id = highest_id
end
end
# @param [Hash] redis_config in addition to the options listed, see https://github.com/redis/redis-rb for other available options
# @option redis_config [Logger] :logger a logger to which logs will be output
# @option redis_config [Boolean] :enable_redis_logger (false) whether or not to enable logging by the underlying Redis library
# @option redis_config [Integer] :clear_every (1) the interval of publications between which the backlog will not be cleared
# @param [Integer] max_backlog_size the largest permitted size (number of messages) for per-channel backlogs; beyond this capacity, old messages will be dropped.
def initialize(redis_config = {}, max_backlog_size = 1000)
@redis_config = redis_config.dup
@clear_every = redis_config.delete(:clear_every) || 1
@logger = @redis_config[:logger]
@redis_config[:logger] = nil unless @redis_config[:enable_redis_logger]
@max_backlog_size = max_backlog_size
@max_global_backlog_size = 2000
@max_in_memory_publish_backlog = 1000
@in_memory_backlog = []
@lock = Mutex.new
@flush_backlog_thread = nil
@pub_redis = nil
@subscribed = false
# after 7 days inactive backlogs will be removed
@max_backlog_age = 604_800
end
# Reconnects to Redis; used after a process fork, typically triggered by a forking webserver
# @see Base#after_fork
def after_fork
@pub_redis&.disconnect!
end
# (see Base#reset!)
def reset!
pub_redis.keys("__mb_*").each { |k| pub_redis.del k }
end
# (see Base#destroy)
def destroy
@pub_redis&.disconnect!
end
# Deletes all backlogs and their data. Does not delete ID pointers, so new publications will get IDs that continue from the last publication before the expiry. Use with extreme caution.
# @see Base#expire_all_backlogs!
def expire_all_backlogs!
pub_redis.keys("__mb_*backlog_n").each { |k| pub_redis.del k }
end
# Note, the script takes care of all expiry of keys, however
# we do not expire the global backlog key cause we have no simple way to determine what it should be on publish
# we do not provide a mechanism to set a global max backlog age, only a per-channel which we can override on publish
LUA_PUBLISH = <<LUA
local start_payload = ARGV[1]
local max_backlog_age = ARGV[2]
local max_backlog_size = tonumber(ARGV[3])
local max_global_backlog_size = tonumber(ARGV[4])
local channel = ARGV[5]
local clear_every = ARGV[6]
local global_id_key = KEYS[1]
local backlog_id_key = KEYS[2]
local backlog_key = KEYS[3]
local global_backlog_key = KEYS[4]
local redis_channel_name = KEYS[5]
local global_id = redis.call("INCR", global_id_key)
local backlog_id = redis.call("INCR", backlog_id_key)
local payload = table.concat({ global_id, backlog_id, start_payload }, "|")
local global_backlog_message = table.concat({ backlog_id, channel }, "|")
redis.call("ZADD", backlog_key, backlog_id, payload)
redis.call("EXPIRE", backlog_key, max_backlog_age)
redis.call("ZADD", global_backlog_key, global_id, global_backlog_message)
redis.call("EXPIRE", global_backlog_key, max_backlog_age)
redis.call("PUBLISH", redis_channel_name, payload)
redis.call("EXPIRE", backlog_id_key, max_backlog_age)
if backlog_id > max_backlog_size and backlog_id % clear_every == 0 then
redis.call("ZREMRANGEBYSCORE", backlog_key, 1, backlog_id - max_backlog_size)
end
if global_id > max_global_backlog_size and global_id % clear_every == 0 then
redis.call("ZREMRANGEBYSCORE", global_backlog_key, 1, global_id - max_global_backlog_size)
end
return backlog_id
LUA
LUA_PUBLISH_SHA1 = Digest::SHA1.hexdigest(LUA_PUBLISH)
# (see Base#publish)
def publish(channel, data, opts = nil)
queue_in_memory = (opts && opts[:queue_in_memory]) != false
max_backlog_age = (opts && opts[:max_backlog_age]) || self.max_backlog_age
max_backlog_size = (opts && opts[:max_backlog_size]) || self.max_backlog_size
redis = pub_redis
backlog_id_key = backlog_id_key(channel)
backlog_key = backlog_key(channel)
msg = MessageBus::Message.new nil, nil, channel, data
cached_eval(
redis,
LUA_PUBLISH,
LUA_PUBLISH_SHA1,
argv: [
msg.encode_without_ids,
max_backlog_age,
max_backlog_size,
max_global_backlog_size,
channel,
clear_every,
],
keys: [
global_id_key,
backlog_id_key,
backlog_key,
global_backlog_key,
redis_channel_name,
],
)
rescue ::Redis::CommandError => e
if queue_in_memory && e.message =~ /READONLY/
@lock.synchronize do
@in_memory_backlog << [channel, data]
if @in_memory_backlog.length > @max_in_memory_publish_backlog
@in_memory_backlog.delete_at(0)
@logger.warn(
"Dropping old message cause max_in_memory_publish_backlog is full: #{e.message}\n#{e.backtrace.join('\n')}",
)
end
end
if @flush_backlog_thread == nil
@lock.synchronize do
if @flush_backlog_thread == nil
@flush_backlog_thread = Thread.new { ensure_backlog_flushed }
end
end
end
nil
else
raise
end
end
# (see Base#last_id)
def last_id(channel)
backlog_id_key = backlog_id_key(channel)
pub_redis.get(backlog_id_key).to_i
end
# (see Base#last_ids)
def last_ids(*channels)
return [] if channels.size == 0
backlog_id_keys = channels.map { |c| backlog_id_key(c) }
pub_redis.mget(*backlog_id_keys).map(&:to_i)
end
# (see Base#backlog)
def backlog(channel, last_id = 0)
redis = pub_redis
backlog_key = backlog_key(channel)
items = redis.zrangebyscore backlog_key, last_id.to_i + 1, "+inf"
items.map { |i| MessageBus::Message.decode(i) }
end
# (see Base#global_backlog)
def global_backlog(last_id = 0)
items = pub_redis.zrangebyscore global_backlog_key, last_id.to_i + 1, "+inf"
items.map! do |i|
pipe = i.index "|"
message_id = i[0..pipe].to_i
channel = i[pipe + 1..-1]
m = get_message(channel, message_id)
m
end
items.compact!
items
end
# (see Base#get_message)
def get_message(channel, message_id)
redis = pub_redis
backlog_key = backlog_key(channel)
items = redis.zrangebyscore backlog_key, message_id, message_id
if items && items[0]
MessageBus::Message.decode(items[0])
else
nil
end
end
# (see Base#subscribe)
def subscribe(channel, last_id = nil)
# trivial implementation for now,
# can cut down on connections if we only have one global subscriber
raise ArgumentError unless block_given?
if last_id
# we need to translate this to a global id, at least give it a shot
# we are subscribing on global and global is always going to be bigger than local
# so worst case is a replay of a few messages
message = get_message(channel, last_id)
last_id = message.global_id if message
end
global_subscribe(last_id) { |m| yield m if m.channel == channel }
end
# (see Base#global_unsubscribe)
def global_unsubscribe
begin
new_redis = new_redis_connection
new_redis.publish(redis_channel_name, UNSUB_MESSAGE)
ensure
new_redis&.disconnect!
@subscribed = false
end
end
# (see Base#global_subscribe)
def global_subscribe(last_id = nil, &blk)
raise ArgumentError unless block_given?
highest_id = last_id
clear_backlog =
lambda do
retries = 4
begin
highest_id = process_global_backlog(highest_id, retries > 0, &blk)
rescue BackLogOutOfOrder => e
highest_id = e.highest_id
retries -= 1
sleep(rand(50) / 1000.0)
retry
end
end
begin
global_redis = new_redis_connection
clear_backlog.call(&blk) if highest_id
global_redis.subscribe(redis_channel_name) do |on|
on.subscribe do
clear_backlog.call(&blk) if highest_id
@subscribed = true
end
on.unsubscribe { @subscribed = false }
on.message do |_c, m|
if m == UNSUB_MESSAGE
@subscribed = false
global_redis.unsubscribe
return # rubocop:disable Lint/NonLocalExitFromIterator
end
m = MessageBus::Message.decode m
# we have 3 options
#
# 1. message came in the correct order GREAT, just deal with it
# 2. message came in the incorrect order COMPLICATED, wait a tiny bit and clear backlog
# 3. message came in the incorrect order and is lowest than current highest id, reset
if highest_id.nil? || m.global_id == highest_id + 1
highest_id = m.global_id
yield m
else
clear_backlog.call(&blk)
end
end
end
rescue => error
@logger.warn "#{error} subscribe failed, reconnecting in 1 second. Call stack #{error.backtrace.join("\n")}"
sleep 1
global_redis&.disconnect!
retry
ensure
global_redis&.disconnect!
end
end
private
def new_redis_connection
config =
@redis_config.filter do |k, v|
# This is not ideal, required for Redis gem version 5
# redis-client no longer accepts arbitrary params
# anything unknown will error out.
# https://github.com/redis-rb/redis-client/blob/4c8e05acfb3477c1651138a4924616e79e6116f2/lib/redis_client/config.rb#L21-L39
#
#
# We should be doing the opposite and allowlisting params
# or splitting the object up. Starting with the smallest change that is backwards compatible
!%i[
backend
logger
long_polling_enabled
long_polling_interval
backend_options
base_route
client_message_filters
site_id_lookup
group_ids_lookup
user_id_lookup
transport_codec
].include?(k)
end
::Redis.new(config)
end
# redis connection used for publishing messages
def pub_redis
@pub_redis ||= new_redis_connection
end
def redis_channel_name
db = @redis_config[:db] || 0
"_message_bus_#{db}"
end
def backlog_key(channel)
"__mb_backlog_n_#{channel}"
end
def backlog_id_key(channel)
"__mb_backlog_id_n_#{channel}"
end
def global_id_key
"__mb_global_id_n"
end
def global_backlog_key
"__mb_global_backlog_n"
end
def process_global_backlog(highest_id, raise_error)
highest_id = 0 if highest_id > pub_redis.get(global_id_key).to_i
global_backlog(highest_id).each do |old|
if highest_id + 1 == old.global_id
yield old
highest_id = old.global_id
else
raise BackLogOutOfOrder.new(highest_id) if raise_error
if old.global_id > highest_id
yield old
highest_id = old.global_id
end
end
end
highest_id
end
def ensure_backlog_flushed
flushed = false
while !flushed
try_again = false
if is_readonly?
sleep 1
next
end
@lock.synchronize do
if @in_memory_backlog.length == 0
flushed = true
break
end
begin
# TODO recover special options
publish(*@in_memory_backlog[0], queue_in_memory: false)
rescue ::Redis::CommandError => e
if e.message =~ /^READONLY/
try_again = true
else
@logger.warn(
"Dropping undeliverable message: #{e.message}\n#{e.backtrace.join('\n')}",
)
end
rescue => e
@logger.warn(
"Dropping undeliverable message: #{e.message}\n#{e.backtrace.join('\n')}",
)
end
@in_memory_backlog.delete_at(0) unless try_again
end
end
ensure
@lock.synchronize { @flush_backlog_thread = nil }
end
def cached_eval(redis, script, script_sha1, params)
begin
redis.evalsha script_sha1, params
rescue ::Redis::CommandError => e
if e.to_s =~ /^NOSCRIPT/
redis.eval script, params
else
raise
end
end
end
def is_readonly?
key = "__mb_is_readonly"
begin
# disconnect to force a reconnect when attempting to set the key
# in case we are not connected to the correct server
# which can happen when sharing ips
pub_redis.disconnect!
pub_redis.set(key, "1")
false
rescue ::Redis::CommandError => e
true if e.message =~ /^READONLY/
end
end
MessageBus::BACKENDS[:redis] = self
end
end
end