-
Notifications
You must be signed in to change notification settings - Fork 0
/
reactor.rb
538 lines (497 loc) · 16.3 KB
/
reactor.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
# encoding: utf-8
# frozen_string_literal: true
require_relative './registry'
require_relative './helper'
module CelluloidPubsub
# The reactor handles new connections. Based on what the client sends it either subscribes to a channel
# or will publish to a channel or just dispatch to the server if command is neither subscribe, publish or unsubscribe
#
# @!attribute websocket
# @return [Reel::WebSocket] websocket connection
#
# @!attribute server
# @return [CelluloidPubsub::Webserver] the server actor to which the reactor is connected to
#
# @!attribute channels
# @return [Array] array of channels to which the current reactor has subscribed to
class Reactor
include CelluloidPubsub::BaseActor
# available actions that can be delegated
AVAILABLE_ACTIONS = %w[unsubscribe_clients unsubscribe subscribe publish unsubscribe_all].freeze
# The websocket connection received from the server
# @return [Reel::WebSocket] websocket connection
attr_accessor :websocket
# The server instance to which this reactor is linked to
# @return [CelluloidPubsub::Webserver] the server actor to which the reactor is connected to
attr_accessor :server
# The channels to which this reactor has subscribed to
# @return [Array] array of channels to which the current reactor has subscribed to
attr_accessor :channels
# The same options passed to the server are available on the reactor too
# @return [Hash] Hash with all the options passed to the server
attr_reader :options
finalizer :shutdown
trap_exit :actor_died
# rececives a new socket connection from the server
# and listens for messages
#
# @param [Reel::WebSocket] websocket
#
# @return [void]
#
# @api public
def work(websocket, server)
initialize_data(websocket, server)
async.run
end
# initializes the actor
#
# @param [Reel::WebSocket] websocket
# @param [CelluloidPubsub::WebServer] server
#
# @return [Celluloid::Actor] returns the actor
#
# @api public
def initialize_data(websocket, server)
@websocket = websocket
@server = server
@options = @server.server_options
@channels = []
@shutting_down = false
setup_celluloid_logger
log_debug "#{self.class} Streaming changes for #{websocket.url} #{websocket.class.name}"
yield(websocket, server) if block_given?
cell_actor
end
# the method will return the file path of the log file where debug messages will be printed
#
#
# @return [String] returns the file path of the log file where debug messages will be printed
#
# @api public
def log_file_path
@log_file_path ||= options.fetch('log_file_path', nil)
end
# the method will return the log level of the logger
#
# @return [Integer, nil] return the log level used by the logger ( default is 1 - info)
#
# @api public
def log_level
@log_level ||= options['log_level'] || ::Logger::Severity::INFO
end
# the method will return options needed when configuring an adapter
# @see celluloid_pubsub_redis_adapter for more information
#
# @return [Hash] returns options needed by the adapter
#
# @api public
def adapter_options
@adapter_options ||= options['adapter_options'] || {}
end
# the method will return true if the actor is shutting down
#
#
# @return [Boolean] returns true if the actor is shutting down
#
# @api public
def shutting_down?
@shutting_down == true
end
# the method will return true if debug is enabled
#
#
# @return [Boolean] returns true if debug is enabled otherwise false
#
# @api public
def debug_enabled?
@debug_enabled = options.fetch('enable_debug', false)
@debug_enabled == true
end
# reads from the socket the message
# and dispatches it to the handle_websocket_message method
# @see #handle_websocket_message
#
# @return [void]
#
# @api public
#
# :nocov:
def run
loop do
break if shutting_down? || actor_dead?(Actor.current) || @websocket.closed? || actor_dead?(@server)
message = try_read_websocket
handle_websocket_message(message) if message.present?
end
end
# :nocov:
# will try to read the message from the websocket
# and if it fails will log the exception if debug is enabled
#
# @return [void]
#
# @api public
#
def try_read_websocket
@websocket.closed? ? nil : @websocket.read
rescue StandardError
nil
end
# the method will return the reactor's class name used in debug messages
#
#
# @return [Class] returns the reactor's class name used in debug messages
#
# @api public
def reactor_class
self.class
end
# method used to parse a JSON object into a Hash object
#
# @param [JSON] message
#
# @return [Hash]
#
# @api public
def parse_json_data(message)
log_debug "#{reactor_class} received #{message}"
JSON.parse(message)
rescue StandardError => e
log_debug "#{reactor_class} could not parse #{message} because of #{e.inspect}"
message
end
# method that handles the message received from the websocket connection
# first will try to parse the message {#parse_json_data} and then it will dispatch
# it to another method that will decide depending the message what action
# should the reactor take {#handle_parsed_websocket_message}
#
# @see #parse_json_data
# @see #handle_parsed_websocket_message
#
# @param [JSON] message
#
# @return [void]
#
# @api public
def handle_websocket_message(message)
log_debug "#{reactor_class} read message #{message}"
json_data = parse_json_data(message)
handle_parsed_websocket_message(json_data)
end
# method that checks if the data is a Hash
#
# if the data is a hash then will stringify the keys and will call the method {#delegate_action}
# that will handle the message, otherwise will call the method {#handle_unknown_action}
#
# @see #delegate_action
# @see #handle_unknown_action
#
# @param [Hash] json_data
#
# @return [void]
#
# @api public
def handle_parsed_websocket_message(json_data)
data = json_data.is_a?(Hash) ? json_data.stringify_keys : {}
if CelluloidPubsub::Reactor::AVAILABLE_ACTIONS.include?(data['client_action'].to_s)
log_debug "#{self.class} finds actions for #{json_data}"
delegate_action(data) if data['client_action'].present?
else
handle_unknown_action(data['channel'], json_data)
end
end
# method that checks if the data is a Hash
#
# if the data is a hash then will stringify the keys and will call the method {#delegate_action}
# that will handle the message, otherwise will call the method {#handle_unknown_action}
#
# @see #delegate_action
# @see #handle_unknown_action
#
# @param [Hash] json_data
# @option json_data [String] :client_action The action based on which the reactor will decide what action should make
#
# Possible values are:
# unsubscribe_all
# unsubscribe_clients
# unsubscribe
# subscribe
# publish
#
#
# @return [void]
#
# @api public
def delegate_action(json_data)
async.send(json_data['client_action'], json_data['channel'], json_data)
end
# the method will delegate the message to the server in an asyncronous way by sending the current actor and the message
# @see CelluloidPubsub::WebServer#handle_dispatched_message
#
# @param [Hash] json_data
#
# @return [void]
#
# @api public
def handle_unknown_action(channel, json_data)
log_debug "Trying to dispatch to server #{json_data} on channel #{channel}"
@server.async.handle_dispatched_message(Actor.current, json_data)
end
# if the reactor has unsubscribed from all his channels will close the websocket connection,
# otherwise will delete the channel from his channel list
#
# @param [String] channel The channel that needs to be deleted from the reactor's list of subscribed channels
#
# @return [void]
#
# @api public
def forget_channel(channel)
if @channels.blank?
@websocket.close
else
@channels.delete(channel)
end
end
# the method will unsubscribe a client by closing the websocket connection if has unscribed from all channels
# and deleting the reactor from the channel list on the server
#
# @param [String] channel
#
# @return [void]
#
# @api public
def unsubscribe(channel, _json_data)
log_debug "#{self.class} runs 'unsubscribe' method with #{channel}"
return unless channel.present?
forget_channel(channel)
delete_server_subscribers(channel)
end
# the method will delete the reactor from the channel list on the server
#
# @param [String] channel
#
# @return [void]
#
# @api public
def delete_server_subscribers(channel)
@server.mutex.synchronize do
(@server.subscribers[channel] || []).delete_if do |hash|
hash[:reactor] == Actor.current
end
end
end
# the method will unsubscribe all clients subscribed to a channel by closing the
#
# @param [String] channel
#
# @return [void]
#
# @api public
def unsubscribe_clients(channel, _json_data)
log_debug "#{self.class} runs 'unsubscribe_clients' method with #{channel}"
return if channel.blank?
unsubscribe_from_channel(channel)
@server.subscribers[channel] = []
end
# the method will terminate the current actor
#
#
# @return [void]
#
# @api public
def shutdown
@shutting_down = true
log_debug "#{self.class} tries to 'shutdown'"
@websocket.close if close_websocket? && @websocket.present? && !@websocket.closed?
terminate
end
# In tests we sometimes mock the websocket to b e a double
# in which case we can't call close on the double
# because it's outside of the test
# @see Celluloid.shutdown in spec_helper
#
# @return [void]
#
# @api public
def close_websocket?
return true if ENV['RACK_ENV'] != 'test'
defined?(Rspec::Mocks::Double) && !websocket.is_a?(Rspec::Mocks::Double)
end
# this method will add the current actor to the list of the subscribers {#add_subscriber_to_channel}
# and will write to the socket a message for succesful subscription
#
# @see #add_subscriber_to_channel
#
# @param [String] channel
# @param [Object] message
#
# @return [void]
#
# @api public
def subscribe(channel, message)
return unless channel.present?
add_subscriber_to_channel(channel, message)
log_debug "#{self.class} subscribed to #{channel} with #{message}"
@websocket << message.merge('client_action' => 'successful_subscription', 'channel' => channel).to_json if @server.adapter == CelluloidPubsub::WebServer::CLASSIC_ADAPTER
end
# this method will write to the socket all messages that were published
# to that channel before the actor subscribed
#
# @param [String] channel
# @return [void]
#
# @api public
def send_unpublished(channel)
return if (messages = unpublished_messages(channel)).blank?
messages.each do |msg|
@websocket << msg.to_json
end
end
# the method clears all the messages left unpublished in a channel
#
# @param [String] channel
#
# @return [void]
#
# @api public
def clear_unpublished_messages(channel)
CelluloidPubsub::Registry.messages[channel] = []
end
# the method will return a list of all unpublished messages in a channel
#
# @param [String] channel
#
# @return [Array] the list of messages that were not published
#
# @api public
def unpublished_messages(channel)
(messages = CelluloidPubsub::Registry.messages[channel]).present? ? messages : []
end
# this method will return a list of all subscribers to a particular channel or a empty array
#
#
# @param [String] channel The channel that will be used to fetch all subscribers from this channel
#
# @return [Array] returns a list of all subscribers to a particular channel or a empty array
#
# @api public
def channel_subscribers(channel)
@server.subscribers[channel] || []
end
# adds the curent actor the list of the subscribers for a particular channel
# and registers the new channel
#
# @param [String] channel
# @param [Object] message
#
# @return [void]
#
# @api public
def add_subscriber_to_channel(channel, message)
registry_channels = CelluloidPubsub::Registry.channels
@channels << channel
registry_channels << channel unless registry_channels.include?(channel)
@server.mutex.synchronize do
@server.subscribers[channel] = channel_subscribers(channel).push(reactor: Actor.current, message: message)
end
end
# method for publishing data to a channel
#
# @param [String] current_topic The Channel to which the reactor instance {CelluloidPubsub::Reactor} will publish the message to
# @param [Object] json_data The additional data that contains the message that needs to be sent
#
# @return [void]
#
# @api public
def publish(current_topic, json_data)
message = json_data['data'].to_json
return if current_topic.blank? || message.blank?
server_publish_event(current_topic, message)
rescue StandardError => e
log_debug("could not publish message #{message} into topic #{current_topic} because of #{e.inspect}")
end
# the method will publish to all subsribers of a channel a message
#
# @param [String] current_topic
# @param [#to_s] message
#
# @return [void]
#
# @api public
def server_publish_event(current_topic, message)
if (subscribers = @server.subscribers[current_topic]).present?
subscribers.dup.pmap do |hash|
hash[:reactor].websocket << message
end
else
save_unpublished_message(current_topic, message)
end
end
# the method save the message for a specific channel if there are no subscribers
#
# @param [String] current_topic
# @param [#to_s] message
#
# @return [void]
#
# @api public
def save_unpublished_message(current_topic, message)
@server.timers_mutex.synchronize do
(CelluloidPubsub::Registry.messages[current_topic] ||= []) << message
end
end
# unsubscribes all actors from all channels and terminates the current actor
#
# @param [String] _channel NOT USED - needed to maintain compatibility with the other methods
# @param [Object] json_data NOT USED - needed to maintain compatibility with the other methods
#
# @return [void]
#
# @api public
def unsubscribe_all(_channel, json_data)
log_debug "#{self.class} runs 'unsubscribe_all' method"
CelluloidPubsub::Registry.channels.dup.pmap do |channel|
unsubscribe_clients(channel, json_data)
end
log_debug 'clearing connections'
shutdown
end
# unsubscribes all actors from the specified chanel
#
# @param [String] channel
# @return [void]
#
# @api public
def unsubscribe_from_channel(channel)
log_debug "#{self.class} runs 'unsubscribe_from_channel' method with #{channel}"
server_kill_reactors(channel)
end
# kills all reactors registered on a channel and closes their websocket connection
#
# @param [String] channel
# @return [void]
#
# @api public
def server_kill_reactors(channel)
@server.mutex.synchronize do
(@server.subscribers[channel] || []).dup.pmap do |hash|
reactor = hash[:reactor]
reactor.websocket.close
Celluloid::Actor.kill(reactor)
end
end
end
# method called when the actor is exiting
#
# @param [actor] actor - the current actor
# @param [Hash] reason - the reason it crashed
#
# @return [void]
#
# @api public
def actor_died(actor, reason)
@shutting_down = true
log_debug "Oh no! #{actor.inspect} has died because of a #{reason.class}"
end
end
end