Skip to content

Commit

Permalink
fix(cardinality): Keep sliding window updated with good cardinality i…
Browse files Browse the repository at this point in the history
…tems (#3076)

After checking for membership items were not added to new granules of
the sliding window:
  • Loading branch information
Dav1dde committed Feb 9, 2024
1 parent 71039c8 commit 4e8c00c
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 7 deletions.
20 changes: 16 additions & 4 deletions relay-cardinality/src/redis/cardinality.lua
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,16 @@ local results = {
-- to all sets in `KEYS`.
--
-- Returns the total amount of values added to the 'working set'.
local function sadd(t, offset, max)
local function sadd(t, opts)
local working_set_cardinality = 0
local any_modifications = false

for i = 1, #KEYS do
opts = opts or {}
local key_offset = opts.skip_working_set and 1 or 0
local offset = opts.offset or 0
local max = opts.max

for i = 1 + key_offset, #KEYS do
local is_working_set = i == 1

for from, to in batches(#t, 7000, offset, max) do
Expand Down Expand Up @@ -119,7 +124,7 @@ local budget = math.max(0, max_cardinality - current_cardinality)

-- Fast Path: we have enough budget to fit all elements
if budget >= num_hashes then
local added, any_modifications = sadd(ARGV, HASHES_OFFSET)
local added, any_modifications = sadd(ARGV, { offset = HASHES_OFFSET })
-- New current cardinality is current + amount of keys that have been added to the set
current_cardinality = current_cardinality + added

Expand All @@ -140,7 +145,7 @@ local offset = HASHES_OFFSET
local needs_expiry_bumped = false
while budget > 0 and offset < #ARGV do
local len = math.min(#ARGV - offset, budget)
local added, any_modifications = sadd(ARGV, offset, len)
local added, any_modifications = sadd(ARGV, { offset = offset, max = len })

current_cardinality = current_cardinality + added
needs_expiry_bumped = needs_expiry_bumped or any_modifications
Expand All @@ -159,16 +164,23 @@ results[1] = current_cardinality

-- If we ran out of budget, check the remaining items for membership
if budget <= 0 and offset < #ARGV then
local already_seen = {}

for arg_i = offset + 1, #ARGV do
local value = ARGV[arg_i]

-- Can be optimized with `SMISMEMBER` once we switch to Redis 6.2.
if redis.call('SISMEMBER', working_set, value) == 1 then
table.insert(results, ACCEPTED)
table.insert(already_seen, value)
else
table.insert(results, REJECTED)
end
end

-- Make sure the accepted items are inserted into new granules.
local _, any_modifications = sadd(already_seen, { skip_working_set = true })
needs_expiry_bumped = needs_expiry_bumped or any_modifications
end

if needs_expiry_bumped then
Expand Down
59 changes: 56 additions & 3 deletions relay-cardinality/src/redis/limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ mod tests {
let rejected = limiter.test_limits(scoping, limits, entries1);
assert_eq!(rejected.len(), 0);

for i in 0..window.window_seconds / window.granularity_seconds {
for i in 0..(window.window_seconds / window.granularity_seconds) * 2 {
// Fast forward time.
limiter.time_offset = Duration::from_secs(i * window.granularity_seconds);

Expand All @@ -631,8 +631,8 @@ mod tests {
assert_eq!(rejected.len(), 1);
}

// Fast forward time to where we're in the next window.
limiter.time_offset = Duration::from_secs(window.window_seconds + 1);
// Fast forward time to a fresh window.
limiter.time_offset = Duration::from_secs(1 + window.window_seconds * 3);
// Accept the new element.
let rejected = limiter.test_limits(scoping, limits, entries2);
assert_eq!(rejected.len(), 0);
Expand Down Expand Up @@ -864,4 +864,57 @@ mod tests {
}
}
}

#[test]
fn test_limiter_sliding_window_perfect() {
let mut limiter = build_limiter();
let scoping = new_scoping(&limiter);

let window = SlidingWindow {
window_seconds: 300,
granularity_seconds: 100,
};
let limits = &[CardinalityLimit {
id: "limit".to_owned(),
window,
limit: 100,
scope: CardinalityScope::Organization,
namespace: Some(MetricNamespace::Custom),
}];

macro_rules! test {
($r:expr) => {{
let entries = $r
.map(|i| Entry::new(EntryId(i as usize), MetricNamespace::Custom, i))
.collect::<Vec<_>>();

limiter.test_limits(scoping, limits, entries)
}};
}

// Test Case:
// 1. Fill the window
// 2. Fast forward one granule and fill that with the same values
// 3. Assert that values "confirmed" in the first granule are active for a full window
//
// [ ][ ][ ]{ }{ }{ }[ ][ ][ ]
// [A][A][A]{ }{ }{ }[ ][ ][ ]
// [A][A]{A}{ }{ }[ ][ ][ ]
// | \-- Assert this value.
// \-- Touch this value.

// Fill the first window with values.
assert!(test!(0..100).is_empty());

// Fast forward one granule with the same values.
limiter.time_offset = Duration::from_secs(1 + window.granularity_seconds);
assert!(test!(0..100).is_empty());

// Fast forward to the first granule after a full reset.
limiter.time_offset = Duration::from_secs(1 + window.window_seconds);
// Make sure the window is full and does not accept new values.
assert_eq!(test!(200..300).len(), 100);
// Assert original values.
assert!(test!(0..100).is_empty());
}
}

0 comments on commit 4e8c00c

Please sign in to comment.