forked from solana-labs/solana
/
bucket_map_holder.rs
288 lines (253 loc) · 10.2 KB
/
bucket_map_holder.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
use crate::accounts_index::{AccountsIndexConfig, IndexValue};
use crate::bucket_map_holder_stats::BucketMapHolderStats;
use crate::in_mem_accounts_index::{InMemAccountsIndex, SlotT};
use crate::waitable_condvar::WaitableCondvar;
use solana_bucket_map::bucket_map::{BucketMap, BucketMapConfig};
use solana_measure::measure::Measure;
use solana_sdk::clock::SLOT_MS;
use solana_sdk::timing::AtomicInterval;
use std::fmt::Debug;
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
pub type Age = u8;
pub const AGE_MS: u64 = SLOT_MS; // match one age per slot time
pub struct BucketMapHolder<T: IndexValue> {
pub disk: Option<BucketMap<SlotT<T>>>,
pub count_ages_flushed: AtomicUsize,
pub age: AtomicU8,
pub stats: BucketMapHolderStats,
age_timer: AtomicInterval,
// used by bg processing to know when any bucket has become dirty
pub wait_dirty_or_aged: WaitableCondvar,
next_bucket_to_flush: Mutex<usize>,
bins: usize,
_threads: usize,
// how much mb are we allowed to keep in the in-mem index?
// Rest goes to disk.
pub mem_budget_mb: Option<usize>,
ages_to_stay_in_cache: Age,
/// startup is a special time for flush to focus on moving everything to disk as fast and efficiently as possible
/// with less thread count limitations. LRU and access patterns are not important. Freeing memory
/// and writing to disk in parallel are.
/// Note startup is an optimization and is not required for correctness.
startup: AtomicBool,
}
impl<T: IndexValue> Debug for BucketMapHolder<T> {
fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Ok(())
}
}
#[allow(clippy::mutex_atomic)]
impl<T: IndexValue> BucketMapHolder<T> {
pub fn increment_age(&self) {
// since we are about to change age, there are now 0 buckets that have been flushed at this age
// this should happen before the age.fetch_add
let previous = self.count_ages_flushed.swap(0, Ordering::Acquire);
// fetch_add is defined to wrap.
// That's what we want. 0..255, then back to 0.
self.age.fetch_add(1, Ordering::Release);
assert!(previous >= self.bins); // we should not have increased age before previous age was fully flushed
self.wait_dirty_or_aged.notify_all(); // notify all because we can age scan in parallel
}
pub fn future_age_to_flush(&self) -> Age {
self.current_age().wrapping_add(self.ages_to_stay_in_cache)
}
fn has_age_interval_elapsed(&self) -> bool {
// note that when this returns true, state of age_timer is modified
self.age_timer.should_update(AGE_MS)
}
/// used by bg processes to determine # active threads and how aggressively to flush
pub fn get_startup(&self) -> bool {
self.startup.load(Ordering::Relaxed)
}
pub fn set_startup(&self, value: bool) {
if !value {
self.wait_for_idle();
}
self.startup.store(value, Ordering::Relaxed)
}
pub(crate) fn wait_for_idle(&self) {
assert!(self.get_startup());
}
pub fn current_age(&self) -> Age {
self.age.load(Ordering::Acquire)
}
pub fn bucket_flushed_at_current_age(&self) {
self.count_ages_flushed.fetch_add(1, Ordering::Release);
self.maybe_advance_age();
}
// have all buckets been flushed at the current age?
pub fn all_buckets_flushed_at_current_age(&self) -> bool {
self.count_ages_flushed() >= self.bins
}
pub fn count_ages_flushed(&self) -> usize {
self.count_ages_flushed.load(Ordering::Acquire)
}
pub fn maybe_advance_age(&self) -> bool {
// check has_age_interval_elapsed last as calling it modifies state on success
if self.all_buckets_flushed_at_current_age() && self.has_age_interval_elapsed() {
self.increment_age();
true
} else {
false
}
}
pub fn new(bins: usize, config: &Option<AccountsIndexConfig>, threads: usize) -> Self {
const DEFAULT_AGE_TO_STAY_IN_CACHE: Age = 5;
let ages_to_stay_in_cache = config
.as_ref()
.and_then(|config| config.ages_to_stay_in_cache)
.unwrap_or(DEFAULT_AGE_TO_STAY_IN_CACHE);
let mut bucket_config = BucketMapConfig::new(bins);
bucket_config.drives = config.as_ref().and_then(|config| config.drives.clone());
let mem_budget_mb = config.as_ref().and_then(|config| config.index_limit_mb);
// only allocate if mem_budget_mb is Some
let disk = mem_budget_mb.map(|_| BucketMap::new(bucket_config));
Self {
disk,
ages_to_stay_in_cache,
count_ages_flushed: AtomicUsize::default(),
age: AtomicU8::default(),
stats: BucketMapHolderStats::new(bins),
wait_dirty_or_aged: WaitableCondvar::default(),
next_bucket_to_flush: Mutex::new(0),
age_timer: AtomicInterval::default(),
bins,
startup: AtomicBool::default(),
mem_budget_mb,
_threads: threads,
}
}
// get the next bucket to flush, with the idea that the previous bucket
// is perhaps being flushed by another thread already.
pub fn next_bucket_to_flush(&self) -> usize {
// could be lock-free as an optimization
// wrapping is tricky
let mut lock = self.next_bucket_to_flush.lock().unwrap();
let result = *lock;
*lock = (result + 1) % self.bins;
result
}
// intended to execute in a bg thread
pub fn background(&self, exit: Arc<AtomicBool>, in_mem: Vec<Arc<InMemAccountsIndex<T>>>) {
let bins = in_mem.len();
let flush = self.disk.is_some();
loop {
if !flush {
self.wait_dirty_or_aged.wait_timeout(Duration::from_millis(
self.stats.remaining_until_next_interval(),
));
} else if self.all_buckets_flushed_at_current_age() {
let wait = std::cmp::min(
self.age_timer.remaining_until_next_interval(AGE_MS),
self.stats.remaining_until_next_interval(),
);
let mut m = Measure::start("wait");
self.wait_dirty_or_aged
.wait_timeout(Duration::from_millis(wait));
m.stop();
self.stats
.bg_waiting_us
.fetch_add(m.as_us(), Ordering::Relaxed);
// likely some time has elapsed. May have been waiting for age time interval to elapse.
self.maybe_advance_age();
}
if exit.load(Ordering::Relaxed) {
break;
}
self.stats.active_threads.fetch_add(1, Ordering::Relaxed);
for _ in 0..bins {
if flush {
let index = self.next_bucket_to_flush();
in_mem[index].flush();
}
self.stats.report_stats(self);
if self.all_buckets_flushed_at_current_age() {
break;
}
}
self.stats.active_threads.fetch_sub(1, Ordering::Relaxed);
}
}
}
#[cfg(test)]
pub mod tests {
use super::*;
use rayon::prelude::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;
#[test]
fn test_next_bucket_to_flush() {
solana_logger::setup();
let bins = 4;
let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
let visited = (0..bins)
.into_iter()
.map(|_| AtomicUsize::default())
.collect::<Vec<_>>();
let iterations = bins * 30;
let threads = bins * 4;
let expected = threads * iterations / bins;
(0..threads).into_par_iter().for_each(|_| {
(0..iterations).into_iter().for_each(|_| {
let bin = test.next_bucket_to_flush();
visited[bin].fetch_add(1, Ordering::Relaxed);
});
});
visited.iter().enumerate().for_each(|(bin, visited)| {
assert_eq!(visited.load(Ordering::Relaxed), expected, "bin: {}", bin)
});
}
#[test]
fn test_age_increment() {
solana_logger::setup();
let bins = 4;
let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
for age in 0..513 {
assert_eq!(test.current_age(), (age % 256) as Age);
// inc all
for _ in 0..bins {
assert!(!test.all_buckets_flushed_at_current_age());
// cannot call this because based on timing, it may fire: test.bucket_flushed_at_current_age();
}
// this would normally happen once time went off and all buckets had been flushed at the previous age
test.count_ages_flushed.fetch_add(bins, Ordering::Release);
test.increment_age();
}
}
#[test]
fn test_age_time() {
solana_logger::setup();
let bins = 1;
let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
let threads = 2;
let time = AGE_MS * 5 / 2;
let expected = (time / AGE_MS) as Age;
let now = Instant::now();
test.bucket_flushed_at_current_age(); // done with age 0
(0..threads).into_par_iter().for_each(|_| {
while now.elapsed().as_millis() < (time as u128) {
if test.maybe_advance_age() {
test.bucket_flushed_at_current_age();
}
}
});
assert_eq!(test.current_age(), expected);
}
#[test]
fn test_age_broad() {
solana_logger::setup();
let bins = 4;
let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
assert_eq!(test.current_age(), 0);
for _ in 0..bins {
assert!(!test.all_buckets_flushed_at_current_age());
test.bucket_flushed_at_current_age();
}
std::thread::sleep(std::time::Duration::from_millis(AGE_MS * 2));
test.maybe_advance_age();
assert_eq!(test.current_age(), 1);
assert!(!test.all_buckets_flushed_at_current_age());
}
}