-
-
Notifications
You must be signed in to change notification settings - Fork 4k
/
event_manager.py
2689 lines (2261 loc) · 102 KB
/
event_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from __future__ import annotations
import copy
import ipaddress
import logging
import random
import re
import time
import uuid
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from io import BytesIO
from typing import (
TYPE_CHECKING,
Any,
Dict,
Mapping,
MutableMapping,
Optional,
Sequence,
Tuple,
TypedDict,
Union,
cast,
)
import sentry_sdk
from django.conf import settings
from django.core.cache import cache
from django.core.exceptions import ValidationError
from django.db import IntegrityError, OperationalError, connection, router, transaction
from django.db.models import Func
from django.db.models.signals import post_save
from django.utils.encoding import force_str
from urllib3 import Retry
from urllib3.exceptions import MaxRetryError
from sentry import (
eventstore,
eventstream,
eventtypes,
features,
options,
quotas,
reprocessing2,
tsdb,
)
from sentry.attachments import CachedAttachment, MissingAttachmentChunks, attachment_cache
from sentry.conf.server import SEVERITY_DETECTION_RETRIES
from sentry.constants import (
DEFAULT_STORE_NORMALIZER_ARGS,
LOG_LEVELS_MAP,
MAX_TAG_VALUE_LENGTH,
DataCategory,
)
from sentry.culprit import generate_culprit
from sentry.dynamic_sampling import LatestReleaseBias, LatestReleaseParams
from sentry.eventstore.processing import event_processing_store
from sentry.eventtypes import EventType
from sentry.eventtypes.transaction import TransactionEvent
from sentry.grouping.api import (
BackgroundGroupingConfigLoader,
GroupingConfig,
GroupingConfigNotFound,
SecondaryGroupingConfigLoader,
apply_server_fingerprinting,
detect_synthetic_exception,
get_fingerprinting_config_for_project,
get_grouping_config_dict_for_event_data,
get_grouping_config_dict_for_project,
load_grouping_config,
)
from sentry.grouping.result import CalculatedHashes
from sentry.ingest.inbound_filters import FilterStatKeys
from sentry.issues.grouptype import GroupCategory
from sentry.issues.issue_occurrence import IssueOccurrence
from sentry.issues.producer import PayloadType, produce_occurrence_to_kafka
from sentry.killswitches import killswitch_matches_context
from sentry.lang.native.utils import STORE_CRASH_REPORTS_ALL, convert_crashreport_count
from sentry.locks import locks
from sentry.models.activity import Activity
from sentry.models.environment import Environment
from sentry.models.event import EventDict
from sentry.models.eventattachment import CRASH_REPORT_TYPES, EventAttachment, get_crashreport_key
from sentry.models.eventuser import EventUser
from sentry.models.files.file import File
from sentry.models.group import Group, GroupStatus
from sentry.models.groupenvironment import GroupEnvironment
from sentry.models.grouphash import GroupHash
from sentry.models.grouphistory import GroupHistoryStatus, record_group_history
from sentry.models.grouplink import GroupLink
from sentry.models.grouprelease import GroupRelease
from sentry.models.groupresolution import GroupResolution
from sentry.models.integrations.repository_project_path_config import RepositoryProjectPathConfig
from sentry.models.organization import Organization
from sentry.models.project import Project
from sentry.models.projectkey import ProjectKey
from sentry.models.pullrequest import PullRequest
from sentry.models.release import Release, ReleaseProject, follows_semver_versioning_scheme
from sentry.models.releasecommit import ReleaseCommit
from sentry.models.releaseenvironment import ReleaseEnvironment
from sentry.models.releaseprojectenvironment import ReleaseProjectEnvironment
from sentry.models.userreport import UserReport
from sentry.net.http import connection_from_url
from sentry.plugins.base import plugins
from sentry.projectoptions.defaults import BETA_GROUPING_CONFIG, DEFAULT_GROUPING_CONFIG
from sentry.quotas.base import index_data_category
from sentry.reprocessing2 import is_reprocessed_event, save_unprocessed_event
from sentry.services.hybrid_cloud.integration import integration_service
from sentry.shared_integrations.exceptions import ApiError
from sentry.signals import (
first_event_received,
first_event_with_minified_stack_trace_received,
first_transaction_received,
issue_unresolved,
)
from sentry.tasks.commits import fetch_commits
from sentry.tasks.integrations import kick_off_status_syncs
from sentry.tasks.process_buffer import buffer_incr
from sentry.tasks.relay import schedule_invalidate_project_config
from sentry.tsdb.base import TSDBModel
from sentry.types.activity import ActivityType
from sentry.types.group import GroupSubStatus
from sentry.utils import json, metrics
from sentry.utils.cache import cache_key_for_event
from sentry.utils.canonical import CanonicalKeyDict
from sentry.utils.dates import to_datetime, to_timestamp
from sentry.utils.event import has_event_minified_stack_trace, has_stacktrace, is_handled
from sentry.utils.metrics import MutableTags
from sentry.utils.outcomes import Outcome, track_outcome
from sentry.utils.performance_issues.performance_detection import detect_performance_problems
from sentry.utils.performance_issues.performance_problem import PerformanceProblem
from sentry.utils.safe import get_path, safe_execute, setdefault_path, trim
if TYPE_CHECKING:
from sentry.eventstore.models import BaseEvent, Event
logger = logging.getLogger("sentry.events")
SECURITY_REPORT_INTERFACES = ("csp", "hpkp", "expectct", "expectstaple")
# Timeout for cached group crash report counts
CRASH_REPORT_TIMEOUT = 24 * 3600 # one day
NON_TITLE_EVENT_TITLES = ["<untitled>", "<unknown>", "<unlabeled event>"]
@dataclass
class GroupInfo:
group: Group
is_new: bool
is_regression: bool
group_release: Optional[GroupRelease] = None
is_new_group_environment: bool = False
def pop_tag(data: dict[str, Any], key: str) -> None:
if "tags" not in data:
return
data["tags"] = [kv for kv in data["tags"] if kv is None or kv[0] != key]
def set_tag(data: dict[str, Any], key: str, value: Any) -> None:
pop_tag(data, key)
if value is not None:
data.setdefault("tags", []).append((key, trim(value, MAX_TAG_VALUE_LENGTH)))
def get_tag(data: dict[str, Any], key: str) -> Optional[Any]:
for k, v in get_path(data, "tags", filter=True) or ():
if k == key:
return v
return None
def is_sample_event(job):
return get_tag(job["data"], "sample_event") == "yes"
def plugin_is_regression(group: Group, event: Event) -> bool:
project = event.project
for plugin in plugins.for_project(project):
result = safe_execute(
plugin.is_regression, group, event, version=1, _with_transaction=False
)
if result is not None:
return bool(result)
return True
def has_pending_commit_resolution(group: Group) -> bool:
"""
Checks that the most recent commit that fixes a group has had a chance to release
"""
latest_issue_commit_resolution = (
GroupLink.objects.filter(
group_id=group.id,
linked_type=GroupLink.LinkedType.commit,
relationship=GroupLink.Relationship.resolves,
)
.order_by("-datetime")
.first()
)
if latest_issue_commit_resolution is None:
return False
# commit has been released and is not in pending commit state
if ReleaseCommit.objects.filter(commit__id=latest_issue_commit_resolution.linked_id).exists():
return False
else:
# check if this commit is a part of a PR
pr_ids = PullRequest.objects.filter(
pullrequestcommit__commit=latest_issue_commit_resolution.linked_id
).values_list("id", flat=True)
# assume that this commit has been released if any commits in this PR have been released
if ReleaseCommit.objects.filter(
commit__pullrequestcommit__pull_request__in=pr_ids
).exists():
return False
return True
def get_max_crashreports(
model: Union[Project, Organization], allow_none: bool = False
) -> Optional[int]:
value = model.get_option("sentry:store_crash_reports")
return convert_crashreport_count(value, allow_none=allow_none)
def crashreports_exceeded(current_count: int, max_count: int) -> bool:
if max_count == STORE_CRASH_REPORTS_ALL:
return False
return current_count >= max_count
def get_stored_crashreports(cache_key: Optional[str], event: Event, max_crashreports: int) -> int:
# There are two common cases: Storing crash reports is disabled, or is
# unbounded. In both cases, there is no need in caching values or querying
# the database.
if max_crashreports in (0, STORE_CRASH_REPORTS_ALL):
return max_crashreports
cached_reports = cache.get(cache_key, None)
if cached_reports is not None and cached_reports >= max_crashreports:
return cached_reports
# Fall-through if max_crashreports was bumped to get a more accurate number.
# We don't need the actual number, but just whether it's more or equal to
# the currently allowed maximum.
query = EventAttachment.objects.filter(group_id=event.group_id, type__in=CRASH_REPORT_TYPES)
return query[:max_crashreports].count()
class HashDiscarded(Exception):
def __init__(
self, message: str = "", reason: Optional[str] = None, tombstone_id: Optional[int] = None
):
super().__init__(message)
self.reason = reason
self.tombstone_id = tombstone_id
class ScoreClause(Func):
def __init__(self, group=None, last_seen=None, times_seen=None, *args, **kwargs):
self.group = group
self.last_seen = last_seen
self.times_seen = times_seen
# times_seen is likely an F-object that needs the value extracted
if hasattr(self.times_seen, "rhs"):
self.times_seen = self.times_seen.rhs.value
super().__init__(*args, **kwargs)
def __int__(self):
# Calculate the score manually when coercing to an int.
# This is used within create_or_update and friends
return self.group.get_score() if self.group else 0
def as_sql(self, compiler, connection, function=None, template=None):
has_values = self.last_seen is not None and self.times_seen is not None
if has_values:
sql = "log(times_seen + %d) * 600 + %d" % (
self.times_seen,
to_timestamp(self.last_seen),
)
else:
sql = "log(times_seen) * 600 + last_seen::abstime::int"
return (sql, [])
ProjectsMapping = Mapping[int, Project]
Job = MutableMapping[str, Any]
class EventManager:
"""
Handles normalization in both the store endpoint and the save task. The
intention is to swap this class out with a reimplementation in Rust.
"""
def __init__(
self,
data: dict[str, Any],
version: str = "5",
project: Optional[Project] = None,
grouping_config: Optional[GroupingConfig] = None,
client_ip: Optional[str] = None,
user_agent: Optional[str] = None,
auth: Optional[Any] = None,
key: Optional[Any] = None,
content_encoding: Optional[str] = None,
is_renormalize: bool = False,
remove_other: Optional[bool] = None,
project_config: Optional[Any] = None,
sent_at: Optional[datetime] = None,
):
self._data = CanonicalKeyDict(data)
self.version = version
self._project = project
# if not explicitly specified try to get the grouping from project_config
if grouping_config is None and project_config is not None:
config = project_config.config
grouping_config = config.get("grouping_config")
# if we still don't have a grouping also try the project
if grouping_config is None and project is not None:
grouping_config = get_grouping_config_dict_for_project(self._project)
self._grouping_config = grouping_config
self._client_ip = client_ip
self._user_agent = user_agent
self._auth = auth
self._key = key
self._is_renormalize = is_renormalize
self._remove_other = remove_other
self._normalized = False
self.project_config = project_config
self.sent_at = sent_at
def normalize(self, project_id: Optional[int] = None) -> None:
with metrics.timer("events.store.normalize.duration"):
self._normalize_impl(project_id=project_id)
def _normalize_impl(self, project_id: Optional[int] = None) -> None:
if self._project and project_id and project_id != self._project.id:
raise RuntimeError(
"Initialized EventManager with one project ID and called save() with another one"
)
if self._normalized:
raise RuntimeError("Already normalized")
self._normalized = True
from sentry_relay.processing import StoreNormalizer
rust_normalizer = StoreNormalizer(
project_id=self._project.id if self._project else project_id,
client_ip=self._client_ip,
client=self._auth.client if self._auth else None,
key_id=str(self._key.id) if self._key else None,
grouping_config=self._grouping_config,
protocol_version=str(self.version) if self.version is not None else None,
is_renormalize=self._is_renormalize,
remove_other=self._remove_other,
normalize_user_agent=True,
sent_at=self.sent_at.isoformat() if self.sent_at is not None else None,
**DEFAULT_STORE_NORMALIZER_ARGS,
)
pre_normalize_type = self._data.get("type")
self._data = CanonicalKeyDict(rust_normalizer.normalize_event(dict(self._data)))
# XXX: This is a hack to make generic events work (for now?). I'm not sure whether we should
# include this in the rust normalizer, since we don't want people sending us these via the
# sdk.
if pre_normalize_type == "generic":
self._data["type"] = pre_normalize_type
def get_data(self) -> CanonicalKeyDict:
return self._data
@metrics.wraps("event_manager.save")
def save(
self,
project_id: Optional[int],
raw: bool = False,
assume_normalized: bool = False,
start_time: Optional[int] = None,
cache_key: Optional[str] = None,
skip_send_first_transaction: bool = False,
) -> Event:
"""
After normalizing and processing an event, save adjacent models such as
releases and environments to postgres and write the event into
eventstream. From there it will be picked up by Snuba and
post-processing.
We re-insert events with duplicate IDs into Snuba, which is responsible
for deduplicating events. Since deduplication in Snuba is on the primary
key (based on event ID, project ID and day), events with same IDs are only
deduplicated if their timestamps fall on the same day. The latest event
always wins and overwrites the value of events received earlier in that day.
Since we increment counters and frequencies here before events get inserted
to eventstream these numbers may be larger than the total number of
events if we receive duplicate event IDs that fall on the same day
(that do not hit cache first).
"""
# Normalize if needed
if not self._normalized:
if not assume_normalized:
self.normalize(project_id=project_id)
self._normalized = True
with metrics.timer("event_manager.save.project.get_from_cache"):
project = Project.objects.get_from_cache(id=project_id)
with metrics.timer("event_manager.save.organization.get_from_cache"):
project.set_cached_field_value(
"organization", Organization.objects.get_from_cache(id=project.organization_id)
)
projects = {project.id: project}
job = {"data": self._data, "project_id": project.id, "raw": raw, "start_time": start_time}
# After calling _pull_out_data we get some keys in the job like the platform
with sentry_sdk.start_span(op="event_manager.save.pull_out_data"):
_pull_out_data([job], projects)
event_type = self._data.get("type")
if event_type == "transaction":
job["data"]["project"] = project.id
jobs = save_transaction_events([job], projects)
if not project.flags.has_transactions and not skip_send_first_transaction:
first_transaction_received.send_robust(
project=project, event=jobs[0]["event"], sender=Project
)
return jobs[0]["event"]
elif event_type == "generic":
job["data"]["project"] = project.id
jobs = save_generic_events([job], projects)
return jobs[0]["event"]
else:
metric_tags = {"platform": job["event"].platform or "unknown"}
# This metric allows differentiating from all calls to the `event_manager.save` metric
# and adds support for differentiating based on platforms
with metrics.timer("event_manager.save_error_events", tags=metric_tags):
return self.save_error_events(project, job, projects, metric_tags, raw, cache_key)
def save_error_events(
self,
project: Project,
job: Job,
projects: ProjectsMapping,
metric_tags: MutableTags,
raw: bool = False,
cache_key: Optional[str] = None,
) -> Event:
jobs = [job]
if is_sample_event(job):
logger.info(
"save_error_events: processing sample event",
extra={
"event.id": job["event"].event_id,
"project_id": project.id,
"sample_event": True,
},
)
is_reprocessed = is_reprocessed_event(job["data"])
with sentry_sdk.start_span(op="event_manager.save.get_or_create_release_many"):
_get_or_create_release_many(jobs, projects)
with sentry_sdk.start_span(op="event_manager.save.get_event_user_many"):
_get_event_user_many(jobs, projects)
job["project_key"] = None
if job["key_id"] is not None:
with metrics.timer("event_manager.load_project_key"):
try:
job["project_key"] = ProjectKey.objects.get_from_cache(id=job["key_id"])
except ProjectKey.DoesNotExist:
pass
_derive_plugin_tags_many(jobs, projects)
_derive_interface_tags_many(jobs)
do_background_grouping_before = options.get("store.background-grouping-before")
if do_background_grouping_before:
_run_background_grouping(project, job)
secondary_hashes = None
migrate_off_hierarchical = False
if _check_to_run_secondary_grouping(project):
with metrics.timer("event_manager.secondary_grouping", tags=metric_tags):
secondary_hashes = calculate_secondary_hash_if_needed(project, job)
with metrics.timer("event_manager.load_grouping_config"):
# At this point we want to normalize the in_app values in case the
# clients did not set this appropriately so far.
if is_reprocessed:
# The customer might have changed grouping enhancements since
# the event was ingested -> make sure we get the fresh one for reprocessing.
grouping_config = get_grouping_config_dict_for_project(project)
# Write back grouping config because it might have changed since the
# event was ingested.
# NOTE: We could do this unconditionally (regardless of `is_processed`).
job["data"]["grouping_config"] = grouping_config
else:
grouping_config = get_grouping_config_dict_for_event_data(
job["event"].data.data, project
)
with sentry_sdk.start_span(
op="event_manager",
description="event_manager.save.calculate_event_grouping",
), metrics.timer("event_manager.calculate_event_grouping", tags=metric_tags):
hashes = _calculate_event_grouping(project, job["event"], grouping_config)
# Because this logic is not complex enough we want to special case the situation where we
# migrate from a hierarchical hash to a non hierarchical hash. The reason being that
# `_save_aggregate` needs special logic to not create orphaned hashes in migration cases
# but it wants a different logic to implement splitting of hierarchical hashes.
migrate_off_hierarchical = bool(
secondary_hashes
and secondary_hashes.hierarchical_hashes
and not hashes.hierarchical_hashes
)
hashes = CalculatedHashes(
hashes=list(hashes.hashes) + list(secondary_hashes and secondary_hashes.hashes or []),
hierarchical_hashes=(
list(hashes.hierarchical_hashes)
+ list(secondary_hashes and secondary_hashes.hierarchical_hashes or [])
),
tree_labels=(
hashes.tree_labels or (secondary_hashes and secondary_hashes.tree_labels) or []
),
)
if not do_background_grouping_before:
_run_background_grouping(project, job)
if hashes.tree_labels:
job["finest_tree_label"] = hashes.finest_tree_label
_materialize_metadata_many(jobs)
group_creation_kwargs = _get_group_creation_kwargs(job)
group_creation_kwargs["culprit"] = job["culprit"]
# Load attachments first, but persist them at the very last after
# posting to eventstream to make sure all counters and eventstream are
# incremented for sure. Also wait for grouping to remove attachments
# based on the group counter.
with metrics.timer("event_manager.get_attachments"):
with sentry_sdk.start_span(op="event_manager.save.get_attachments"):
attachments = get_attachments(cache_key, job)
try:
with sentry_sdk.start_span(op="event_manager.save.save_aggregate_fn"):
group_info = _save_aggregate(
event=job["event"],
hashes=hashes,
release=job["release"],
metadata=dict(job["event_metadata"]),
received_timestamp=job["received_timestamp"],
migrate_off_hierarchical=migrate_off_hierarchical,
**group_creation_kwargs,
)
job["groups"] = [group_info]
except HashDiscarded as err:
logger.info(
"event_manager.save.discard",
extra={
"reason": err.reason,
"tombstone_id": err.tombstone_id,
},
)
discard_event(job, attachments)
raise
if not group_info:
if is_sample_event(job):
logger.info(
"save_error_events: no groupinfo found, returning event",
extra={
"event.id": job["event"].event_id,
"project_id": project.id,
"sample_event": True,
},
)
return job["event"]
job["event"].group = group_info.group
# store a reference to the group id to guarantee validation of isolation
# XXX(markus): No clue what this does
job["event"].data.bind_ref(job["event"])
_get_or_create_environment_many(jobs, projects)
_get_or_create_group_environment_many(jobs, projects)
_get_or_create_release_associated_models(jobs, projects)
_increment_release_associated_counts_many(jobs, projects)
_get_or_create_group_release_many(jobs, projects)
_tsdb_record_all_metrics(jobs)
UserReport.objects.filter(project_id=project.id, event_id=job["event"].event_id).update(
group_id=group_info.group.id, environment_id=job["environment"].id
)
with metrics.timer("event_manager.filter_attachments_for_group"):
attachments = filter_attachments_for_group(attachments, job)
# XXX: DO NOT MUTATE THE EVENT PAYLOAD AFTER THIS POINT
_materialize_event_metrics(jobs)
for attachment in attachments:
key = f"bytes.stored.{attachment.type}"
old_bytes = job["event_metrics"].get(key) or 0
job["event_metrics"][key] = old_bytes + attachment.size
_nodestore_save_many(jobs)
save_unprocessed_event(project, job["event"].event_id)
if not raw:
if not project.first_event:
project.update(first_event=job["event"].datetime)
first_event_received.send_robust(
project=project, event=job["event"], sender=Project
)
if (
has_event_minified_stack_trace(job["event"])
and not project.flags.has_minified_stack_trace
):
first_event_with_minified_stack_trace_received.send_robust(
project=project, event=job["event"], sender=Project
)
if is_reprocessed:
safe_execute(
reprocessing2.buffered_delete_old_primary_hash,
project_id=job["event"].project_id,
group_id=reprocessing2.get_original_group_id(job["event"]),
event_id=job["event"].event_id,
datetime=job["event"].datetime,
old_primary_hash=reprocessing2.get_original_primary_hash(job["event"]),
current_primary_hash=job["event"].get_primary_hash(),
_with_transaction=False,
)
_eventstream_insert_many(jobs)
# Do this last to ensure signals get emitted even if connection to the
# file store breaks temporarily.
#
# We do not need this for reprocessed events as for those we update the
# group_id on existing models in post_process_group, which already does
# this because of indiv. attachments.
if not is_reprocessed:
with metrics.timer("event_manager.save_attachments"):
save_attachments(cache_key, attachments, job)
metric_tags = {"from_relay": str("_relay_processed" in job["data"])}
metrics.timing(
"events.latency",
job["received_timestamp"] - job["recorded_timestamp"],
tags=metric_tags,
)
metrics.timing("events.size.data.post_save", job["event"].size, tags=metric_tags)
metrics.incr(
"events.post_save.normalize.errors",
amount=len(job["data"].get("errors") or ()),
tags=metric_tags,
)
_track_outcome_accepted_many(jobs)
self._data = job["event"].data.data
# Check if the project is configured for auto upgrading and we need to upgrade
# to the latest grouping config.
if _project_should_update_grouping(project):
_auto_update_grouping(project)
return job["event"]
def _check_to_run_secondary_grouping(project: Project) -> bool:
result = False
# These two values are basically always set
secondary_grouping_config = project.get_option("sentry:secondary_grouping_config")
secondary_grouping_expiry = project.get_option("sentry:secondary_grouping_expiry")
if secondary_grouping_config and (secondary_grouping_expiry or 0) >= time.time():
result = True
return result
def calculate_secondary_hash_if_needed(project: Project, job: Job) -> None | CalculatedHashes:
"""Calculate secondary hash for event using a fallback grouping config for a period of time.
This happens when we upgrade all projects that have not opted-out to automatic upgrades plus
when the customer changes the grouping config.
This causes extra load in save_event processing.
"""
secondary_hashes = None
try:
with sentry_sdk.start_span(
op="event_manager",
description="event_manager.save.secondary_calculate_event_grouping",
):
secondary_event = copy.deepcopy(job["event"])
loader = SecondaryGroupingConfigLoader()
secondary_grouping_config = loader.get_config_dict(project)
secondary_hashes = _calculate_event_grouping(
project, secondary_event, secondary_grouping_config
)
except Exception:
sentry_sdk.capture_exception()
return secondary_hashes
def _project_should_update_grouping(project: Project) -> bool:
should_update_org = (
project.organization_id % 1000 < float(settings.SENTRY_GROUPING_AUTO_UPDATE_ENABLED) * 1000
)
return bool(project.get_option("sentry:grouping_auto_update")) and should_update_org
def _auto_update_grouping(project: Project) -> None:
old_grouping = project.get_option("sentry:grouping_config")
new_grouping = DEFAULT_GROUPING_CONFIG
# update to latest grouping config but not if a user is already on
# beta.
if old_grouping == new_grouping or old_grouping == BETA_GROUPING_CONFIG:
return
# Because the way the auto grouping upgrading happening is racy, we want to
# try to write the audit log entry only and project option change just once.
# For this a cache key is used. That's not perfect, but should reduce the
# risk significantly.
cache_key = f"grouping-config-update:{project.id}:{old_grouping}"
lock = f"grouping-update-lock:{project.id}"
if cache.get(cache_key) is not None:
return
with locks.get(lock, duration=60, name="grouping-update-lock").acquire():
if cache.get(cache_key) is None:
cache.set(cache_key, "1", 60 * 5)
else:
return
from sentry import audit_log
from sentry.utils.audit import create_system_audit_entry
expiry = int(time.time()) + settings.SENTRY_GROUPING_UPDATE_MIGRATION_PHASE
changes = {
"sentry:secondary_grouping_config": old_grouping,
"sentry:secondary_grouping_expiry": expiry,
"sentry:grouping_config": new_grouping,
}
for (key, value) in changes.items():
project.update_option(key, value)
create_system_audit_entry(
organization=project.organization,
target_object=project.id,
event=audit_log.get_event_id("PROJECT_EDIT"),
data={**changes, **project.get_audit_log_data()},
)
@metrics.wraps("event_manager.background_grouping")
def _calculate_background_grouping(
project: Project, event: Event, config: GroupingConfig
) -> CalculatedHashes:
return _calculate_event_grouping(project, event, config)
def _run_background_grouping(project: Project, job: Job) -> None:
"""Optionally run a fraction of events with a third grouping config
This can be helpful to measure its performance impact.
This does not affect actual grouping.
"""
try:
sample_rate = options.get("store.background-grouping-sample-rate")
if sample_rate and random.random() <= sample_rate:
config = BackgroundGroupingConfigLoader().get_config_dict(project)
if config["id"]:
copied_event = copy.deepcopy(job["event"])
_calculate_background_grouping(project, copied_event, config)
except Exception:
sentry_sdk.capture_exception()
@metrics.wraps("save_event.pull_out_data")
def _pull_out_data(jobs: Sequence[Job], projects: ProjectsMapping) -> None:
"""
Update every job in the list with required information and store it in the nodestore.
A bunch of (probably) CPU bound stuff.
"""
for job in jobs:
job["project_id"] = int(job["project_id"])
data = job["data"]
# Pull the toplevel data we're interested in
transaction_name = data.get("transaction")
if transaction_name:
transaction_name = force_str(transaction_name)
job["transaction"] = transaction_name
key_id = None if data is None else data.get("key_id")
if key_id is not None:
key_id = int(key_id)
job["key_id"] = key_id
job["logger_name"] = logger_name = data.get("logger")
job["level"] = level = data.get("level")
job["release"] = data.get("release")
job["dist"] = data.get("dist")
job["environment"] = environment = data.get("environment")
job["recorded_timestamp"] = data.get("timestamp")
# Stores the event in the nodestore
job["event"] = event = _get_event_instance(job["data"], project_id=job["project_id"])
# Overwrite the data key with the event's updated data
job["data"] = data = event.data.data
event._project_cache = project = projects[job["project_id"]]
job["category"] = index_data_category(data.get("type"), project.organization)
job["platform"] = event.platform
# Some of the data that are toplevel attributes are duplicated
# into tags (logger, level, environment, transaction). These are
# different from legacy attributes which are normalized into tags
# ahead of time (site, server_name).
setdefault_path(data, "tags", value=[])
set_tag(data, "level", level)
if logger_name:
set_tag(data, "logger", logger_name)
if environment:
set_tag(data, "environment", environment)
if transaction_name:
set_tag(data, "transaction", transaction_name)
job["received_timestamp"] = job["event"].data.get("received") or float(
job["event"].datetime.strftime("%s")
)
job["groups"] = []
def _is_commit_sha(version: str) -> bool:
return re.match(r"[0-9a-f]{40}", version) is not None
def _associate_commits_with_release(release: Release, project: Project) -> None:
previous_release = release.get_previous_release(project)
possible_repos = (
RepositoryProjectPathConfig.objects.select_related("repository")
.filter(project=project, repository__provider="integrations:github")
.all()
)
if possible_repos:
# If it does exist, kick off a task to look if the commit exists in the repository
target_repo = None
for repo_proj_path_model in possible_repos:
ois = integration_service.get_organization_integrations(
org_integration_ids=[repo_proj_path_model.organization_integration_id]
)
oi = ois[0]
if not oi:
continue
integration = integration_service.get_integration(integration_id=oi.integration_id)
if not integration:
continue
integration_installation = integration.get_installation(
organization_id=oi.organization_id
)
if not integration_installation:
continue
repo_client = integration_installation.get_client()
try:
repo_client.get_commit(
repo=repo_proj_path_model.repository.name, sha=release.version
)
target_repo = repo_proj_path_model.repository
break
except ApiError as exc:
if exc.code != 404:
raise
if target_repo is not None:
# If it does exist, fetch the commits for that repo
fetch_commits.apply_async(
kwargs={
"release_id": release.id,
"user_id": None,
"refs": [{"repository": target_repo.name, "commit": release.version}],
"prev_release_id": previous_release.id
if previous_release is not None
else None,
}
)
@metrics.wraps("save_event.get_or_create_release_many")
def _get_or_create_release_many(jobs: Sequence[Job], projects: ProjectsMapping) -> None:
jobs_with_releases: dict[tuple[int, Release], list[Job]] = {}
release_date_added: dict[tuple[int, Release], datetime] = {}
for job in jobs:
if not job["release"]:
continue
release_key = (job["project_id"], job["release"])
jobs_with_releases.setdefault(release_key, []).append(job)
new_datetime = job["event"].datetime
old_datetime = release_date_added.get(release_key)
if old_datetime is None or new_datetime > old_datetime:
release_date_added[release_key] = new_datetime
for (project_id, version), jobs_to_update in jobs_with_releases.items():
try:
release = Release.get_or_create(
project=projects[project_id],
version=version,
date_added=release_date_added[(project_id, version)],
)
except ValidationError:
release = None
logger.exception(
"Failed creating Release due to ValidationError",
extra={
"project": projects[project_id],
"version": version,
},
)
if release:
if features.has(
"projects:auto-associate-commits-to-release", projects[project_id]
) and _is_commit_sha(release.version):
safe_execute(_associate_commits_with_release, release, projects[project_id])
for job in jobs_to_update:
# Don't allow a conflicting 'release' tag
data = job["data"]
pop_tag(data, "release")
set_tag(data, "sentry:release", release.version)
job["release"] = release
if job["dist"]:
job["dist"] = job["release"].add_dist(job["dist"], job["event"].datetime)
# don't allow a conflicting 'dist' tag
pop_tag(job["data"], "dist")
set_tag(job["data"], "sentry:dist", job["dist"].name)
# Dynamic Sampling - Boosting latest release functionality
if (
features.has(
"organizations:dynamic-sampling", projects[project_id].organization
)
and data.get("type") == "transaction"
):
with sentry_sdk.start_span(
op="event_manager.dynamic_sampling_observe_latest_release"
) as span:
try:
latest_release_params = LatestReleaseParams(
release=release,
project=projects[project_id],
environment=_get_environment_from_transaction(data),
)
def on_release_boosted() -> None:
span.set_tag(
"dynamic_sampling.observe_release_status",
"(release, environment) pair observed and boosted",
)
span.set_data("release", latest_release_params.release.id)
span.set_data("environment", latest_release_params.environment)
schedule_invalidate_project_config(
project_id=project_id,