Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Nov 14, 2016
1 parent d8897a2 commit 1f931c0
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 18 deletions.
13 changes: 7 additions & 6 deletions distributed/batched.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ class BatchedSend(object):
more than one message every interval milliseconds. We send lists of
messages.
Batching several messages at once helps performance when sending
a myriad of tiny messages.
Example
-------
>>> stream = yield connect(ip, port)
Expand Down Expand Up @@ -74,12 +77,12 @@ def _background_send(self):
continue
payload, self.buffer = self.buffer, []
self.batch_count += 1
self.next_deadline = self.loop.time() + self.interval
try:
yield write(self.stream, payload)
except Exception:
logger.exception("Error in batched write")
break
self.next_deadline = self.loop.time() + self.interval

self.stopped.set()

Expand All @@ -93,7 +96,9 @@ def send(self, msg):

self.message_count += 1
self.buffer.append(msg)
self.waker.set()
# Avoid spurious wakeups if possible
if self.next_deadline is None:
self.waker.set()

@gen.coroutine
def close(self, ignore_closed=False):
Expand All @@ -105,10 +110,6 @@ def close(self, ignore_closed=False):
yield self.stopped.wait()
try:
if self.buffer:
if self.next_deadline is not None:
delay = self.next_deadline - self.loop.time()
if delay > 0:
yield gen.sleep(delay)
self.buffer, payload = [], self.buffer
yield write(self.stream, payload)
except StreamClosedError:
Expand Down
15 changes: 3 additions & 12 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,23 +231,14 @@ def write(stream, msg):
logger.exception(e)
raise

futures = []

lengths = ([struct.pack('Q', len(frames))] +
[struct.pack('Q', len(frame)) for frame in frames])
futures.append(stream.write(b''.join(lengths)))
stream.write(b''.join(lengths))

for frame in frames[:-1]:
futures.append(stream.write(frame))

futures.append(stream.write(frames[-1]))
stream.write(frame)

while stream._write_buffer:
try:
yield gen.with_timeout(timedelta(seconds=0.01), futures[-1])
break
except gen.TimeoutError:
pass
yield stream.write(frames[-1])


def pingpong(stream):
Expand Down

0 comments on commit 1f931c0

Please sign in to comment.