Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
Introduce TransportWriter
Browse files Browse the repository at this point in the history
yuriks pointed out that the std::io::Write trait is inadequate for the
contract that users of Transport::put must abide by, so this commit
introduces a new trait, TransportWriter, which extends Write with
explicit complete_upload and cancel_upload methods. This commit also
resolves a bug with idle connection timeouts in Rusoto and Hyper.
  • Loading branch information
tgeoghegan committed Oct 9, 2020
1 parent 5fe21f0 commit 645a6ab
Show file tree
Hide file tree
Showing 6 changed files with 303 additions and 157 deletions.
107 changes: 98 additions & 9 deletions facilitator/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions facilitator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ base64 = "0.12.3"
chrono = "0.4"
clap = "2.33.3"
derivative = "2.1.1"
hyper = "0.13.8"
hyper-rustls = "0.21.0"
prio = "0.2"
rand = "0.7"
ring = { version = "0.16.15", features = ["std"] }
rusoto_core = { version="0.45.0", default_features=false, features=["rustls"] }
rusoto_mock = { version="0.45.0", default_features=false, features=["rustls"] }
rusoto_s3 = { version="0.45.0", default_features=false, features=["rustls"] }
rusoto_core = { version = "0.45.0", default_features = false, features = ["rustls"] }
rusoto_s3 = { version = "0.45.0", default_features = false, features = ["rustls"] }
serde = { version = "1.0", features = ["derive"] }
tempfile = "3.1.0"
thiserror = "1.0"
Expand All @@ -26,3 +27,6 @@ uuid = { version = "0.8", features = ["serde", "v4"] }

[build-dependencies]
vergen = "3"

[dev-dependencies]
rusoto_mock = { version = "0.45.0", default_features = false, features = ["rustls"] }
43 changes: 29 additions & 14 deletions facilitator/src/batch.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
idl::{Header, Packet},
transport::Transport,
transport::{Transport, TransportWriter},
DigestWriter, SidecarWriter, DATE_FORMAT,
};
use anyhow::{anyhow, Context, Result};
Expand Down Expand Up @@ -51,13 +51,10 @@ impl Batch {
is_first: bool,
) -> Batch {
let batch_path = format!(
"{}/{}",
"{}/{}-{}",
aggregation_name,
format!(
"{}-{}",
aggregation_start.format(DATE_FORMAT),
aggregation_end.format(DATE_FORMAT)
)
aggregation_start.format(DATE_FORMAT),
aggregation_end.format(DATE_FORMAT)
);
let filename = format!("sum_{}", if is_first { 0 } else { 1 });

Expand Down Expand Up @@ -258,7 +255,10 @@ impl<'a, H: Header, P: Packet> BatchWriter<'a, H, P> {
let mut sidecar_writer =
SidecarWriter::new(self.transport.put(self.batch.header_key())?, Vec::new());
header.write(&mut sidecar_writer)?;
sidecar_writer.flush().context("failed to flush writer")?;
sidecar_writer
.writer
.complete_upload()
.context("failed to complete batch header upload")?;

let header_signature = key
.sign(&SystemRandom::new(), &sidecar_writer.sidecar)
Expand All @@ -274,7 +274,7 @@ impl<'a, H: Header, P: Packet> BatchWriter<'a, H, P> {
/// content written by the operation.
pub fn packet_file_writer<F>(&mut self, operation: F) -> Result<Digest>
where
F: FnOnce(&mut Writer<SidecarWriter<Box<dyn Write>, DigestWriter>>) -> Result<()>,
F: FnOnce(&mut Writer<SidecarWriter<Box<dyn TransportWriter>, DigestWriter>>) -> Result<()>,
{
let mut writer = Writer::new(
&self.packet_schema,
Expand All @@ -284,10 +284,23 @@ impl<'a, H: Header, P: Packet> BatchWriter<'a, H, P> {
),
);

operation(&mut writer)?;
let result = operation(&mut writer);
let mut sidecar_writer = writer
.into_inner()
.with_context(|| format!("failed to flush Avro writer ({:?})", result))?;

if let Err(e) = result {
sidecar_writer
.writer
.cancel_upload()
.with_context(|| format!("Encountered while handling: {}", e))?;
return Err(e);
}

let mut sidecar_writer = writer.into_inner().context("failed to flush Avro writer")?;
sidecar_writer.flush().context("failed to flush writer")?;
sidecar_writer
.writer
.complete_upload()
.context("failed to complete packet file upload")?;
Ok(sidecar_writer.sidecar.finish())
}

Expand All @@ -298,7 +311,9 @@ impl<'a, H: Header, P: Packet> BatchWriter<'a, H, P> {
writer
.write_all(signature.as_ref())
.context("failed to write signature")?;
writer.flush().context("failed to flush signature")
writer
.complete_upload()
.context("failed to complete signature upload")
}
}

Expand Down Expand Up @@ -391,7 +406,7 @@ mod tests {
// Verify file layout is as expected
for extension in filenames {
transport
.get(format!("{}.{}", base_path, extension).as_ref())
.get(&format!("{}.{}", base_path, extension))
.expect(&format!("could not get batch file {}", extension));
}

Expand Down
Loading

0 comments on commit 645a6ab

Please sign in to comment.