-
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
server.cr
540 lines (488 loc) · 17 KB
/
server.cr
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
require "socket"
require "uri"
require "./server/context"
require "./server/handler"
require "./server/response"
require "./server/request_processor"
require "./common"
require "log"
{% unless flag?(:without_openssl) %}
require "openssl"
{% end %}
# A concurrent HTTP server implementation.
#
# A server is initialized with a handler chain responsible for processing each
# incoming request.
#
# ```
# require "http/server"
#
# server = HTTP::Server.new do |context|
# context.response.content_type = "text/plain"
# context.response.print "Hello world!"
# end
#
# address = server.bind_tcp 8080
# puts "Listening on http://#{address}"
# server.listen
# ```
#
# ## Request processing
#
# The handler chain receives an instance of `HTTP::Server::Context` that holds
# the `HTTP::Request` to process and a `HTTP::Server::Response` which it can
# configure and write to.
#
# Each connection is processed concurrently in a separate `Fiber` and can handle
# multiple subsequent requests-response cycles with connection keep-alive.
#
# ### Handler chain
#
# The handler given to a server can simply be a block that receives an `HTTP::Server::Context`,
# or it can be an instance of `HTTP::Handler`. An `HTTP::Handler` has a `#next`
# method to forward processing to the next handler in the chain.
#
# For example, an initial handler might handle exceptions raised from subsequent
# handlers and return a `500 Server Error` status (see `HTTP::ErrorHandler`).
# The next handler might log all incoming requests (see `HTTP::LogHandler`).
# And the final handler deals with routing and application logic.
#
# ```
# require "http/server"
#
# server = HTTP::Server.new([
# HTTP::ErrorHandler.new,
# HTTP::LogHandler.new,
# HTTP::CompressHandler.new,
# HTTP::StaticFileHandler.new("."),
# ])
#
# server.bind_tcp "127.0.0.1", 8080
# server.listen
# ```
#
# ### Response object
#
# The `HTTP::Server::Response` object has `status` and `headers` properties that can be
# configured before writing the response body. Once any response output has been
# written, changing the `status` and `headers` properties has no effect.
#
# The `HTTP::Server::Response` is a write-only `IO`, so all `IO` methods are available
# on it for sending the response body.
#
# ## Binding to sockets
#
# The server can be bound to one or more server sockets (see `#bind`)
#
# Supported types:
#
# * TCP socket: `#bind_tcp`, `#bind_unused_port`
# * TCP socket with TLS/SSL: `#bind_tls`
# * Unix socket `#bind_unix`
#
# `#bind(uri : URI)` and `#bind(uri : String)` parse socket configuration for
# one of these types from an `URI`. This can be useful for injecting plain text
# configuration values.
#
# Each of these methods returns the `Socket::Address` that was added to this
# server.
#
# ```
# require "http/server"
#
# server = HTTP::Server.new do |context|
# context.response.content_type = "text/plain"
# context.response.print "Hello world!"
# end
#
# address = server.bind_tcp "0.0.0.0", 8080
# puts "Listening on http://#{address}"
# server.listen
# ```
#
# It is also possible to bind a generic `Socket::Server` using
# `#bind(socket : Socket::Server)` which can be used for custom network protocol
# configurations.
#
# ## Server loop
#
# After defining all server sockets to listen to, the server can be started by
# calling `#listen`. This call blocks until the server is closed.
#
# A server can be closed by calling `#close`. This closes the server sockets and
# stops processing any new requests, even on connections with keep-alive enabled.
# Currently processing requests are not interrupted but also not waited for.
# In order to give them some grace period for finishing, the calling context
# can add a timeout like `sleep 10.seconds` after `#listen` returns.
#
# ### Reusing connections
#
# The request processor supports reusing a connection for subsequent
# requests. This is used by default for HTTP/1.1 or when requested by
# the `Connection: keep-alive` header. This is signalled by this header being
# set on the `HTTP::Server::Response` when it's passed into the handler chain.
#
# If in the handler chain this header is overridden to `Connection: close`, then
# the connection will not be reused after the request has been processed.
#
# Reusing the connection also requires that the request body (if present) is
# entirely consumed in the handler chain. Otherwise the connection will be closed.
class HTTP::Server
Log = ::Log.for("http.server")
@sockets = [] of Socket::Server
# Returns `true` if this server is closed.
getter? closed : Bool = false
# Returns `true` if this server is listening on its sockets.
getter? listening : Bool = false
# Creates a new HTTP server with the given block as handler.
def self.new(&handler : HTTP::Handler::HandlerProc) : self
new(handler)
end
# Creates a new HTTP server with a handler chain constructed from the *handlers*
# array and the given block.
def self.new(handlers : Array(HTTP::Handler), &handler : HTTP::Handler::HandlerProc) : self
new(HTTP::Server.build_middleware(handlers, handler))
end
# Creates a new HTTP server with the *handlers* array as handler chain.
def self.new(handlers : Array(HTTP::Handler)) : self
new(HTTP::Server.build_middleware(handlers))
end
# Creates a new HTTP server with the given *handler*.
def initialize(handler : HTTP::Handler | HTTP::Handler::HandlerProc)
@processor = RequestProcessor.new(handler)
end
# Returns the maximum permitted size for the request line in an HTTP request.
#
# The request line is the first line of a request, consisting of method,
# resource and HTTP version and the delimiting line break.
# If the request line has a larger byte size than the permitted size,
# the server responds with the status code `414 URI Too Long` (see `HTTP::Status::URI_TOO_LONG`).
#
# Default: `HTTP::MAX_REQUEST_LINE_SIZE`
def max_request_line_size : Int32
@processor.max_request_line_size
end
# Sets the maximum permitted size for the request line in an HTTP request.
def max_request_line_size=(size : Int32)
@processor.max_request_line_size = size
end
# Returns the maximum permitted combined size for the headers in an HTTP request.
#
# When parsing a request, the server keeps track of the amount of total bytes
# consumed for all headers (including line breaks).
# If combined byte size of all headers is larger than the permitted size,
# the server responds with the status code `432 Request Header Fields Too Large`
# (see `HTTP::Status::REQUEST_HEADER_FIELDS_TOO_LARGE`).
#
# Default: `HTTP::MAX_HEADERS_SIZE`
def max_headers_size : Int32
@processor.max_headers_size
end
# Sets the maximum permitted combined size for the headers in an HTTP request.
def max_headers_size=(size : Int32)
@processor.max_headers_size = size
end
# Creates a `TCPServer` listening on `host:port` and adds it as a socket, returning the local address
# and port the server listens on.
#
# ```
# require "http/server"
#
# server = HTTP::Server.new { }
# server.bind_tcp("127.0.0.100", 8080) # => Socket::IPAddress.new("127.0.0.100", 8080)
# ```
#
# If *reuse_port* is `true`, it enables the `SO_REUSEPORT` socket option,
# which allows multiple processes to bind to the same port.
def bind_tcp(host : String, port : Int32, reuse_port : Bool = false) : Socket::IPAddress
tcp_server = TCPServer.new(host, port, reuse_port: reuse_port)
begin
bind(tcp_server)
rescue exc
tcp_server.close
raise exc
end
tcp_server.local_address
end
# Creates a `TCPServer` listening on `127.0.0.1:port` and adds it as a socket,
# returning the local address and port the server listens on.
#
# ```
# require "http/server"
#
# server = HTTP::Server.new { }
# server.bind_tcp(8080) # => Socket::IPAddress.new("127.0.0.1", 8080)
# ```
#
# If *reuse_port* is `true`, it enables the `SO_REUSEPORT` socket option,
# which allows multiple processes to bind to the same port.
def bind_tcp(port : Int32, reuse_port : Bool = false) : Socket::IPAddress
bind_tcp Socket::IPAddress::LOOPBACK, port, reuse_port
end
# Creates a `TCPServer` listening on *address* and adds it as a socket, returning the local address
# and port the server listens on.
#
# ```
# require "http/server"
#
# server = HTTP::Server.new { }
# server.bind_tcp(Socket::IPAddress.new("127.0.0.100", 8080)) # => Socket::IPAddress.new("127.0.0.100", 8080)
# server.bind_tcp(Socket::IPAddress.new("127.0.0.100", 0)) # => Socket::IPAddress.new("127.0.0.100", 35487)
# ```
#
# If *reuse_port* is `true`, it enables the `SO_REUSEPORT` socket option,
# which allows multiple processes to bind to the same port.
def bind_tcp(address : Socket::IPAddress, reuse_port : Bool = false) : Socket::IPAddress
bind_tcp(address.address, address.port, reuse_port: reuse_port)
end
# Creates a `TCPServer` listening on an unused port and adds it as a socket.
#
# Returns the `Socket::IPAddress` with the determined port number.
#
# ```
# require "http/server"
#
# server = HTTP::Server.new { }
# server.bind_unused_port # => Socket::IPAddress.new("127.0.0.1", 12345)
# ```
def bind_unused_port(host : String = Socket::IPAddress::LOOPBACK, reuse_port : Bool = false) : Socket::IPAddress
bind_tcp host, 0, reuse_port
end
# Creates a `UNIXServer` bound to *path* and adds it as a socket.
#
# ```
# require "http/server"
#
# server = HTTP::Server.new { }
# server.bind_unix "/tmp/my-socket.sock"
# ```
def bind_unix(path : String) : Socket::UNIXAddress
server = UNIXServer.new(path)
begin
bind(server)
rescue exc
server.close
raise exc
end
server.local_address
end
# Creates a `UNIXServer` bound to *address* and adds it as a socket.
#
# ```
# require "http/server"
#
# server = HTTP::Server.new { }
# server.bind_unix(Socket::UNIXAddress.new("/tmp/my-socket.sock"))
# ```
def bind_unix(address : Socket::UNIXAddress) : Socket::UNIXAddress
bind_unix(address.path)
end
{% unless flag?(:without_openssl) %}
# Creates an `OpenSSL::SSL::Server` and adds it as a socket.
#
# The SSL server wraps a `TCPServer` listening on `host:port`.
#
# ```
# require "http/server"
#
# server = HTTP::Server.new { }
# context = OpenSSL::SSL::Context::Server.new
# context.certificate_chain = "openssl.crt"
# context.private_key = "openssl.key"
# server.bind_tls "127.0.0.1", 8080, context
# ```
def bind_tls(host : String, port : Int32, context : OpenSSL::SSL::Context::Server, reuse_port : Bool = false) : Socket::IPAddress
tcp_server = TCPServer.new(host, port, reuse_port: reuse_port)
server = OpenSSL::SSL::Server.new(tcp_server, context)
server.start_immediately = false
begin
bind(server)
rescue exc
server.close
raise exc
end
tcp_server.local_address
end
# Creates an `OpenSSL::SSL::Server` and adds it as a socket.
#
# The SSL server wraps a `TCPServer` listening on an unused port on *host*.
#
# ```
# require "http/server"
#
# server = HTTP::Server.new { }
# context = OpenSSL::SSL::Context::Server.new
# context.certificate_chain = "openssl.crt"
# context.private_key = "openssl.key"
# address = server.bind_tls "127.0.0.1", context
# ```
def bind_tls(host : String, context : OpenSSL::SSL::Context::Server) : Socket::IPAddress
bind_tls(host, 0, context)
end
# Creates an `OpenSSL::SSL::Server` and adds it as a socket.
#
# The SSL server wraps a `TCPServer` listening on an unused port on *host*.
#
# ```
# require "http/server"
#
# server = HTTP::Server.new { }
# context = OpenSSL::SSL::Context::Server.new
# context.certificate_chain = "openssl.crt"
# context.private_key = "openssl.key"
# address = server.bind_tls Socket::IPAddress.new("127.0.0.1", 8000), context
# ```
def bind_tls(address : Socket::IPAddress, context : OpenSSL::SSL::Context::Server) : Socket::IPAddress
bind_tls(address.address, address.port, context)
end
{% end %}
# Parses a socket configuration from *uri* and adds it to this server.
# Returns the effective address it is bound to.
#
# ```
# require "http/server"
#
# server = HTTP::Server.new { }
# server.bind("tcp://localhost:80") # => Socket::IPAddress.new("127.0.0.1", 8080)
# server.bind("unix:///tmp/server.sock") # => Socket::UNIXAddress.new("/tmp/server.sock")
# server.bind("tls://127.0.0.1:443?key=private.key&cert=certificate.cert&ca=ca.crt") # => Socket::IPAddress.new("127.0.0.1", 443)
# ```
def bind(uri : String) : Socket::Address
bind(URI.parse(uri))
end
# :ditto:
def bind(uri : URI) : Socket::Address
case uri.scheme
when "tcp"
bind_tcp(Socket::IPAddress.parse(uri))
when "unix"
bind_unix(Socket::UNIXAddress.parse(uri))
when "tls", "ssl"
address = Socket::IPAddress.parse(uri)
{% unless flag?(:without_openssl) %}
context = OpenSSL::SSL::Context::Server.from_hash(uri.query_params)
bind_tls(address, context)
{% else %}
raise ArgumentError.new "Unsupported socket type: #{uri.scheme} (program was compiled without openssl support)"
{% end %}
else
raise ArgumentError.new "Unsupported socket type: #{uri.scheme}"
end
end
# Adds a `Socket::Server` *socket* to this server.
def bind(socket : Socket::Server) : Nil
raise "Can't add socket to running server" if listening?
raise "Can't add socket to closed server" if closed?
@sockets << socket
end
# Enumerates all addresses this server is bound to.
def each_address(&block : Socket::Address ->)
@sockets.each do |socket|
yield socket.local_address
end
end
def addresses : Array(Socket::Address)
array = [] of Socket::Address
each_address do |address|
array << address
end
array
end
# Creates a `TCPServer` listening on `127.0.0.1:port`, adds it as a socket
# and starts the server. Blocks until the server is closed.
#
# See `#bind(port : Int32)` for details.
def listen(port : Int32, reuse_port : Bool = false)
bind_tcp(port, reuse_port)
listen
end
# Creates a `TCPServer` listening on `host:port`, adds it as a socket
# and starts the server. Blocks until the server is closed.
#
# See `#bind(host : String, port : Int32)` for details.
def listen(host : String, port : Int32, reuse_port : Bool = false)
bind_tcp(host, port, reuse_port)
listen
end
# Starts the server. Blocks until the server is closed.
def listen : Nil
raise "Can't re-start closed server" if closed?
raise "Can't start server with no sockets to listen to, use HTTP::Server#bind first" if @sockets.empty?
raise "Can't start running server" if listening?
@listening = true
done = Channel(Nil).new
@sockets.each do |socket|
spawn do
loop do
io = begin
socket.accept?
rescue e
handle_exception(e)
next
end
if io
# a non nillable version of the closured io
_io = io
spawn handle_client(_io)
else
break
end
end
ensure
done.send nil
end
end
@sockets.size.times { done.receive }
end
# Gracefully terminates the server. It will process currently accepted
# requests, but it won't accept new connections.
def close : Nil
raise "Can't close server, it's already closed" if closed?
@closed = true
@processor.close
@sockets.each do |socket|
socket.close
rescue
# ignore exception on close
end
@listening = false
@sockets.clear
end
private def handle_client(io : IO)
if io.is_a?(IO::Buffered)
io.sync = false
end
{% unless flag?(:without_openssl) %}
if io.is_a?(OpenSSL::SSL::Socket::Server)
begin
io.accept
rescue ex
Log.debug(exception: ex) { "Error during SSL handshake" }
return
end
end
{% end %}
@processor.process(io, io)
ensure
{% begin %}
begin
io.close
rescue IO::Error{% unless flag?(:without_openssl) %} | OpenSSL::SSL::Error{% end %}
end
{% end %}
end
# This method handles exceptions raised at `Socket#accept?`.
private def handle_exception(e : Exception)
# TODO: This needs more refinement. Not every exception is an actual server
# error and should be logged as such. Client malfunction should only be informational.
# See https://github.com/crystal-lang/crystal/pull/9034#discussion_r407038999
Log.error(exception: e) { "Error while connecting a new socket" }
end
# Builds all handlers as the middleware for `HTTP::Server`.
def self.build_middleware(handlers, last_handler : (Context ->)? = nil)
raise ArgumentError.new "You must specify at least one HTTP Handler." if handlers.empty?
0.upto(handlers.size - 2) { |i| handlers[i].next = handlers[i + 1] }
handlers.last.next = last_handler if last_handler
handlers.first
end
end