Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 76 additions & 1 deletion src/sentry/testutils/pytest/sentry.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import collections
import collections.abc
import os
import random
import shutil
Expand Down Expand Up @@ -218,15 +219,23 @@ def pytest_configure(config: pytest.Config) -> None:

if snuba_url := xdist.get_snuba_url():
settings.SENTRY_SNUBA = snuba_url
# Also set the env var that _SnubaPool._get() prefers; this is immune
# to Django override_settings resets that can happen mid-session.
os.environ["_SENTRY_SNUBA_POOL_URL"] = snuba_url

settings.SENTRY_ISSUE_PLATFORM_FUTURES_MAX_LIMIT = 1

if not hasattr(settings, "SENTRY_OPTIONS"):
settings.SENTRY_OPTIONS = {}

redis_db = xdist.get_redis_db()
settings.SENTRY_OPTIONS.update(
{
"redis.clusters": {"default": {"hosts": {0: {"db": xdist.get_redis_db()}}}},
# Each xdist worker gets its own Redis DB so that teardown
# flushdb() only affects that worker's data. Isolation is
# intra-shard (within a single pytest process); each CI shard
# job runs on its own runner with its own Redis service.
"redis.clusters": {"default": {"hosts": {0: {"db": redis_db}}}},
"mail.backend": "django.core.mail.backends.locmem.EmailBackend",
"system.url-prefix": "http://testserver",
"system.base-hostname": "testserver",
Expand Down Expand Up @@ -267,6 +276,21 @@ def pytest_configure(config: pytest.Config) -> None:
settings.SENTRY_OPTIONS_COMPLAIN_ON_ERRORS = True
settings.VALIDATE_SUPERUSER_ACCESS_CATEGORY_AND_REASON = False

# Flush the options store's local cache so that any redis.clusters value
# that may have been cached before this configure hook (e.g. during early
# Django import) is evicted. The next access will re-read from settings
# and pick up the worker-specific DB number.
from sentry import options as sentry_options

sentry_options.default_manager.store.flush_local_cache()

# Also bust the redis_clusters manager's cluster cache so it re-creates
# the connection with the correct DB on first use.
from sentry.utils.redis import redis_clusters

redis_clusters._clusters_bytes.clear()
redis_clusters._clusters_str.clear()

_configure_test_env_cells()

# ID controls
Expand Down Expand Up @@ -378,8 +402,41 @@ def pytest_sessionfinish(session: pytest.Session, exitstatus: int) -> None:
TaskNamespace.send_task = TaskNamespace._original_send_task # type: ignore[method-assign]
del TaskNamespace._original_send_task

# When running shuffled tests with --exitfirst, record the first failing
# test ID so the CI workflow can pass it to detect-test-pollution.
failing_testid = getattr(session, "_shuffle_failing_testid", None)
if failing_testid and exitstatus != 0:
outdir = os.environ.get("GITHUB_WORKSPACE", "/tmp")
with open(f"{outdir}/failing-testid", "w") as f:
f.write(failing_testid + "\n")
longrepr = getattr(session, "_shuffle_failing_longrepr", None)
if longrepr:
with open(f"{outdir}/failing-testid-longrepr", "w") as f:
f.write(longrepr)


def pytest_runtest_setup(item: pytest.Item) -> None:
# Ensure the _SnubaPool proxy uses the correct per-worker Snuba URL before
# each test. The proxy caches settings.SENTRY_SNUBA on first use; if that
# first use happened before configure_for_worker updated the setting (or if
# the setting was never updated because xdist.get_snuba_url() returned None
# due to a stale module-level env read), the pool may point at 1218.
#
# Re-read the target URL from env here (call-time, not import-time) and
# force the proxy to rebuild the pool if the cached URL doesn't match.
snuba_url = xdist.get_snuba_url()
if snuba_url:
# Set an env var that _SnubaPool._get() prefers over settings.SENTRY_SNUBA.
# Environment variables are immune to Django's override_settings and
# TestCase isolation mechanisms that can reset the Django setting mid-session.
os.environ["_SENTRY_SNUBA_POOL_URL"] = snuba_url
settings.SENTRY_SNUBA = snuba_url
if "sentry.utils.snuba" in sys.modules:
snuba_mod = sys.modules["sentry.utils.snuba"]
pool_proxy = getattr(snuba_mod, "_snuba_pool", None)
if pool_proxy is not None and pool_proxy._url != snuba_url:
pool_proxy._url = None # force _get() to rebuild pool at correct URL

if item.config.getvalue("nomigrations") and any(
mark for mark in item.iter_markers(name="migrations")
):
Expand Down Expand Up @@ -522,11 +579,29 @@ def pytest_collection_modifyitems(config: pytest.Config, items: list[pytest.Item
if shuffle_enabled:
_shuffle(items, random.Random(seed))

# Write the final ordered test IDs to a file for detect-test-pollution
outdir = os.environ.get("GITHUB_WORKSPACE", "/tmp")
with open(f"{outdir}/testids-full", "w") as f:
f.write("\n".join(item.nodeid for item in items) + "\n")

# This only needs to be done if there are items to be de-selected
if len(discard) > 0:
config.hook.pytest_deselected(items=discard)


@pytest.hookimpl(tryfirst=True, hookwrapper=True)
def pytest_runtest_makereport(
item: pytest.Item, call: pytest.CallInfo[None]
) -> collections.abc.Generator[None]:
outcome = yield
report = outcome.get_result()
# Track the last failing test for detect-test-pollution (written to disk
# in pytest_sessionfinish only when the session actually fails).
if os.environ.get("SENTRY_SHUFFLE_TESTS") and report.failed and report.when == "call":
item.session._shuffle_failing_testid = item.nodeid # type: ignore[attr-defined]
item.session._shuffle_failing_longrepr = str(report.longrepr) # type: ignore[attr-defined]


def pytest_xdist_setupnodes() -> None:
# prevent out-of-order django initialization
os.environ.pop("DJANGO_SETTINGS_MODULE", None)
47 changes: 33 additions & 14 deletions src/sentry/testutils/pytest/xdist.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,31 @@

import os

_TEST_REDIS_DB = 9
# Redis defaults to 16 databases (0-15). We reserve DBs 0-8 for non-test
# use and assign test workers to DBs 9-15. With a base of 9 that gives 7
# usable slots (gw0-gw6). When there are more workers than slots we wrap
# around using modulo so we never crash; adjacent workers may share a DB,
# but flushdb() still provides per-test isolation within each worker.
_TEST_REDIS_DB_BASE = 9
_REDIS_DB_COUNT = 16 # Redis default; configure "databases N" to increase
_REDIS_TEST_SLOTS = _REDIS_DB_COUNT - _TEST_REDIS_DB_BASE # 7 slots (9-15)

_SNUBA_BASE_PORT = 1230
# Redis defaults to 16 DBs (0-15). With base DB 9, max 7 workers (gw0-gw6).
_MAX_WORKERS = 7

_worker_id: str | None = os.environ.get("PYTEST_XDIST_WORKER")
_worker_num: int | None = int(_worker_id.replace("gw", "")) if _worker_id else None

if _worker_num is not None and _worker_num >= _MAX_WORKERS:
raise RuntimeError(
f"xdist worker {_worker_id} exceeds max supported workers ({_MAX_WORKERS}). "
f"Redis only has DBs 0-15 and base DB is {_TEST_REDIS_DB}."
)


def get_redis_db() -> int:
"""Return the Redis DB number for this xdist worker.

Each worker gets its own DB so that ``flushdb()`` in teardown only
affects that worker's data. Workers are mapped round-robin into the
available test DB slots when there are more workers than slots.
"""
if _worker_num is not None:
return _TEST_REDIS_DB + _worker_num
return _TEST_REDIS_DB
return _TEST_REDIS_DB_BASE + (_worker_num % _REDIS_TEST_SLOTS)
return _TEST_REDIS_DB_BASE


def get_kafka_topic(base_name: str) -> str:
Expand All @@ -30,6 +36,19 @@ def get_kafka_topic(base_name: str) -> str:


def get_snuba_url() -> str | None:
if _worker_num is not None and os.environ.get("XDIST_PER_WORKER_SNUBA"):
return f"http://127.0.0.1:{_SNUBA_BASE_PORT + _worker_num}"
return None
"""Return the per-worker Snuba URL, or None if not in per-worker Snuba mode.

Reads both env vars at call time (not just at module import time) to handle
any edge case where xdist.py was imported before the subprocess environment
was fully initialised.
"""
if not os.environ.get("XDIST_PER_WORKER_SNUBA"):
return None
worker_id = os.environ.get("PYTEST_XDIST_WORKER") or ""
if not worker_id.startswith("gw"):
return None
try:
worker_num = int(worker_id[2:])
except ValueError:
return None
return f"http://127.0.0.1:{_SNUBA_BASE_PORT + worker_num}"
56 changes: 43 additions & 13 deletions src/sentry/utils/snuba.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,19 +562,49 @@ def increment(
)


_snuba_pool = connection_from_url(
settings.SENTRY_SNUBA,
retries=RetrySkipTimeout(
total=5,
# Our calls to snuba frequently fail due to network issues. We want to
# automatically retry most requests. Some of our POSTs and all of our DELETEs
# do cause mutations, but we have other things in place to handle duplicate
# mutations.
allowed_methods={"GET", "POST", "DELETE"},
),
timeout=settings.SENTRY_SNUBA_TIMEOUT,
maxsize=10,
)
class _SnubaPool:
"""Proxy for the Snuba HTTP connection pool.

Reads ``settings.SENTRY_SNUBA`` on first use instead of at module-import
time. This ensures xdist workers that override the setting in
``pytest_configure`` (to point at a per-worker snuba-gw container) get the
correct pool, even when the module was imported before the override ran.

The proxy is a regular object so existing ``patch.object(_snuba_pool, ...)``
calls in tests continue to work without modification.
"""

def __init__(self) -> None:
self._pool: urllib3.HTTPConnectionPool | None = None
self._url: str | None = None

def _get(self) -> urllib3.HTTPConnectionPool:
# Prefer the env-var override set by pytest_runtest_setup, which is
# immune to Django override_settings / TestCase isolation that can
# reset settings.SENTRY_SNUBA back to the default 1218 mid-session.
url = os.environ.get("_SENTRY_SNUBA_POOL_URL") or settings.SENTRY_SNUBA
if self._pool is None or self._url != url:
self._pool = connection_from_url(
url,
retries=RetrySkipTimeout(
total=5,
# Our calls to snuba frequently fail due to network issues. We want to
# automatically retry most requests. Some of our POSTs and all of our DELETEs
# do cause mutations, but we have other things in place to handle duplicate
# mutations.
allowed_methods={"GET", "POST", "DELETE"},
),
timeout=settings.SENTRY_SNUBA_TIMEOUT,
maxsize=10,
)
self._url = url
return self._pool

def urlopen(self, *args: Any, **kwargs: Any) -> urllib3.response.HTTPResponse:
return self._get().urlopen(*args, **kwargs)


_snuba_pool = _SnubaPool()


epoch_naive = datetime(1970, 1, 1, tzinfo=None)
Expand Down
Loading