Skip to content

Commit 322fd51

Browse files
committed
Move server I/O to async event loop, reserve thread pool for app code
1 parent 8dc3a4e commit 322fd51

File tree

8 files changed

+841
-179
lines changed

8 files changed

+841
-179
lines changed

plain/plain/runtime/global_settings.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@
191191
]
192192
SERVER_GRACEFUL_TIMEOUT: int = 30
193193
SERVER_SENDFILE: bool = True
194+
SERVER_CONNECTIONS: int = 1000
194195

195196
# MARK: Preflight Checks
196197

plain/plain/server/README.md

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,13 @@ plain server --reload
3232
The server uses two levels of concurrency:
3333

3434
- **Workers** are separate OS processes. Each worker runs independently with its own memory. The default is `0` (auto), which spawns one worker per CPU core.
35-
- **Threads** run inside each worker. Threads share memory within a worker and handle concurrent requests using a thread pool. The default is 4 threads per worker.
35+
- **Threads** run inside each worker. Threads handle application code (middleware and views) using a thread pool. All network I/O (accepting connections, reading requests, writing responses, TLS, keepalive) is handled asynchronously on the event loop without consuming threads. The default is 4 threads per worker.
3636

3737
Total concurrent requests = `workers × threads`. On a 4-core machine with the defaults, that's `4 × 4 = 16` concurrent requests.
3838

3939
**When to adjust workers:** Workers provide true parallelism since each is a separate process with its own Python GIL. More workers means more memory usage but better CPU utilization. Use `--workers 0` (the default) to match your CPU cores, or set an explicit number.
4040

41-
**When to adjust threads:** Threads are efficient for I/O-bound work (database queries, external API calls) since they release the GIL while waiting. Most web applications are I/O-bound, so the default of 4 threads works well. Increase threads if your application spends a lot of time waiting on I/O. Decrease to 1 if you need to avoid thread-safety concerns.
41+
**When to adjust threads:** Threads are used exclusively for running your application code (middleware and views). This means `SERVER_THREADS` directly controls how many views can execute in parallel — it's not shared with I/O operations. Increase threads if your views spend a lot of time waiting on I/O (database queries, external API calls). Decrease to 1 if you need to avoid thread-safety concerns.
4242

4343
**Long-lived connections:** Async views (SSE, WebSocket) run on the worker's event loop instead of occupying a thread pool slot. This means long-lived connections don't reduce your capacity for regular requests.
4444

@@ -82,6 +82,7 @@ SERVER_ACCESS_LOG = True
8282
SERVER_ACCESS_LOG_FIELDS = ["method", "path", "query", "status", "duration_ms", "size", "ip", "user_agent", "referer"]
8383
SERVER_GRACEFUL_TIMEOUT = 30
8484
SERVER_SENDFILE = True
85+
SERVER_CONNECTIONS = 1000
8586
```
8687

8788
Settings can also be set via environment variables with the `PLAIN_` prefix (e.g., `PLAIN_SERVER_WORKERS=4`).
@@ -193,21 +194,35 @@ plain server --timeout 120
193194

194195
## Architecture
195196

197+
Each worker process runs an asyncio event loop that handles all network I/O. A thread pool is reserved exclusively for application code.
198+
196199
```mermaid
197200
graph TD
198201
A[Arbiter] -->|fork per core| W[Worker]
199202
W --> EL[asyncio event loop]
200203
EL -->|accept| C[Connection]
201204
C -->|wait readable| EL
205+
C -->|TLS handshake| TP_TLS[Thread pool]
206+
TP_TLS --> EL
202207
C -->|TLS ALPN| P{Protocol?}
203208
P -->|h2| H2[HTTP/2 handler]
204-
P -->|http/1.1| H1[Parse in thread pool]
209+
P -->|http/1.1| HDR[Read headers async]
210+
HDR --> BODY{Body size?}
211+
BODY -->|"small (≤ limit)"| PRE[Pre-buffer body async]
212+
BODY -->|"large (> limit)"| BRIDGE[AsyncBridgeUnreader]
213+
PRE --> PARSE[Parse request]
214+
BRIDGE -->|"parse in thread pool"| PARSE
205215
H2 -->|"h2 codec (sans-I/O)"| STREAMS[Multiplexed streams]
206216
STREAMS -->|per stream| TP[Thread pool]
207-
H1 --> TP
217+
PARSE --> TP
208218
TP --> MW[before_request + view + after_response]
219+
MW -->|write response async| EL
209220
```
210221

222+
**Request body handling:** Small request bodies (≤ `DATA_UPLOAD_MAX_MEMORY_SIZE`, default 2.5MB) are pre-buffered on the event loop before parsing. Large bodies use `AsyncBridgeUnreader` which streams data lazily from the socket — the parser runs in the thread pool and bridges back to the event loop for socket reads. This keeps memory bounded while supporting large file uploads through multipart streaming to temp files.
223+
224+
**Async views note:** Async views that read the request body work with pre-buffered (small) requests. For large bodies on the bridge path, body reads must happen in the thread pool (sync views). If you need async views to handle large uploads, increase `DATA_UPLOAD_MAX_MEMORY_SIZE` to cover your expected body sizes.
225+
211226
## Installation
212227

213228
The server module is included with Plain. No additional installation is required.

plain/plain/server/arbiter.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,9 @@ def _start(self) -> None:
106106
plain.runtime.__version__,
107107
)
108108

109-
check_worker_config(self.app.threads, self.log)
109+
from plain.runtime import settings
110+
111+
check_worker_config(self.app.threads, settings.SERVER_CONNECTIONS, self.log)
110112

111113
def _handle_signal(self, sig: int, frame: object) -> None:
112114
self._shutdown_event.set()

plain/plain/server/http/response.py

Lines changed: 76 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
# See the LICENSE for more information.
77
#
88
# Vendored and modified for Plain.
9+
import asyncio
910
import io
1011
import logging
1112
import os
@@ -107,13 +108,10 @@ def create_request(
107108
) -> HttpRequest:
108109
"""Build a plain.http.Request directly from the server's parsed HTTP message."""
109110

110-
# Handle 100-continue before merging headers
111+
# Extract Host header (100-continue is handled during async body reading)
111112
host = None
112113
for hdr_name, hdr_value in req.headers:
113-
if hdr_name == "EXPECT":
114-
if hdr_value.lower() == "100-continue":
115-
sock.send(b"HTTP/1.1 100 Continue\r\n\r\n")
116-
elif hdr_name == "HOST":
114+
if hdr_name == "HOST":
117115
host = hdr_value
118116

119117
headers = _merge_headers(req.headers)
@@ -420,3 +418,76 @@ def close(self) -> None:
420418
self.send_headers()
421419
if self.chunked:
422420
util.write_chunk(self.sock, b"")
421+
422+
# ------------------------------------------------------------------
423+
# Async write methods — use loop.sock_sendall() instead of blocking
424+
# sendall(). The socket must be non-blocking (managed by asyncio).
425+
# ------------------------------------------------------------------
426+
427+
async def async_send_headers(self) -> None:
428+
if self.headers_sent:
429+
return
430+
tosend = self.default_headers()
431+
tosend.extend([f"{k}: {v}\r\n" for k, v in self.headers])
432+
header_str = "{}\r\n".format("".join(tosend))
433+
await util.async_sendall(self.sock, util.to_bytestring(header_str, "latin-1"))
434+
self.headers_sent = True
435+
436+
async def async_write(self, arg: bytes) -> None:
437+
await self.async_send_headers()
438+
if not isinstance(arg, bytes):
439+
raise TypeError(f"{arg!r} is not a byte")
440+
arglen = len(arg)
441+
tosend = arglen
442+
if self.response_length is not None:
443+
if self.sent >= self.response_length:
444+
return
445+
tosend = min(self.response_length - self.sent, tosend)
446+
if tosend < arglen:
447+
arg = arg[:tosend]
448+
449+
if self.chunked and tosend == 0:
450+
return
451+
452+
self.sent += tosend
453+
if self.chunked:
454+
chunk_size = f"{len(arg):X}\r\n"
455+
chunk = b"".join([chunk_size.encode("utf-8"), arg, b"\r\n"])
456+
await util.async_sendall(self.sock, chunk)
457+
else:
458+
await util.async_sendall(self.sock, arg)
459+
460+
async def async_write_response(self, http_response: Any) -> None:
461+
"""Write a plain.http.ResponseBase using async I/O."""
462+
self.prepare_response(http_response)
463+
464+
if (
465+
isinstance(http_response, FileResponse)
466+
and http_response.file_to_stream is not None
467+
):
468+
file_wrapper = FileWrapper(
469+
http_response.file_to_stream, http_response.block_size
470+
)
471+
http_response.file_to_stream.close = http_response.close
472+
# Read file chunks in the default executor (not the app thread pool)
473+
# to avoid blocking the event loop. File reads are fast and shouldn't
474+
# contend with app threads.
475+
loop = asyncio.get_running_loop()
476+
while True:
477+
chunk = await loop.run_in_executor(
478+
None, file_wrapper.filelike.read, file_wrapper.blksize
479+
)
480+
if not chunk:
481+
break
482+
await self.async_write(chunk)
483+
else:
484+
for chunk in http_response:
485+
await self.async_write(chunk)
486+
487+
await self.async_close()
488+
489+
async def async_close(self) -> None:
490+
if not self.headers_sent:
491+
await self.async_send_headers()
492+
if self.chunked:
493+
await util.async_sendall(self.sock, b"0\r\n\r\n")

plain/plain/server/http/unreader.py

Lines changed: 68 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,13 @@
66
# See the LICENSE for more information.
77
#
88
# Vendored and modified for Plain.
9+
import asyncio
910
import io
1011
import os
1112
import socket
1213
from collections.abc import Iterable, Iterator
13-
from typing import TYPE_CHECKING
1414

15-
if TYPE_CHECKING:
16-
pass
15+
from .. import util
1716

1817
# Classes that can undo reading data from
1918
# a given type of data source.
@@ -86,3 +85,69 @@ def chunk(self) -> bytes:
8685
except StopIteration:
8786
self.iter = None
8887
return b""
88+
89+
90+
class BufferUnreader(Unreader):
91+
"""Unreader backed by pre-read bytes with no socket I/O.
92+
93+
Used when headers and body have been read asynchronously on the
94+
event loop and the data is already in memory. The parser reads
95+
headers from the buffer and sets up body readers (ChunkedReader,
96+
LengthReader) that also read from this buffer.
97+
"""
98+
99+
def __init__(self, data: bytes) -> None:
100+
super().__init__()
101+
self.buf.write(data)
102+
103+
def chunk(self) -> bytes:
104+
# All data is pre-buffered; nothing more to read.
105+
return b""
106+
107+
108+
class AsyncBridgeUnreader(Unreader):
109+
"""Unreader that bridges async socket reads to sync parser reads.
110+
111+
Used for large request bodies that shouldn't be fully pre-buffered.
112+
Headers and any initial body bytes are in the buffer. When the buffer
113+
is exhausted, chunk() bridges to the event loop via
114+
run_coroutine_threadsafe for lazy socket reads.
115+
116+
IMPORTANT: chunk() blocks the calling thread, so this unreader must
117+
only be used from a thread pool — never from the event loop thread.
118+
"""
119+
120+
def __init__(
121+
self,
122+
data: bytes,
123+
sock: socket.socket,
124+
loop: asyncio.AbstractEventLoop,
125+
timeout: float = 30,
126+
) -> None:
127+
super().__init__()
128+
self.buf.write(data)
129+
self._sock = sock
130+
self._loop = loop
131+
self._timeout = timeout
132+
self._eof = False
133+
self.socket_bytes_read = 0
134+
135+
def chunk(self) -> bytes:
136+
if self._eof:
137+
return b""
138+
future = asyncio.run_coroutine_threadsafe(
139+
util.async_recv(self._sock, 8192), self._loop
140+
)
141+
try:
142+
# On Python 3.11+, concurrent.futures.TimeoutError is
143+
# builtins.TimeoutError so this except clause catches it.
144+
data = future.result(timeout=self._timeout)
145+
except TimeoutError:
146+
future.cancel()
147+
self._eof = True
148+
raise
149+
if not data:
150+
self._eof = True
151+
else:
152+
self.socket_bytes_read += len(data)
153+
return data

0 commit comments

Comments
 (0)