Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(cardinality): Use a Lua script and in-memory cache for the cardinality limiter #2849

Merged
merged 12 commits into from
Dec 19, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
- Allow ingestion of metrics summary on spans. ([#2823](https://github.com/getsentry/relay/pull/2823))
- Add metric_bucket data category. ([#2824](https://github.com/getsentry/relay/pull/2824))
- Keep only domain and extension for image resource span grouping. ([#2826](https://github.com/getsentry/relay/pull/2826))
- Use a Lua script for cardinality limiting to reduce load on Redis. ([#2849](https://github.com/getsentry/relay/pull/2849))

## 23.11.2

Expand Down
174 changes: 174 additions & 0 deletions relay-cardinality/src/cardinality.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
-- Check the cardinality of a list of hashes/strings and return wether the item should be dropped.
--
--
-- ``KEYS``: A list of cardinality sets, the first key 'working set' is used to check cardinality,
-- hashes will be updated in all passed keys.
--
-- ``ARGV``:
-- * [number] Max cardinality.
-- * [number] Set expiry.
-- * [string...] List of hashes.
--
-- Returns a table, the first element in the table contains the new set cardinality and for
-- every passed hash (in order) the table contains `0` if the hash was rejected and `1` if it was accepted.
--
--
-- For example to check the cardinality limit of 3 with the hashes `1, 2, 3, 4, 5`
-- in a sliding window of 1 hour with a granularity of 2 (sets: `0` and `1800`)
-- send the following values:
--
-- KEYS: { "prefix-scope-0", "prefix-scope-1800" }
-- ARGV: {
-- 3, -- Limit
-- 3600, -- Window size / Expiry
-- 1, 2, 3, 4, 5 -- Hashes
-- }
--
-- The script returns:
--
-- {
-- 3, -- new cardinality
-- 1, -- accepted: 1
-- 1, -- accepted: 2
-- 1, -- accepted: 3
-- 0, -- rejected: 4
-- 0, -- rejected: 5
-- }
--
-- Redis state after execution:
--
-- prefix-scope-0: {1, 2, 3} | Expires: now + 3600
-- prefix-scope-1800: {1, 2, 3} | Expires: now + 3600
--
--
-- The script applies the following logic for every hash passed as an argument:
-- * if the cardinality has not been reached yet, the hash is added to all sets
-- and the item is marked as accepted
-- * otherwise it only marks the hash as accepted when the hash was already seen before,
-- this is done by checking wether the hash is contained in the 'working set'
--
-- Afterwards if any item was added to the 'working set' the expiry of all sets is bumped
-- with the passed expiry.

local function batches(n, batch_size, offset, max_items)
local i = 0
if max_items then
n = math.min(n, max_items + offset)
end

return function()
local from = i * batch_size + 1 + offset
i = i + 1
if from <= n then
local to = math.min(from + batch_size - 1, n)

return from, to
end
end
end


local ACCEPTED = true
local REJECTED = false
local HASHES_OFFSET = 2 -- arg1: max_cardinality, arg2: expiry

local working_set = KEYS[1]
local max_cardinality = tonumber(ARGV[1])
local expire = tonumber(ARGV[2])
Dav1dde marked this conversation as resolved.
Show resolved Hide resolved
local num_hashes = #ARGV - HASHES_OFFSET

local results = {
0, -- total cardinality
}

-- Adds elements from the table `t`, starting with index `offset` and a maximum of `max` elements
-- to all sets in `KEYS`.
--
-- Returns the total amount of values added to the 'working set'.
local function sadd(t, offset, max)
local added = 0;

for i = 1, #KEYS do
local is_working_set = i == 1

for from, to in batches(#t, 7000, offset, max) do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should batch exclusively on the Relay side. That would simplify this script, and we need some sort of batching there anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like batching in the lua script, makes the relay side easier to read and we have a guarantee that this never fails in lua (one less error to handle and invariant).

The lua implementation would get slightly easier, but we would still need a slicing function like batches (to slice offset and max), which now is just a minimal overhead to the batches function (just a change of the max value and the offset).

So overall imo we don't gain much by moving it to relay.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but we would still need a slicing function like batches

That makes sense!

So overall imo we don't gain much by moving it to relay.

Even if we keep batching on the Lua side, don't we also need batching on the Rust side? Not sure if we would run into size limits or network timeouts otherwise.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure how Redis and the network behaves here, in my local tests it didn't make a difference but there is also no latency and Redis does not do anything else (e.g. rate limiting etc.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we batch and use a pipeline, I feel like that would make no difference then?

local r = redis.call('SADD', KEYS[i], unpack(t, from, to))
if is_working_set then
added = added + r
end
end
end

return added
end

-- Bumps to expiry of all sets by the passed expiry
local function bump_expiry()
for _, key in ipairs(KEYS) do
redis.call('EXPIRE', key, expire)
end
end


local current_cardinality = redis.call('SCARD', working_set) or 0
local budget = math.max(0, max_cardinality - current_cardinality)


-- Fast Path: we have enough budget to fit all elements
if budget >= num_hashes then
local added = sadd(ARGV, HASHES_OFFSET)
-- New current cardinality is current + amount of keys that have been added to the set
current_cardinality = current_cardinality + added

for _ = 1, num_hashes do
table.insert(results, ACCEPTED)
end

if added > 0 then
bump_expiry()
end

results[1] = current_cardinality
return results
end

-- We do have some budget
local offset = HASHES_OFFSET
local needs_expiry_bumped = false
while budget > 0 and offset < #ARGV do
local len = math.min(#ARGV - offset, budget)
local added = sadd(ARGV, offset, len)

current_cardinality = current_cardinality + added
needs_expiry_bumped = needs_expiry_bumped or added > 0

for _ = 1, len do
table.insert(results, ACCEPTED)
end

offset = offset + len
budget = budget - added
end

-- Update cardinality, at this point cardinality cannot change anymore,
-- the maximum amount of keys that can be added, have been added to the set.
results[1] = current_cardinality

-- If we ran out of budget, check the remaining items for membership
if budget <= 0 and offset < #ARGV then
for arg_i = offset + 1, #ARGV do
local value = ARGV[arg_i]

if redis.call('SISMEMBER', working_set, value) == 1 then
Dav1dde marked this conversation as resolved.
Show resolved Hide resolved
table.insert(results, ACCEPTED)
else
table.insert(results, REJECTED)
end
end
end

if needs_expiry_bumped then
bump_expiry()
end

return results
2 changes: 1 addition & 1 deletion relay-cardinality/src/limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub struct Entry<T: CardinalityScope> {
/// Opaque data structure used by [`CardinalityLimiter`] to track
/// which buckets have been accepted and rejected.
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, Hash)]
pub struct EntryId(pub(crate) usize);
pub struct EntryId(pub usize);

impl<T: CardinalityScope> Entry<T> {
/// Creates a new entry.
Expand Down
Loading
Loading