-
Notifications
You must be signed in to change notification settings - Fork 457
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[skunkworks] persist: Don't copy into a single Vec for S3 Blob #18692
[skunkworks] persist: Don't copy into a single Vec for S3 Blob #18692
Conversation
1d0e563
to
b4bde29
Compare
6a5b737
to
3760ac7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool! persist changes LGTM modulo the following comments. I didn't look closely at the impl of SegmentedBytes yet
src/ore/src/bytes.rs
Outdated
} | ||
} | ||
|
||
impl From<&[u8]> for SegmentedBytes { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dunno how others feel, but I dislike From
impls that are potentially this costly. my preferences in order:
- remove this and make the user call
to_vec
- a non-
From
impl function (which has the opportunity to declare the allocation and copy in the rustdoc)
ditto for From<&str>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally makes sense! I ended up removing these impls as they weren't actually needed, they were an artifact from an attempt to make testing more seamless
src/ore/src/bytes.rs
Outdated
impl io::Read for SegmentedReader { | ||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { | ||
// Ugh. Map from u64 to usize. | ||
let pointer = usize::try_from(self.pointer).map_err(|_| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
materialize declares in one place that it only runs on 64-bit platforms, so we don't have this sort of thing littered everywhere: mz_ore::cast::CastFrom
:). import that and use usize::cast_from(self.pointer)
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sweet! Updated :)
|
||
// We can read it back using persist code v2 and v3. | ||
assert_eq!( | ||
UntypedState::<u64>::decode(&v2, &buf) | ||
UntypedState::<u64>::decode(&v2, bytes.clone()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
persist (and I think all of mz now?) has a lint that forces things like Arc::clone to be called as Arc::clone(&foo)
instead of foo.clone()
, which makes it much easier to see at a glance that certain clones are cheap. I wonder if we can opt Bytes
into that lint?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good thought! I don't think we can opt Bytes
into that, but I did file an issue with Clippy to see if we can extend the current lint, rust-lang/rust-clippy#10657
Cargo.toml
Outdated
@@ -115,3 +115,4 @@ postgres-protocol = { git = "https://github.com/MaterializeInc/rust-postgres" } | |||
tokio-postgres = { git = "https://github.com/MaterializeInc/rust-postgres" } | |||
serde-value = { git = "https://github.com/MaterializeInc/serde-value.git" } | |||
vte = { git = "https://github.com/alacritty/vte" } | |||
aws-smithy-http = { git = "https://github.com/parker-timmerman/smithy-rs.git", branch = "v/0.53.2" } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the story here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I merged an upstream PR, smithy-lang/smithy-rs#2525, into smithy-rs
to add some functionality into their AggregatedBytes
type, but they haven't yet released a new version of aws-smithy-http
, so I needed to make a fork. But before I forked the crate into Materialize, I wanted to get feedback on this PR, so I forked it to my own account
src/persist/src/s3.rs
Outdated
self.min.fetch_min(nanos, atomic::Ordering::SeqCst); | ||
|
||
// Trace if our provided duration was much larger than our minimum. | ||
let new_min = self.min.load(atomic::Ordering::Relaxed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the same as std::cmp::min(fetch_min_result, nanos)
, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ha, it is! I updated this
src/persist/src/s3.rs
Outdated
|
||
// Trace if our provided duration was much larger than our minimum. | ||
let new_min = self.min.load(atomic::Ordering::Relaxed); | ||
let current_duration = Duration::from_nanos(nanos); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may as well use x here? you don't get the u64::max truncation, but seems fine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
.body | ||
.collect() | ||
.map_err(|err| Error::from(format!("s3 get first body err: {}", err))); | ||
let elapsed = body_start.elapsed(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this the same? I think you need to await the collect()
future before calling elapsed()
, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call!
Neat!
Do you think they'd accept a PR that added |
3760ac7
to
c2fc239
Compare
Totally agree, for now I submitted an issue against the repo, vorner/bytes-utils#16, and once I get a free cycle I'll put up a PR, we'll see what they say! |
c2fc239
to
fc09f17
Compare
Perfect, thanks @parker-timmerman! If you could drop a TODO comment at the top of ore/bytes.rs that links to that issue, that'd be ideal. That way the next person who wonders "why did we write all this code?" can go check out that issue. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay looked over the SegementedBytes stuff and that that lgtm, too
once normal CI is passing (do you know what's going on there?), I'll want us to run the nightly feature benchmarks before this goes in, just as a sanity check. lemme know if you don't know how to kick that off
|
||
/// The total number of bytes this [`SegmentedReader`] is mapped over. | ||
pub fn len(&self) -> usize { | ||
self.len |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is self.len redundant with the greatest/last key in segments?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah it is redundant, would you prefer if we got rid of len
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a slight lean toward removing it, but this is well within the bounds of what I'd leave to the discretion of the author :)
self.pointer = n; | ||
return Ok(n); | ||
} | ||
SeekFrom::End(n) => (u64::cast_from(self.len), n), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wow TIL that SeekFrom::End
is kind of confusing. I was completely expecting positive to seek backwards, but I check the docs and you're correct here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I expected the same, that it would be a positive seek backwards, but the existing API also does make some sense
// The contract for io::Seek denotes that seeking beyond the end | ||
// of the stream is allowed. If we're beyond the end of the stream | ||
// then we won't read back any bytes, but it won't be an error. | ||
match base.checked_add_signed(offset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wow checked_add_signed really does all the work here, nice!
fc09f17
to
fb45941
Compare
…ntedBytes' add more tests to bytes remove From impls for SegementedBytes ignore tests in miri because they're slow add TODO comment to bytes mod fix bug
more fixes for non-S3 fixes for examples update clients fmt
…futures immediately after header futures update S3 persist impl S3 fixes update S3 client update git deps s3 fmt
fb45941
to
eb85cbb
Compare
Done :) |
CI is passing (there was a small bug in |
yep! if a nightlies run is triggered manually, the first step is a selection box that makes you select which sub-jobs you want |
Nice! I kicked off the benchmarks and the maelstrom tests, they both seemed applicable here :) |
Motivation
Today the implementation of
Blob
forS3
will copy all of the multi-part responses into a single contiguous Vec. We need to do this because of the return type onBlob::get
, but practically it's unnecessary because no consumers ofBlob::get
need the bytes to be in a single contiguous Vec. Ideally we don't do this copy, and the API specifies some return type that is an iterable, but possibly non-contiguous collection of bytes. There's a TODO that describes this!This PR adds a new type to
ore
calledSegmentedBytes
, which is a collection of individualByte
segments. This new type implementsbytes::Buf
andstd::io::{Read, Seek}
which are the two APIs used by consumers ofBlob::get
. We then update theBlob::get
method to returnSegmentedBytes
instead ofVec<u8>
. Finally we update the S3 Blob impl to no longer copy bytes from the responses into a single Vec, and as part of this refactor, we also request each part's body immediately after receiving its header, instead of waiting for all the headers to come back first, which fixes another TODO.Note: Ideally we don't have to define our own "segmented bytes" type, but I couldn't find anything else that really gave us what we wanted. I tried the following:
Blob::get
generic withBox<dyn bytes::Buf>
, but it appears part of the returned blobs are encoded with arrow2, and the decoder requires the buffer to implementstd::io::Read
andstd::io::Seek
, whichbytes::Buf
does not. I also couldn't find any other impls of arrow2 that usedbytes::Buf
instead of thestd::io
traits.bytes-utils
has aSegmentedBuf
type, but it similarly doesn't implementRead
andSeek
. So we'd also have to wrap this type.aws-smithy-http
has anAggregatedBytes
type, which is what the S3 client returns. But again, it doesn't implementRead
andSeek
, so we'd need to wrap the type ourselves.So because we'd at least need to add a wrapper type to make any existing types work, I implemented our own "segmented bytes" type since it wasn't much more work, and we'd have total control over the impl.
Currently we depend on a personal fork of Amazon's
smithy-rs
just because they have yet to release smithy-lang/smithy-rs#2525. Also I'm more than happy to make a Materialize fork ofsmithy-rs
, I didn't do that yet though because I wanted to get feedback on this change first.Tips for reviewer
There are multiple stacked changes in this PR, it's best if you view each of the commits independently. The changes are:
SegmentedBytes
in theore
cratetrait Blob
inpersist
to useSegmentedBytes
Vec
I see we have an open loop benchmark for our persist-client, @danhhz would you know how to run this?
Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way) and therefore is tagged with aT-proto
label.