Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/sentry/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ defmodule Sentry.Application do
] ++
maybe_http_client_spec ++
maybe_span_storage ++
[Sentry.Transport.SenderPool]
[Sentry.Transport.RateLimiter, Sentry.Transport.SenderPool]

cache_loaded_applications()

Expand Down
5 changes: 5 additions & 0 deletions lib/sentry/client_error.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ defmodule Sentry.ClientError do
"""
@type reason() ::
:too_many_retries
| :rate_limited
| :server_error
| {:invalid_json, Exception.t()}
| {:request_failure, reason :: :inet.posix() | term()}
Expand Down Expand Up @@ -73,6 +74,10 @@ defmodule Sentry.ClientError do
"Sentry responded with status 429 - Too Many Requests and the SDK exhausted the configured retries"
end

defp format(:rate_limited) do
"the event was dropped because the category is currently rate-limited by Sentry"
end

defp format({:invalid_json, reason}) do
formatted =
if is_exception(reason) do
Expand Down
1 change: 1 addition & 0 deletions lib/sentry/client_report/sender.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ defmodule Sentry.ClientReport.Sender do
| Sentry.CheckIn.t()
| ClientReport.t()
| Sentry.Event.t()
| Sentry.Transaction.t()
def record_discarded_events(reason, event_items, genserver)
when is_list(event_items) do
# We silently ignore events whose reasons aren't valid because we have to add it to the allowlist in Snuba
Expand Down
84 changes: 52 additions & 32 deletions lib/sentry/transport.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule Sentry.Transport do
# This module is exclusively responsible for encoding and POSTing envelopes to Sentry.

alias Sentry.{ClientError, ClientReport, Config, Envelope, LoggerUtils}
alias Sentry.Transport.RateLimiter

@default_retries [1000, 2000, 4000, 8000]
@sentry_version 5
Expand Down Expand Up @@ -47,27 +48,13 @@ defmodule Sentry.Transport do
retries_left,
envelope_items
) do
case request(client, endpoint, headers, payload) do
case request(client, endpoint, headers, payload, envelope_items) do
{:ok, id} ->
{:ok, id}

# If Sentry gives us a Retry-After header, we listen to that instead of our
# own retry.
{:retry_after, delay_ms} when retries_left != [] ->
Process.sleep(delay_ms)

post_envelope_with_retries(
client,
endpoint,
headers,
payload,
tl(retries_left),
envelope_items
)

{:retry_after, _delay_ms} ->
{:error, :rate_limited} ->
ClientReport.Sender.record_discarded_events(:ratelimit_backoff, envelope_items)
{:error, ClientError.new(:too_many_retries)}
{:error, ClientError.new(:rate_limited)}

{:error, _reason} when retries_left != [] ->
[sleep_interval | retries_left] = retries_left
Expand All @@ -92,29 +79,32 @@ defmodule Sentry.Transport do
end
end

defp request(client, endpoint, headers, body) do
with {:ok, 200, _headers, body} <-
defp check_rate_limited(envelope_items) do
rate_limited? =
Enum.any?(envelope_items, fn item ->
category = Envelope.get_data_category(item)
RateLimiter.rate_limited?(category)
end)

if rate_limited?, do: {:error, :rate_limited}, else: :ok
end

defp request(client, endpoint, headers, body, envelope_items) do
with :ok <- check_rate_limited(envelope_items),
{:ok, 200, _headers, body} <-
client_post_and_validate_return_value(client, endpoint, headers, body),
{:ok, json} <- Sentry.JSON.decode(body, Config.json_library()) do
{:ok, Map.get(json, "id")}
else
{:ok, 429, headers, _body} ->
delay_ms =
with timeout when is_binary(timeout) <-
:proplists.get_value("Retry-After", headers, nil),
{delay_s, ""} <- Integer.parse(timeout) do
delay_s * 1000
else
_ ->
# https://develop.sentry.dev/sdk/rate-limiting/#stage-1-parse-response-headers
60_000
end

{:retry_after, delay_ms}
{:ok, 429, _headers, _body} ->
{:error, :rate_limited}

{:ok, status, headers, body} ->
{:error, {:http, {status, headers, body}}}

{:error, :rate_limited} ->
{:error, :rate_limited}

{:error, reason} ->
{:error, {:request_failure, reason}}
end
Expand All @@ -127,6 +117,7 @@ defmodule Sentry.Transport do
{:ok, status, resp_headers, resp_body}
when is_integer(status) and status in 200..599 and is_list(resp_headers) and
is_binary(resp_body) ->
update_rate_limits(resp_headers, status)
{:ok, status, resp_headers, resp_body}

{:ok, status, resp_headers, resp_body} ->
Expand All @@ -137,6 +128,35 @@ defmodule Sentry.Transport do
end
end

defp update_rate_limits(headers, status) do
rate_limits_header = :proplists.get_value("X-Sentry-Rate-Limits", headers, nil)

cond do
is_binary(rate_limits_header) ->
# Use categorized rate limits if present
RateLimiter.update_rate_limits(rate_limits_header)

status == 429 ->
# Use global rate limit from Retry-After if no categorized limits are present
delay_seconds = get_global_delay(headers)
RateLimiter.update_global_rate_limit(delay_seconds)

true ->
:ok
end
end

defp get_global_delay(headers) do
with timeout when is_binary(timeout) <- :proplists.get_value("Retry-After", headers, nil),
{delay, ""} <- Integer.parse(timeout) do
delay
else
# Per the spec, if Retry-After is missing or malformed, default to 60 seconds
# https://develop.sentry.dev/sdk/rate-limiting/#stage-1-parse-response-headers
_ -> 60
end
end

defp get_endpoint_and_headers do
%Sentry.DSN{} = dsn = Config.dsn()

Expand Down
174 changes: 174 additions & 0 deletions lib/sentry/transport/rate_limiter.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
defmodule Sentry.Transport.RateLimiter do
@moduledoc false
# Tracks rate limits per category from Sentry API responses.
# Uses an ETS table to store expiry timestamps for rate-limited categories.
# When Sentry returns a 429 response with rate limit headers, this module
# stores the expiry time per category, allowing other parts of the SDK to
# check if an event should be dropped before sending.
#
# See https://develop.sentry.dev/sdk/expected-features/rate-limiting/

use GenServer

@table __MODULE__
@sweep_interval_ms 60_000

## Public API

@doc """
Starts the RateLimiter GenServer.
"""
@spec start_link(keyword()) :: GenServer.on_start()
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, nil, name: Keyword.get(opts, :name, __MODULE__))
end

## GenServer Callbacks

@impl true
def init(nil) do
_table = :ets.new(@table, [:named_table, :public, :set])
schedule_sweep()
{:ok, :no_state}
end

@impl true
def handle_info(:sweep, state) do
now = System.system_time(:second)

# Match spec: select entries where expiry (position 2) < now
match_spec = [{{:"$1", :"$2"}, [{:<, :"$2", now}], [true]}]

:ets.select_delete(@table, match_spec)

schedule_sweep()
{:noreply, state}
end

## Public Functions

@doc """
Checks if the given category is currently rate-limited.

Returns `true` if the category is rate-limited (either specifically or via
a global rate limit), `false` otherwise.

## Examples

iex> RateLimiter.rate_limited?("error")
false

iex> :ets.insert(RateLimiter, {"error", System.system_time(:second) + 60})
iex> RateLimiter.rate_limited?("error")
true

"""
@spec rate_limited?(String.t()) :: boolean()
def rate_limited?(category) do
now = System.system_time(:second)
check_rate_limited(category, now) or check_rate_limited(:global, now)
end

@doc """
Updates global rate limit from a Retry-After header value.

This is a fallback for when X-Sentry-Rate-Limits is not present.
Stores a global rate limit (:global key) that affects all categories.

## Examples

iex> RateLimiter.update_global_rate_limit(60)
:ok

"""
@spec update_global_rate_limit(pos_integer()) :: :ok
def update_global_rate_limit(retry_after_seconds) when is_integer(retry_after_seconds) do
now = System.system_time(:second)
expiry = now + retry_after_seconds
:ets.insert(@table, {:global, expiry})
:ok
end

@doc """
Updates rate limits from the X-Sentry-Rate-Limits header.

Parses the header value and stores expiry timestamps for each category.
Returns `:ok` regardless of parsing success.

## Examples

iex> RateLimiter.update_rate_limits("60:error;transaction")
:ok

"""
@spec update_rate_limits(String.t() | nil) :: :ok
def update_rate_limits(nil), do: :ok

def update_rate_limits(rate_limits_header) when is_binary(rate_limits_header) do
now = System.system_time(:second)
rate_limits = parse_rate_limits_header(rate_limits_header)

Enum.each(rate_limits, fn {category, retry_after_seconds} ->
expiry = now + retry_after_seconds
:ets.insert(@table, {category, expiry})
end)

:ok
end

## Private Helpers

@spec check_rate_limited(String.t() | :global, integer()) :: boolean()
defp check_rate_limited(category, time) do
case :ets.lookup(@table, category) do
[{^category, expiry}] when expiry > time -> true
_ -> false
end
end

# Parse X-Sentry-Rate-Limits header
# Format: "60:error;transaction:key, 2700:default:organization"
# Returns: [{category, retry_after_seconds}, ...]
@spec parse_rate_limits_header(String.t()) :: [{String.t() | :global, integer()}]
defp parse_rate_limits_header(header_value) do
header_value
|> String.split(",")
|> Enum.map(&String.trim/1)
|> Enum.flat_map(&parse_quota_limit/1)
end

@spec parse_quota_limit(String.t()) :: [{String.t() | :global, integer()}]
defp parse_quota_limit(quota_limit_str) do
{retry_after_str, rest} = quota_limit_str |> String.split(":") |> List.pop_at(0)

case parse_retry_after(retry_after_str) do
{:ok, retry_after} -> parse_categories(rest, retry_after)
:error -> []
end
end

@spec parse_retry_after(String.t() | nil) :: {:ok, integer()} | :error
defp parse_retry_after(nil), do: :error

defp parse_retry_after(retry_after_str) do
case Integer.parse(retry_after_str) do
{retry_after, ""} -> {:ok, retry_after}
_ -> :error
end
end

@spec parse_categories([String.t()], integer()) :: [{String.t() | :global, integer()}]
defp parse_categories([categories_str | _rest], retry_after) do
case String.split(categories_str, ";") do
[""] -> [{:global, retry_after}]
categories -> Enum.map(categories, fn cat -> {cat, retry_after} end)
end
end

defp parse_categories(_, _), do: []

@spec schedule_sweep() :: reference()
defp schedule_sweep do
Process.send_after(self(), :sweep, @sweep_interval_ms)
end
end
Loading
Loading