Skip to content

Commit

Permalink
fix: Rate limiter TokenBucket::auto_replenish()
Browse files Browse the repository at this point in the history
#### Change

Fixes bug in `TokenBucket::auto_replenish`. This change also yields a
slight performance increase (from ~28ns to ~26ns).

#### Reason

When a TokenBucket gets empty, the implementation sets a timer of 100ms
for replenishing it.

The bucket replenishing function calculates the new tokens to add to
the budget using the following formula:

```
let tokens = (time_delta * self.processed_capacity) / self.processed_refill_time;
```

However, this formula can return 0 depending of the values of
`self.processed_capacity` and `self.processed_refill_time`.

For a TokenBucket of size total capacity and `complete_refill_time_ns`
refill period in nanoseconds, the above values are calculated as follows:

```
// Get the greatest common factor between `size` and `complete_refill_time_ns`.
let common_factor = gcd(size, complete_refill_time_ns);
let processed_capacity: u64 = size / common_factor;
let processed_refill_time: u64 =
    complete_refill_time_ns / common_factor;
```

So, for example if `size == 1` and
`complete_refill_time_ns == 1_000_000_000` (equivalent to 1 token per
second) the replenishing tokens will be:

```
let tokens = (100_000_000 * 1) / 1_000_000_000; // which gives 0 in integer division
```

As a result, the bucket will never get replenished.

Signed-off-by: Jonathan Woollett-Light <jcawl@amazon.co.uk>
  • Loading branch information
JonathanWoollett-Light committed Jan 17, 2023
1 parent 6900349 commit 40bb276
Showing 1 changed file with 9 additions and 66 deletions.
75 changes: 9 additions & 66 deletions src/rate_limiter/src/lib.rs
Expand Up @@ -66,20 +66,6 @@ const REFILL_TIMER_INTERVAL_MS: u64 = 100;
const TIMER_REFILL_STATE: TimerState =
TimerState::Oneshot(Duration::from_millis(REFILL_TIMER_INTERVAL_MS));

const NANOSEC_IN_ONE_MILLISEC: u64 = 1_000_000;

// Euclid's two-thousand-year-old algorithm for finding the greatest common divisor.
fn gcd(x: u64, y: u64) -> u64 {
let mut x = x;
let mut y = y;
while y != 0 {
let t = y;
y = x % y;
x = t;
}
x
}

/// Enum describing the outcomes of a `reduce()` call on a `TokenBucket`.
#[derive(Clone, Debug, PartialEq)]
pub enum BucketReduction {
Expand All @@ -101,6 +87,7 @@ pub struct TokenBucket {
initial_one_time_burst: u64,
// Complete refill time in milliseconds.
refill_time: u64,
refill_one_ns: u64,

// Internal state descriptors.

Expand All @@ -110,10 +97,6 @@ pub struct TokenBucket {
budget: u64,
// Last time this token bucket saw activity.
last_update: Instant,

// Fields used for pre-processing optimizations.
processed_capacity: u64,
processed_refill_time: u64,
}

impl TokenBucket {
Expand All @@ -130,45 +113,30 @@ impl TokenBucket {
if size == 0 || complete_refill_time_ms == 0 {
return None;
}
// Formula for computing current refill amount:
// refill_token_count = (delta_time * size) / (complete_refill_time_ms * 1_000_000)
// In order to avoid overflows, simplify the fractions by computing greatest common divisor.

let complete_refill_time_ns = complete_refill_time_ms * NANOSEC_IN_ONE_MILLISEC;
// Get the greatest common factor between `size` and `complete_refill_time_ns`.
let common_factor = gcd(size, complete_refill_time_ns);
// The division will be exact since `common_factor` is a factor of `size`.
let processed_capacity: u64 = size / common_factor;
// The division will be exact since `common_factor` is a factor of
// `complete_refill_time_ns`.
let processed_refill_time: u64 = complete_refill_time_ns / common_factor;

Some(TokenBucket {
size,
one_time_burst,
initial_one_time_burst: one_time_burst,
refill_time: complete_refill_time_ms,
refill_one_ns: (1_000_000 * complete_refill_time_ms) / size,
// Start off full.
budget: size,
// Last updated is now.
last_update: Instant::now(),
processed_capacity,
processed_refill_time,
})
}

// Replenishes token bucket based on elapsed time. Should only be called internally by `Self`.
fn auto_replenish(&mut self) {
// Compute time passed since last refill/update.
let time_delta = self.last_update.elapsed().as_nanos() as u64;
self.last_update = Instant::now();

// At each 'time_delta' nanoseconds the bucket should refill with:
// refill_amount = (time_delta * size) / (complete_refill_time_ms * 1_000_000)
// `processed_capacity` and `processed_refill_time` are the result of simplifying above
// fraction formula with their greatest-common-factor.
let tokens = (time_delta * self.processed_capacity) / self.processed_refill_time;
self.budget = std::cmp::min(self.budget + tokens, self.size);
let time_delta = std::mem::replace(&mut self.last_update, Instant::now()).elapsed();

// Compute amount to refill.
let refill_amount = (time_delta.as_nanos() as u64) / self.refill_one_ns;

// Refill.
self.budget = std::cmp::min(self.budget.saturating_add(refill_amount), self.size);
}

/// Attempts to consume `tokens` from the bucket and returns whether the action succeeded.
Expand Down Expand Up @@ -534,14 +502,6 @@ pub(crate) mod tests {
&self.last_update
}

fn get_processed_capacity(&self) -> u64 {
self.processed_capacity
}

fn get_processed_refill_time(&self) -> u64 {
self.processed_refill_time
}

// After a restore, we cannot be certain that the last_update field has the same value.
pub fn partial_eq(&self, other: &TokenBucket) -> bool {
(other.capacity() == self.capacity())
Expand Down Expand Up @@ -569,30 +529,13 @@ pub(crate) mod tests {
assert!(*tb.get_last_update() >= before);
let after = Instant::now();
assert!(*tb.get_last_update() <= after);
assert_eq!(tb.get_processed_capacity(), 1);
assert_eq!(tb.get_processed_refill_time(), 1_000_000);

// Verify invalid bucket configurations result in `None`.
assert!(TokenBucket::new(0, 1234, 1000).is_none());
assert!(TokenBucket::new(100, 1234, 0).is_none());
assert!(TokenBucket::new(0, 1234, 0).is_none());
}

#[test]
fn test_token_bucket_preprocess() {
let tb = TokenBucket::new(1000, 0, 1000).unwrap();
assert_eq!(tb.get_processed_capacity(), 1);
assert_eq!(tb.get_processed_refill_time(), NANOSEC_IN_ONE_MILLISEC);

let thousand = 1000;
let tb = TokenBucket::new(3 * 7 * 11 * 19 * thousand, 0, 7 * 11 * 13 * 17).unwrap();
assert_eq!(tb.get_processed_capacity(), 3 * 19);
assert_eq!(
tb.get_processed_refill_time(),
13 * 17 * (NANOSEC_IN_ONE_MILLISEC / thousand)
);
}

#[test]
fn test_token_bucket_reduce() {
// token bucket with capacity 1000 and refill time of 1000 milliseconds
Expand Down

0 comments on commit 40bb276

Please sign in to comment.