/
tcp.rb
430 lines (356 loc) · 13.1 KB
/
tcp.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
#--
# Copyright (C)2007-10 Tony Arcieri
# You can redistribute this under the terms of the Ruby license
# See file LICENSE for details
#++
module Revactor
# The TCP module holds all Revactor functionality related to the
# Transmission Control Protocol, including drop-in replacements
# for Ruby TCP Sockets which can operate concurrently using Actors.
module TCP
# Number of seconds to wait for a connection
CONNECT_TIMEOUT = 10
# Raised when a read from a client or server fails
class ReadError < StandardError; end
# Raised when a write to a client or server fails
class WriteError < StandardError; end
# Raised when a connection to a remote server fails
class ConnectError < StandardError; end
# Raised when hostname resolution for a remote server fails
class ResolveError < ConnectError; end
# Connect to the specified host and port. Host may be a domain name
# or IP address. Accepts the following options:
#
# :active - Controls how data is read from the socket. See the
# documentation for Revactor::TCP::Socket#active=
#
def self.connect(host, port, options = {})
socket = Socket.connect host, port, options
socket.attach Coolio::Loop.default
Actor.receive do |filter|
filter.when(T[Object, socket]) do |message, _|
case message
when :tcp_connected
return socket
when :tcp_connect_failed
raise ConnectError, "connection refused"
when :tcp_resolve_failed
raise ResolveError, "couldn't resolve #{host}"
else raise "unexpected message for #{socket.inspect}: #{message}"
end
end
filter.after(CONNECT_TIMEOUT) do
raise ConnectError, "connection timed out"
end
end
end
# Listen on the specified address and port. Accepts the following options:
#
# :active - Default active setting for new connections. See the
# documentation Coolio::TCP::Socket#active= for more info
#
# :controller - The controlling actor, default Actor.current
#
# :filter - An symbol/class or array of symbols/classes which implement
# #encode and #decode methods to transform data sent and
# received data respectively via Revactor::TCP::Socket.
# See the "Filters" section in the README for more information
#
def self.listen(addr, port, options = {})
Listener.new(addr, port, options).attach(Coolio::Loop.default).disable
end
# TCP socket class, returned by Revactor::TCP.connect and
# Revactor::TCP::Listener#accept
class Socket < Coolio::TCPSocket
attr_reader :controller
class << self
# Connect to the specified host and port. Host may be a domain name
# or IP address. Accepts the following options:
#
# :active - Controls how data is read from the socket. See the
# documentation for #active=
#
# :controller - The controlling actor, default Actor.current
#
# :filter - An symbol/class or array of symbols/classes which implement
# #encode and #decode methods to transform data sent and
# received data respectively via Revactor::TCP::Socket.
# See the "Filters" section in the README for more information
#
def connect(host, port, options = {})
options[:active] ||= false
options[:controller] ||= Actor.current
super.instance_eval {
@active, @controller = options[:active], options[:controller]
@filterset = [*initialize_filter(options[:filter])]
self
}
end
end
def initialize(socket, options = {})
super(socket)
@active ||= options[:active] || false
@controller ||= options[:controller] || Actor.current
@filterset ||= [*initialize_filter(options[:filter])]
@receiver = @controller
@read_buffer = IO::Buffer.new
end
def inspect
"#<#{self.class}:0x#{object_id.to_s(16)} #{@remote_host}:#{@remote_port}>"
end
# Enable or disable active mode data reception. State can be any
# of the following:
#
# true - All received data is sent to the controlling actor
# false - Receiving data is disabled
# :once - A single message will be sent to the controlling actor
# then active mode will be disabled
#
def active=(state)
unless @receiver == @controller
raise "cannot change active state during a synchronous call"
end
unless [true, false, :once].include? state
raise ArgumentError, "must be true, false, or :once"
end
if [true, :once].include?(state)
unless @read_buffer.empty?
@receiver << [:tcp, self, @read_buffer.read]
return if state == :once
end
enable unless enabled?
end
@active = state
end
# Is the socket in active mode?
def active?; @active; end
# Set the controlling actor
def controller=(controller)
raise ArgumentError, "controller must be an actor" unless controller.is_a? Actor
@receiver = controller if @receiver == @controller
@controller = controller
end
# Read data from the socket synchronously. If a length is specified
# then the call blocks until the given length has been read. Otherwise
# the call blocks until it receives any data.
def read(length = nil, options = {})
# Only one synchronous call allowed at a time
raise "already being called synchronously" unless @receiver == @controller
unless @read_buffer.empty? or (length and @read_buffer.size < length)
return @read_buffer.read(length)
end
active = @active
@active = :once
@receiver = Actor.current
enable unless enabled?
loop do
Actor.receive do |filter|
filter.when(T[:tcp, self]) do |_, _, data|
if length.nil?
@receiver = @controller
@active = active
enable if @active
return data
end
@read_buffer << data
if @read_buffer.size >= length
@receiver = @controller
@active = active
enable if @active
return @read_buffer.read(length)
end
end
filter.when(T[:tcp_closed, self]) do
unless @receiver == @controller
@receiver = @controller
@receiver << T[:tcp_closed, self]
end
raise EOFError, "connection closed"
end
if timeout = options[:timeout]
filter.after(timeout) { raise ReadError, "read timed out" }
end
end
end
end
# Write data to the socket. The call blocks until all data has been written.
def write(data, options = {})
# Only one synchronous call allowed at a time
raise "already being called synchronously" unless @receiver == @controller
active = @active
@active = false
@receiver = Actor.current
disable if @active
super(encode(data))
Actor.receive do |filter|
filter.when(T[:tcp_write_complete, self]) do
@receiver = @controller
@active = active
enable if @active and not enabled?
return data.size
end
filter.when(T[:tcp_closed, self]) do
@active = false
raise EOFError, "connection closed"
end
if timeout = options[:timeout]
filter.after(timeout) { raise WriteError, "write timed out" }
end
end
end
alias_method :<<, :write
#########
protected
#########
#
# Filter setup
#
# Initialize filters
def initialize_filter(filter)
case filter
when NilClass
[]
when Tuple
name, *args = filter
case name
when Class
name.new(*args)
when Symbol
symbol_to_filter(name).new(*args)
else raise ArgumentError, "unrecognized filter type: #{name.class}"
end
when Array
filter.map { |f| initialize_filter f }
when Class
filter.new
when Symbol
symbol_to_filter(filter).new
end
end
# Lookup filters referenced as symbols
def symbol_to_filter(filter)
case filter
when :line then Revactor::Filter::Line
when :packet then Revactor::Filter::Packet
else raise ArgumentError, "unrecognized filter type: #{filter}"
end
end
# Decode data through the filter chain
def decode(data)
@filterset.inject([data]) do |a, filter|
a.inject([]) do |a2, d|
a2 + filter.decode(d)
end
end
end
# Encode data through the filter chain
def encode(message)
@filterset.reverse.inject(message) { |m, filter| filter.encode(*m) }
end
#
# Coolio::TCPSocket callback
#
def on_connect
@receiver << T[:tcp_connected, self]
end
def on_connect_failed
@receiver << T[:tcp_connect_failed, self]
end
def on_resolve_failed
@receiver << T[:tcp_resolve_failed, self]
end
def on_close
@receiver << T[:tcp_closed, self]
end
def on_read(data)
# Run incoming message through the filter chain
message = decode(data)
if message.is_a?(Array) and not message.empty?
message.each { |msg| @receiver << T[:tcp, self, msg] }
elsif message and not message.empty?
@receiver << T[:tcp, self, message]
else return
end
if @active == :once
@active = false
disable
end
end
def on_write_complete
@receiver << T[:tcp_write_complete, self]
end
end
# TCP Listener returned from Revactor::TCP.listen
class Listener < Coolio::TCPListener
attr_reader :controller
# Listen on the specified address and port. Accepts the following options:
#
# :active - Default active setting for new connections. See the
# documentation Coolio::TCP::Socket#active= for more info
#
# :controller - The controlling actor, default Actor.current
#
# :filter - An symbol/class or array of symbols/classes which implement
# #encode and #decode methods to transform data sent and
# received data respectively via Revactor::TCP::Socket.
# See the "Filters" section in the README for more information
#
def initialize(host, port, options = {})
super(host, port)
opts = {
:active => false,
:controller => Actor.current
}.merge(options)
@active, @controller = opts[:active], opts[:controller]
@filterset = options[:filter]
@accepting = false
end
def inspect
"#<#{self.class}:0x#{object_id.to_s(16)}>"
end
# Change the default active setting for newly accepted connections
def active=(state)
unless [true, false, :once].include? state
raise ArgumentError, "must be true, false, or :once"
end
@active = state
end
# Will newly accepted connections be active?
def active?; @active; end
# Change the default controller for newly accepted connections
def controller=(controller)
raise ArgumentError, "controller must be an actor" unless controller.is_a? Actor
@controller = controller
end
# Accept an incoming connection
def accept
raise "another actor is already accepting" if @accepting
@accepting = true
@receiver = Actor.current
enable
Actor.receive do |filter|
filter.when(T[:tcp_connection, self]) do |_, _, sock|
@accepting = false
return sock
end
end
end
#########
protected
#########
#
# Coolio::TCPListener callbacks
#
def on_connection(socket)
sock = Socket.new(socket,
:controller => @controller,
:active => @active,
:filter => @filterset
)
sock.attach(evloop)
@receiver << T[:tcp_connection, self, sock]
disable
end
end
end
end