-
Notifications
You must be signed in to change notification settings - Fork 0
/
zarrs_benchmark_read_async.rs
190 lines (177 loc) · 8 KB
/
zarrs_benchmark_read_async.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
use std::{cell::UnsafeCell, sync::Arc, time::SystemTime};
use async_scoped::spawner::Spawner;
use clap::Parser;
use futures::{FutureExt, StreamExt};
use zarrs::{
array::{
codec::{ArrayCodecTraits, CodecOptionsBuilder},
concurrency::RecommendedConcurrency,
ArrayView,
},
array_subset::ArraySubset,
config::global_config,
storage::{store::AsyncObjectStore, AsyncReadableStorageTraits},
};
#[derive(Parser, Debug)]
#[command(
author,
version,
about,
long_about = "Benchmark zarrs read throughput with the async API."
)]
struct Args {
/// The zarr array directory.
path: String,
/// Number of concurrent chunks.
#[arg(long)]
concurrent_chunks: Option<usize>,
/// Read the entire array in one operation.
///
/// If set, `concurrent_chunks` is ignored.
#[arg(long, default_value_t = false)]
read_all: bool,
/// Ignore checksums.
///
/// If set, checksum validation in codecs (e.g. crc32c) is skipped.
#[arg(long, default_value_t = false)]
ignore_checksums: bool,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
zarrs::config::global_config_mut().set_validate_checksums(!args.ignore_checksums);
// let storage = Arc::new(AsyncOpendalStore::new({
// let mut builder = opendal::services::Fs::default();
// builder.root(&args.path.clone()); // FIXME: Absolute
// Operator::new(builder)?.finish()
// }));
let storage = Arc::new(AsyncObjectStore::new(
object_store::local::LocalFileSystem::new_with_prefix(args.path.clone())?,
));
let array = Arc::new(zarrs::array::Array::async_new(storage.clone(), "/").await?);
// println!("{:#?}", array.metadata());
let chunks = ArraySubset::new_with_shape(array.chunk_grid_shape().unwrap());
let start = SystemTime::now();
let mut bytes_decoded = 0;
let element_size = array.data_type().size();
let chunk_indices = chunks.indices().into_iter().collect::<Vec<_>>();
if args.read_all {
let array_shape = array.shape().to_vec();
let array_subset = ArraySubset::new_with_shape(array_shape.to_vec());
// -------------------------------------- SLOW --------------------------------------
// See https://docs.rs/zarrs/latest/zarrs/array/struct.Array.html#async-api
// let array_data = array.async_retrieve_array_subset(&array_subset).await?;
// ----------------------------------------------------------------------------------
// -------------------------------------- FAST --------------------------------------
// This might get integrated into zarrs itself as Array::async_retrieve_array_subset_tokio in the future
let array_data = {
// Calculate chunk/codec concurrency
let chunk_representation =
array.chunk_array_representation(&vec![0; array.chunk_grid().dimensionality()])?;
let concurrent_target = std::thread::available_parallelism().unwrap().get();
let (chunk_concurrent_limit, codec_concurrent_target) =
zarrs::array::concurrency::calc_concurrency_outer_inner(
concurrent_target,
{
let concurrent_chunks =
std::cmp::min(chunks.num_elements_usize(), concurrent_target);
&RecommendedConcurrency::new(concurrent_chunks..concurrent_chunks)
},
&array
.codecs()
.recommended_concurrency(&chunk_representation)?,
);
let codec_options = CodecOptionsBuilder::new()
.concurrent_target(codec_concurrent_target)
.build();
// Allocate output and decode into it
let array_data =
UnsafeCell::new(vec![0u8; array_subset.num_elements_usize() * element_size]);
{
let decode_chunk_into_array = |chunk_indices: Vec<u64>| {
let chunk_subset = array.chunk_subset(&chunk_indices).unwrap();
let codec_options = codec_options.clone();
let array = array.clone();
let data = unsafe { array_data.get().as_mut() }.unwrap().as_mut_slice();
async move {
let array_shape = array.shape().to_vec();
let array_subset = ArraySubset::new_with_shape(array_shape.clone());
let array_view = ArrayView::new(data, &array_shape, array_subset).unwrap();
array
.async_retrieve_array_subset_into_array_view_opt(
&chunk_subset,
&unsafe { array_view.subset_view(&chunk_subset).unwrap() },
&codec_options,
)
.await
}
};
let spawner = async_scoped::spawner::use_tokio::Tokio;
let futures = chunk_indices.into_iter().map(decode_chunk_into_array);
let mut stream = futures::stream::iter(futures)
.map(|future| spawner.spawn(future))
.buffer_unordered(chunk_concurrent_limit);
while let Some(item) = stream.next().await {
item??;
}
}
array_data.into_inner()
};
// ----------------------------------------------------------------------------------
bytes_decoded += array_data.len();
} else {
// Calculate chunk/codec concurrency
let chunk_representation =
array.chunk_array_representation(&vec![0; array.chunk_grid().dimensionality()])?;
let concurrent_target = std::thread::available_parallelism().unwrap().get();
let (chunk_concurrent_limit, codec_concurrent_target) =
zarrs::array::concurrency::calc_concurrency_outer_inner(
concurrent_target,
&if let Some(concurrent_chunks) = args.concurrent_chunks {
let concurrent_chunks =
std::cmp::min(chunks.num_elements_usize(), concurrent_chunks);
RecommendedConcurrency::new(concurrent_chunks..concurrent_chunks)
} else {
let concurrent_chunks = std::cmp::min(
chunks.num_elements_usize(),
global_config().chunk_concurrent_minimum(),
);
RecommendedConcurrency::new_minimum(concurrent_chunks)
},
&array
.codecs()
.recommended_concurrency(&chunk_representation)?,
);
let codec_options = CodecOptionsBuilder::new()
.concurrent_target(codec_concurrent_target)
.build();
let futures = chunk_indices
.into_iter()
.map(|chunk_indices| {
// println!("Chunk/shard: {:?}", chunk_indices);
let array = array.clone();
let codec_options = codec_options.clone();
async move {
array
.async_retrieve_chunk_opt(&chunk_indices, &codec_options)
.map(|bytes| bytes.map(|bytes| bytes.len()))
.await
}
})
.map(tokio::task::spawn);
let mut stream = futures::stream::iter(futures).buffer_unordered(chunk_concurrent_limit);
while let Some(item) = stream.next().await {
bytes_decoded += item.unwrap()?;
}
}
let duration = SystemTime::now().duration_since(start)?.as_secs_f32();
println!(
"Decoded {} ({:.2}MB) in {:.2}ms ({:.2}MB decoded @ {:.2}GB/s)",
args.path,
storage.size().await? as f32 / 1e6,
duration * 1e3,
bytes_decoded as f32 / 1e6,
(/* GB */bytes_decoded as f32 * 1e-9) / duration,
);
Ok(())
}