Skip to content

Commit

Permalink
Support or ConnectionStateMachine and ChannelStateMachine
Browse files Browse the repository at this point in the history
Fixes #52
  • Loading branch information
mattheworiordan committed Sep 4, 2015
1 parent 1a5fffa commit aa05eba
Show file tree
Hide file tree
Showing 19 changed files with 343 additions and 87 deletions.
40 changes: 40 additions & 0 deletions lib/ably/models/channel_state_change.rb
@@ -0,0 +1,40 @@
module Ably::Models
# ChannelStateChange is a class that is emitted by the {Ably::Realtime::Channel} object
# when a state change occurs
#
# @!attribute [r] current
# @return [Connection::STATE] Current connection state
# @!attribute [r] previous
# @return [Connection::STATE] Previous connection state
# @!attribute [r] reason
# @return [Ably::Models::ErrorInfo] Object describing the reason for a state change when not initiated by the consumer of the client library
#
class ChannelStateChange
include Ably::Modules::ModelCommon

def initialize(hash_object)
unless (hash_object.keys - [:current, :previous, :reason]).empty?
raise ArgumentError, 'Invalid attributes, expecting :current, :previous, :reason'
end

@hash_object = {
current: hash_object.fetch(:current),
previous: hash_object.fetch(:previous),
retry_in: hash_object[:retry_in],
reason: hash_object[:reason]
}
rescue KeyError => e
raise ArgumentError, e
end

%w(current previous reason).each do |attribute|
define_method attribute do
@hash_object[attribute.to_sym]
end
end

def to_s
"ChannelStateChange: current state #{current}, previous state #{previous}"
end
end
end
42 changes: 42 additions & 0 deletions lib/ably/models/connection_state_change.rb
@@ -0,0 +1,42 @@
module Ably::Models
# ConnectionStateChange is a class that is emitted by the {Ably::Realtime::Connection} object
# when a state change occurs
#
# @!attribute [r] current
# @return [Connection::STATE] Current connection state
# @!attribute [r] previous
# @return [Connection::STATE] Previous connection state
# @!attribute [r] retry_in
# @return [Integer] Time in seconds until the connection will reattempt to connect when in the +:disconnected+ or +:suspended+ state
# @!attribute [r] reason
# @return [Ably::Models::ErrorInfo] Object describing the reason for a state change when not initiated by the consumer of the client library
#
class ConnectionStateChange
include Ably::Modules::ModelCommon

def initialize(hash_object)
unless (hash_object.keys - [:current, :previous, :retry_in, :reason]).empty?
raise ArgumentError, 'Invalid attributes, expecting :current, :previous, :retry_in, :reason'
end

@hash_object = {
current: hash_object.fetch(:current),
previous: hash_object.fetch(:previous),
retry_in: hash_object[:retry_in],
reason: hash_object[:reason]
}
rescue KeyError => e
raise ArgumentError, e
end

%w(current previous retry_in reason).each do |attribute|
define_method attribute do
@hash_object[attribute.to_sym]
end
end

def to_s
"ConnectionStateChange: current state #{current}, previous state #{previous}"
end
end
end
5 changes: 4 additions & 1 deletion lib/ably/modules/state_emitter.rb
Expand Up @@ -141,7 +141,10 @@ def unsafe_once_state_changed(&block)
#
def deferrable_for_state_change_to(target_state)
Ably::Util::SafeDeferrable.new(logger).tap do |deferrable|
once_or_if(target_state, else: proc { |*args| deferrable.fail self, *args }) do
fail_proc = Proc.new do |state_change|
deferrable.fail self, state_change.reason
end
once_or_if(target_state, else: fail_proc) do
yield self if block_given?
deferrable.succeed self
end
Expand Down
32 changes: 28 additions & 4 deletions lib/ably/modules/uses_state_machine.rb
Expand Up @@ -13,17 +13,17 @@ module UsesStateMachine
#
# @return [Boolean] true if new_state can be transitioned to by state machine
# @api private
def transition_state_machine(new_state, emit_object = nil)
state_machine.transition_state(new_state, emit_object)
def transition_state_machine(new_state, emit_params = {})
state_machine.transition_state(new_state, emit_object(new_state, emit_params))
end

# Call #transition_to! on the StateMachine
# An exception wil be raised if new_state cannot be transitioned to by state machine
#
# @return [void]
# @api private
def transition_state_machine!(new_state, emit_object = nil)
state_machine.transition_to!(new_state, emit_object)
def transition_state_machine!(new_state, emit_params = {})
state_machine.transition_to!(new_state, emit_object(new_state, emit_params))
end

# Provides an internal method for this object's state to match the StateMachine's current state.
Expand Down Expand Up @@ -70,5 +70,29 @@ def log_state_machine_state_change
logger.debug "#{self.class.name}: Transitioned to #{state_machine.current_state}"
end
end

def emit_object(new_state, emit_params)
if self.class.emits_klass
self.class.emits_klass.new((emit_params || {}).merge(current: STATE(new_state), previous: STATE(state_machine.current_state)))
else
emit_params
end
end

def self.included(base)
base.extend(ClassMethods)
end

module ClassMethods
def emits_klass
@emits_klass ||= if @emits_klass_name
Object.const_get @emits_klass_name
end
end

def ensure_state_machine_emits(klass)
@emits_klass_name = klass
end
end
end
end
5 changes: 5 additions & 0 deletions lib/ably/realtime/channel.rb
Expand Up @@ -46,6 +46,7 @@ class Channel
)
include Ably::Modules::StateEmitter
include Ably::Modules::UsesStateMachine
ensure_state_machine_emits 'Ably::Models::ChannelStateChange'

# Max number of messages to bundle in a single ProtocolMessage
MAX_PROTOCOL_MESSAGE_BATCH_SIZE = 50
Expand Down Expand Up @@ -287,6 +288,10 @@ def logger
client.logger
end

# As we are using a state machine, do not allow change_state to be used
# #transition_state_machine must be used instead
private :change_state

private
attr_reader :queue

Expand Down
8 changes: 4 additions & 4 deletions lib/ably/realtime/channel/channel_manager.rb
Expand Up @@ -27,7 +27,7 @@ def attach
# Commence attachment
def detach(error = nil)
if connection.closed? || connection.connecting? || connection.suspended?
channel.transition_state_machine :detached, error
channel.transition_state_machine :detached, reason: error
elsif can_transition_to?(:detached)
send_detach_protocol_message
end
Expand All @@ -51,7 +51,7 @@ def emit_error(error)

# Detach a channel as a result of an error
def suspend(error)
channel.transition_state_machine! :detaching, error
channel.transition_state_machine! :detaching, reason: error
end

# When a channel is no longer attached or has failed,
Expand Down Expand Up @@ -141,13 +141,13 @@ def setup_connection_event_handlers

connection.unsafe_on(:suspended) do |error|
if can_transition_to?(:detaching)
channel.transition_state_machine :detaching, Ably::Exceptions::ConnectionSuspended.new('Connection suspended', nil, 80002, error)
channel.transition_state_machine :detaching, reason: Ably::Exceptions::ConnectionSuspended.new('Connection suspended', nil, 80002, error)
end
end

connection.unsafe_on(:failed) do |error|
if can_transition_to?(:failed)
channel.transition_state_machine :failed, Ably::Exceptions::ConnectionFailed.new('Connection failed', nil, 80002, error)
channel.transition_state_machine :failed, reason: Ably::Exceptions::ConnectionFailed.new('Connection failed', nil, 80002, error)
end
end

Expand Down
31 changes: 20 additions & 11 deletions lib/ably/realtime/channel/channel_state_machine.rb
Expand Up @@ -35,39 +35,48 @@ class ChannelStateMachine
end

before_transition(to: [:attached]) do |channel, current_transition|
channel.manager.attached current_transition.metadata
channel.manager.attached current_transition.metadata.reason
end

after_transition(to: [:detaching]) do |channel, current_transition|
channel.manager.detach current_transition.metadata
err = error_from_state_change(current_transition)
channel.manager.detach err
end

after_transition(to: [:detached]) do |channel, current_transition|
channel.manager.fail_messages_awaiting_ack nil_unless_error(current_transition.metadata)
channel.manager.emit_error current_transition.metadata if is_error_type?(current_transition.metadata)
err = error_from_state_change(current_transition)
channel.manager.fail_messages_awaiting_ack err
channel.manager.emit_error err if err
end

after_transition(to: [:failed]) do |channel, current_transition|
channel.manager.fail_messages_awaiting_ack nil_unless_error(current_transition.metadata)
channel.manager.emit_error current_transition.metadata if is_error_type?(current_transition.metadata)
err = error_from_state_change(current_transition)
channel.manager.fail_messages_awaiting_ack err
channel.manager.emit_error err if err
end

# Transitions responsible for updating channel#error_reason
before_transition(to: [:failed]) do |channel, current_transition|
channel.set_failed_channel_error_reason current_transition.metadata if is_error_type?(current_transition.metadata)
err = error_from_state_change(current_transition)
channel.set_failed_channel_error_reason err if err
end

before_transition(to: [:attached, :detached]) do |channel, current_transition|
if is_error_type?(current_transition.metadata)
channel.set_failed_channel_error_reason current_transition.metadata
err = error_from_state_change(current_transition)
if err
channel.set_failed_channel_error_reason err
else
# Attached & Detached are "healthy" final states so reset the error reason
channel.clear_error_reason
end
end

def self.nil_unless_error(error_object)
error_object if is_error_type?(error_object)
def self.error_from_state_change(current_transition)
# ChannelStateChange object is always passed in current_transition metadata object
connection_state_change = current_transition.metadata
# Reason attribute contains errors
err = connection_state_change && connection_state_change.reason
err if is_error_type?(err)
end

private
Expand Down
8 changes: 4 additions & 4 deletions lib/ably/realtime/client/incoming_message_dispatcher.rb
Expand Up @@ -68,10 +68,10 @@ def dispatch_protocol_message(*args)

when ACTION.Connect
when ACTION.Connected
connection.transition_state_machine :connected, protocol_message unless connection.connected?
connection.transition_state_machine :connected, reason: protocol_message unless connection.connected?

when ACTION.Disconnect, ACTION.Disconnected
connection.transition_state_machine :disconnected, protocol_message.error unless connection.disconnected?
connection.transition_state_machine :disconnected, reason: protocol_message.error unless connection.disconnected?

when ACTION.Close
when ACTION.Closed
Expand All @@ -87,7 +87,7 @@ def dispatch_protocol_message(*args)
when ACTION.Attach
when ACTION.Attached
get_channel(protocol_message.channel).tap do |channel|
channel.transition_state_machine :attached, protocol_message unless channel.attached?
channel.transition_state_machine :attached, reason: protocol_message unless channel.attached?
end

when ACTION.Detach
Expand Down Expand Up @@ -125,7 +125,7 @@ def dispatch_protocol_message(*args)
def dispatch_channel_error(protocol_message)
logger.warn "Channel Error message received: #{protocol_message.error}"
if !protocol_message.has_message_serial?
get_channel(protocol_message.channel).transition_state_machine :failed, protocol_message.error
get_channel(protocol_message.channel).transition_state_machine :failed, reason: protocol_message.error
else
logger.fatal "Cannot process ProtocolMessage as not yet implemented: #{protocol_message}"
end
Expand Down
1 change: 1 addition & 0 deletions lib/ably/realtime/connection.rb
Expand Up @@ -55,6 +55,7 @@ class Connection
)
include Ably::Modules::StateEmitter
include Ably::Modules::UsesStateMachine
ensure_state_machine_emits 'Ably::Models::ConnectionStateChange'

# Expected format for a connection recover key
RECOVER_REGEX = /^(?<recover>[\w-]+):(?<connection_serial>\-?\w+)$/
Expand Down

0 comments on commit aa05eba

Please sign in to comment.