-
-
Notifications
You must be signed in to change notification settings - Fork 4k
/
cases.py
2785 lines (2387 loc) · 102 KB
/
cases.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from __future__ import annotations
import hashlib
import inspect
import os.path
import re
import time
from contextlib import contextmanager
from datetime import datetime, timedelta, timezone
from io import BytesIO
from typing import Any, Dict, List, Literal, Mapping, Optional, Sequence, Union
from unittest import mock
from urllib.parse import urlencode
from uuid import uuid4
from zlib import compress
import pytest
import requests
import responses
import sentry_kafka_schemas
from click.testing import CliRunner
from django.conf import settings
from django.contrib.auth import login
from django.contrib.auth.models import AnonymousUser
from django.core import signing
from django.core.cache import cache
from django.db import DEFAULT_DB_ALIAS, connection, connections
from django.db.migrations.executor import MigrationExecutor
from django.http import HttpRequest
from django.test import TestCase as DjangoTestCase
from django.test import TransactionTestCase as DjangoTransactionTestCase
from django.test import override_settings
from django.test.utils import CaptureQueriesContext
from django.urls import resolve, reverse
from django.utils import timezone as django_timezone
from django.utils.functional import cached_property
from pkg_resources import iter_entry_points
from requests.utils import CaseInsensitiveDict, get_encoding_from_headers
from rest_framework import status
from rest_framework.test import APITestCase as BaseAPITestCase
from sentry_relay.consts import SPAN_STATUS_NAME_TO_CODE
from snuba_sdk import Granularity, Limit, Offset
from snuba_sdk.conditions import BooleanCondition, Condition, ConditionGroup
from sentry import auth, eventstore
from sentry.auth.authenticators.totp import TotpInterface
from sentry.auth.provider import Provider
from sentry.auth.providers.dummy import DummyProvider
from sentry.auth.providers.saml2.activedirectory.apps import ACTIVE_DIRECTORY_PROVIDER_NAME
from sentry.auth.superuser import COOKIE_DOMAIN as SU_COOKIE_DOMAIN
from sentry.auth.superuser import COOKIE_NAME as SU_COOKIE_NAME
from sentry.auth.superuser import COOKIE_PATH as SU_COOKIE_PATH
from sentry.auth.superuser import COOKIE_SALT as SU_COOKIE_SALT
from sentry.auth.superuser import COOKIE_SECURE as SU_COOKIE_SECURE
from sentry.auth.superuser import ORG_ID as SU_ORG_ID
from sentry.auth.superuser import Superuser
from sentry.event_manager import EventManager
from sentry.eventstore.models import Event
from sentry.eventstream.snuba import SnubaEventStream
from sentry.issues.grouptype import NoiseConfig, PerformanceNPlusOneGroupType
from sentry.issues.ingest import send_issue_occurrence_to_eventstream
from sentry.mail import mail_adapter
from sentry.mediators.project_rules.creator import Creator
from sentry.models.apitoken import ApiToken
from sentry.models.authprovider import AuthProvider as AuthProviderModel
from sentry.models.commit import Commit
from sentry.models.commitauthor import CommitAuthor
from sentry.models.dashboard import Dashboard
from sentry.models.dashboard_widget import (
DashboardWidget,
DashboardWidgetDisplayTypes,
DashboardWidgetQuery,
)
from sentry.models.deletedorganization import DeletedOrganization
from sentry.models.deploy import Deploy
from sentry.models.environment import Environment
from sentry.models.files.file import File
from sentry.models.groupmeta import GroupMeta
from sentry.models.identity import Identity, IdentityProvider, IdentityStatus
from sentry.models.notificationsetting import NotificationSetting
from sentry.models.options.project_option import ProjectOption
from sentry.models.options.user_option import UserOption
from sentry.models.organization import Organization
from sentry.models.organizationmember import OrganizationMember
from sentry.models.project import Project
from sentry.models.release import Release
from sentry.models.releasecommit import ReleaseCommit
from sentry.models.repository import Repository
from sentry.models.rule import RuleSource
from sentry.models.user import User
from sentry.models.useremail import UserEmail
from sentry.monitors.models import Monitor, MonitorEnvironment, MonitorType, ScheduleType
from sentry.notifications.types import NotificationSettingOptionValues, NotificationSettingTypes
from sentry.plugins.base import plugins
from sentry.replays.lib.event_linking import transform_event_for_linking_payload
from sentry.replays.models import ReplayRecordingSegment
from sentry.rules.base import RuleBase
from sentry.search.events.constants import (
METRIC_FRUSTRATED_TAG_VALUE,
METRIC_SATISFACTION_TAG_KEY,
METRIC_SATISFIED_TAG_VALUE,
METRIC_TOLERATED_TAG_VALUE,
METRICS_MAP,
SPAN_METRICS_MAP,
)
from sentry.sentry_metrics import indexer
from sentry.sentry_metrics.aggregation_option_registry import AggregationOption
from sentry.sentry_metrics.configuration import UseCaseKey
from sentry.sentry_metrics.use_case_id_registry import METRIC_PATH_MAPPING, UseCaseID
from sentry.silo import SiloMode
from sentry.snuba.metrics.datasource import get_series
from sentry.tagstore.snuba.backend import SnubaTagStorage
from sentry.testutils.factories import get_fixture_path
from sentry.testutils.helpers.datetime import before_now, iso_format
from sentry.testutils.helpers.notifications import TEST_ISSUE_OCCURRENCE
from sentry.testutils.helpers.slack import install_slack
from sentry.testutils.pytest.selenium import Browser
from sentry.types.condition_activity import ConditionActivity, ConditionActivityType
from sentry.types.integrations import ExternalProviders
from sentry.utils import json
from sentry.utils.auth import SsoSession
from sentry.utils.dates import to_timestamp
from sentry.utils.json import dumps_htmlsafe
from sentry.utils.performance_issues.performance_detection import detect_performance_problems
from sentry.utils.retries import TimedRetryPolicy
from sentry.utils.samples import load_data
from sentry.utils.snuba import _snuba_pool
from ..services.hybrid_cloud.organization.serial import serialize_rpc_organization
from ..shared_integrations.client.proxy import IntegrationProxyClient
from ..snuba.metrics import (
MetricConditionField,
MetricField,
MetricGroupByField,
MetricOrderByField,
MetricsQuery,
get_date_range,
)
from ..snuba.metrics.naming_layer.mri import SessionMRI, TransactionMRI, parse_mri
from .asserts import assert_status_code
from .factories import Factories
from .fixtures import Fixtures
from .helpers import AuthProvider, Feature, TaskRunner, override_options, parse_queries
from .silo import assume_test_silo_mode
from .skips import requires_snuba
__all__ = (
"TestCase",
"TransactionTestCase",
"APITestCase",
"TwoFactorAPITestCase",
"AuthProviderTestCase",
"RuleTestCase",
"PermissionTestCase",
"PluginTestCase",
"CliTestCase",
"AcceptanceTestCase",
"IntegrationTestCase",
"SnubaTestCase",
"BaseMetricsTestCase",
"BaseMetricsLayerTestCase",
"BaseIncidentsTest",
"IntegrationRepositoryTestCase",
"ReleaseCommitPatchTest",
"SetRefsTestCase",
"OrganizationDashboardWidgetTestCase",
"SCIMTestCase",
"SCIMAzureTestCase",
"MetricsEnhancedPerformanceTestCase",
"MetricsAPIBaseTestCase",
"OrganizationMetricMetaIntegrationTestCase",
"ProfilesSnubaTestCase",
"ReplaysAcceptanceTestCase",
"ReplaysSnubaTestCase",
"MonitorTestCase",
"MonitorIngestTestCase",
)
from ..types.region import get_region_by_name
DEFAULT_USER_AGENT = "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36"
DETECT_TESTCASE_MISUSE = os.environ.get("SENTRY_DETECT_TESTCASE_MISUSE") == "1"
SILENCE_MIXED_TESTCASE_MISUSE = os.environ.get("SENTRY_SILENCE_MIXED_TESTCASE_MISUSE") == "1"
SessionOrTransactionMRI = Union[SessionMRI, TransactionMRI]
class BaseTestCase(Fixtures):
def assertRequiresAuthentication(self, path, method="GET"):
resp = getattr(self.client, method.lower())(path)
assert resp.status_code == 302
assert resp["Location"].startswith("http://testserver" + reverse("sentry-login"))
@pytest.fixture(autouse=True)
def setup_dummy_auth_provider(self):
auth.register("dummy", DummyProvider)
self.addCleanup(auth.unregister, "dummy", DummyProvider)
def tasks(self):
return TaskRunner()
@pytest.fixture(autouse=True)
def polyfill_capture_on_commit_callbacks(self, django_capture_on_commit_callbacks):
"""
https://pytest-django.readthedocs.io/en/latest/helpers.html#django_capture_on_commit_callbacks
pytest-django comes with its own polyfill of this Django helper for
older Django versions, so we're using that.
"""
self.capture_on_commit_callbacks = django_capture_on_commit_callbacks
@pytest.fixture(autouse=True)
def expose_stale_database_reads(self, stale_database_reads):
self.stale_database_reads = stale_database_reads
def feature(self, names):
"""
>>> with self.feature({'feature:name': True})
>>> # ...
"""
return Feature(names)
def auth_provider(self, name, cls):
"""
>>> with self.auth_provider('name', Provider)
>>> # ...
"""
return AuthProvider(name, cls)
def save_session(self):
self.session.save()
self.save_cookie(
name=settings.SESSION_COOKIE_NAME,
value=self.session.session_key,
max_age=None,
path="/",
domain=settings.SESSION_COOKIE_DOMAIN,
secure=settings.SESSION_COOKIE_SECURE or None,
expires=None,
)
def save_cookie(self, name, value, **params):
self.client.cookies[name] = value
self.client.cookies[name].update({k.replace("_", "-"): v for k, v in params.items()})
def make_request(
self,
user=None,
auth=None,
method=None,
is_superuser=False,
path="/",
secure_scheme=False,
subdomain=None,
*,
GET: dict[str, str] | None = None,
) -> HttpRequest:
request = HttpRequest()
if subdomain:
setattr(request, "subdomain", subdomain)
if method:
request.method = method
request.path = path
request.META["REMOTE_ADDR"] = "127.0.0.1"
request.META["SERVER_NAME"] = "testserver"
request.META["SERVER_PORT"] = 80
if GET is not None:
for k, v in GET.items():
request.GET[k] = v
if secure_scheme:
secure_header = settings.SECURE_PROXY_SSL_HEADER
request.META[secure_header[0]] = secure_header[1]
# order matters here, session -> user -> other things
request.session = self.session
request.auth = auth
request.user = user or AnonymousUser()
# must happen after request.user/request.session is populated
request.superuser = Superuser(request)
if is_superuser:
# XXX: this is gross, but it's a one-off and apis change only once in a great while
request.superuser.set_logged_in(user)
request.is_superuser = lambda: request.superuser.is_active
request.successful_authenticator = None
return request
# TODO(dcramer): ideally superuser_sso would be False by default, but that would require
# a lot of tests changing
@TimedRetryPolicy.wrap(timeout=5)
def login_as(
self, user, organization_id=None, organization_ids=None, superuser=False, superuser_sso=True
):
if isinstance(user, OrganizationMember):
with assume_test_silo_mode(SiloMode.CONTROL):
user = User.objects.get(id=user.user_id)
user.backend = settings.AUTHENTICATION_BACKENDS[0]
request = self.make_request()
with assume_test_silo_mode(SiloMode.CONTROL):
login(request, user)
request.user = user
if organization_ids is None:
organization_ids = set()
else:
organization_ids = set(organization_ids)
if superuser and superuser_sso is not False:
if SU_ORG_ID:
organization_ids.add(SU_ORG_ID)
if organization_id:
organization_ids.add(organization_id)
# TODO(dcramer): ideally this would get abstracted
if organization_ids:
for o in organization_ids:
sso_session = SsoSession.create(o)
self.session[sso_session.session_key] = sso_session.to_dict()
# logging in implicitly binds superuser, but for test cases we
# want that action to be explicit to avoid accidentally testing
# superuser-only code
if not superuser:
# XXX(dcramer): we're calling the internal method to avoid logging
request.superuser._set_logged_out()
elif request.user.is_superuser and superuser:
request.superuser.set_logged_in(request.user)
# XXX(dcramer): awful hack to ensure future attempts to instantiate
# the Superuser object are successful
self.save_cookie(
name=SU_COOKIE_NAME,
value=signing.get_cookie_signer(salt=SU_COOKIE_NAME + SU_COOKIE_SALT).sign(
request.superuser.token
),
max_age=None,
path=SU_COOKIE_PATH,
domain=SU_COOKIE_DOMAIN,
secure=SU_COOKIE_SECURE or None,
expires=None,
)
# Save the session values.
self.save_session()
def load_fixture(self, filepath):
with open(get_fixture_path(filepath), "rb") as fp:
return fp.read()
def _pre_setup(self):
super()._pre_setup()
cache.clear()
ProjectOption.objects.clear_local_cache()
GroupMeta.objects.clear_local_cache()
def _post_teardown(self):
super()._post_teardown()
def options(self, options):
"""
A context manager that temporarily sets a global option and reverts
back to the original value when exiting the context.
"""
return override_options(options)
def assert_valid_deleted_log(self, deleted_log, original_object):
assert deleted_log is not None
assert original_object.name == deleted_log.name
assert deleted_log.name == original_object.name
assert deleted_log.slug == original_object.slug
if not isinstance(deleted_log, DeletedOrganization):
assert deleted_log.organization_id == original_object.organization.id
assert deleted_log.organization_name == original_object.organization.name
assert deleted_log.organization_slug == original_object.organization.slug
assert deleted_log.date_created == original_object.date_added
assert deleted_log.date_deleted >= deleted_log.date_created
def assertWriteQueries(self, queries, debug=False, *args, **kwargs):
func = kwargs.pop("func", None)
using = kwargs.pop("using", DEFAULT_DB_ALIAS)
conn = connections[using]
context = _AssertQueriesContext(self, queries, debug, conn)
if func is None:
return context
with context:
func(*args, **kwargs)
def get_mock_uuid(self):
class uuid:
hex = "abc123"
bytes = b"\x00\x01\x02"
return uuid
class _AssertQueriesContext(CaptureQueriesContext):
def __init__(self, test_case, queries, debug, connection):
self.test_case = test_case
self.queries = queries
self.debug = debug
super().__init__(connection)
def __exit__(self, exc_type, exc_value, traceback):
super().__exit__(exc_type, exc_value, traceback)
if exc_type is not None:
return
parsed_queries = parse_queries(self.captured_queries)
if self.debug:
import pprint
pprint.pprint("====================== Raw Queries ======================")
pprint.pprint(self.captured_queries)
pprint.pprint("====================== Table writes ======================")
pprint.pprint(parsed_queries)
for table, num in parsed_queries.items():
expected = self.queries.get(table, 0)
if expected == 0:
import pprint
pprint.pprint(
"WARNING: no query against %s emitted, add debug=True to see all the queries"
% (table)
)
else:
self.test_case.assertTrue(
num == expected,
"%d write queries expected on `%s`, got %d, add debug=True to see all the queries"
% (expected, table, num),
)
for table, num in self.queries.items():
executed = parsed_queries.get(table, None)
self.test_case.assertFalse(
executed is None,
"no query against %s emitted, add debug=True to see all the queries" % (table),
)
@override_settings(ROOT_URLCONF="sentry.web.urls")
class TestCase(BaseTestCase, DjangoTestCase):
# We need Django to flush all databases.
databases: set[str] | str = "__all__"
@contextmanager
def auto_select_silo_mode_on_redirects(self):
"""
Tests that utilize follow=True may follow redirects between silo modes. This isn't ideal but convenient for
testing certain work flows. Using this context manager, the silo mode in the test will swap automatically
for each view's decorator in order to prevent otherwise unavoidable SiloAvailability errors.
"""
old_request = self.client.request
def request(**request: Any) -> Any:
resolved = resolve(request["PATH_INFO"])
view_class = getattr(resolved.func, "view_class", None)
if view_class is not None:
endpoint_silo_limit = getattr(view_class, "silo_limit", None)
if endpoint_silo_limit:
for mode in endpoint_silo_limit.modes:
if mode is SiloMode.MONOLITH or mode is SiloMode.get_current_mode():
continue
region = None
if mode is SiloMode.REGION:
# TODO: Can we infer the correct region here? would need to package up the
# the request dictionary into a higher level object, which also involves invoking
# _base_environ and maybe other logic buried in Client.....
region = get_region_by_name(settings.SENTRY_MONOLITH_REGION)
with SiloMode.exit_single_process_silo_context(), SiloMode.enter_single_process_silo_context(
mode, region
):
return old_request(**request)
return old_request(**request)
with mock.patch.object(self.client, "request", new=request):
yield
# Ensure that testcases that ask for DB setup actually make use of the
# DB. If they don't, they're wasting CI time.
if DETECT_TESTCASE_MISUSE:
@pytest.fixture(autouse=True, scope="class")
def _require_db_usage(self, request):
class State:
used_db = {}
base = request.cls
state = State()
yield state
did_not_use = set()
did_use = set()
for name, used in state.used_db.items():
if used:
did_use.add(name)
else:
did_not_use.add(name)
if did_not_use and not did_use:
pytest.fail(
f"none of the test functions in {state.base} used the DB! Use `unittest.TestCase` "
f"instead of `sentry.testutils.TestCase` for those kinds of tests."
)
elif did_not_use and did_use and not SILENCE_MIXED_TESTCASE_MISUSE:
pytest.fail(
f"Some of the test functions in {state.base} used the DB and some did not! "
f"test functions using the db: {did_use}\n"
f"Use `unittest.TestCase` instead of `sentry.testutils.TestCase` for the tests not using the db."
)
@pytest.fixture(autouse=True, scope="function")
def _check_function_for_db(self, request, monkeypatch, _require_db_usage):
from django.db.backends.base.base import BaseDatabaseWrapper
real_ensure_connection = BaseDatabaseWrapper.ensure_connection
state = _require_db_usage
def ensure_connection(*args, **kwargs):
for info in inspect.stack():
frame = info.frame
try:
first_arg_name = frame.f_code.co_varnames[0]
first_arg = frame.f_locals[first_arg_name]
except LookupError:
continue
# make an exact check here for two reasons. One is that this is
# good enough as we do not expect subclasses, secondly however because
# it turns out doing an isinstance check on untrusted input can cause
# bad things to happen because it's hookable. In particular this
# blows through max recursion limits here if it encounters certain types
# of broken lazy proxy objects.
if type(first_arg) is state.base and info.function in state.used_db:
state.used_db[info.function] = True
break
return real_ensure_connection(*args, **kwargs)
monkeypatch.setattr(BaseDatabaseWrapper, "ensure_connection", ensure_connection)
state.used_db[request.function.__name__] = False
yield
class TransactionTestCase(BaseTestCase, DjangoTransactionTestCase):
# We need Django to flush all databases.
databases: set[str] | str = "__all__"
pass
class PerformanceIssueTestCase(BaseTestCase):
# We need Django to flush all databases.
databases: set[str] | str = "__all__"
def create_performance_issue(
self,
tags=None,
contexts=None,
fingerprint=None,
transaction=None,
event_data=None,
issue_type=None,
noise_limit=0,
project_id=None,
detector_option="performance.issues.n_plus_one_db.problem-creation",
user_data=None,
):
if issue_type is None:
issue_type = PerformanceNPlusOneGroupType
if event_data is None:
event_data = load_data(
"transaction-n-plus-one",
timestamp=before_now(minutes=10),
)
if tags is not None:
event_data["tags"] = tags
if contexts is not None:
event_data["contexts"] = contexts
if transaction:
event_data["transaction"] = transaction
if project_id is None:
project_id = self.project.id
if user_data:
event_data["user"] = user_data
perf_event_manager = EventManager(event_data)
perf_event_manager.normalize()
def detect_performance_problems_interceptor(data: Event, project: Project):
perf_problems = detect_performance_problems(data, project)
if fingerprint:
for perf_problem in perf_problems:
perf_problem.fingerprint = fingerprint
return perf_problems
with mock.patch(
"sentry.issues.ingest.send_issue_occurrence_to_eventstream",
side_effect=send_issue_occurrence_to_eventstream,
) as mock_eventstream, mock.patch(
"sentry.event_manager.detect_performance_problems",
side_effect=detect_performance_problems_interceptor,
), mock.patch.object(
issue_type, "noise_config", new=NoiseConfig(noise_limit, timedelta(minutes=1))
), override_options(
{"performance.issues.all.problem-detection": 1.0, detector_option: 1.0}
):
event = perf_event_manager.save(project_id)
if mock_eventstream.call_args:
event = event.for_group(mock_eventstream.call_args[0][2].group)
event.occurrence = mock_eventstream.call_args[0][1]
return event
class APITestCase(BaseTestCase, BaseAPITestCase):
"""
Extend APITestCase to inherit access to `client`, an object with methods
that simulate API calls to Sentry, and the helper `get_response`, which
combines and simplifies a lot of tedious parts of making API calls in tests.
When creating API tests, use a new class per endpoint-method pair. The class
must set the string `endpoint`.
"""
# We need Django to flush all databases.
databases: set[str] | str = "__all__"
method = "get"
@property
def endpoint(self):
raise NotImplementedError(f"implement for {type(self).__module__}.{type(self).__name__}")
def get_response(self, *args, **params):
"""
Simulate an API call to the test case's URI and method.
:param params:
Note: These names are intentionally a little funny to prevent name
collisions with real API arguments.
* extra_headers: (Optional) Dict mapping keys to values that will be
passed as request headers.
* qs_params: (Optional) Dict mapping keys to values that will be
url-encoded into a API call's query string.
* raw_data: (Optional) Sometimes we want to precompute the JSON body.
:returns Response object
"""
url = reverse(self.endpoint, args=args)
# In some cases we want to pass querystring params to put/post, handle
# this here.
if "qs_params" in params:
query_string = urlencode(params.pop("qs_params"), doseq=True)
url = f"{url}?{query_string}"
headers = params.pop("extra_headers", {})
raw_data = params.pop("raw_data", None)
if raw_data and isinstance(raw_data, bytes):
raw_data = raw_data.decode("utf-8")
if raw_data and isinstance(raw_data, str):
raw_data = json.loads(raw_data)
data = raw_data or params
method = params.pop("method", self.method).lower()
return getattr(self.client, method)(url, format="json", data=data, **headers)
def get_success_response(self, *args, **params):
"""
Call `get_response` (see above) and assert the response's status code.
:param params:
* status_code: (Optional) Assert that the response's status code is
a specific code. Omit to assert any successful status_code.
:returns Response object
"""
status_code = params.pop("status_code", None)
if status_code and status_code >= 400:
raise Exception("status_code must be < 400")
method = params.pop("method", self.method).lower()
response = self.get_response(*args, method=method, **params)
if status_code:
assert_status_code(response, status_code)
elif method == "get":
assert_status_code(response, status.HTTP_200_OK)
# TODO(mgaeta): Add the other methods.
# elif method == "post":
# assert_status_code(response, status.HTTP_201_CREATED)
elif method == "put":
assert_status_code(response, status.HTTP_200_OK)
elif method == "delete":
assert_status_code(response, status.HTTP_204_NO_CONTENT)
else:
# TODO(mgaeta): Add other methods.
assert_status_code(response, 200, 300)
return response
def get_error_response(self, *args, **params):
"""
Call `get_response` (see above) and assert that the response's status
code is an error code. Basically it's syntactic sugar.
:param params:
* status_code: (Optional) Assert that the response's status code is
a specific error code. Omit to assert any error status_code.
:returns Response object
"""
status_code = params.pop("status_code", None)
if status_code and status_code < 400:
raise Exception("status_code must be >= 400 (an error status code)")
response = self.get_response(*args, **params)
if status_code:
assert_status_code(response, status_code)
else:
assert_status_code(response, 400, 600)
return response
def get_cursor_headers(self, response):
return [
link["cursor"]
for link in requests.utils.parse_header_links(
response.get("link").rstrip(">").replace(">,<", ",<")
)
]
# The analytics event `name` was called with `kwargs` being a subset of its properties
def analytics_called_with_args(self, fn, name, **kwargs):
for call_args, call_kwargs in fn.call_args_list:
event_name = call_args[0]
if event_name == name:
assert all(call_kwargs.get(key, None) == val for key, val in kwargs.items())
return True
return False
@contextmanager
def api_gateway_proxy_stubbed(self):
"""Mocks a fake api gateway proxy that redirects via Client objects"""
def proxy_raw_request(
method: str,
url: str,
headers: Mapping[str, str],
params: Mapping[str, str] | None,
data: Any,
**kwds: Any,
) -> requests.Response:
from django.test.client import Client
client = Client()
extra: Mapping[str, Any] = {
f"HTTP_{k.replace('-', '_').upper()}": v for k, v in headers.items()
}
if params:
url += "?" + urlencode(params)
with assume_test_silo_mode(SiloMode.REGION):
resp = getattr(client, method.lower())(
url, b"".join(data), headers["Content-Type"], **extra
)
response = requests.Response()
response.status_code = resp.status_code
response.headers = CaseInsensitiveDict(resp.headers)
response.encoding = get_encoding_from_headers(response.headers)
response.raw = BytesIO(resp.content)
return response
with mock.patch("sentry.api_gateway.proxy.external_request", new=proxy_raw_request):
yield
class TwoFactorAPITestCase(APITestCase):
@cached_property
def path_2fa(self):
return reverse("sentry-account-settings-security")
def enable_org_2fa(self, organization):
organization.flags.require_2fa = True
organization.save()
def api_enable_org_2fa(self, organization, user):
self.login_as(user)
url = reverse(
"sentry-api-0-organization-details", kwargs={"organization_slug": organization.slug}
)
return self.client.put(url, data={"require2FA": True})
def api_disable_org_2fa(self, organization, user):
url = reverse(
"sentry-api-0-organization-details", kwargs={"organization_slug": organization.slug}
)
return self.client.put(url, data={"require2FA": False})
def assert_can_enable_org_2fa(self, organization, user, status_code=200):
self.__helper_enable_organization_2fa(organization, user, status_code)
def assert_cannot_enable_org_2fa(self, organization, user, status_code, err_msg=None):
self.__helper_enable_organization_2fa(organization, user, status_code, err_msg)
def __helper_enable_organization_2fa(self, organization, user, status_code, err_msg=None):
response = self.api_enable_org_2fa(organization, user)
assert response.status_code == status_code
if err_msg:
assert err_msg.encode("utf-8") in response.content
organization = Organization.objects.get(id=organization.id)
if 200 <= status_code < 300:
assert organization.flags.require_2fa
else:
assert not organization.flags.require_2fa
def add_2fa_users_to_org(self, organization, num_of_users=10, num_with_2fa=5):
non_compliant_members = []
for num in range(0, num_of_users):
user = self.create_user("foo_%s@example.com" % num)
self.create_member(organization=organization, user=user)
if num_with_2fa:
TotpInterface().enroll(user)
num_with_2fa -= 1
else:
non_compliant_members.append(user.email)
return non_compliant_members
class AuthProviderTestCase(TestCase):
provider: type[Provider] = DummyProvider
provider_name = "dummy"
def setUp(self):
super().setUp()
# TestCase automatically sets up dummy provider
if self.provider_name != "dummy" or self.provider != DummyProvider:
auth.register(self.provider_name, self.provider)
self.addCleanup(auth.unregister, self.provider_name, self.provider)
class RuleTestCase(TestCase):
@property
def rule_cls(self):
raise NotImplementedError(f"implement for {type(self).__module__}.{type(self).__name__}")
def get_event(self):
return self.event
def get_rule(self, **kwargs):
kwargs.setdefault("project", self.project)
kwargs.setdefault("data", {})
return self.rule_cls(**kwargs)
def get_state(self, **kwargs):
from sentry.rules import EventState
kwargs.setdefault("is_new", True)
kwargs.setdefault("is_regression", True)
kwargs.setdefault("is_new_group_environment", True)
kwargs.setdefault("has_reappeared", True)
return EventState(**kwargs)
def get_condition_activity(self, **kwargs) -> ConditionActivity:
kwargs.setdefault("group_id", self.event.group.id)
kwargs.setdefault("type", ConditionActivityType.CREATE_ISSUE)
kwargs.setdefault("timestamp", self.event.datetime)
return ConditionActivity(**kwargs)
def passes_activity(
self,
rule: RuleBase,
condition_activity: Optional[ConditionActivity] = None,
event_map: Optional[Dict[str, Any]] = None,
):
if condition_activity is None:
condition_activity = self.get_condition_activity()
if event_map is None:
event_map = {}
return rule.passes_activity(condition_activity, event_map)
def assertPasses(self, rule, event=None, **kwargs):
if event is None:
event = self.event
state = self.get_state(**kwargs)
assert rule.passes(event, state) is True
def assertDoesNotPass(self, rule, event=None, **kwargs):
if event is None:
event = self.event
state = self.get_state(**kwargs)
assert rule.passes(event, state) is False
class PermissionTestCase(TestCase):
def setUp(self):
super().setUp()
self.owner = self.create_user(is_superuser=False)
self.organization = self.create_organization(
owner=self.owner, flags=0 # disable default allow_joinleave access
)
self.team = self.create_team(organization=self.organization)
def assert_can_access(self, user, path, method="GET", **kwargs):
self.login_as(user, superuser=user.is_superuser)
resp = getattr(self.client, method.lower())(path, **kwargs)
assert resp.status_code >= 200 and resp.status_code < 300
return resp
def assert_cannot_access(self, user, path, method="GET", **kwargs):
self.login_as(user, superuser=user.is_superuser)
resp = getattr(self.client, method.lower())(path, **kwargs)
assert resp.status_code >= 300
def assert_member_can_access(self, path, **kwargs):
return self.assert_role_can_access(path, "member", **kwargs)
def assert_manager_can_access(self, path, **kwargs):
return self.assert_role_can_access(path, "manager", **kwargs)
def assert_teamless_member_can_access(self, path, **kwargs):
user = self.create_user(is_superuser=False)
self.create_member(user=user, organization=self.organization, role="member", teams=[])
self.assert_can_access(user, path, **kwargs)
def assert_member_cannot_access(self, path, **kwargs):
return self.assert_role_cannot_access(path, "member", **kwargs)
def assert_manager_cannot_access(self, path, **kwargs):
return self.assert_role_cannot_access(path, "manager", **kwargs)
def assert_teamless_member_cannot_access(self, path, **kwargs):
user = self.create_user(is_superuser=False)
self.create_member(user=user, organization=self.organization, role="member", teams=[])
self.assert_cannot_access(user, path, **kwargs)
def assert_team_admin_can_access(self, path, **kwargs):
return self.assert_role_can_access(path, "admin", **kwargs)
def assert_teamless_admin_can_access(self, path, **kwargs):
user = self.create_user(is_superuser=False)
self.create_member(user=user, organization=self.organization, role="admin", teams=[])
self.assert_can_access(user, path, **kwargs)
def assert_team_admin_cannot_access(self, path, **kwargs):
return self.assert_role_cannot_access(path, "admin", **kwargs)
def assert_teamless_admin_cannot_access(self, path, **kwargs):
user = self.create_user(is_superuser=False)
self.create_member(user=user, organization=self.organization, role="admin", teams=[])
self.assert_cannot_access(user, path, **kwargs)
def assert_team_owner_can_access(self, path, **kwargs):
return self.assert_role_can_access(path, "owner", **kwargs)
def assert_owner_can_access(self, path, **kwargs):
return self.assert_role_can_access(path, "owner", **kwargs)
def assert_owner_cannot_access(self, path, **kwargs):
return self.assert_role_cannot_access(path, "owner", **kwargs)
def assert_non_member_cannot_access(self, path, **kwargs):
user = self.create_user(is_superuser=False)
self.assert_cannot_access(user, path, **kwargs)
def assert_role_can_access(self, path, role, **kwargs):
user = self.create_user(is_superuser=False)
self.create_member(user=user, organization=self.organization, role=role, teams=[self.team])
return self.assert_can_access(user, path, **kwargs)
def assert_role_cannot_access(self, path, role, **kwargs):
user = self.create_user(is_superuser=False)
self.create_member(user=user, organization=self.organization, role=role, teams=[self.team])
self.assert_cannot_access(user, path, **kwargs)
@requires_snuba
class PluginTestCase(TestCase):
@property
def plugin(self):
raise NotImplementedError(f"implement for {type(self).__module__}.{type(self).__name__}")
def setUp(self):
super().setUp()
# Old plugins, plugin is a class, new plugins, it's an instance
# New plugins don't need to be registered