Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
Use string instead of Path for Transport keys
Browse files Browse the repository at this point in the history
std::path::{Path, PathBuf} are inappropriate for the keys to Transport
methods because S3 object keys are not paths and should always use '/'
as a separator regardless of what platform the code is run on. This
commits replaces their usage with str or String as appropriate.

Addresses #28
  • Loading branch information
tgeoghegan committed Oct 2, 2020
1 parent cd14e56 commit 11b4a9f
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 78 deletions.
92 changes: 51 additions & 41 deletions facilitator/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@ use ring::{
};
use std::io::{Cursor, Read, Write};
use std::marker::PhantomData;
use std::path::{Path, PathBuf};
use uuid::Uuid;

/// Manages the paths to the different files in a batch
pub struct Batch {
header_path: PathBuf,
signature_path: PathBuf,
packet_file_path: PathBuf,
header_path: String,
signature_path: String,
packet_file_path: String,
}

impl Batch {
Expand Down Expand Up @@ -51,46 +50,52 @@ impl Batch {
aggregation_end: &NaiveDateTime,
is_first: bool,
) -> Batch {
let batch_path = PathBuf::new().join(aggregation_name).join(format!(
"{}-{}",
aggregation_start.format(DATE_FORMAT),
aggregation_end.format(DATE_FORMAT)
));
let batch_path = format!(
"{}/{}",
aggregation_name,
format!(
"{}-{}",
aggregation_start.format(DATE_FORMAT),
aggregation_end.format(DATE_FORMAT)
)
);
let filename = format!("sum_{}", if is_first { 0 } else { 1 });

Batch {
header_path: batch_path.with_extension(&filename),
signature_path: batch_path.with_extension(format!("{}.sig", &filename)),
packet_file_path: batch_path.with_extension(format!(
"invalid_uuid_{}.avro",
header_path: format!("{}.{}", batch_path, filename),
signature_path: format!("{}.{}.sig", batch_path, filename),
packet_file_path: format!(
"{}.invalid_uuid_{}.avro",
batch_path,
if is_first { 0 } else { 1 }
)),
),
}
}

fn new(aggregation_name: &str, batch_id: &Uuid, date: &NaiveDateTime, filename: &str) -> Batch {
let batch_path = PathBuf::new()
.join(aggregation_name)
.join(date.format(DATE_FORMAT).to_string())
.join(batch_id.to_hyphenated().to_string());

let batch_path = format!(
"{}/{}/{}",
aggregation_name,
date.format(DATE_FORMAT),
batch_id.to_hyphenated()
);
Batch {
header_path: batch_path.with_extension(filename),
signature_path: batch_path.with_extension(format!("{}.sig", filename)),
packet_file_path: batch_path.with_extension(format!("{}.avro", filename)),
header_path: format!("{}.{}", batch_path, filename),
signature_path: format!("{}.{}.sig", batch_path, filename),
packet_file_path: format!("{}.{}.avro", batch_path, filename),
}
}

fn header_key(&self) -> &Path {
self.header_path.as_path()
fn header_key(&self) -> &str {
self.header_path.as_ref()
}

fn signature_key(&self) -> &Path {
self.signature_path.as_path()
fn signature_key(&self) -> &str {
self.signature_path.as_ref()
}

fn packet_file_key(&self) -> &Path {
self.packet_file_path.as_path()
fn packet_file_key(&self) -> &str {
self.packet_file_path.as_ref()
}
}

Expand Down Expand Up @@ -313,7 +318,7 @@ mod tests {
fn roundtrip_batch<'a>(
aggregation_name: String,
batch_id: Uuid,
base_path: PathBuf,
base_path: String,
filenames: &[String],
batch_writer: &mut BatchWriter<'a, IngestionHeader, IngestionDataSharePacket>,
batch_reader: &BatchReader<'a, IngestionHeader, IngestionDataSharePacket>,
Expand Down Expand Up @@ -386,7 +391,7 @@ mod tests {
// Verify file layout is as expected
for extension in filenames {
transport
.get(&base_path.with_extension(extension))
.get(format!("{}.{}", base_path, extension).as_ref())
.expect(&format!("could not get batch file {}", extension));
}

Expand Down Expand Up @@ -450,10 +455,12 @@ mod tests {
let batch_reader: BatchReader<'_, IngestionHeader, IngestionDataSharePacket> =
BatchReader::new_ingestion(&aggregation_name, &batch_id, &date, &mut read_transport)
.unwrap();
let base_path = PathBuf::new()
.join(aggregation_name)
.join(date.format(DATE_FORMAT).to_string())
.join(batch_id.to_hyphenated().to_string());
let base_path = format!(
"{}/{}/{}",
aggregation_name,
date.format(DATE_FORMAT),
batch_id.to_hyphenated()
);
let read_key = if keys_match {
default_ingestor_public_key()
} else {
Expand Down Expand Up @@ -525,10 +532,12 @@ mod tests {
&mut read_transport,
)
.unwrap();
let base_path = PathBuf::new()
.join(aggregation_name)
.join(date.format(DATE_FORMAT).to_string())
.join(batch_id.to_hyphenated().to_string());
let base_path = format!(
"{}/{}/{}",
aggregation_name,
date.format(DATE_FORMAT),
batch_id.to_hyphenated()
);
let first_filenames = &[
"validity_0".to_owned(),
"validity_0.avro".to_owned(),
Expand Down Expand Up @@ -611,11 +620,12 @@ mod tests {
&mut read_transport,
)
.unwrap();
let batch_path = PathBuf::new().join(aggregation_name).join(format!(
"{}-{}",
let batch_path = format!(
"{}/{}-{}",
aggregation_name,
start.format(DATE_FORMAT),
end.format(DATE_FORMAT)
));
);
let first_filenames = &[
"sum_0".to_owned(),
"invalid_uuid_0.avro".to_owned(),
Expand Down
5 changes: 1 addition & 4 deletions facilitator/src/sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ mod tests {
},
transport::LocalFileTransport,
};
use std::path::PathBuf;

#[test]
#[ignore]
Expand All @@ -187,12 +186,10 @@ mod tests {
100,
);
assert!(res.is_ok(), "error writing sample data {:?}", res.err());
let mut expected_path =
PathBuf::from("fake-aggregation/fake-date").join(batch_uuid.to_string());
let expected_path = format!("fake-aggregation/fake-date/{}.batch", batch_uuid);

let transports = &[pha_transport, facilitator_transport];
for transport in transports {
expected_path.set_extension("batch");
let reader = transport.get(&expected_path);
assert!(res.is_ok(), "error reading header back {:?}", res.err());

Expand Down
58 changes: 25 additions & 33 deletions facilitator/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::boxed::Box;
use std::fs::{create_dir_all, File};
use std::io::{Read, Write};
use std::mem::{replace, take};
use std::path::{Path, PathBuf};
use std::path::{PathBuf, MAIN_SEPARATOR};
use std::pin::Pin;
use tokio::{
io::{AsyncRead, AsyncReadExt},
Expand All @@ -21,10 +21,10 @@ use tokio::{
pub trait Transport {
/// Returns an std::io::Read instance from which the contents of the value
/// of the provided key may be read.
fn get(&self, key: &Path) -> Result<Box<dyn Read>>;
fn get(&self, key: &str) -> Result<Box<dyn Read>>;
/// Returns an std::io::Write instance into which the contents of the value
/// may be written.
fn put(&mut self, key: &Path) -> Result<Box<dyn Write>>;
fn put(&mut self, key: &str) -> Result<Box<dyn Write>>;
}

/// A transport implementation backed by the local filesystem.
Expand All @@ -38,18 +38,25 @@ impl LocalFileTransport {
pub fn new(directory: PathBuf) -> LocalFileTransport {
LocalFileTransport { directory }
}

/// Callers will construct keys using "/" as a separator. This function
/// attempts to convert the provided key into a relative path valid for the
/// current platform.
fn relative_path(key: &str) -> PathBuf {
PathBuf::from(key.replace("/", &MAIN_SEPARATOR.to_string()))
}
}

impl Transport for LocalFileTransport {
fn get(&self, key: &Path) -> Result<Box<dyn Read>> {
let path = self.directory.join(key);
fn get(&self, key: &str) -> Result<Box<dyn Read>> {
let path = self.directory.join(LocalFileTransport::relative_path(key));
let f =
File::open(path.as_path()).with_context(|| format!("opening {}", path.display()))?;
Ok(Box::new(f))
}

fn put(&mut self, key: &Path) -> Result<Box<dyn Write>> {
let path = self.directory.join(key);
fn put(&mut self, key: &str) -> Result<Box<dyn Write>> {
let path = self.directory.join(LocalFileTransport::relative_path(key));
if let Some(parent) = path.parent() {
create_dir_all(parent)
.with_context(|| format!("creating parent directories {}", parent.display()))?;
Expand Down Expand Up @@ -96,24 +103,16 @@ impl S3Transport {
client_provider,
}
}

/// Convert Path to key suitable for use in S3 API.
fn s3_key(key: &Path) -> Result<String> {
Ok(key
.to_str()
.context("failed to convert path to string")?
.to_string())
}
}

impl Transport for S3Transport {
fn get(&self, key: &Path) -> Result<Box<dyn Read>> {
fn get(&self, key: &str) -> Result<Box<dyn Read>> {
let mut runtime = basic_runtime()?;
let client = (self.client_provider)(&self.region);
let get_output = runtime
.block_on(client.get_object(GetObjectRequest {
bucket: self.bucket.to_owned(),
key: S3Transport::s3_key(key)?,
key: key.to_string(),
..Default::default()
}))
.context("error getting S3 object")?;
Expand All @@ -123,12 +122,11 @@ impl Transport for S3Transport {
Ok(Box::new(StreamingBodyReader::new(body, runtime)))
}

fn put(&mut self, key: &Path) -> Result<Box<dyn Write>> {
println!("doing put for {}", key.display());
fn put(&mut self, key: &str) -> Result<Box<dyn Write>> {
Ok(Box::new(MultipartUploadWriter::new(
self.region.clone(),
self.bucket.to_owned(),
S3Transport::s3_key(key)?,
key.to_string(),
// Set buffer size to 5 MB, which is the minimum required by Amazon
// https://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
5_242_880,
Expand Down Expand Up @@ -375,25 +373,22 @@ mod tests {
let tempdir = tempfile::TempDir::new().unwrap();
let mut file_transport = LocalFileTransport::new(tempdir.path().to_path_buf());
let content = vec![1, 2, 3, 4, 5, 6, 7, 8];
let path = Path::new("path");
let path2 = Path::new("path2");
let complex_path = Path::new("path3/with/separators");

{
let ret = file_transport.get(&path2);
let ret = file_transport.get("path2");
assert!(ret.is_err(), "unexpected return value {:?}", ret.err());
}

for path in &[path, complex_path] {
let writer = file_transport.put(&path);
for path in &["path", "path3/with/separators"] {
let writer = file_transport.put(path);
assert!(writer.is_ok(), "unexpected error {:?}", writer.err());

writer
.unwrap()
.write_all(&content)
.expect("failed to write");

let reader = file_transport.get(&path);
let reader = file_transport.get(path);
assert!(reader.is_ok(), "create reader failed: {:?}", reader.err());

let mut content_again = Vec::new();
Expand Down Expand Up @@ -521,9 +516,6 @@ mod tests {
},
)
.expect_err("expected error");
for cause in err.chain() {
println!("error {:?}", cause);
}
assert!(
err.is::<rusoto_core::RusotoError<CreateMultipartUploadError>>(),
"found unexpected error {:?}",
Expand Down Expand Up @@ -672,7 +664,7 @@ mod tests {
)
});

let ret = transport.get(&Path::new(TEST_KEY));
let ret = transport.get(TEST_KEY);
assert!(ret.is_err(), "unexpected return value {:?}", ret.err());

let transport =
Expand All @@ -688,7 +680,7 @@ mod tests {
});

let mut reader = transport
.get(&Path::new(TEST_KEY))
.get(TEST_KEY)
.expect("unexpected error getting reader");
let mut content = Vec::new();
reader.read_to_end(&mut content).expect("failed to read");
Expand Down Expand Up @@ -733,7 +725,7 @@ mod tests {
});

let mut writer = transport
.put(&Path::new(TEST_KEY))
.put(TEST_KEY)
.expect("unexpected error getting writer");
writer.write_all(b"fake-content").expect("failed to write");
}
Expand Down

0 comments on commit 11b4a9f

Please sign in to comment.