-
Notifications
You must be signed in to change notification settings - Fork 248
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[copy] finally eliminate all stalls from the copy tool #11778
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rebase needed as well.
bucket, name = self._get_bucket_name(url) | ||
range_str = f'bytes={start}-' | ||
if length is not None: | ||
range_str += str(start + length) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we double check the definitions of the range are correct for all clouds? I vaguely recall Google might have different semantics and the end point is inclusive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, you're totally right it's inclusive. I'll fix.
@@ -156,7 +157,7 @@ async def read(self, n: int = -1) -> bytes: | |||
|
|||
if n == -1: | |||
try: | |||
downloader = await self._client.download_blob(offset=self._offset) | |||
downloader = await self._client.download_blob(offset=self._offset, length=self._length) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't remember exactly, but I might have run into issues with Azure when the file is empty and you ask for 0 bytes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally right. I can add a helpful error message for azure.
HttpResponseError: The range specified is invalid for the current size of the resource.
RequestId:314c0805-301e-001f-64f6-547b9b000000
Time:2022-04-20T20:38:34.1301964Z
ErrorCode:InvalidRange
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hrm. that's annoying to have one give an error, so I'll just have it return an empty readable.
@@ -38,8 +38,8 @@ class ClientResponse: | |||
def __init__(self, client_response: aiohttp.ClientResponse): | |||
self.client_response = client_response | |||
|
|||
def release(self) -> None: | |||
return self.client_response.release() | |||
async def release(self) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the reason for this change? I looked at aiohttp.ClientResponse
and release
is not async.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
release is async: https://docs.aiohttp.org/en/stable/client_reference.html#aiohttp.ClientResponse.release
I returns "noop" which is a generator which makes the function async.
async with await fs.create(file) as f: | ||
await f.write(b'abcde') | ||
|
||
async with await fs.open_from(file, 2, 2) as f: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a test with an empty file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
great comments, all addressed, I added some relevant tests
Actually, I decided to just properly implement truncated files in local fs too. I can't use |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's also CI errors for check_services etc.
|
||
await fs.write(file, b'') | ||
|
||
assert await fs.read_range(file, 0, 0, end_inclusive=False) == b'' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if end_inclusive=True
? Error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That will try to read the zeroth byte from an empty file and will trigger an error. I've added a new test for that case, test_read_range_end_inclusive_empty_file_should_error
. In Google, you get
hailtop.httpx.ClientResponseError: 416, message='Requested range not satisfiable'
In local, you get:
hailtop.aiotools.fs.exceptions.UnexpectedEOFError
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. I suppose we should catch and convert these all to UnexpectedEOF errors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, all now UnexpectedEOFError and a test for that.
return self.bio.isatty() | ||
|
||
def read(self, n: int = -1): | ||
assert self.n <= self.limit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find self.n
confusing. Can we name it self.offset
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
n = self.limit - self.n | ||
else: | ||
n = min(self.limit - self.n, n) | ||
b = self.bio.read(self.limit - self.n) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this be b = self.bio.read(n)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
Bump. |
Let's try to get this merged today, I don't have meetings so I can respond quickly to changes |
@@ -108,7 +108,8 @@ async def open(self, url: str) -> ReadableStream: | |||
|
|||
async def open_from(self, url: str, start: int, *, length: Optional[int] = None) -> ReadableStream: | |||
if length == 0: | |||
isfile, isdir = await asyncio.gather(self.isfile(url), self.isdir(url)) | |||
assert not url.endswith('/') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this not working correctly before? I'm not sure I like this change. What happens if the file ends in a slash?
Ah I see. We can't distinguish an empty file that's a directory or a file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is because isdir
fails an assert if the url doesn't end with a slash. I kind of hate that we assert that, but it seems to be a pervasive assumption. If that's the assumption, assuming we never do open_from('gs://bucket/foo/')
also seems reasonable.
No changelog because the copy tool is not properly public, though maybe it can be public now. The main problem was that we were saturating our network bandwidth with data we did not need. Why? As far as I can tell, `release` on an `aiohttp.ClientResponse` is insufficient to stop the receipt of more bytes. I realized one proper fix was to simply tell the cloud storage vendors how much data we wanted to receive. That is the change to `open_from` which I have made pervasively to all clouds. I also changed `release` to `close` because that seems more correct. I do not understand why we only `release`d it before.
The assertion causes a bunch of tests to fail which expect to get an |
No changelog because the copy tool is not properly public, though
maybe it can be public now.
The main problem was that we were saturating our network bandwidth
with data we did not need. Why? As far as I can tell,
release
onan
aiohttp.ClientResponse
is insufficient to stop the receipt ofmore bytes.
I realized one proper fix was to simply tell the cloud storage vendors
how much data we wanted to receive. That is the change to
open_from
which I have made pervasively to all clouds.
I also changed
release
toclose
because that seems more correct.I do not understand why we only
release
d it before.