Skip to content

Commit

Permalink
Make sync reassembler more readable.
Browse files Browse the repository at this point in the history
No logic changes.
  • Loading branch information
aaugustin committed Nov 3, 2023
1 parent 2431e09 commit ec3bd2a
Showing 1 changed file with 15 additions and 15 deletions.
30 changes: 15 additions & 15 deletions src/websockets/sync/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ def __init__(self) -> None:
# queue for transferring frames from the writing thread (library code)
# to the reading thread (user code). We're buffering when chunks_queue
# is None and streaming when it's a SimpleQueue. None is a sentinel
# value marking the end of the stream, superseding message_complete.
# value marking the end of the message, superseding message_complete.

# Stream data from frames belonging to the same message.
# Remove quotes around type when dropping Python < 3.9.
self.chunks_queue: Optional["queue.SimpleQueue[Optional[Data]]"] = None

# This flag marks the end of the stream.
# This flag marks the end of the connection.
self.closed = False

def get(self, timeout: Optional[float] = None) -> Data:
Expand Down Expand Up @@ -108,12 +108,12 @@ def get(self, timeout: Optional[float] = None) -> Data:
# mypy cannot figure out that chunks have the proper type.
message: Data = joiner.join(self.chunks) # type: ignore

assert not self.message_fetched.is_set()
self.message_fetched.set()

self.chunks = []
assert self.chunks_queue is None

assert not self.message_fetched.is_set()
self.message_fetched.set()

return message

def get_iter(self) -> Iterator[Data]:
Expand Down Expand Up @@ -169,26 +169,26 @@ def get_iter(self) -> Iterator[Data]:
with self.mutex:
self.get_in_progress = False

assert self.message_complete.is_set()
self.message_complete.clear()

# get_iter() was unblocked by close() rather than put().
if self.closed:
raise EOFError("stream of frames ended")

assert not self.message_fetched.is_set()
self.message_fetched.set()
assert self.message_complete.is_set()
self.message_complete.clear()

assert self.chunks == []
self.chunks_queue = None

assert not self.message_fetched.is_set()
self.message_fetched.set()

def put(self, frame: Frame) -> None:
"""
Add ``frame`` to the next message.
When ``frame`` is the final frame in a message, :meth:`put` waits until
the message is fetched, either by calling :meth:`get` or by fully
consuming the return value of :meth:`get_iter`.
the message is fetched, which can be achieved by calling :meth:`get` or
by fully consuming the return value of :meth:`get_iter`.
:meth:`put` assumes that the stream of frames respects the protocol. If
it doesn't, the behavior is undefined.
Expand Down Expand Up @@ -247,13 +247,13 @@ def put(self, frame: Frame) -> None:
with self.mutex:
self.put_in_progress = False

assert self.message_fetched.is_set()
self.message_fetched.clear()

# put() was unblocked by close() rather than get() or get_iter().
if self.closed:
raise EOFError("stream of frames ended")

assert self.message_fetched.is_set()
self.message_fetched.clear()

self.decoder = None

def close(self) -> None:
Expand Down

0 comments on commit ec3bd2a

Please sign in to comment.