-
Notifications
You must be signed in to change notification settings - Fork 19
/
incoming_message_dispatcher.rb
187 lines (155 loc) · 7.13 KB
/
incoming_message_dispatcher.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
module Ably::Realtime
class Client
# IncomingMessageDispatcher is a (private) class that is used to dispatch {Ably::Models::ProtocolMessage} that are
# received from Ably via the {Ably::Realtime::Connection}
class IncomingMessageDispatcher
ACTION = Ably::Models::ProtocolMessage::ACTION
def initialize(client, connection)
@client = client
@connection = connection
subscribe_to_incoming_protocol_messages
end
private
attr_reader :client, :connection
def channels
client.channels
end
def get_channel(channel_name)
channels.fetch(channel_name) do
logger.warn "Received channel message for non-existent channel"
Ably::Realtime::Models::NilChannel.new
end
end
def logger
client.logger
end
def dispatch_protocol_message(*args)
protocol_message = args.first
unless protocol_message.kind_of?(Ably::Models::ProtocolMessage)
raise ArgumentError, "Expected a ProtocolMessage. Received #{protocol_message}"
end
unless [:nack, :error].include?(protocol_message.action)
logger.debug "#{protocol_message.action} received: #{protocol_message}"
end
if [:sync, :presence, :message].any? { |prevent_duplicate| protocol_message.action == prevent_duplicate }
if connection.serial && protocol_message.has_connection_serial? && protocol_message.connection_serial <= connection.serial
error_target = if protocol_message.channel
get_channel(protocol_message.channel)
else
connection
end
error_message = "Protocol error, duplicate message received for serial #{protocol_message.connection_serial}"
error_target.emit :error, Ably::Exceptions::ProtocolError.new(error_message, 400, 80013)
logger.error error_message
return
end
end
update_connection_recovery_info protocol_message
case protocol_message.action
when ACTION.Heartbeat
when ACTION.Ack
ack_pending_queue_for_message_serial(protocol_message) if protocol_message.has_message_serial?
when ACTION.Nack
logger.warn "NACK received: #{protocol_message}"
nack_pending_queue_for_message_serial(protocol_message) if protocol_message.has_message_serial?
when ACTION.Connect
when ACTION.Connected
connection.transition_state_machine :connected, reason: protocol_message unless connection.connected?
when ACTION.Disconnect, ACTION.Disconnected
connection.transition_state_machine :disconnected, reason: protocol_message.error unless connection.disconnected?
when ACTION.Close
when ACTION.Closed
connection.transition_state_machine :closed unless connection.closed?
when ACTION.Error
if protocol_message.channel && !protocol_message.has_message_serial?
dispatch_channel_error protocol_message
else
process_connection_error protocol_message
end
when ACTION.Attach
when ACTION.Attached
get_channel(protocol_message.channel).tap do |channel|
channel.transition_state_machine :attached, reason: protocol_message unless channel.attached?
end
when ACTION.Detach
when ACTION.Detached
get_channel(protocol_message.channel).tap do |channel|
channel.transition_state_machine :detached unless channel.detached?
end
when ACTION.Sync
presence = get_channel(protocol_message.channel).presence
protocol_message.presence.each do |presence_message|
presence.__incoming_msgbus__.publish :sync, presence_message
end
presence.members.update_sync_serial protocol_message.channel_serial
when ACTION.Presence
presence = get_channel(protocol_message.channel).presence
protocol_message.presence.each do |presence_message|
presence.__incoming_msgbus__.publish :presence, presence_message
end
when ACTION.Message
channel = get_channel(protocol_message.channel)
protocol_message.messages.each do |message|
channel.__incoming_msgbus__.publish :message, message
end
else
error = Ably::Exceptions::ProtocolError.new("Protocol Message Action #{protocol_message.action} is unsupported by this MessageDispatcher", 400, 80013)
client.connection.emit :error, error
logger.fatal error.message
end
end
def dispatch_channel_error(protocol_message)
logger.warn "Channel Error message received: #{protocol_message.error}"
if !protocol_message.has_message_serial?
get_channel(protocol_message.channel).transition_state_machine :failed, reason: protocol_message.error
else
logger.fatal "Cannot process ProtocolMessage as not yet implemented: #{protocol_message}"
end
end
def process_connection_error(protocol_message)
connection.manager.error_received_from_server protocol_message.error
end
def update_connection_recovery_info(protocol_message)
connection.update_connection_serial protocol_message.connection_serial if protocol_message.has_connection_serial?
end
def ack_pending_queue_for_message_serial(ack_protocol_message)
drop_pending_queue_from_ack(ack_protocol_message) do |protocol_message|
ack_messages protocol_message.messages
ack_messages protocol_message.presence
end
end
def nack_pending_queue_for_message_serial(nack_protocol_message)
drop_pending_queue_from_ack(nack_protocol_message) do |protocol_message|
nack_messages protocol_message.messages, nack_protocol_message
nack_messages protocol_message.presence, nack_protocol_message
end
end
def ack_messages(messages)
messages.each do |message|
logger.debug "Calling ACK success callbacks for #{message.class.name} - #{message.to_json}"
message.succeed message
end
end
def nack_messages(messages, protocol_message)
messages.each do |message|
logger.debug "Calling NACK failure callbacks for #{message.class.name} - #{message.to_json}, protocol message: #{protocol_message}"
message.fail message, protocol_message.error
end
end
def drop_pending_queue_from_ack(ack_protocol_message)
message_serial_up_to = ack_protocol_message.message_serial + ack_protocol_message.count - 1
connection.__pending_message_ack_queue__.drop_while do |protocol_message|
if protocol_message.message_serial <= message_serial_up_to
yield protocol_message
true
end
end
end
def subscribe_to_incoming_protocol_messages
connection.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |*args|
dispatch_protocol_message *args
end
end
end
end
end