Skip to content

Commit

Permalink
feat: add get-all; load hashes at most 2048 at a time
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisdickinson committed Dec 18, 2019
1 parent 60f9a77 commit 4a4920e
Showing 1 changed file with 36 additions and 9 deletions.
45 changes: 36 additions & 9 deletions src/bin/eos.rs
Expand Up @@ -7,7 +7,7 @@ use entropic_object_store::object::Object;
use entropic_object_store::stores::loose::LooseStore;
use entropic_object_store::stores::packed::PackedStore;
use entropic_object_store::stores::{ReadableStore, WritableStore};
use futures::future::join_all;
use futures::future::{ select_all, join_all };
use sha2::Sha256;
use std::path::PathBuf;
use std::str::FromStr;
Expand Down Expand Up @@ -41,6 +41,11 @@ enum Command {
#[structopt(short, long, default_value = "loose")]
backend: Backends,
},
GetAll {
hashfile: PathBuf,
#[structopt(short, long, default_value = "loose")]
backend: Backends,
},
Pack {},
}

Expand Down Expand Up @@ -103,12 +108,12 @@ async fn cmd_add<D: Digest + Send + Sync, S: WritableStore<D>>(
Ok(())
}

async fn cmd_get<S: ReadableStore>(store: S, hashes: &[String]) -> anyhow::Result<()> {
async fn cmd_get<S: ReadableStore, T: AsRef<str>>(store: S, hashes: &[T]) -> anyhow::Result<()> {
let cksize = Sha256::new().result().len();
let valid_hashes: Vec<_> = hashes
.iter()
.filter_map(|xs| {
let decoded = hex::decode(xs).ok()?;
let decoded = hex::decode(xs.as_ref()).ok()?;
if decoded.len() != cksize {
return None;
}
Expand All @@ -117,12 +122,26 @@ async fn cmd_get<S: ReadableStore>(store: S, hashes: &[String]) -> anyhow::Resul
.collect();
let cleaned_hashes: Vec<_> = valid_hashes.iter().map(hex::encode).collect();

let mut results = Vec::new();
let mut pending = Vec::new();
for hash in valid_hashes {
results.push(store.get(hash));
pending.push(store.get(hash));
}

let results = join_all(results).await;
let mut results = Vec::with_capacity(pending.len());
let mut concurrent = pending.split_off(if pending.len() >= 2048 { pending.len() - 2048 } else { 0 });

while pending.len() > 0 {
let (result, idx, rest) = select_all(concurrent).await;
results.push(result);
concurrent = rest;
if let Some(popped) = pending.pop() {
concurrent.push(popped);
}
}

let last_chunk = join_all(concurrent).await;
results.extend(last_chunk);

for (idx, object) in results.iter().enumerate() {
match object {
Ok(opt) => {
Expand All @@ -142,7 +161,7 @@ async fn cmd_get<S: ReadableStore>(store: S, hashes: &[String]) -> anyhow::Resul
}
Err(_e) => {
dbg!(_e);
println!("{} error reading hash", "ERR:".white().on_red());
println!("{} error reading hash {}", "ERR:".white().on_red(), idx);
}
}
}
Expand All @@ -166,8 +185,16 @@ async fn main() -> anyhow::Result<()> {
match &eos.command {
Command::Add { files } => cmd_add(loose, files).await?,
Command::Get { hashes, backend } => match backend {
Backends::Loose => cmd_get(loose, hashes).await?,
Backends::Packed => cmd_get(packfiles, hashes).await?,
Backends::Loose => cmd_get(loose, &hashes[..]).await?,
Backends::Packed => cmd_get(packfiles, &hashes[..]).await?,
},
Command::GetAll { hashfile, backend } => {
let data = fs::read(&hashfile).await?;
let hashes: Vec<_> = std::str::from_utf8(&data)?.trim().split("\n").collect();
match backend {
Backends::Loose => cmd_get(loose, &hashes[..]).await?,
Backends::Packed => cmd_get(packfiles, &hashes[..]).await?,
}
},
Command::Pack {} => loose.to_packed_store().await?,
};
Expand Down

0 comments on commit 4a4920e

Please sign in to comment.