Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework BatchedSend logic #661

Merged
merged 10 commits into from
Nov 16, 2016
Merged

Rework BatchedSend logic #661

merged 10 commits into from
Nov 16, 2016

Conversation

pitrou
Copy link
Member

@pitrou pitrou commented Nov 14, 2016

Does away with the timeout and looking up a private attribute on IOStream.
Refs PR #653.

Does away with the timeout and looking up a private attribute on IOStream.
Refs PR dask#653.
except Exception:
logger.exception("Error in batched write")
break
self.next_deadline = self.loop.time() + self.interval
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to base the next deadline on when we started the last send rather than when we finsihed it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I don't know. What are the intended semantics?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class generally tries to solve the problem of streams on which we want to send thousands of small messages per second, such as might occur in the following situation:

for x in range(10000):
    future = client.submit(inc, x)
    futures.append(future)

(or on the worker to scheduler side, as the worker reports status updates)

We found that these situations were significantly faster if we never sent two messages within a few milliseconds of each other, preferring instead to batch them. If it has been more than a few milliseconds since the last payload was dispatched and the last payload has finished then I think we should be able to send again.

Copy link
Member Author

@pitrou pitrou Nov 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then the yield write(...) wasn't really useful in the previous version? There's no need to wait on the write future if we want to base the deadline on the start of the write operation.

Related question: what is with gen.with_timeout(timedelta(seconds=0.01),...) in d.core.write?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is related to what was going on here and is also a possible source of error. Some explanation here: #653

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, right. We can remove it.

if self.next_deadline is not None:
delay = self.next_deadline - self.loop.time()
if delay > 0:
yield gen.sleep(delay)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this added delay?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It mirrors the yield self.last_send that was here previously. Perhaps I'm misunderstanding the intent :-)

@@ -37,7 +37,7 @@ def handle_stream(self, stream, address):
self.count += 1
yield write(stream, msg)
except StreamClosedError as e:
pass
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect this to be a syntax error in Python 2

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, only return with an explicit value is forbidden. Bare return allows exiting the coroutine.

@mrocklin
Copy link
Member

This looks pretty nice to me

@pitrou
Copy link
Member Author

pitrou commented Nov 14, 2016

By the way, BatchedStream doesn't seem used anymore, perhaps we should remove it?

@mrocklin
Copy link
Member

Removing it sounds fine to me. Generally I am happy to yield to your judgment on anything related to this issue. I suspect that you have a lot more experience here than I do.

@mrocklin
Copy link
Member

Testing failures here are unrelated. Addressing them in #662 .

break
except gen.TimeoutError:
pass
yield stream.write(frames[-1])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this could fail if we write to the same stream in another coroutine. Ideally we shouldn't do this. Normally the rpc class creates new streams as necessary to handle concurrent communications to the same destination. All cases that I can find when a coroutine writes directly to a stream it creates and owns that stream exclusively.

Still though, we were running into problems in the wild that suggested that this might be an issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how that's different from the old code, though? It would also wait on futures[-1] and only catch timeout errors.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would raise a timeout error if the future didn't complete quickly and then fall back to checking if the stream's write_buffer was empty.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit surprised that this would make a difference. What were the symptoms of the problems?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somewhere some coroutine is stuck waiting on yield write(...). This first occurred on yield self.last_write within BatchedSend, and resulted in messages waiting in the worker's message buffer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. That's because IOStream.write can forget previous futures. So how about we don't wait for the write at all? We could simply yield gen.moment so that write() remains a coroutine...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we would want to yield on the write if we were going to apply backpressure. We're not doing this yet at other stages though so yes, I suspect that that would work fine. We're moving the data pile-up from the BatchedSend buffer to the Tornado write buffer, which is probably appropriate anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out we must wait for the write() to be issued before closing the stream. This is gonna be a bit hairy...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More or less hairy than polling on the _write_buffer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the solution is to flush the stream explicitly before closing. Let me try it out.

This is recommended before closing the stream.
"""
if stream.writing():
yield stream.write(b'')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we know that this will complete?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API expects that write() isn't called before flush() completes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we confident that this expectation is fulfilled?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the docstring to better inform the reader. But perhaps we only want to expose close() so that we don't do any further mistakes. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm mostly concerned about us as users. Only very advanced dask/distributed users should use read/write/close directly.

However, given that we've had problems reported it's possible that we aren't handling everything well internally.

I have no problem with read/write/close as an API generally.

"""
if not stream.closed():
try:
flush(stream)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we yield on this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right, my bad.

@mrocklin
Copy link
Member

It would be good to develop some tests to stress communication in a few ways. However, I'm not entirely sure how this needs to be stressed. One thing that I've found to be useful in the past is to change 300000 to 1000000 in distributed/tests/test_batched.py::test_sending_traffic_jam.

@mrocklin
Copy link
Member

Thoughts on ignoring the RuntimeError around the h5py test?

@mrocklin
Copy link
Member

This all seems fine to me. +1

@pitrou
Copy link
Member Author

pitrou commented Nov 16, 2016

Thoughts on ignoring the RuntimeError around the h5py test?

I would hope h5py merges the pull request that would fix the issue.

@pitrou pitrou merged commit 3f1cc73 into dask:master Nov 16, 2016
@mrocklin
Copy link
Member

I do not expect h5py to merge or release quickly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants