forked from adamcooke/procodile
/
tcp_proxy.cr
155 lines (137 loc) 路 4.42 KB
/
tcp_proxy.cr
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
module Procodile
class TCPProxy
def self.start(supervisor)
proxy = new(supervisor)
proxy.start
proxy
end
def initialize(@supervisor : Procodile::Supervisor)
@listeners = {} of TCPServer => Procodile::Process
@stopped_processes = [] of Procodile::Process
@sp_reader, @sp_writer = IO.pipe
end
def start
@supervisor.config.processes.each { |_, p| add_process(p) }
Thread.new do
listen
Procodile.log nil, "proxy", "Stopped listening on all ports"
end
end
def add_process(process)
if process.proxy?
@listeners[TCPServer.new(process.proxy_address.not_nil!, process.proxy_port.not_nil!)] = process
Procodile.log nil, "proxy", "Proxying traffic on #{process.proxy_address}:#{process.proxy_port} to #{process.name}".color(32)
@sp_writer.write(".".to_slice)
end
rescue e
Procodile.log nil, "proxy", "Exception: #{e.class}: #{e.message}"
Procodile.log nil, "proxy", e.backtrace[0, 5].join("\n")
end
def remove_process(process)
@stopped_processes << process
@sp_writer.write(".".to_slice)
end
def listen
loop do
# IO.select IOs, timeout 30 seconds
io = IO.select([@sp_reader] + @listeners.keys, nil, nil, 30)
if io && io.first
# if IO.select not Timeout, do
io.first.each do |i|
if io == @sp_reader
io.read_nonblock(999)
next
end
Thread.new(io) do |io|
handle_client(client: io.accept, server: io)
end
end
end
@stopped_processes.reject do |process|
if io = @listeners.key(process)
Procodile.log nil, "proxy", "Stopped proxy listener for #{process.name}"
io.close
@listeners.delete(io)
end
true
end
end
rescue e
Procodile.log nil, "proxy", "Exception: #{e.class}: #{e.message}"
Procodile.log nil, "proxy", e.backtrace[0, 5].join("\n")
end
def listen1
sleep_chan = Channel(Nil).new
sp_reader_chan = Channel(Int).new
listener_chan = Channel(Nil).new
spawn do
loop do
sleep 30
sleep_chan.send(Nil)
end
end
spawn do
loop { sp_reader_chan.send @sp_reader.read(999) }
end
@listeners.keys.each do |io|
_io = io
spawn do
loop { listener_chan.send handle_client(client: _io.accept, server: _io) }
end
end
loop do
select
when sp_reader_chan.receive
when listener_chan.receive
when sleep_chan.receive
end
@stopped_processes.reject do |process|
if io = @listeners.key(process)
Procodile.log nil, "proxy", "Stopped proxy listener for #{process.name}"
io.close
@listeners.delete(io)
end
true
end
end
end
def handle_client(client, server) : Nil
process = @listeners[server]
instances = @supervisor.processes[process]? || [] of String
if instances.empty?
Procodile.log nil, "proxy", "There are no processes running for #{process.name}"
else
instance = instances[rand(instances.size)]
backend_socket = TCPSocket.new("127.0.0.1", instance.port) rescue nil
if backend_socket.nil?
Procodile.log nil, "proxy", "Could not connect to #{instance.description}:#{instance.port}"
return
end
readers = {:backend => backend_socket, :client => client}
loop do
io = IO.select(readers.values, nil, nil, 0.5)
if io && io.first
io.first.each do |io|
readers.each_key do |key|
next unless readers[key] == io
opposite_side = key == :client ? :backend : :client
if io.eof?
readers[opposite_side].shutdown(Socket::SHUT_WR) rescue nil
readers.delete(opposite_side)
else
readers[opposite_side].write(io.readpartial(1024)) rescue nil
end
end
end
end
end
end
rescue e
Procodile.log nil, "proxy", "Exception: #{e.class}: #{e.message}"
Procodile.log nil, "proxy", e.backtrace[0, 5].join("\n")
ensure
backend_socket.close rescue nil
client.close rescue nil
end
end
end