/
server.rb
173 lines (149 loc) · 4.28 KB
/
server.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
require 'eventmachine'
require 'dkbrpc/connection_id'
require "dkbrpc/fast_message_protocol"
require 'dkbrpc/outgoing_connection'
require 'dkbrpc/incoming_connection'
require 'dkbrpc/id'
require 'dkbrpc/default'
module Dkbrpc
class ClientProxy
def initialize(outgoing_connection)
@remote_connection = outgoing_connection
end
def method_missing(method, * args, & block)
EventMachine::schedule do
@remote_connection.remote_call(method, args, & block)
end
end
def errback(& block)
@remote_connection.errbacks << block if block
end
def conn_id
@remote_connection.conn_id
end
end
class Server
attr_reader :connections
attr_reader :conn_id_generator
attr_reader :msg_id_generator
attr_reader :errback
attr_reader :insecure_methods
attr_accessor :external_protocol
def initialize(host, port, api, insecure_methods=Default::INSECURE_METHODS)
@host = host
@port = port
@api = api
@connections = []
@unbind_block = Proc.new do |connection|
@connections.delete(connection)
end
@conn_id_generator = Id.new
@msg_id_generator = Id.new
@insecure_methods = insecure_methods
end
def start(& callback)
EventMachine::schedule do
begin
@server_signature = EventMachine::start_server(@host, @port, Listener) do |connection|
connection.conn_id_generator = @conn_id_generator
connection.msg_id_generator = @msg_id_generator
connection.api = @api
connection.new_connection_callback = callback
connection.errbacks = @errback.nil? ? [] : [@errback]
connection.unbindback = @unbind_block
connection.insecure_methods = @insecure_methods
connection.external_protocol = @external_protocol
@connections << connection
end
rescue Exception => e
@errback.call(e) if @errback
end
end
end
def stop(& callback)
EventMachine::schedule do
EventMachine::next_tick do
close_all_connections
stop_server(callback)
end
end
end
def errback(& block)
@errback = block
end
private #################################################################################
def close_all_connections
@connections.each do |connection|
connection.close_connection
end
end
def stop_server(callback)
if @server_signature
EventMachine::stop_server(@server_signature)
@server_signature = nil
callback.call(true) if callback
else
callback.call(false) if callback
end
end
end
module Listener
INCOMING_CONNECTION = "4"[0]
OUTGOING_CONNECTION = "5"[0]
attr_accessor :conn_id_generator
attr_reader :conn_id
attr_accessor :msg_id_generator
attr_accessor :api
attr_accessor :new_connection_callback
attr_accessor :callback
attr_accessor :errbacks
attr_accessor :unbindback
attr_accessor :insecure_methods
attr_accessor :external_protocol
include ConnectionId
def post_init
@buffer = ""
end
def receive_data data
@buffer << data
handshake(@buffer) if @conn_id.nil?
end
def unbind
call_errbacks(ConnectionError.new)
@unbindback.call(self) if @unbindback
end
def call_errbacks(message)
@errbacks.each do |e|
e.call(message)
end
end
private
def handle_incoming
@conn_id = @conn_id_generator.next
self.extend(IncomingConnection)
switch_protocol
send_data(@conn_id)
end
def handle_outgoing(buffer)
if complete_id?(buffer[1..-1])
@conn_id = extract_id(buffer[1..-1])
self.extend(OutgoingConnection)
switch_protocol
send_message(@conn_id)
@new_connection_callback.call(ClientProxy.new(self)) if @new_connection_callback
end
end
def handshake(buffer)
if buffer[0] == INCOMING_CONNECTION
handle_incoming
elsif buffer[0] == OUTGOING_CONNECTION
handle_outgoing(buffer)
else
@external_protocol.handle_connection(buffer, self) if @external_protocol
end
end
def switch_protocol
Dkbrpc::FastMessageProtocol.install(self)
end
end
end