-
Notifications
You must be signed in to change notification settings - Fork 10
/
ConnectionManager.h
354 lines (305 loc) · 16.3 KB
/
ConnectionManager.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
/*
* Copyright(c) Sophist Solutions, Inc. 1990-2022. All rights reserved
*/
#ifndef _Stroika_Framework_WebServer_ConnectionManager_h_
#define _Stroika_Framework_WebServer_ConnectionManager_h_ 1
#include "../StroikaPreComp.h"
#include <list>
#include <memory>
#include "../../Foundation/Common/Property.h"
#include "../../Foundation/Containers/Collection.h"
#include "../../Foundation/Containers/Set.h"
#include "../../Foundation/Execution/Synchronized.h"
#include "../../Foundation/Execution/ThreadPool.h"
#include "../../Foundation/Execution/UpdatableWaitForIOReady.h"
#include "../../Foundation/IO/Network/HTTP/Headers.h"
#include "../../Foundation/IO/Network/Listener.h"
#include "../../Foundation/IO/Network/SocketAddress.h"
#include "CORS.h"
#include "Connection.h"
#include "Request.h"
#include "Response.h"
#include "Router.h"
/**
* \version <a href="Code-Status.md#Beta">Beta</a>
*/
namespace Stroika::Frameworks::WebServer {
using namespace Stroika::Foundation;
using Characters::String;
using Common::KeyValuePair;
using Containers::Collection;
using Containers::Set;
using IO::Network::ConnectionOrientedStreamSocket;
using IO::Network::Socket;
using IO::Network::HTTP::Headers;
using Traversal::Iterable;
/**
* This class is a useful helper for managing a set of connections. You can start it and stop it
* (it maintains internal threads). And you can hand it Connections, along with a set of handlers,
* and it will monitor the connections, and when any is ready with more input, it will assign the
* appropriate handler to handle the request, and produce the response.
*
* \note The connection manager supports HTTP keep-alives, to keep the connection open.
*
* \note Default ordering for interceptors:
* interceptors += earlyInterceptors;
* interceptors += beforeInterceptors;
* interceptors += router;
* interceptors += afterInterceptors;
*
* \note \em Thread-Safety <a href="Thread-Safety.md#Internally-Synchronized-Thread-Safety">Internally-Synchronized-Thread-Safety</a>
*
* TODO:
* @todo We could allow updating most of these parameters. Some easily (like ServerHeader). Some would require stopping the
* connections, and rebuilding the list of threads etc (cuz must redo bindings). For now, KISS,
* and only allow modifications as needed.
*/
class ConnectionManager {
public:
/**
* \brief ConnectionManager::Options specify things like default headers, caching policies, binding flags (not bindings), thread count
*/
struct Options {
/**
* This is the max number of TCP connections the webserver will allow to keep around, before starting
* to reject new connections.
*
* \note NYI tracking and rejecting extra connections - just used as a hint for other values
*/
optional<unsigned int> fMaxConnections;
/**
* This is basically the number of threads to use. It can be automatically inferred from fMaxConnections
* but can be specified explicitly.
*/
optional<unsigned int> fMaxConcurrentlyHandledConnections;
/**
* If bindFlags omitted, it defaults to kDefault_BindFlags
*/
optional<Socket::BindFlags> fBindFlags;
/**
* fDefaultResponseHeaders may be used to specify initial{default} values for some HTTP Response headers.
* It can be used for specifying any non-standared HTTP headers to add to all responsed (like X-Foobar: value).
* It can be used to specify a default 'Server' header.
* This is equivilent to adding an interceptor early in the interceptor chain, and calling
* response.rwHeaders = *fDefaultResponseHeaders;
*
* Probably not a good idea to specify things like contentLength, etc here. That may produce bad results.
* @todo consider listing a stronger set of requirements for what headings can and cannot be set?)
*
* This is also a good place to add defaults like:
* "Content-Type": DataExchange::InternetMediaTypes::kOctetStream (which was done automatically by Stroika before 2.1b10)
*/
optional<Headers> fDefaultResponseHeaders;
/**
* fDefaultGETResponseHeaders - like fDefaultResponseHeaders - may be used to specify initial{default} values for some HTTP headers,
* but it is only applied to GET requests (in addition to those applied from fDefaultResponseHeaders which applied to all responses).
*
* An example of something that makes sense to apply here would be Cache-Control settings (since these are unneeded on other HTTP methods
* typically).
*/
optional<Headers> fDefaultGETResponseHeaders;
/**
* Options for how the HTTP Server handles CORS (mostly HTTP OPTIONS requests)
*/
optional<CORSOptions> fCORS;
/**
* \brief sets the initial value for each Response. Harmless except for the slight performance cost (wont always work) - see Response::autoComputeETag
*
* defaults to kDefault_AutoComputeETagResponse
*/
optional<bool> fAutoComputeETagResponse;
/**
* This feature causes sockets to automatically flush their data - and avoid connection reset - when possible.
* This makes the closing socket process more costly and slow, so is optional, but is on by default because it makes
* commmunications more releable.
*
* Turn this on - especially - if you see connection reset when clients talk to the Stroika web-server (TCP RST sent).
*
* \note - this defaults to 2 seconds (kDefault_AutomaticTCPDisconnectOnClose)
*/
optional<Time::DurationSecondsType> fAutomaticTCPDisconnectOnClose;
/**
* @see Socket::SetLinger () - SO_LINGER
*/
optional<int> fLinger;
/**
* mostly for debugging - so easier to segrate names of threads if you have
* multiple thread pools/connection managers
*/
optional<String> fThreadPoolName;
/**
* The number of new TCP connections the kernel will buffer before the application has a chance to accept.
*
* This can typically be left unset, and defaults to be based on fMaxConnections.
*
* The default for tcp backlog is a little less than max # of connections. What makes
* sense depends on ratio of incoming connections to the lifetime of those calls. If high, make this more than
* number of connections. If low, then can be less than max# of connections.
*
* @see http://man7.org/linux/man-pages/man2/listen.2.html
*/
optional<unsigned int> fTCPBacklog;
static constexpr unsigned int kDefault_MaxConnections{25};
static constexpr Socket::BindFlags kDefault_BindFlags{};
#if qCompiler_cpp17InlineStaticMemberOfClassDoubleDeleteAtExit_Buggy
static const Headers kDefault_Headers;
static const Common::ConstantProperty<CORSOptions> kDefault_CORS;
#else
static inline const Headers kDefault_Headers{Iterable<KeyValuePair<String, String>>{{IO::Network::HTTP::HeaderName::kServer, L"Stroika/2.1"sv}}};
static inline const Common::ConstantProperty<CORSOptions> kDefault_CORS{[] () { return kDefault_CORSOptions; }};
#endif
static constexpr bool kDefault_AutoComputeETagResponse{true};
static constexpr Time::DurationSecondsType kDefault_AutomaticTCPDisconnectOnClose{2.0};
static constexpr optional<int> kDefault_Linger{nullopt}; // intentionally optional-valued
};
static const Options kDefaultOptions;
public:
/**
*/
ConnectionManager (const SocketAddress& bindAddress, const Sequence<Route>& routes, const Options& options = kDefaultOptions);
ConnectionManager (const Traversal::Iterable<SocketAddress>& bindAddresses, const Sequence<Route>& routes, const Options& options = kDefaultOptions);
ConnectionManager (const ConnectionManager&) = delete;
#if qDefaultTracingOn
~ConnectionManager ();
#else
~ConnectionManager () = default;
#endif
public:
nonvirtual ConnectionManager& operator= (const ConnectionManager&) = delete;
public:
/**
* Returns the 'effective' options after applying defaults, not (generally) the original options.
*/
Common::ReadOnlyProperty<const Options&> options;
public:
/**
* This defaults to @DefaultFaultInterceptor, but can be set to 'missing' or any other fault handler. Not also - that
* all interceptors can engage in fault handling. This is just meant to provide a simple one-stop-shop for how to
* handle faults in one place.
*/
Common::Property<optional<Interceptor>> defaultErrorHandler;
public:
/**
* Get the list of interceptors early interceptors. These default to:
* earltInterceptors += ServerHeadersInterceptor_{serverHeader, corsSupportMode};
* if (defaultFaultHandler) {
* earltInterceptors += *defaultFaultHandler;
* }
*
* @see beforeInterceptors, afterInterceptors, AddInterceptor, RemoveInterceptor to maintain the list of interceptors
*/
Common::Property<Sequence<Interceptor>> earlyInterceptors;
public:
/**
* Get the list of interceptors before the private ConnectionManager interceptors (e.g. router).
*
* @see earlyInterceptors, afterInterceptors, AddInterceptor, RemoveInterceptor to maintain the list of interceptors
*/
Common::Property<Sequence<Interceptor>> beforeInterceptors;
public:
/**
* Get the list of interceptors after the private ConnectionManager interceptors (e.g. router).
* @see beforeInterceptors
*
* @see earlyInterceptors, beforeInterceptors, AddInterceptor, RemoveInterceptor to maintain the list of interceptors
*/
Common::Property<Sequence<Interceptor>> afterInterceptors;
public:
/**
* These 'before' and 'after' values are releative to the router, which towards the end of the chain.
*/
enum class InterceptorAddRelativeTo {
ePrependsToEarly,
ePrepend,
eAppend,
eAfterBeforeInterceptors,
};
static constexpr InterceptorAddRelativeTo ePrependsToEarly = InterceptorAddRelativeTo::ePrependsToEarly;
static constexpr InterceptorAddRelativeTo ePrepend = InterceptorAddRelativeTo::ePrepend;
static constexpr InterceptorAddRelativeTo eAppend = InterceptorAddRelativeTo::eAppend;
static constexpr InterceptorAddRelativeTo eAfterBeforeInterceptors = InterceptorAddRelativeTo::eAfterBeforeInterceptors;
public:
/**
*/
nonvirtual void AddInterceptor (const Interceptor& i, InterceptorAddRelativeTo relativeTo);
public:
/**
*/
nonvirtual void RemoveInterceptor (const Interceptor& i);
public:
/**
*/
nonvirtual void AbortConnection (const shared_ptr<Connection>& conn);
public:
/**
* We need some sort of status flag on connections - saying of they are OPEN or not - or done.
* But this will return just those which are not 'done'. Of course - due to asynchrony,
* by the time one looks at the list, some may already be done.
*/
Common::ReadOnlyProperty<Collection<shared_ptr<Connection>>> pConnections;
public:
/**
* Here active refers to being currently processed, reading data, writing data or computing answers. This means
* assigned into thread pool for handling.
*/
Common::ReadOnlyProperty<Collection<shared_ptr<Connection>>> pActiveConnections;
private:
nonvirtual void onConnect_ (const ConnectionOrientedStreamSocket::Ptr& s);
private:
nonvirtual void WaitForReadyConnectionLoop_ ();
private:
nonvirtual void FixupInterceptorChain_ ();
private:
// Inactive connections are those we are waiting (select/epoll) for incoming data; these are stored in fInactiveSockSetPoller_
nonvirtual Collection<shared_ptr<Connection>> GetInactiveConnections_ () const;
private:
nonvirtual void ReplaceInEarlyInterceptor_ (const optional<Interceptor>& oldValue, const optional<Interceptor>& newValue);
private:
Options fEffectiveOptions_;
Execution::Synchronized<optional<Interceptor>> fDefaultErrorHandler_;
Execution::Synchronized<Sequence<Interceptor>> fEarlyInterceptors_;
Execution::Synchronized<Sequence<Interceptor>> fBeforeInterceptors_;
Execution::Synchronized<Sequence<Interceptor>> fAfterInterceptors_;
Execution::Synchronized<optional<Time::DurationSecondsType>> fAutomaticTCPDisconnectOnClose_;
Router fRouter_;
InterceptorChain fInterceptorChain_; // no need to synchonize cuz internally synchronized
// Active connections are those actively in the readheaders/readbody, dispatch/handle code
Execution::Synchronized<Collection<shared_ptr<Connection>>> fActiveConnections_;
struct MyWaitForIOReady_Traits_ {
using HighLevelType = shared_ptr<Connection>;
static inline auto GetSDKPollable (const HighLevelType& t)
{
return t->socket ().GetNativeSocket ();
}
};
// No need to lock fInactiveSockSetPoller_ since its internally synchronized;
Execution::UpdatableWaitForIOReady<shared_ptr<Connection>, MyWaitForIOReady_Traits_> fInactiveSockSetPoller_{};
/*
* SUBTLE DATA MEMBER ORDERING NOTE!
* We count on the THREADS that run and manipulate all the above data members are all listed AFTER those data
* members in this object. This is just for the covenient ordering that imposes on construction and destruction:
* the threads (declared below) are automatically shutdown on destruction before the data they reference (above)
*
* Same with the listener, as this is basically a thread invoking calls on the above data members.
*/
// we may eventually want two thread pools - one for managing bookkeeping/monitoring harvests, and one for actually handling
// connections. Or maybe a single thread for the bookkeeping, and the pool for handling ongoing connections?
//
// But for now - KISS
//
// Note - for now - we don't even handle 'accepting' connections in the threadpool!!! - just one thread
Execution::ThreadPool fActiveConnectionThreads_;
Execution::Thread::CleanupPtr fWaitForReadyConnectionThread_{Execution::Thread::CleanupPtr::eAbortBeforeWaiting};
// Note: this must be declared after the threadpool so its shutdown on destruction before the thread pool, and doesn't try to launch
// new tasks into an already destroyed threadpool.
IO::Network::Listener fListener_;
};
inline const ConnectionManager::Options ConnectionManager::kDefaultOptions;
}
/*
********************************************************************************
***************************** Implementation Details ***************************
********************************************************************************
*/
#include "ConnectionManager.inl"
#endif /*_Stroika_Framework_WebServer_ConnectionManager_h_*/