@@ -19,10 +19,19 @@ Result HttpAsyncServer::start(AsyncEventLoop& loop, StringSpan address, uint16_t
1919 asyncServerAccept.callback .bind <HttpAsyncServer, &HttpAsyncServer::onNewClient>(*this );
2020 SC_TRY (asyncServerAccept.start (*eventLoop, serverSocket));
2121 started = true ;
22-
2322 return httpServer.start (memory);
2423}
2524
25+ void HttpAsyncServer::setUseAsyncStreams (bool useAsyncStreams, Span<AsyncReadableStream::Request> readQueue,
26+ Span<AsyncWritableStream::Request> writeQueue, Span<AsyncBufferView> buffers)
27+ {
28+ readQueues = readQueue;
29+ writeQueues = writeQueue;
30+ useStreams = useAsyncStreams;
31+
32+ buffersPool.buffers = buffers;
33+ }
34+
2635Result HttpAsyncServer::stopAsync ()
2736{
2837 stopping = true ;
@@ -34,10 +43,7 @@ Result HttpAsyncServer::stopAsync()
3443
3544 for (HttpServerClient& it : httpServer.clients )
3645 {
37- if (it.state != HttpServerClient::State::Free)
38- {
39- closeAsync (it);
40- }
46+ closeAsync (it);
4147 }
4248 return Result (true );
4349}
@@ -72,19 +78,64 @@ void HttpAsyncServer::onNewClient(AsyncSocketAccept::Result& result)
7278 SC_ASSERT_RELEASE (httpServer.allocateClient (idx));
7379
7480 HttpServerClient& client = httpServer.clients [idx];
81+ client.socket = move (acceptedClient);
82+ if (useStreams)
83+ {
84+ const size_t readQueueLen = readQueues.sizeInElements () / httpServer.clients .sizeInElements ();
85+ const size_t writeQueueLen = writeQueues.sizeInElements () / httpServer.clients .sizeInElements ();
86+ SC_TRUST_RESULT (readQueueLen > 0 );
87+ SC_TRUST_RESULT (writeQueueLen > 0 );
88+ Span<AsyncReadableStream::Request> readQueue;
89+ Span<AsyncWritableStream::Request> writeQueue;
90+ SC_TRUST_RESULT (readQueues.sliceStartLength (idx * readQueueLen, readQueueLen, readQueue));
91+ SC_TRUST_RESULT (writeQueues.sliceStartLength (idx * writeQueueLen, writeQueueLen, writeQueue));
92+ SC_TRUST_RESULT (client.readableSocketStream .init (buffersPool, readQueue, *eventLoop, client.socket ));
93+ SC_TRUST_RESULT (client.writableSocketStream .init (buffersPool, writeQueue, *eventLoop, client.socket ));
94+
95+ auto onData = [this , idx](AsyncBufferView::ID bufferID) { onStreamReceive (httpServer.clients [idx], bufferID); };
96+ SC_TRUST_RESULT (client.readableSocketStream .eventData .addListener (onData));
97+ SC_TRUST_RESULT (client.readableSocketStream .start ());
98+ }
99+ else
100+ {
101+ client.asyncSend .setDebugName (client.debugName );
102+ client.asyncReceive .setDebugName (client.debugName );
103+ client.asyncReceive .callback .bind <HttpAsyncServer, &HttpAsyncServer::onReceive>(*this );
75104
76- client.socket = move (acceptedClient);
77- client.asyncSend .setDebugName (client.debugName );
78- client.asyncReceive .setDebugName (client.debugName );
79- client.asyncReceive .callback .bind <HttpAsyncServer, &HttpAsyncServer::onReceive>(*this );
80-
81- // This cannot fail because start reports only incorrect API usage (AsyncRequest already in use etc.)
82- SC_TRUST_RESULT (client.asyncReceive .start (*eventLoop, client.socket , client.request .availableHeader ));
105+ // This cannot fail because start reports only incorrect API usage (AsyncRequest already in use etc.)
106+ SC_TRUST_RESULT (client.asyncReceive .start (*eventLoop, client.socket , client.request .availableHeader ));
107+ }
83108
84109 // Only reactivate asyncAccept if arena is not full (otherwise it's being reactivated in closeAsync)
85110 result.reactivateRequest (httpServer.canAcceptMoreClients ());
86111}
87112
113+ void HttpAsyncServer::onStreamReceive (HttpServerClient& client, AsyncBufferView::ID bufferID)
114+ {
115+ Span<char > readData;
116+ buffersPool.getWritableData (bufferID, readData);
117+ // TODO: Handle error for available headers not big enough
118+ SC_ASSERT_RELEASE (readData.sizeInBytes () <= client.request .availableHeader .sizeInBytes ());
119+ ::memcpy (client.request.availableHeader.data(), readData.data(), readData.sizeInBytes());
120+
121+ if (not client.request .parse (httpServer.maxHeaderSize , readData))
122+ {
123+ // TODO: Invoke on error
124+ return ;
125+ }
126+ else if (client.request .allHeadersReceived ())
127+ {
128+ httpServer.onRequest (client.request , client.response );
129+ }
130+
131+ if (client.response .mustBeFlushed ())
132+ {
133+ auto onAfterWrite = [this , &client](AsyncBufferView::ID) { closeAsync (client); };
134+ SC_TRUST_RESULT (client.writableStream ->write (client.response .getSpan (), onAfterWrite));
135+ client.writableStream ->end (); // TODO: This must be called only if actually ended...
136+ }
137+ }
138+
88139void HttpAsyncServer::onReceive (AsyncSocketReceive::Result& result)
89140{
90141 SC_COMPILER_WARNING_PUSH_OFFSETOF
@@ -98,10 +149,19 @@ void HttpAsyncServer::onReceive(AsyncSocketReceive::Result& result)
98149 return ;
99150 }
100151
101- const size_t idx = static_cast <size_t >(&client - &httpServer.clients [0 ]);
102- Span<char > outspan = httpServer.processClientReceivedData (idx, readData);
103- if (not outspan.empty ())
152+ if (not client.request .parse (httpServer.maxHeaderSize , readData))
104153 {
154+ // TODO: Invoke on error
155+ return ;
156+ }
157+ else if (client.request .allHeadersReceived ())
158+ {
159+ httpServer.onRequest (client.request , client.response );
160+ }
161+
162+ if (client.response .mustBeFlushed ())
163+ {
164+ Span<const char > outspan = client.response .getSpan ();
105165 client.asyncSend .setDebugName (client.debugName );
106166 client.asyncSend .callback .bind <HttpAsyncServer, &HttpAsyncServer::onAfterSend>(*this );
107167 auto res = client.asyncSend .start (*eventLoop, client.socket , outspan);
@@ -130,15 +190,28 @@ void HttpAsyncServer::onAfterSend(AsyncSocketSend::Result& result)
130190
131191void HttpAsyncServer::closeAsync (HttpServerClient& requestClient)
132192{
133- if (not requestClient.asyncSend .isFree ())
193+ if (requestClient.state == HttpServerClient::State::Free)
194+ {
195+ return ;
196+ }
197+
198+ if (useStreams)
134199 {
135- (void )requestClient.asyncSend .stop (*eventLoop);
200+ requestClient.readableSocketStream .destroy (); // emits 'eventClose' cancelling pending reads
201+ requestClient.writableSocketStream .end (); // emits 'eventFinish' cancelling pending writes
136202 }
137- if ( not requestClient. asyncReceive . isFree ())
203+ else
138204 {
139- (void )requestClient.asyncReceive .stop (*eventLoop);
205+ if (not requestClient.asyncSend .isFree ())
206+ {
207+ (void )requestClient.asyncSend .stop (*eventLoop);
208+ }
209+ if (not requestClient.asyncReceive .isFree ())
210+ {
211+ (void )requestClient.asyncReceive .stop (*eventLoop);
212+ }
213+ SC_TRUST_RESULT (requestClient.socket .close ());
140214 }
141- SC_TRUST_RESULT (requestClient.socket .close ());
142215 const bool wasFull = not httpServer.canAcceptMoreClients ();
143216
144217 SC_TRUST_RESULT (httpServer.deallocateClient (requestClient));
0 commit comments