diff --git a/lib/sentry/application.ex b/lib/sentry/application.ex index 0d8d6058..dbb750f6 100644 --- a/lib/sentry/application.ex +++ b/lib/sentry/application.ex @@ -47,7 +47,7 @@ defmodule Sentry.Application do ] ++ maybe_http_client_spec ++ maybe_span_storage ++ - [Sentry.Transport.SenderPool] + [Sentry.Transport.RateLimiter, Sentry.Transport.SenderPool] cache_loaded_applications() diff --git a/lib/sentry/client_error.ex b/lib/sentry/client_error.ex index 5112bf6d..b3792c37 100644 --- a/lib/sentry/client_error.ex +++ b/lib/sentry/client_error.ex @@ -33,6 +33,7 @@ defmodule Sentry.ClientError do """ @type reason() :: :too_many_retries + | :rate_limited | :server_error | {:invalid_json, Exception.t()} | {:request_failure, reason :: :inet.posix() | term()} @@ -73,6 +74,10 @@ defmodule Sentry.ClientError do "Sentry responded with status 429 - Too Many Requests and the SDK exhausted the configured retries" end + defp format(:rate_limited) do + "the event was dropped because the category is currently rate-limited by Sentry" + end + defp format({:invalid_json, reason}) do formatted = if is_exception(reason) do diff --git a/lib/sentry/client_report/sender.ex b/lib/sentry/client_report/sender.ex index d1c91d64..aa80a943 100644 --- a/lib/sentry/client_report/sender.ex +++ b/lib/sentry/client_report/sender.ex @@ -31,6 +31,7 @@ defmodule Sentry.ClientReport.Sender do | Sentry.CheckIn.t() | ClientReport.t() | Sentry.Event.t() + | Sentry.Transaction.t() def record_discarded_events(reason, event_items, genserver) when is_list(event_items) do # We silently ignore events whose reasons aren't valid because we have to add it to the allowlist in Snuba diff --git a/lib/sentry/transport.ex b/lib/sentry/transport.ex index e6899ff9..6cb967c4 100644 --- a/lib/sentry/transport.ex +++ b/lib/sentry/transport.ex @@ -4,6 +4,7 @@ defmodule Sentry.Transport do # This module is exclusively responsible for encoding and POSTing envelopes to Sentry. alias Sentry.{ClientError, ClientReport, Config, Envelope, LoggerUtils} + alias Sentry.Transport.RateLimiter @default_retries [1000, 2000, 4000, 8000] @sentry_version 5 @@ -47,27 +48,13 @@ defmodule Sentry.Transport do retries_left, envelope_items ) do - case request(client, endpoint, headers, payload) do + case request(client, endpoint, headers, payload, envelope_items) do {:ok, id} -> {:ok, id} - # If Sentry gives us a Retry-After header, we listen to that instead of our - # own retry. - {:retry_after, delay_ms} when retries_left != [] -> - Process.sleep(delay_ms) - - post_envelope_with_retries( - client, - endpoint, - headers, - payload, - tl(retries_left), - envelope_items - ) - - {:retry_after, _delay_ms} -> + {:error, :rate_limited} -> ClientReport.Sender.record_discarded_events(:ratelimit_backoff, envelope_items) - {:error, ClientError.new(:too_many_retries)} + {:error, ClientError.new(:rate_limited)} {:error, _reason} when retries_left != [] -> [sleep_interval | retries_left] = retries_left @@ -92,29 +79,32 @@ defmodule Sentry.Transport do end end - defp request(client, endpoint, headers, body) do - with {:ok, 200, _headers, body} <- + defp check_rate_limited(envelope_items) do + rate_limited? = + Enum.any?(envelope_items, fn item -> + category = Envelope.get_data_category(item) + RateLimiter.rate_limited?(category) + end) + + if rate_limited?, do: {:error, :rate_limited}, else: :ok + end + + defp request(client, endpoint, headers, body, envelope_items) do + with :ok <- check_rate_limited(envelope_items), + {:ok, 200, _headers, body} <- client_post_and_validate_return_value(client, endpoint, headers, body), {:ok, json} <- Sentry.JSON.decode(body, Config.json_library()) do {:ok, Map.get(json, "id")} else - {:ok, 429, headers, _body} -> - delay_ms = - with timeout when is_binary(timeout) <- - :proplists.get_value("Retry-After", headers, nil), - {delay_s, ""} <- Integer.parse(timeout) do - delay_s * 1000 - else - _ -> - # https://develop.sentry.dev/sdk/rate-limiting/#stage-1-parse-response-headers - 60_000 - end - - {:retry_after, delay_ms} + {:ok, 429, _headers, _body} -> + {:error, :rate_limited} {:ok, status, headers, body} -> {:error, {:http, {status, headers, body}}} + {:error, :rate_limited} -> + {:error, :rate_limited} + {:error, reason} -> {:error, {:request_failure, reason}} end @@ -127,6 +117,7 @@ defmodule Sentry.Transport do {:ok, status, resp_headers, resp_body} when is_integer(status) and status in 200..599 and is_list(resp_headers) and is_binary(resp_body) -> + update_rate_limits(resp_headers, status) {:ok, status, resp_headers, resp_body} {:ok, status, resp_headers, resp_body} -> @@ -137,6 +128,35 @@ defmodule Sentry.Transport do end end + defp update_rate_limits(headers, status) do + rate_limits_header = :proplists.get_value("X-Sentry-Rate-Limits", headers, nil) + + cond do + is_binary(rate_limits_header) -> + # Use categorized rate limits if present + RateLimiter.update_rate_limits(rate_limits_header) + + status == 429 -> + # Use global rate limit from Retry-After if no categorized limits are present + delay_seconds = get_global_delay(headers) + RateLimiter.update_global_rate_limit(delay_seconds) + + true -> + :ok + end + end + + defp get_global_delay(headers) do + with timeout when is_binary(timeout) <- :proplists.get_value("Retry-After", headers, nil), + {delay, ""} <- Integer.parse(timeout) do + delay + else + # Per the spec, if Retry-After is missing or malformed, default to 60 seconds + # https://develop.sentry.dev/sdk/rate-limiting/#stage-1-parse-response-headers + _ -> 60 + end + end + defp get_endpoint_and_headers do %Sentry.DSN{} = dsn = Config.dsn() diff --git a/lib/sentry/transport/rate_limiter.ex b/lib/sentry/transport/rate_limiter.ex new file mode 100644 index 00000000..1b0bc03b --- /dev/null +++ b/lib/sentry/transport/rate_limiter.ex @@ -0,0 +1,174 @@ +defmodule Sentry.Transport.RateLimiter do + @moduledoc false + # Tracks rate limits per category from Sentry API responses. + # Uses an ETS table to store expiry timestamps for rate-limited categories. + # When Sentry returns a 429 response with rate limit headers, this module + # stores the expiry time per category, allowing other parts of the SDK to + # check if an event should be dropped before sending. + # + # See https://develop.sentry.dev/sdk/expected-features/rate-limiting/ + + use GenServer + + @table __MODULE__ + @sweep_interval_ms 60_000 + + ## Public API + + @doc """ + Starts the RateLimiter GenServer. + """ + @spec start_link(keyword()) :: GenServer.on_start() + def start_link(opts \\ []) do + GenServer.start_link(__MODULE__, nil, name: Keyword.get(opts, :name, __MODULE__)) + end + + ## GenServer Callbacks + + @impl true + def init(nil) do + _table = :ets.new(@table, [:named_table, :public, :set]) + schedule_sweep() + {:ok, :no_state} + end + + @impl true + def handle_info(:sweep, state) do + now = System.system_time(:second) + + # Match spec: select entries where expiry (position 2) < now + match_spec = [{{:"$1", :"$2"}, [{:<, :"$2", now}], [true]}] + + :ets.select_delete(@table, match_spec) + + schedule_sweep() + {:noreply, state} + end + + ## Public Functions + + @doc """ + Checks if the given category is currently rate-limited. + + Returns `true` if the category is rate-limited (either specifically or via + a global rate limit), `false` otherwise. + + ## Examples + + iex> RateLimiter.rate_limited?("error") + false + + iex> :ets.insert(RateLimiter, {"error", System.system_time(:second) + 60}) + iex> RateLimiter.rate_limited?("error") + true + + """ + @spec rate_limited?(String.t()) :: boolean() + def rate_limited?(category) do + now = System.system_time(:second) + check_rate_limited(category, now) or check_rate_limited(:global, now) + end + + @doc """ + Updates global rate limit from a Retry-After header value. + + This is a fallback for when X-Sentry-Rate-Limits is not present. + Stores a global rate limit (:global key) that affects all categories. + + ## Examples + + iex> RateLimiter.update_global_rate_limit(60) + :ok + + """ + @spec update_global_rate_limit(pos_integer()) :: :ok + def update_global_rate_limit(retry_after_seconds) when is_integer(retry_after_seconds) do + now = System.system_time(:second) + expiry = now + retry_after_seconds + :ets.insert(@table, {:global, expiry}) + :ok + end + + @doc """ + Updates rate limits from the X-Sentry-Rate-Limits header. + + Parses the header value and stores expiry timestamps for each category. + Returns `:ok` regardless of parsing success. + + ## Examples + + iex> RateLimiter.update_rate_limits("60:error;transaction") + :ok + + """ + @spec update_rate_limits(String.t() | nil) :: :ok + def update_rate_limits(nil), do: :ok + + def update_rate_limits(rate_limits_header) when is_binary(rate_limits_header) do + now = System.system_time(:second) + rate_limits = parse_rate_limits_header(rate_limits_header) + + Enum.each(rate_limits, fn {category, retry_after_seconds} -> + expiry = now + retry_after_seconds + :ets.insert(@table, {category, expiry}) + end) + + :ok + end + + ## Private Helpers + + @spec check_rate_limited(String.t() | :global, integer()) :: boolean() + defp check_rate_limited(category, time) do + case :ets.lookup(@table, category) do + [{^category, expiry}] when expiry > time -> true + _ -> false + end + end + + # Parse X-Sentry-Rate-Limits header + # Format: "60:error;transaction:key, 2700:default:organization" + # Returns: [{category, retry_after_seconds}, ...] + @spec parse_rate_limits_header(String.t()) :: [{String.t() | :global, integer()}] + defp parse_rate_limits_header(header_value) do + header_value + |> String.split(",") + |> Enum.map(&String.trim/1) + |> Enum.flat_map(&parse_quota_limit/1) + end + + @spec parse_quota_limit(String.t()) :: [{String.t() | :global, integer()}] + defp parse_quota_limit(quota_limit_str) do + {retry_after_str, rest} = quota_limit_str |> String.split(":") |> List.pop_at(0) + + case parse_retry_after(retry_after_str) do + {:ok, retry_after} -> parse_categories(rest, retry_after) + :error -> [] + end + end + + @spec parse_retry_after(String.t() | nil) :: {:ok, integer()} | :error + defp parse_retry_after(nil), do: :error + + defp parse_retry_after(retry_after_str) do + case Integer.parse(retry_after_str) do + {retry_after, ""} -> {:ok, retry_after} + _ -> :error + end + end + + @spec parse_categories([String.t()], integer()) :: [{String.t() | :global, integer()}] + defp parse_categories([categories_str | _rest], retry_after) do + case String.split(categories_str, ";") do + [""] -> [{:global, retry_after}] + categories -> Enum.map(categories, fn cat -> {cat, retry_after} end) + end + end + + defp parse_categories(_, _), do: [] + + @spec schedule_sweep() :: reference() + defp schedule_sweep do + Process.send_after(self(), :sweep, @sweep_interval_ms) + end +end diff --git a/test/sentry/transport/rate_limiter_test.exs b/test/sentry/transport/rate_limiter_test.exs new file mode 100644 index 00000000..71648835 --- /dev/null +++ b/test/sentry/transport/rate_limiter_test.exs @@ -0,0 +1,143 @@ +defmodule Sentry.Transport.RateLimiterTest do + use ExUnit.Case, async: false + + alias Sentry.Transport.RateLimiter + + setup do + # The RateLimiter is started by the application + # Just clear the ETS table before each test + :ets.delete_all_objects(RateLimiter) + :ok + end + + describe "parse_rate_limits_header/1" do + test "parses single category limit" do + # X-Sentry-Rate-Limits: 60:error + RateLimiter.update_rate_limits("60:error") + + assert RateLimiter.rate_limited?("error") == true + assert RateLimiter.rate_limited?("transaction") == false + end + + test "parses multiple categories with same limit" do + # X-Sentry-Rate-Limits: 60:error;transaction + RateLimiter.update_rate_limits("60:error;transaction") + + assert RateLimiter.rate_limited?("error") == true + assert RateLimiter.rate_limited?("transaction") == true + end + + test "parses multiple limits separated by comma" do + # X-Sentry-Rate-Limits: 60:transaction, 2700:default;error + RateLimiter.update_rate_limits("60:transaction, 2700:default;error") + + assert RateLimiter.rate_limited?("transaction") == true + assert RateLimiter.rate_limited?("default") == true + assert RateLimiter.rate_limited?("error") == true + end + + test "parses empty categories as global limit" do + # X-Sentry-Rate-Limits: 60::organization + RateLimiter.update_rate_limits("60::organization") + + # Global limit affects all categories + assert RateLimiter.rate_limited?("error") == true + assert RateLimiter.rate_limited?("transaction") == true + end + + test "ignores unknown dimensions" do + # X-Sentry-Rate-Limits: 60:error:organization:quota_exceeded + RateLimiter.update_rate_limits("60:error:organization:quota_exceeded") + + assert RateLimiter.rate_limited?("error") == true + end + + test "handles malformed entries gracefully" do + # X-Sentry-Rate-Limits: invalid, 60:error, bad_format + RateLimiter.update_rate_limits("invalid, 60:error, bad_format") + + # Only the valid entry should be parsed + assert RateLimiter.rate_limited?("error") == true + end + + test "handles spaces after commas" do + # X-Sentry-Rate-Limits: 60:error, 120:transaction + RateLimiter.update_rate_limits("60:error, 120:transaction") + + assert RateLimiter.rate_limited?("error") == true + assert RateLimiter.rate_limited?("transaction") == true + end + end + + describe "update_rate_limits/1" do + test "stores category-specific rate limits in ETS" do + RateLimiter.update_rate_limits("60:error") + + assert [{_, expiry}] = :ets.lookup(RateLimiter, "error") + assert expiry > System.system_time(:second) + end + + test "stores global rate limit with :global key" do + RateLimiter.update_rate_limits("60::") + + assert [{:global, expiry}] = :ets.lookup(RateLimiter, :global) + assert expiry > System.system_time(:second) + end + + test "overwrites existing rate limits" do + RateLimiter.update_rate_limits("1:error") + first_expiry = :ets.lookup(RateLimiter, "error") |> hd() |> elem(1) + + RateLimiter.update_rate_limits("15:error") + second_expiry = :ets.lookup(RateLimiter, "error") |> hd() |> elem(1) + + assert second_expiry > first_expiry + end + end + + describe "update_global_rate_limit/1" do + test "stores global rate limit from Retry-After" do + RateLimiter.update_global_rate_limit(60) + + assert [{:global, expiry}] = :ets.lookup(RateLimiter, :global) + assert_in_delta expiry, System.system_time(:second) + 60, 1 + end + end + + describe "rate_limited?/1" do + test "returns true for rate-limited category" do + now = System.system_time(:second) + :ets.insert(RateLimiter, {"error", now + 60}) + + assert RateLimiter.rate_limited?("error") == true + end + + test "returns false for non-rate-limited category" do + assert RateLimiter.rate_limited?("error") == false + end + + test "returns false for expired rate limit" do + now = System.system_time(:second) + :ets.insert(RateLimiter, {"error", now - 10}) + + assert RateLimiter.rate_limited?("error") == false + end + + test "returns true when global limit is active" do + now = System.system_time(:second) + :ets.insert(RateLimiter, {:global, now + 60}) + + # Any category should be limited + assert RateLimiter.rate_limited?("error") == true + assert RateLimiter.rate_limited?("transaction") == true + end + + test "returns true if either category or global limit is active" do + now = System.system_time(:second) + :ets.insert(RateLimiter, {"error", now + 30}) + :ets.insert(RateLimiter, {:global, now + 60}) + + assert RateLimiter.rate_limited?("error") == true + end + end +end diff --git a/test/sentry/transport_test.exs b/test/sentry/transport_test.exs index 3e9f12cc..6d97cc24 100644 --- a/test/sentry/transport_test.exs +++ b/test/sentry/transport_test.exs @@ -234,7 +234,7 @@ defmodule Sentry.TransportTest do assert_received {:request, ^ref} end - test "fails when it exhausts retries and Sentry replies with 429", %{bypass: bypass} do + test "fails immediately when Sentry replies with 429 (rate limited)", %{bypass: bypass} do envelope = Envelope.from_event(Event.create_event(message: "Hello")) test_pid = self() ref = make_ref() @@ -247,7 +247,7 @@ defmodule Sentry.TransportTest do |> Plug.Conn.resp(429, ~s<{}>) end) - assert :too_many_retries = + assert :rate_limited = error(fn -> Transport.encode_and_post_envelope(envelope, HackneyClient, _retries = []) end) @@ -260,6 +260,98 @@ defmodule Sentry.TransportTest do assert log =~ "[warning]" assert_received {:request, ^ref} end + + test "updates rate limits from X-Sentry-Rate-Limits header in 200 OK response", %{ + bypass: bypass + } do + envelope = Envelope.from_event(Event.create_event(message: "Hello")) + + # Simulate Sentry sending rate limit in successful response + Bypass.expect(bypass, "POST", "/api/1/envelope/", fn conn -> + conn + |> Plug.Conn.put_resp_header("X-Sentry-Rate-Limits", "60:error:key") + |> Plug.Conn.resp(200, ~s<{"id":"abc123"}>) + end) + + # Request should succeed + assert {:ok, "abc123"} = Transport.encode_and_post_envelope(envelope, HackneyClient) + + # But rate limit should be stored + assert Transport.RateLimiter.rate_limited?("error") + refute Transport.RateLimiter.rate_limited?("transaction") + end + + test "updates rate limits from X-Sentry-Rate-Limits header in error responses", %{ + bypass: bypass + } do + envelope = Envelope.from_event(Event.create_event(message: "Hello")) + + # Simulate Sentry sending rate limit in error response + Bypass.expect(bypass, "POST", "/api/1/envelope/", fn conn -> + conn + |> Plug.Conn.put_resp_header("X-Sentry-Rate-Limits", "120:transaction:organization") + |> Plug.Conn.resp(500, ~s<{"error":"Internal Server Error"}>) + end) + + # Request should fail + assert {:error, %ClientError{reason: :server_error}} = + Transport.encode_and_post_envelope(envelope, HackneyClient, _retries = []) + + # But rate limit should still be stored + assert Transport.RateLimiter.rate_limited?("transaction") + refute Transport.RateLimiter.rate_limited?("error") + end + + test "proactively enforces rate limits from 200 OK before subsequent requests", %{ + bypass: bypass + } do + # First request returns 200 with rate limit header + Bypass.expect(bypass, "POST", "/api/1/envelope/", fn conn -> + conn + |> Plug.Conn.put_resp_header("X-Sentry-Rate-Limits", "60:error:key") + |> Plug.Conn.resp(200, ~s<{"id":"first-event"}>) + end) + + envelope1 = Envelope.from_event(Event.create_event(message: "First error")) + assert {:ok, "first-event"} = Transport.encode_and_post_envelope(envelope1, HackneyClient) + + # Verify rate limit was stored + assert Transport.RateLimiter.rate_limited?("error") + + # Second error event should be dropped BEFORE making HTTP request + # This happens at the higher level (encode_and_post_envelope checks rate limits first) + envelope2 = Envelope.from_event(Event.create_event(message: "Second error")) + + # The bypass will NOT receive a request because it's dropped before sending + assert {:error, %ClientError{reason: :rate_limited}} = + Transport.encode_and_post_envelope(envelope2, HackneyClient, _retries = []) + end + + test "handles multiple categories in single X-Sentry-Rate-Limits header", %{bypass: bypass} do + envelope = Envelope.from_event(Event.create_event(message: "Hello")) + + # Simulate Sentry rate-limiting multiple categories at once + Bypass.expect(bypass, "POST", "/api/1/envelope/", fn conn -> + conn + |> Plug.Conn.put_resp_header( + "X-Sentry-Rate-Limits", + "60:error;transaction:key, 120:attachment:org" + ) + |> Plug.Conn.resp(200, ~s<{"id":"xyz"}>) + end) + + assert {:ok, "xyz"} = Transport.encode_and_post_envelope(envelope, HackneyClient) + + # Both error and transaction should be rate-limited for 60 seconds + assert Transport.RateLimiter.rate_limited?("error") + assert Transport.RateLimiter.rate_limited?("transaction") + + # Attachment should be rate-limited for 120 seconds + assert Transport.RateLimiter.rate_limited?("attachment") + + # Other categories should not be rate-limited + refute Transport.RateLimiter.rate_limited?("session") + end end defp error(fun) do diff --git a/test/support/case.ex b/test/support/case.ex index 85103114..6b254a80 100644 --- a/test/support/case.ex +++ b/test/support/case.ex @@ -10,6 +10,11 @@ defmodule Sentry.Case do setup context do config_before = all_config() + # Clear rate limiter state before each test + if Process.whereis(Sentry.Transport.RateLimiter) do + :ets.delete_all_objects(Sentry.Transport.RateLimiter) + end + on_exit(fn -> assert config_before == all_config() end)