-
Notifications
You must be signed in to change notification settings - Fork 231
/
client_handler.rb
106 lines (88 loc) · 3.42 KB
/
client_handler.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
require 'socket'
require 'json'
# This class is a little confusing. See the docs/ directory for guidance.
module Zeus
class Server
class ClientHandler
def datasource ; @listener ; end
def on_datasource_event ; handle_server_connection ; end
def close_child_socket ; end
def close_parent_socket ; @listener.close ; end
REATTEMPT_HANDSHAKE = 204
def initialize(acceptor_commands, server)
@server = server
@acceptor_commands = acceptor_commands
@listener = UNIXServer.new(Zeus::SOCKET_NAME)
@listener.listen(10)
rescue Errno::EADDRINUSE
Zeus.ui.error "Zeus appears to be already running in this project. If not, remove #{Zeus::SOCKET_NAME} and try again."
exit 1
end
private
# See docs/client_server_handshake.md for details
def handle_server_connection
s_client = @listener.accept
data = JSON.parse(s_client.readline.chomp)
command, arguments = data.values_at('command', 'arguments')
client_terminal = s_client.recv_io
exit_status_socket = s_client.recv_io
Thread.new {
# This is a little ugly. Gist: Try to handshake the client to the acceptor.
# If the acceptor is not booted yet, this will hang until it is, then terminate with
# REATTEMPT_HANDSHAKE. We catch that exit code and try once more.
begin
loop do
pid = fork { handshake_client_to_acceptor(s_client, command, arguments, client_terminal, exit_status_socket) ; exit }
Process.wait(pid)
break if $?.exitstatus != REATTEMPT_HANDSHAKE
end
ensure
client_terminal.close
s_client.close
end
}
end
def handshake_client_to_acceptor(s_client, command, arguments, client_terminal, exit_status_socket)
unless @acceptor_commands.include?(command.to_s)
msg = "no such command `#{command}`."
return exit_with_message(s_client, client_terminal, msg)
end
unless acceptor = send_io_to_acceptor(client_terminal, exit_status_socket, command)
wait_for_acceptor(s_client, client_terminal, command)
exit REATTEMPT_HANDSHAKE
end
Zeus.ui.info "accepting connection for #{command}"
acceptor.socket.puts arguments.to_json
pid = acceptor.socket.readline.chomp.to_i
s_client.puts pid
s_client.close
end
def exit_with_message(s_client, client_terminal, msg)
s_client << "0\n"
client_terminal << "[zeus] #{msg}\n"
client_terminal.close
s_client.close
exit 1
end
def wait_for_acceptor(s_client, client_terminal, command)
s_client << "0\n"
client_terminal << "[zeus] waiting for `#{command}` to finish booting...\n"
s, r = UNIXSocket.pair
s << {type: 'wait', command: command}.to_json << "\n"
@server.__CHILD__register_acceptor(r)
s.readline # wait until acceptor is booted
end
def send_io_to_acceptor(io, io2, command)
return false unless acceptor = @server.__CHILD__find_acceptor_for_command(command)
return false unless usock = UNIXSocket.for_fd(acceptor.socket.fileno)
usock.send_io(io)
usock.send_io(io2)
io.close
io2.close
return acceptor
rescue Errno::EPIPE
return false
end
end
end
end