Skip to content

Commit

Permalink
use progress in zarrs_reeencode
Browse files Browse the repository at this point in the history
  • Loading branch information
LDeakin committed Mar 11, 2024
1 parent fe042fe commit ab68372
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 57 deletions.
7 changes: 7 additions & 0 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

## TODO
- zarrs_reencode convert data type
- More examples for codecs etc
- zarrs_filter
- GuidedFilter
- DistanceMap
5 changes: 0 additions & 5 deletions docs/zarrs_filter.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,3 @@ zarrs_filter <RUNFILE.json>
[00:00:22/00:00:22] summed area table filter/array_sat.zarr rw:198.70/198.81 p:52.61
```
</details>


## TODO
- Filter: GuidedFilter
- Filter: DistanceMap
63 changes: 59 additions & 4 deletions src/bin/zarrs_reencode.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
use std::sync::Arc;

use clap::Parser;
use indicatif::{ProgressBar, ProgressStyle};
use zarrs::storage::{ReadableStorageTraits, StorePrefix, WritableStorageTraits};
use zarrs_tools::{do_reencode, get_array_builder_reencode, ZarrReencodingArgs};
use zarrs_tools::{
do_reencode, get_array_builder_reencode,
progress::{ProgressCallback, ProgressStats},
ZarrReencodingArgs,
};

/// Reencode a Zarr V3 array.
#[derive(Parser, Debug)]
Expand Down Expand Up @@ -36,7 +41,44 @@ struct Args {
verbose: bool,
}

fn main() {
fn bar_style_run() -> ProgressStyle {
ProgressStyle::with_template(
"[{elapsed_precise}/{duration_precise}] {bar:40.black/bold} {pos}/{len} ({percent}%) {prefix} {msg}",
)
.unwrap_or(ProgressStyle::default_bar())
}

fn bar_style_finish() -> ProgressStyle {
ProgressStyle::with_template("[{elapsed_precise}/{elapsed_precise}] {prefix} {msg}")
.unwrap_or(ProgressStyle::default_bar())
}

fn progress_callback(stats: ProgressStats, bar: &ProgressBar) {
bar.set_length(stats.num_steps as u64);
bar.set_position(stats.step as u64);
if stats.process_steps.is_empty() {
bar.set_message(format!(
"rw:{:.2}/{:.2} p:{:.2}",
stats.read.as_secs_f32(),
stats.write.as_secs_f32(),
stats.process.as_secs_f32(),
));
} else {
bar.set_message(format!(
"rw:{:.2}/{:.2} p:{:.2} {:.2?}",
stats.read.as_secs_f32(),
stats.write.as_secs_f32(),
stats.process.as_secs_f32(),
stats
.process_steps
.iter()
.map(|t| t.as_secs_f32())
.collect::<Vec<_>>(),
));
}
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();

zarrs::config::global_config_mut().set_validate_checksums(!args.ignore_checksums);
Expand All @@ -51,15 +93,27 @@ fn main() {
);
}

let bar = ProgressBar::new(0);
bar.set_style(bar_style_run());
let progress_callback = |stats: ProgressStats| progress_callback(stats, &bar);
let progress_callback = ProgressCallback::new(&progress_callback);

let storage_out =
Arc::new(zarrs::storage::store::FilesystemStore::new(args.path_out.clone()).unwrap());
storage_out.erase_prefix(&StorePrefix::root()).unwrap();
let builder = get_array_builder_reencode(&args.encoding, &array_in, None);
let array_out = builder.build(storage_out.clone(), "/").unwrap();
array_out.store_metadata().unwrap();

let (duration, duration_read, duration_write, bytes_decoded) =
do_reencode(&array_in, &array_out, args.validate, args.concurrent_chunks);
let (duration, duration_read, duration_write, bytes_decoded) = do_reencode(
&array_in,
&array_out,
args.validate,
args.concurrent_chunks,
&progress_callback,
)?;
bar.set_style(bar_style_finish());
bar.finish_and_clear();
let bytes_decoded_gb = /* GB */bytes_decoded as f32 * 1e-9;
println!(
"Reencode {} ({:2}MB) to {} ({:2}MB) in {:.2}ms\n\tread in ~{:.2}ms ({:.2}MB decoded @ {:.2}GB/s)\n\twrite in ~{:.2}ms ({:.2}MB encoded @ {:.2}GB/s)",
Expand All @@ -75,4 +129,5 @@ fn main() {
bytes_decoded as f32 / 1e6,
bytes_decoded_gb / duration_write,
);
Ok(())
}
66 changes: 24 additions & 42 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))]
#![doc(hidden)]

use std::{
sync::Mutex,
time::{Duration, SystemTime},
};
use std::{sync::Mutex, time::SystemTime};

use clap::Parser;
use indicatif::{ProgressBar, ProgressStyle};
use progress::{Progress, ProgressCallback};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use rayon_iter_concurrent_limit::iter_concurrent_limit;
use serde::{Deserialize, Serialize};
Expand All @@ -18,7 +15,8 @@ use zarrs::{
CodecOptionsBuilder, Crc32cCodec, ShardingCodec,
},
concurrency::RecommendedConcurrency,
Array, ArrayBuilder, CodecChain, DataType, DimensionName, FillValue, FillValueMetadata,
Array, ArrayBuilder, ArrayError, CodecChain, DataType, DimensionName, FillValue,
FillValueMetadata,
},
array_subset::ArraySubset,
config::global_config,
Expand Down Expand Up @@ -568,18 +566,10 @@ pub fn do_reencode<TStorageOut: ReadableWritableStorageTraits + 'static>(
array_out: &Array<TStorageOut>,
validate: bool,
concurrent_chunks: Option<usize>,
) -> (f32, f32, f32, usize) {
progress_callback: &ProgressCallback,
) -> Result<(f32, f32, f32, usize), ArrayError> {
let start = SystemTime::now();
let bytes_decoded = Mutex::new(0);
let duration_read = Mutex::new(Duration::from_secs(0));
let duration_write = Mutex::new(Duration::from_secs(0));
let chunks = ArraySubset::new_with_shape(array_out.chunk_grid_shape().unwrap());
let style =
ProgressStyle::with_template("[{elapsed_precise}] [{bar}] ({pos}/{len}, ETA {eta})")
.unwrap();
let pb = ProgressBar::new(chunks.num_elements());
pb.set_style(style);
pb.set_position(0);

let chunk_representation = array_out
.chunk_array_representation(&vec![0; array_out.chunk_grid().dimensionality()])
Expand Down Expand Up @@ -610,64 +600,56 @@ pub fn do_reencode<TStorageOut: ReadableWritableStorageTraits + 'static>(
.concurrent_target(codec_concurrent_target)
.build();

let progress = Progress::new(chunks.num_elements_usize(), progress_callback);
let indices = chunks.indices();
if array_in.data_type() == array_out.data_type() {
iter_concurrent_limit!(
chunks_concurrent_limit,
indices,
for_each,
try_for_each,
|chunk_indices: Vec<u64>| {
let chunk_subset = array_out.chunk_subset(&chunk_indices).unwrap();

let start_read = SystemTime::now();
let bytes = array_in.retrieve_array_subset(&chunk_subset).unwrap(); // NOTE: Max concurrency
*duration_read.lock().unwrap() += start_read.elapsed().unwrap();
let bytes = progress
.read(|| array_in.retrieve_array_subset_opt(&chunk_subset, &codec_options))?;
*bytes_decoded.lock().unwrap() += bytes.len();

if validate {
let bytes_clone = bytes.clone();
let start_write = SystemTime::now();
array_out
.store_chunk_opt(&chunk_indices, bytes_clone, &codec_options)
.unwrap();
*duration_write.lock().unwrap() += start_write.elapsed().unwrap();
progress.write(|| {
array_out.store_chunk_opt(&chunk_indices, bytes_clone, &codec_options)
})?;
let bytes_out = array_out
.retrieve_chunk_opt(&chunk_indices, &codec_options)
.unwrap();
assert!(bytes == bytes_out);
} else {
let start_write = SystemTime::now();
array_out
.store_chunk_opt(&chunk_indices, bytes, &codec_options)
.unwrap();
*duration_write.lock().unwrap() += start_write.elapsed().unwrap();
progress.write(|| {
array_out.store_chunk_opt(&chunk_indices, bytes, &codec_options)
})?;
}
pb.inc(1);
progress.next();
Ok::<_, ArrayError>(())
}
);
)?;
} else {
// FIXME
todo!("zarrs_reencode does not yet support data type conversion!")
}
pb.finish_and_clear();

if validate {
println!("Validation successful");
}

let duration = start.elapsed().unwrap().as_secs_f32();
let duration_read = duration_read.into_inner().unwrap().as_secs_f32();
let duration_write = duration_write.into_inner().unwrap().as_secs_f32();
let stats = progress.stats();
let duration_read = stats.read.as_secs_f32();
let duration_write = stats.write.as_secs_f32();
let duration_read_write = duration_read + duration_write;
let duration_read = duration_read * duration / duration_read_write;
let duration_write = duration_write * duration / duration_read_write;

(
Ok((
duration,
duration_read,
duration_write,
bytes_decoded.into_inner().unwrap(),
)
))
}

/// Convert an arrays fill value to a new data type
Expand Down
17 changes: 11 additions & 6 deletions src/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl<'a> Progress<'a> {
duration_process_steps: Mutex::new(vec![]),
duration_write: Mutex::new(Duration::ZERO),
};
progress.update(0);
progress.update();
progress
}

Expand Down Expand Up @@ -77,25 +77,30 @@ impl<'a> Progress<'a> {
result
}

fn update(&self, step: usize) {
pub fn stats(&self) -> ProgressStats {
let step = self.step.load(std::sync::atomic::Ordering::SeqCst);
let read = *self.duration_read.lock().unwrap();
let process = *self.duration_process.lock().unwrap();
let process_steps = self.duration_process_steps.lock().unwrap().clone();
let write = *self.duration_write.lock().unwrap();
let stats = ProgressStats {
ProgressStats {
step,
num_steps: self.num_steps,
read,
process,
process_steps,
write,
};
}
}

fn update(&self) {
let stats = self.stats();
self.progress_callback.update(stats);
}

pub fn next(&self) {
let step = 1 + self.step.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
self.update(step);
self.step.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
self.update();
}
}

Expand Down

0 comments on commit ab68372

Please sign in to comment.