forked from puma/puma
-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.rb
648 lines (529 loc) · 17 KB
/
server.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
require 'rack'
require 'stringio'
require 'puma/thread_pool'
require 'puma/const'
require 'puma/events'
require 'puma/null_io'
require 'puma/compat'
require 'puma/puma_http11'
require 'socket'
module Puma
# The HTTP Server itself. Serves out a single Rack app.
class Server
include Puma::Const
attr_reader :thread
attr_reader :events
attr_accessor :app
attr_accessor :min_threads
attr_accessor :max_threads
attr_accessor :persistent_timeout
attr_accessor :auto_trim_time
# Create a server for the rack app +app+.
#
# +events+ is an object which will be called when certain error events occur
# to be handled. See Puma::Events for the list of current methods to implement.
#
# Server#run returns a thread that you can join on to wait for the server
# to do it's work.
#
def initialize(app, events=Events::DEFAULT)
@app = app
@events = events
@check, @notify = IO.pipe
@ios = [@check]
@status = :stop
@min_threads = 0
@max_threads = 16
@auto_trim_time = 1
@thread = nil
@thread_pool = nil
@persistent_timeout = PERSISTENT_TIMEOUT
@persistent_check, @persistent_wakeup = IO.pipe
@unix_paths = []
@proto_env = {
"rack.version".freeze => Rack::VERSION,
"rack.errors".freeze => events.stderr,
"rack.multithread".freeze => true,
"rack.multiprocess".freeze => false,
"rack.run_once".freeze => true,
"SCRIPT_NAME".freeze => ENV['SCRIPT_NAME'] || "",
# Rack blows up if this is an empty string, and Rack::Lint
# blows up if it's nil. So 'text/plain' seems like the most
# sensible default value.
"CONTENT_TYPE".freeze => "text/plain",
"QUERY_STRING".freeze => "",
SERVER_PROTOCOL => HTTP_11,
SERVER_SOFTWARE => PUMA_VERSION,
GATEWAY_INTERFACE => CGI_VER
}
@envs = {}
ENV['RACK_ENV'] ||= "development"
end
# On Linux, use TCP_CORK to better control how the TCP stack
# packetizes our stream. This improves both latency and throughput.
#
if RUBY_PLATFORM =~ /linux/
# 6 == Socket::IPPROTO_TCP
# 3 == TCP_CORK
# 1/0 == turn on/off
def cork_socket(socket)
socket.setsockopt(6, 3, 1) if socket.kind_of? TCPSocket
end
def uncork_socket(socket)
socket.setsockopt(6, 3, 0) if socket.kind_of? TCPSocket
end
else
def cork_socket(socket)
end
def uncork_socket(socket)
end
end
# Tell the server to listen on host +host+, port +port+.
# If +optimize_for_latency+ is true (the default) then clients connecting
# will be optimized for latency over throughput.
#
# +backlog+ indicates how many unaccepted connections the kernel should
# allow to accumulate before returning connection refused.
#
def add_tcp_listener(host, port, optimize_for_latency=true, backlog=1024)
s = TCPServer.new(host, port)
if optimize_for_latency
s.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
end
s.setsockopt(Socket::SOL_SOCKET,Socket::SO_REUSEADDR, true)
s.listen backlog
@ios << s
s
end
def inherit_tcp_listener(host, port, fd)
s = TCPServer.for_fd(fd)
@ios << s
s
end
def add_ssl_listener(host, port, ctx, optimize_for_latency=true, backlog=1024)
s = TCPServer.new(host, port)
if optimize_for_latency
s.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
end
s.setsockopt(Socket::SOL_SOCKET,Socket::SO_REUSEADDR, true)
s.listen backlog
ssl = OpenSSL::SSL::SSLServer.new(s, ctx)
env = @proto_env.dup
env[HTTPS_KEY] = HTTPS
@envs[ssl] = env
@ios << ssl
s
end
def inherited_ssl_listener(fd, ctx)
s = TCPServer.for_fd(fd)
@ios << OpenSSL::SSL::SSLServer.new(s, ctx)
s
end
# Tell the server to listen on +path+ as a UNIX domain socket.
#
def add_unix_listener(path, umask=nil)
@unix_paths << path
# Let anyone connect by default
umask ||= 0
begin
old_mask = File.umask(umask)
s = UNIXServer.new(path)
@ios << s
ensure
File.umask old_mask
end
s
end
def inherit_unix_listener(path, fd)
@unix_paths << path
s = UNIXServer.for_fd fd
@ios << s
s
end
def backlog
@thread_pool and @thread_pool.backlog
end
def running
@thread_pool and @thread_pool.spawned
end
# Runs the server. It returns the thread used so you can join it.
# The thread is always available via #thread to be join'd
#
def run
BasicSocket.do_not_reverse_lookup = true
@status = :run
@thread_pool = ThreadPool.new(@min_threads, @max_threads) do |client, env|
process_client(client, env)
end
if @auto_trim_time
@thread_pool.auto_trim!(@auto_trim_time)
end
@thread = Thread.new do
begin
check = @check
sockets = @ios
pool = @thread_pool
while @status == :run
begin
ios = IO.select sockets
ios.first.each do |sock|
if sock == check
break if handle_check
else
pool << [sock.accept, @envs.fetch(sock, @proto_env)]
end
end
rescue Errno::ECONNABORTED
# client closed the socket even before accept
client.close rescue nil
rescue Object => e
@events.unknown_error self, e, "Listen loop"
end
end
graceful_shutdown if @status == :stop
ensure
unless @status == :restart
@ios.each { |i| i.close }
@unix_paths.each { |i| File.unlink i }
end
end
end
return @thread
end
# :nodoc:
def handle_check
cmd = @check.read(1)
case cmd
when STOP_COMMAND
@status = :stop
return true
when HALT_COMMAND
@status = :halt
return true
when RESTART_COMMAND
@status = :restart
return true
end
return false
end
# Given a connection on +client+, handle the incoming requests.
#
# This method support HTTP Keep-Alive so it may, depending on if the client
# indicates that it supports keep alive, wait for another request before
# returning.
#
def process_client(client, proto_env)
parser = HttpParser.new
close_socket = true
begin
while true
parser.reset
env = proto_env.dup
data = client.readpartial(CHUNK_SIZE)
nparsed = 0
# Assumption: nparsed will always be less since data will get filled
# with more after each parsing. If it doesn't get more then there was
# a problem with the read operation on the client socket.
# Effect is to stop processing when the socket can't fill the buffer
# for further parsing.
while nparsed < data.bytesize
nparsed = parser.execute(env, data, nparsed)
if parser.finished?
cl = env[CONTENT_LENGTH]
case handle_request(env, client, parser.body, cl)
when false
return
when :async
close_socket = false
return
end
nparsed += parser.body.bytesize if cl
if data.bytesize > nparsed
data.slice!(0, nparsed)
parser.reset
env = @proto_env.dup
nparsed = 0
else
unless ret = IO.select([client, @persistent_check], nil, nil, @persistent_timeout)
raise EOFError, "Timed out persistent connection"
end
return if ret.first.include? @persistent_check
end
else
# Parser is not done, queue up more data to read and continue parsing
chunk = client.readpartial(CHUNK_SIZE)
return if !chunk or chunk.length == 0 # read failed, stop processing
data << chunk
if data.bytesize >= MAX_HEADER
raise HttpParserError,
"HEADER is longer than allowed, aborting client early."
end
end
end
end
# The client disconnected while we were reading data
rescue EOFError, SystemCallError
# Swallow them. The ensure tries to close +client+ down
# The client doesn't know HTTP well
rescue HttpParserError => e
@events.parse_error self, env, e
# Server error
rescue StandardError => e
@events.unknown_error self, e, "Read"
ensure
begin
client.close if close_socket
rescue IOError, SystemCallError
# Already closed
rescue StandardError => e
@events.unknown_error self, e, "Client"
end
end
end
# Given a Hash +env+ for the request read from +client+, add
# and fixup keys to comply with Rack's env guidelines.
#
def normalize_env(env, client)
if host = env[HTTP_HOST]
if colon = host.index(":")
env[SERVER_NAME] = host[0, colon]
env[SERVER_PORT] = host[colon+1, host.bytesize]
else
env[SERVER_NAME] = host
env[SERVER_PORT] = PORT_80
end
else
env[SERVER_NAME] = LOCALHOST
env[SERVER_PORT] = PORT_80
end
unless env[REQUEST_PATH]
# it might be a dumbass full host request header
uri = URI.parse(env[REQUEST_URI])
env[REQUEST_PATH] = uri.path
raise "No REQUEST PATH" unless env[REQUEST_PATH]
end
env[PATH_INFO] = env[REQUEST_PATH]
# From http://www.ietf.org/rfc/rfc3875 :
# "Script authors should be aware that the REMOTE_ADDR and
# REMOTE_HOST meta-variables (see sections 4.1.8 and 4.1.9)
# may not identify the ultimate source of the request.
# They identify the client for the immediate request to the
# server; that client may be a proxy, gateway, or other
# intermediary acting on behalf of the actual source client."
#
env[REMOTE_ADDR] = client.peeraddr.last
end
# The object used for a request with no body. All requests with
# no body share this one object since it has no state.
EmptyBody = NullIO.new
# Given the request +env+ from +client+ and a partial request body
# in +body+, finish reading the body if there is one and invoke
# the rack app. Then construct the response and write it back to
# +client+
#
# +cl+ is the previously fetched Content-Length header if there
# was one. This is an optimization to keep from having to look
# it up again.
#
def handle_request(env, client, body, cl)
normalize_env env, client
env[PUMA_SOCKET] = client
if cl
body = read_body env, client, body, cl
return false unless body
else
body = EmptyBody
end
env[RACK_INPUT] = body
env[RACK_URL_SCHEME] = env[HTTPS_KEY] ? HTTPS : HTTP
# A rack extension. If the app writes #call'ables to this
# array, we will invoke them when the request is done.
#
after_reply = env[RACK_AFTER_REPLY] = []
begin
begin
status, headers, res_body = @app.call(env)
if status == -1
unless headers.empty? and res_body == []
raise "async response must have empty headers and body"
end
return :async
end
rescue => e
@events.unknown_error self, e, "Rack app"
status, headers, res_body = lowlevel_error(e)
end
content_length = nil
no_body = false
if res_body.kind_of? Array and res_body.size == 1
content_length = res_body[0].bytesize
end
cork_socket client
if env[HTTP_VERSION] == HTTP_11
allow_chunked = true
keep_alive = env[HTTP_CONNECTION] != CLOSE
include_keepalive_header = false
# An optimization. The most common response is 200, so we can
# reply with the proper 200 status without having to compute
# the response header.
#
if status == 200
client.write HTTP_11_200
else
client.write "HTTP/1.1 "
client.write status.to_s
client.write " "
client.write HTTP_STATUS_CODES[status]
client.write "\r\n"
no_body = status < 200 || STATUS_WITH_NO_ENTITY_BODY[status]
end
else
allow_chunked = false
keep_alive = env[HTTP_CONNECTION] == KEEP_ALIVE
include_keepalive_header = keep_alive
# Same optimization as above for HTTP/1.1
#
if status == 200
client.write HTTP_10_200
else
client.write "HTTP/1.1 "
client.write status.to_s
client.write " "
client.write HTTP_STATUS_CODES[status]
client.write "\r\n"
no_body = status < 200 || STATUS_WITH_NO_ENTITY_BODY[status]
end
end
colon = COLON
line_ending = LINE_END
headers.each do |k, vs|
case k
when CONTENT_LENGTH2
content_length = vs
next
when TRANSFER_ENCODING
allow_chunked = false
content_length = nil
when CONTENT_TYPE
next if no_body
end
vs.split(NEWLINE).each do |v|
client.write k
client.write colon
client.write v
client.write line_ending
end
end
if no_body
client.write line_ending
return keep_alive
end
if include_keepalive_header
client.write CONNECTION_KEEP_ALIVE
elsif !keep_alive
client.write CONNECTION_CLOSE
end
if content_length
client.write CONTENT_LENGTH_S
client.write content_length.to_s
client.write line_ending
chunked = false
elsif allow_chunked
client.write TRANSFER_ENCODING_CHUNKED
chunked = true
end
client.write line_ending
res_body.each do |part|
if chunked
client.write part.bytesize.to_s(16)
client.write line_ending
client.write part
client.write line_ending
else
client.write part
end
client.flush
end
if chunked
client.write CLOSE_CHUNKED
client.flush
end
ensure
uncork_socket client
body.close
res_body.close if res_body.respond_to? :close
after_reply.each { |o| o.call }
end
return keep_alive
end
# Given the requset +env+ from +client+ and the partial body +body+
# plus a potential Content-Length value +cl+, finish reading
# the body and return it.
#
# If the body is larger than MAX_BODY, a Tempfile object is used
# for the body, otherwise a StringIO is used.
#
def read_body(env, client, body, cl)
content_length = cl.to_i
remain = content_length - body.bytesize
return StringIO.new(body) if remain <= 0
# Use a Tempfile if there is a lot of data left
if remain > MAX_BODY
stream = Tempfile.new(Const::PUMA_TMP_BASE)
stream.binmode
else
# The body[0,0] trick is to get an empty string in the same
# encoding as body.
stream = StringIO.new body[0,0]
end
stream.write body
# Read an odd sized chunk so we can read even sized ones
# after this
chunk = client.readpartial(remain % CHUNK_SIZE)
# No chunk means a closed socket
unless chunk
stream.close
return nil
end
remain -= stream.write(chunk)
# Raed the rest of the chunks
while remain > 0
chunk = client.readpartial(CHUNK_SIZE)
unless chunk
stream.close
return nil
end
remain -= stream.write(chunk)
end
stream.rewind
return stream
end
# A fallback rack response if +@app+ raises as exception.
#
def lowlevel_error(e)
[500, {}, ["Puma caught this error: #{e}\n#{e.backtrace.join("\n")}"]]
end
# Wait for all outstanding requests to finish.
#
def graceful_shutdown
@thread_pool.shutdown if @thread_pool
end
# Stops the acceptor thread and then causes the worker threads to finish
# off the request queue before finally exiting.
#
def stop(sync=false)
@persistent_wakeup.close
@notify << STOP_COMMAND
@thread.join if @thread && sync
end
def halt(sync=false)
@persistent_wakeup.close
@notify << HALT_COMMAND
@thread.join if @thread && sync
end
def begin_restart
@persistent_wakeup.close
@notify << RESTART_COMMAND
end
end
end