/
json_drb.rb
385 lines (348 loc) · 13.1 KB
/
json_drb.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
# encoding: ascii-8bit
# Copyright 2014 Ball Aerospace & Technologies Corp.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
# under the terms of the GNU Lesser General Public License
# as published by the Free Software Foundation; version 3 with
# attribution addendums as found in the LICENSE.txt
require 'thread'
require 'socket'
require 'json'
require 'drb/acl'
require 'drb/drb'
require 'set'
require 'cosmos/io/json_rpc'
module Cosmos
# JsonDRb implements the JSON-RPC 2.0 Specification to provide an interface
# for both internal and external tools to access the COSMOS server. It
# provides methods to install an access control list to control access to the
# API. It also limits the available methods to a known list of allowable API
# methods.
class JsonDRb
MINIMUM_REQUEST_TIME = 0.0001
FAST_READ = (RUBY_VERSION > "2.1")
@@debug = false
# @return [Integer] The number of JSON-RPC requests processed
attr_accessor :request_count
# @return [Array<String>] List of methods that should be allowed
attr_accessor :method_whitelist
# @return [ACL] The access control list
attr_accessor :acl
def initialize
@listen_socket = nil
@thread = nil
@acl = nil
@object = nil
@method_whitelist = nil
@request_count = 0
@request_times = []
@request_times_index = 0
@request_mutex = Mutex.new
@client_sockets = []
@client_threads = []
@client_pipe_writers = []
@client_mutex = Mutex.new
@thread_reader, @thread_writer = IO.pipe
end
# Returns the number of connected clients
# @return [Integer] The number of connected clients
def num_clients
@client_threads.length
end
# Stops the DRb service by closing the socket and the processing thread
def stop_service
Cosmos.kill_thread(self, @thread)
@thread = nil
Cosmos.close_socket(@listen_socket)
@listen_socket = nil
client_threads = nil
@client_mutex.synchronize do
@client_sockets.each do |client_socket|
Cosmos.close_socket(client_socket)
end
@client_pipe_writers.each do |client_pipe_writer|
client_pipe_writer.write('.')
end
client_threads = @client_threads.clone
end
# This cannot be inside of the client_mutex or the threads will not
# be able to shutdown because they will stick on the client_mutex
client_threads.each do |client_thread|
Cosmos.kill_thread(self, client_thread)
end
@client_mutex.synchronize do
@client_threads.clear
@client_sockets.clear
@client_pipe_writers.clear
end
end
# Gracefully kill the thread
def graceful_kill
@thread_writer.write('.') if @thread
end
# @param hostname [String] The host to start the service on
# @param port [Integer] The port number to listen for connections
# @param object [Object] The object to send the DRb requests to. This
# object must either include the Cosmos::Script module or be the
# CmdTlmServer.
def start_service(hostname = nil, port = nil, object = nil)
if hostname and port and object
@thread_reader, @thread_writer = IO.pipe
@object = object
hostname = '127.0.0.1'.freeze if (hostname.to_s.upcase == 'LOCALHOST'.freeze)
# Create a socket to accept connections from clients
begin
@listen_socket = TCPServer.new(hostname, port)
@listen_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1) unless Kernel.is_windows?
# The address is use error is pretty typical if an existing
# CmdTlmServer is running so explicitly rescue this
rescue Errno::EADDRINUSE
raise "Error binding to port #{port}.\n" +
"Either another application is using this port\n" +
"or the operating system is being slow cleaning up.\n" +
"Make sure all sockets/streams are closed in all applications,\n" +
"wait 1 minute and try again."
# Something else went wrong which is fatal
rescue => error
Logger.error "JsonDRb listen thread unable to be created.\n#{error.formatted}"
Cosmos.handle_fatal_exception(error)
end
# Start the listen thread which accepts connections
@thread = Thread.new do
begin
while true
begin
socket = @listen_socket.accept_nonblock
rescue Errno::EAGAIN, Errno::ECONNABORTED, Errno::EINTR, Errno::EWOULDBLOCK
read_ready, _ = IO.select([@listen_socket, @thread_reader])
if read_ready and read_ready.include?(@thread_reader)
# Thread should be killed
break
else
retry
end
end
if @acl and !@acl.allow_socket?(socket)
Cosmos.close_socket(socket)
next
end
# Create new thread for new connection
create_client_thread(socket)
end
rescue Exception => error
Logger.error "JsonDRb listen thread unexpectedly died.\n#{error.formatted}"
Cosmos.handle_fatal_exception(error)
end
end
elsif hostname or port or object
raise "0 or 3 parameters must be given"
else
# Client - Noop
end
end
# @return [Thread] The server thread listening for incoming requests
def thread
@thread
end
# Adds a request time to the list. A request time consists of the amount of
# time to receive the request, process it, and send the response. These
# times are used by the {#average_request_time} method to calculate an
# average request time.
#
# @param request_time [Float] Time in seconds for the data transmission
def add_request_time(request_time)
@request_mutex.synchronize do
request_time = MINIMUM_REQUEST_TIME if request_time < MINIMUM_REQUEST_TIME
@request_times[@request_times_index] = request_time
@request_times_index += 1
@request_times_index = 0 if @request_times_index >= 100
end
end
# @return [Float] The average time in seconds for a JSON DRb request to be
# processed and the response sent.
def average_request_time
avg = 0
@request_mutex.synchronize do
avg = @request_times.mean
end
avg
end
# @param socket [Socket] The socket to the client
# @param data [String] Binary data which has already been read from the
# socket.
# @param pipe_reader [IO.pipe] Used to break out of select
# @return [String] The request message
def self.receive_message(socket, data, pipe_reader)
self.get_at_least_x_bytes_of_data(socket, data, 4, pipe_reader)
if data.length >= 4
length = data[0..3].unpack('N'.freeze)[0]
data.replace(data[4..-1])
else
return nil
end
self.get_at_least_x_bytes_of_data(socket, data, length, pipe_reader)
if data.length >= length
message = data[0..(length - 1)]
data.replace(data[length..-1])
return message
else
return nil
end
end
# @param socket [Socket] The socket to the client
# @param current_data [String] Binary data read from the socket
# @param required_num_bytes [Integer] The minimum number of bytes to read
# @param pipe_reader [IO.pipe] Used to break out of select
# before returning
def self.get_at_least_x_bytes_of_data(socket, current_data, required_num_bytes, pipe_reader)
while (current_data.length < required_num_bytes)
if FAST_READ
data = socket.read_nonblock(65535, exception: false)
if data == :wait_readable
IO.fast_select([socket, pipe_reader], nil, nil, nil)
else
current_data << data
end
else
begin
current_data << socket.read_nonblock(65535)
rescue IO::WaitReadable
IO.fast_select([socket, pipe_reader], nil, nil, nil)
end
end
end
end
# @param socket [Socket] The socket to the client
# @param data [String] Binary data to send to the socket
# @param send_timeout [Float] The number of seconds to wait for the send to
# complete
def self.send_data(socket, data, send_timeout = 10.0)
num_bytes_to_send = data.length + 4
total_bytes_sent = 0
bytes_sent = 0
data_to_send = [data.length].pack('N'.freeze) << data.clone
loop do
begin
bytes_sent = socket.write_nonblock(data_to_send[total_bytes_sent..-1])
rescue Errno::EAGAIN, Errno::EWOULDBLOCK
result = IO.fast_select(nil, [socket], nil, send_timeout)
if result
retry
else
raise Timeout::Error, "Send Timeout"
end
end
total_bytes_sent += bytes_sent
break if total_bytes_sent >= num_bytes_to_send
end
end
# @return [Boolean] Whether debug messages are enabled
def self.debug?
@@debug
end
# @param value [Boolean] Whether to enable debug messages
def self.debug=(value)
@@debug = value
end
protected
# Creates a new Thread to service the JSON DRb requests from the client.
#
# @param socket [Socket] The socket which the server accepted from the
# client.
def create_client_thread(socket)
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, 1)
Thread.new(socket) do |my_socket|
pipe_reader, pipe_writer = IO.pipe
@client_mutex.synchronize do
@client_sockets << my_socket
@client_threads << Thread.current
@client_pipe_writers << pipe_writer
end
data = ''
begin
while true
begin
request_data = JsonDRb.receive_message(my_socket, data, pipe_reader)
start_time = Time.now
@request_count += 1
rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::ENOTSOCK
# Socket was closed
break
end
if request_data
break unless process_request(request_data, my_socket, start_time)
else
# Socket was closed by client
break
end
end
rescue Exception => error
Logger.error "JsonDrb client thread unexpectedly died.\n#{error.formatted}"
end
@client_mutex.synchronize do
Cosmos.close_socket(my_socket)
@client_sockets.delete(my_socket)
@client_threads.delete(Thread.current)
@client_pipe_writers.delete(pipe_writer)
end
end
end
# Process the JSON request data, execute the method, and send the response.
#
# @param request_data [String] The JSON encoded request
# @param my_socket [Socket] The socket to send the response out on
# @param start_time [Time] The time when the initial request was received
def process_request(request_data, my_socket, start_time)
STDOUT.puts request_data if JsonDRb.debug?
begin
request = JsonRpcRequest.from_json(request_data)
response = nil
if (@method_whitelist and @method_whitelist.include?(request.method)) or
(!@method_whitelist and !JsonRpcRequest::DANGEROUS_METHODS.include?(request.method))
begin
result = @object.send(request.method.intern, *request.params)
if request.id
response = JsonRpcSuccessResponse.new(result, request.id)
end
rescue Exception => error
if request.id
if NoMethodError === error
response = JsonRpcErrorResponse.new(
JsonRpcError.new(-32601, "Method not found", error), request.id)
elsif ArgumentError === error
response = JsonRpcErrorResponse.new(
JsonRpcError.new(-32602, "Invalid params", error), request.id)
else
response = JsonRpcErrorResponse.new(
JsonRpcError.new(-1, error.message, error), request.id)
end
end
end
else
if request.id
response = JsonRpcErrorResponse.new(
JsonRpcError.new(-1, "Cannot call unauthorized methods"), request.id)
end
end
process_response(response, my_socket, start_time) if response
rescue => error
response = JsonRpcErrorResponse.new(JsonRpcError.new(-32600, "Invalid Request", error), nil)
process_response(response, my_socket, start_time)
end
true
end
def process_response(response, socket, start_time)
response_data = response.to_json(:allow_nan => true)
STDOUT.puts response_data if JsonDRb.debug?
JsonDRb.send_data(socket, response_data)
end_time = Time.now
request_time = end_time - start_time
add_request_time(request_time)
rescue
# Socket was closed?
return false
end
end
end