Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

facilitator: Add S3Transport #36

Merged
merged 4 commits into from
Oct 9, 2020
Merged

facilitator: Add S3Transport #36

merged 4 commits into from
Oct 9, 2020

Conversation

tgeoghegan
Copy link
Collaborator

Adds a Transport implementation which reads objects from and writes
objects to S3. The various subcommands on the facilitator tool can now
take arguments like s3://bucket-name as well as local paths for input
and output storage.

Addresses #19

@tgeoghegan tgeoghegan requested a review from yuriks October 1, 2020 19:08
facilitator/src/lib.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@yuriks yuriks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a partial review. I figured I'd submit it so you can work on these things in the meantime since you're going to be rebasing on top of my branch after it's merged anyway.

),
&mut LocalFileTransport::new(
Path::new(sub_matches.value_of("facilitator-output").unwrap()).to_path_buf(),
&mut *transport_for_output_path(sub_matches.value_of("pha-output").unwrap()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This was basically already in the original code but: I think all this stuff would be much easier to read if you first get/unwrap the value of the param to a local variable (maybe even do all of the ones you'll use at once ahead of time at the top of the block), and then used it wherever else. The sub_matches.value_of().unwrap() noise inside param lists and expressions is pretty distracting.

facilitator/src/lib.rs Outdated Show resolved Hide resolved
})
}

fn new_with_client_provider(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: new_with_client just to clutter less (the idea being that crucial difference is that you're providing a client, it doesn't matter as much that it's in the form of a provider)

/// Convert Path to key suitable for use in S3 API.
fn s3_key(key: &Path) -> Result<String, Error> {
Ok(key
.to_str()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will fail on my system, if we use the Path manipulation methods (like push() or join()) at any point because of the different separators. While I know I'm not the deployment target, that would be quite annoying. :D I'd already mentioned we should move away from Path because it's not abstracting over the right thing, but for now you could add something like:

path.to_str().ok_or_else(...)?.map(|s| s.replace(path::is_separator, "/"))

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote #28 to track the inadequacy of Path here. I could incorporate a fix into this change.

client_provider: fn(&Region) -> S3Client,
) -> S3Transport {
S3Transport {
region: region,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use the shorthand initialization syntax (foo: foo, -> foo,)

fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
if buf.is_empty() {
return Ok(0);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, why this check? If you're just trying to avoid an unecessary dispatch+block if someone passes an empty buffer, this seems like a very rare scenario to bother with.


fn transport_for_output_path(path: &str) -> Box<dyn Transport> {
match path.strip_prefix("s3://") {
Some(bucket) => Box::new(S3Transport::new(Region::UsWest2, bucket.to_string())),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, we should probably encode the region in the prefix somehow (like s3://{region}/{bucket...})

buffer_capacity: usize,
client_provider: fn(&Region) -> S3Client,
) -> Result<MultipartUploadWriter, Error> {
let mut runtime = Runtime::new()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is always a blocking operation, we will never need a thread pool and can execute all async tasks on our own thread. Tokio has a threaded and a basic runtime, and Runtime::new() will auto-select between them depending on what crate features are enabled, but in this case we always want the basic runtime since it is lighter weight. You can force it by using runtime::Builder:

use tokio::runtime;
runtime::Builder::new().basic_scheduler().enable_all().build();

serde = { version = "1.0", features = ["derive"] }
tempfile = "3.1.0"
tokio = { version = "0.2", features = ["io-util"] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're directly creating tokio runtimes in our code, you should specify the rt-core feature to include the basic runtime at least. This works now anyway since some dependency already enabled rt-core, but there's no guarantee that'll be the case (although in practice for our dependency set it will probably always be).

Ok(buf.len())
}

fn flush(&mut self) -> std::io::Result<()> {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One consequence of this implementation is that callers must call std::io::Write::flush on the Write they get or the upload will (1) never be completed and (2) we will be billed indefinitely for any upload parts Amazon is holding on to. We control all the callers in this same codebase, so we can make sure we flush() appropriately (hence the changes I made in batch.rs) but I wonder if MultipartUploadWriter could defend itself against this better. For instance, if we added an std::ops::Drop implementation that panics if self.completed_parts is non-empty, to help programmers detect what I think is a programming error rather than something that could reasonably happen at runtime.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust's type system has no way of "preventing a type from being dropped" and thus forcing you to consume it somehow, so that's not really possible. The closest approximation to that is to have a function accepting a closure, which acquires the resource, runs the closure and then immediately cleans up the resource afterwards. This might be something we can do here but it would require a lot of changes and would probably be awkward to use.

Now of course you're free to impl Drop and then make a blocking call to the network to finalize the chunk, but that will not extend to async network calls, so it feels like a bad solution to have that be the main API and not just a fallback.

In addition, using flush on the writer (as you're doing now) is also a bad option, because it means that you're breaking the contract from the trait that flush can be called multiple times.

So in the end I think a reasonable thing to do is have a dedicated method in MultipartUploadWriter to finish the upload, and another to cancel it. As for the cleanup, panicing in Drop if none of the aforementioned methods were called is a possibility, but I don't know if it's actually a good idea in production. I suppose there are no cases in which we'd rather execution finish with a forgotten file which was not written (rather than aborting entirely), so this might be valid for our use-case (but probably not others, which is fine). An alternative is logging an error and then, in the background, calling the cancellation method as a last-ditch effort.

As for encouraging users to not forget to call the functions in the first place, a good way of doing this is to make the finalization call return some value you're going to need later in the program. I don't know if there's anything that would fit the bill here, but it's something to do if you can think of something that'd fit.

Regarding the costs of not finalizing an upload, we should set a bucket lifecycle policy which will expire unfinished uploads after a certain amount of time. We'd need to do this either way because there's always the chance that your machine/process randomly die at any point during the upload.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooh, good point about the flush contract. The only problem with extending the MultipartUploadWriter method surface as you describe is that Transport::put would no longer return an std::io::Write but we control all the callers so that's not so bad.

Copy link
Contributor

@yuriks yuriks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finished looking at the rest of the code I hadn't before, and also left some new things I noticed.

prio = "0.2"
rand = "0.7"
ring = { version = "0.16.15", features = ["std"] }
rusoto_core = { version="0.45.0", default_features=false, features=["rustls"] }
rusoto_mock = { version="0.45.0", default_features=false, features=["rustls"] }
rusoto_s3 = { version="0.45.0", default_features=false, features=["rustls"] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add spaces around all the =

facilitator/src/lib.rs Outdated Show resolved Hide resolved
facilitator/src/bin/facilitator.rs Show resolved Hide resolved
facilitator/src/bin/facilitator.rs Outdated Show resolved Hide resolved
facilitator/src/bin/facilitator.rs Outdated Show resolved Hide resolved
facilitator/src/transport.rs Outdated Show resolved Hide resolved
facilitator/src/transport.rs Outdated Show resolved Hide resolved
facilitator/Cargo.toml Outdated Show resolved Hide resolved
facilitator/src/bin/facilitator.rs Outdated Show resolved Hide resolved
facilitator/src/transport.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@yuriks yuriks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left 2 trivial comments but LGTM otherwise, feel free to merge after fixing them/rebasing!

rusoto_mock = { version="0.45.0", default_features=false, features=["rustls"] }
rusoto_s3 = { version="0.45.0", default_features=false, features=["rustls"] }
rusoto_core = { version = "0.45.0", default_features=false, features=["rustls"] }
rusoto_s3 = { version = "0.45.0", default_features=false, features=["rustls"] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You still missed spaces around the rest of the =s (for default_features and features, sorry)

facilitator/src/transport.rs Show resolved Hide resolved
}

impl MemoryWriter for Vec<u8> {}

/// SidecarWriter wraps an std::io::Write, but also writes any buffers passed to
/// it into a MemoryWriter. The reason sidecar isn't simply some other type that
/// implements std::io::Write is that SidecarWriter::write assumes that short
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is outdated now.

Adds a Transport implementation which reads objects from and writes
objects to S3. The various subcommands on the facilitator tool can now
take arguments like `s3://bucket-name` as well as local paths for input
and output storage. Also removes NullTransport, which was used nowhere.

Addresses #19
s3://-style paths must now be like s3://{region}/{bucket-name}.
facilitator now validates this for various storage arguments.
std::path::{Path, PathBuf} are inappropriate for the keys to Transport
methods because S3 object keys are not paths and should always use '/'
as a separator regardless of what platform the code is run on. This
commits replaces their usage with str or String as appropriate.

Addresses #28
yuriks pointed out that the std::io::Write trait is inadequate for the
contract that users of Transport::put must abide by, so this commit
introduces a new trait, TransportWriter, which extends Write with
explicit complete_upload and cancel_upload methods. This commit also
resolves a bug with idle connection timeouts in Rusoto and Hyper.
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants