Skip to content

Commit

Permalink
Queue write only after processing all buffers (#445)
Browse files Browse the repository at this point in the history
Allows `writelines` to leverage vectorized IO within uvloop to send
multiple buffers in one `sendmsg` call.

* Update license copyright

Co-authored-by: Fantix King <fantix.king@gmail.com>
  • Loading branch information
jakirkham and fantix committed Sep 9, 2022
1 parent 089f6cb commit 9c6ecb6
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 11 deletions.
2 changes: 1 addition & 1 deletion LICENSE-APACHE
@@ -1,4 +1,4 @@
Copyright (c) 2015-present MagicStack Inc. http://magic.io
Copyright (C) 2016-present the uvloop authors and contributors.

Apache License
Version 2.0, January 2004
Expand Down
2 changes: 1 addition & 1 deletion LICENSE-MIT
@@ -1,6 +1,6 @@
The MIT License

Copyright (c) 2015-present MagicStack Inc. http://magic.io
Copyright (C) 2016-present the uvloop authors and contributors.

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
11 changes: 9 additions & 2 deletions uvloop/handles/stream.pxd
Expand Up @@ -21,7 +21,6 @@ cdef class UVStream(UVBaseTransport):
cdef inline _init(self, Loop loop, object protocol, Server server,
object waiter, object context)

cdef inline _exec_write(self)

cdef inline _shutdown(self)
cdef inline _accept(self, UVStream server)
Expand All @@ -31,7 +30,15 @@ cdef class UVStream(UVBaseTransport):
cdef inline __reading_started(self)
cdef inline __reading_stopped(self)

cdef inline _write(self, object data)
# The user API firstly calls _buffer_write() to buffer up user data chunks,
# potentially multiple times in writelines(), and then call _start_write()
# to start writing either immediately or in the next iteration.
cdef inline _buffer_write(self, object data)
cdef inline _start_write(self)

# _exec_write() is the method that does the actual send, and _try_write()
# is a fast-path used in _exec_write() to send a single chunk.
cdef inline _exec_write(self)
cdef inline _try_write(self, object data)

cdef _close(self)
Expand Down
17 changes: 10 additions & 7 deletions uvloop/handles/stream.pyx
Expand Up @@ -162,7 +162,7 @@ cdef class _StreamWriteContext:
PyObject_GetBuffer(
buf, &p_pybufs[py_bufs_len], PyBUF_SIMPLE)
except Exception:
# This shouldn't ever happen, as `UVStream._write`
# This shouldn't ever happen, as `UVStream._buffer_write`
# casts non-bytes objects to `memoryviews`.
ctx.py_bufs_len = py_bufs_len
ctx.free_bufs()
Expand Down Expand Up @@ -407,7 +407,7 @@ cdef class UVStream(UVBaseTransport):

return written

cdef inline _write(self, object data):
cdef inline _buffer_write(self, object data):
cdef int dlen

if not PyBytes_CheckExact(data):
Expand All @@ -420,6 +420,7 @@ cdef class UVStream(UVBaseTransport):
self._buffer_size += dlen
self._buffer.append(data)

cdef inline _start_write(self):
if (not self._protocol_paused and
(<uv.uv_stream_t*>self._handle).write_queue_size == 0 and
self._buffer_size > self._high_water):
Expand All @@ -443,10 +444,10 @@ cdef class UVStream(UVBaseTransport):
# If not all of the data was sent successfully,
# we might need to pause the protocol.
self._maybe_pause_protocol()
return

self._maybe_pause_protocol()
self._loop._queue_write(self)
elif self._buffer_size > 0:
self._maybe_pause_protocol()
self._loop._queue_write(self)

cdef inline _exec_write(self):
cdef:
Expand Down Expand Up @@ -679,7 +680,8 @@ cdef class UVStream(UVBaseTransport):
if self._conn_lost:
self._conn_lost += 1
return
self._write(buf)
self._buffer_write(buf)
self._start_write()

def writelines(self, bufs):
self._ensure_alive()
Expand All @@ -690,7 +692,8 @@ cdef class UVStream(UVBaseTransport):
self._conn_lost += 1
return
for buf in bufs:
self._write(buf)
self._buffer_write(buf)
self._start_write()

def write_eof(self):
self._ensure_alive()
Expand Down

0 comments on commit 9c6ecb6

Please sign in to comment.