Skip to content

Commit

Permalink
fix error with cannot add key during iteration and fix detecting cell…
Browse files Browse the repository at this point in the history
…uloid version properly
  • Loading branch information
bogdanRada committed Jul 8, 2016
1 parent a770768 commit e50c93a
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 26 deletions.
2 changes: 1 addition & 1 deletion lib/celluloid_pubsub/base_actor.rb
Expand Up @@ -33,7 +33,7 @@ def celluloid_version
end

def version_less_than_seventeen?
verify_gem_version('celluloid', '0.17', operator: '<')
verify_gem_version(celluloid_version, '0.17', operator: '<')
end

def setup_actor_supervision(class_name, options)
Expand Down
2 changes: 1 addition & 1 deletion lib/celluloid_pubsub/helper.rb
Expand Up @@ -71,7 +71,7 @@ def setup_celluloid_logger
#
# @api private
def setup_celluloid_exception_handler
Celluloid.task_class = Celluloid::TaskThread
Celluloid.task_class = defined?(Celluloid::TaskThread) ? Celluloid::TaskThread : Celluloid::Task::Threaded
Celluloid.exception_handler do |ex|
puts ex unless filtered_error?(ex)
end
Expand Down
51 changes: 30 additions & 21 deletions lib/celluloid_pubsub/reactor.rb
Expand Up @@ -165,18 +165,18 @@ def handle_parsed_websocket_message(json_data)
def delegate_action(json_data)
channel = json_data.fetch('channel', nil)
case json_data['client_action']
when 'unsubscribe_all'
unsubscribe_all
when 'unsubscribe_clients'
async.unsubscribe_clients(channel)
when 'unsubscribe'
async.unsubscribe(channel)
when 'subscribe'
async.start_subscriber(channel, json_data)
when 'publish'
async.publish_event(channel, json_data['data'].to_json)
else
handle_unknown_action(json_data)
when 'unsubscribe_all'
unsubscribe_all
when 'unsubscribe_clients'
async.unsubscribe_clients(channel)
when 'unsubscribe'
async.unsubscribe(channel)
when 'subscribe'
async.start_subscriber(channel, json_data)
when 'publish'
async.publish_event(channel, json_data['data'].to_json)
else
handle_unknown_action(json_data)
end
end

Expand Down Expand Up @@ -221,8 +221,10 @@ def unsubscribe(channel)
log_debug "#{self.class} runs 'unsubscribe' method with #{channel}"
return unless channel.present?
forget_channel(channel)
(@server.subscribers[channel] || []).delete_if do |hash|
hash[:reactor] == Actor.current
@server.mutex.synchronize do
(@server.subscribers[channel].dup || []).delete_if do |hash|
hash[:reactor] == Actor.current
end
end
end

Expand Down Expand Up @@ -295,7 +297,9 @@ def add_subscriber_to_channel(channel, message)
registry_channels = CelluloidPubsub::Registry.channels
@channels << channel
registry_channels << channel unless registry_channels.include?(channel)
@server.subscribers[channel] = channel_subscribers(channel).push(reactor: Actor.current, message: message)
@server.mutex.synchronize do
@server.subscribers[channel] = channel_subscribers(channel).push(reactor: Actor.current, message: message)
end
end

# method for publishing data to a channel
Expand All @@ -308,8 +312,11 @@ def add_subscriber_to_channel(channel, message)
# @api public
def publish_event(current_topic, message)
return if current_topic.blank? || message.blank?
(@server.subscribers[current_topic].dup || []).pmap do |hash|
hash[:reactor].websocket << message
log_debug "#{self.class} tries to publish to #{current_topic} with #{message} into subscribers #{@server.subscribers[current_topic].inspect}"
@server.mutex.synchronize do
(@server.subscribers[current_topic].dup || []).pmap do |hash|
hash[:reactor].websocket << message
end
end
rescue => exception
log_debug("could not publish message #{message} into topic #{current_topic} because of #{exception.inspect}")
Expand Down Expand Up @@ -337,10 +344,12 @@ def unsubscribe_all
# @api public
def unsubscribe_from_channel(channel)
log_debug "#{self.class} runs 'unsubscribe_from_channel' method with #{channel}"
(@server.subscribers[channel].dup || []).pmap do |hash|
reactor = hash[:reactor]
reactor.websocket.close
Celluloid::Actor.kill(reactor)
@server.mutex.synchronize do
(@server.subscribers[channel].dup || []).pmap do |hash|
reactor = hash[:reactor]
reactor.websocket.close
Celluloid::Actor.kill(reactor)
end
end
end
end
Expand Down
4 changes: 2 additions & 2 deletions lib/celluloid_pubsub/version.rb
Expand Up @@ -15,9 +15,9 @@ module VERSION
# major release version
MAJOR = 0
# minor release version
MINOR = 7
MINOR = 8
# tiny release version
TINY = 9
TINY = 0
# prelease version ( set this only if it is a prelease)
PRE = nil

Expand Down
3 changes: 2 additions & 1 deletion lib/celluloid_pubsub/web_server.rb
Expand Up @@ -25,7 +25,7 @@ class WebServer < Reel::Server::HTTP
# The name of the default adapter
CLASSIC_ADAPTER = 'classic'

attr_accessor :server_options, :subscribers
attr_accessor :server_options, :subscribers, :mutex
finalizer :shutdown
# receives a list of options that are used to configure the webserver
#
Expand All @@ -44,6 +44,7 @@ def initialize(options = {})
Celluloid.boot unless Celluloid.running?
@server_options = parse_options(options)
@subscribers = {}
@mutex = Mutex.new
setup_celluloid_logger
log_debug "CelluloidPubsub::WebServer example starting on #{hostname}:#{port}"
super(hostname, port, { spy: spy, backlog: backlog }, &method(:on_connection))
Expand Down

0 comments on commit e50c93a

Please sign in to comment.