diff --git a/src/sentry/eventstream/kafka/postprocessworker.py b/src/sentry/eventstream/kafka/postprocessworker.py index e666ae280cd982..fa0c715231c6b0 100644 --- a/src/sentry/eventstream/kafka/postprocessworker.py +++ b/src/sentry/eventstream/kafka/postprocessworker.py @@ -6,6 +6,7 @@ from sentry import options from sentry.eventstream.kafka.protocol import ( + decode_bool, get_task_kwargs_for_message, get_task_kwargs_for_message_from_headers, ) @@ -21,6 +22,7 @@ _CONCURRENCY_METRIC = "eventstream.concurrency" _MESSAGES_METRIC = "eventstream.messages" _CONCURRENCY_OPTION = "post-process-forwarder:concurrency" +_TRANSACTION_FORWARDER_HEADER = "transaction_forwarder" @contextmanager @@ -114,7 +116,7 @@ def process_message(self, message: Message) -> Optional[Future]: """ return self.__executor.submit(_get_task_kwargs_and_dispatch, message) - def flush_batch(self, batch: Sequence[Future]) -> None: + def flush_batch(self, batch: Optional[Sequence[Future]]) -> None: """ For all work which was submitted to the thread pool executor, we need to ensure that if an exception was raised, then we raise it in the main thread. This is needed so that processing can be stopped in such @@ -140,3 +142,44 @@ def flush_batch(self, batch: Sequence[Future]) -> None: def shutdown(self) -> None: self.__executor.shutdown() + + +class ErrorsPostProcessForwarderWorker(PostProcessForwarderWorker): + """ + ErrorsPostProcessForwarderWorker will processes messages only in the following scenarios: + 1. _TRANSACTION_FORWARDER_HEADER is missing from the kafka headers. This is a backward compatibility + use case. There can be messages in the queue which do not have this header. Those messages should be + handled by the errors post process forwarder + 2. _TRANSACTION_FORWARDER_HEADER is False in the kafka headers. + """ + + def process_message(self, message: Message) -> Optional[Future]: + headers = {header: value for header, value in message.headers()} + + # Backwards-compatibility case for messages missing header. + if _TRANSACTION_FORWARDER_HEADER not in headers: + return super().process_message(message) + + if decode_bool(headers.get(_TRANSACTION_FORWARDER_HEADER)) is False: + return super().process_message(message) + + return None + + +class TransactionsPostProcessForwarderWorker(PostProcessForwarderWorker): + """ + TransactionsPostProcessForwarderWorker will processes messages only in the following scenarios: + 1. _TRANSACTION_FORWARDER_HEADER is True in the kafka headers. + """ + + def process_message(self, message: Message) -> Optional[Future]: + headers = {header: value for header, value in message.headers()} + + # Backwards-compatibility for messages missing headers. + if _TRANSACTION_FORWARDER_HEADER not in headers: + return None + + if decode_bool(headers.get(_TRANSACTION_FORWARDER_HEADER)) is True: + return super().process_message(message) + + return None diff --git a/src/sentry/eventstream/kafka/protocol.py b/src/sentry/eventstream/kafka/protocol.py index 8deb4e6fd8d961..12d68c5a50eba9 100644 --- a/src/sentry/eventstream/kafka/protocol.py +++ b/src/sentry/eventstream/kafka/protocol.py @@ -99,33 +99,37 @@ def get_task_kwargs_for_message(value): return handler(*payload[1:]) -def get_task_kwargs_for_message_from_headers(headers: Sequence[Tuple[str, Optional[bytes]]]): - """ - Same as get_task_kwargs_for_message but gets the required information from - the kafka message headers. - """ +def decode_str(value: Optional[bytes]) -> str: + assert isinstance(value, bytes) + return value.decode("utf-8") - def decode_str(value: Optional[bytes]) -> str: - assert isinstance(value, bytes) - return value.decode("utf-8") - def decode_optional_str(value: Optional[bytes]) -> Optional[str]: - if value is None: - return None - return decode_str(value) +def decode_optional_str(value: Optional[bytes]) -> Optional[str]: + if value is None: + return None + return decode_str(value) - def decode_int(value: Optional[bytes]) -> int: - assert isinstance(value, bytes) - return int(value) - def decode_optional_int(value: Optional[bytes]) -> Optional[int]: - if value is None: - return None - return decode_int(value) +def decode_int(value: Optional[bytes]) -> int: + assert isinstance(value, bytes) + return int(value) + - def decode_bool(value: bytes) -> bool: - return bool(int(decode_str(value))) +def decode_optional_int(value: Optional[bytes]) -> Optional[int]: + if value is None: + return None + return decode_int(value) + +def decode_bool(value: bytes) -> bool: + return bool(int(decode_str(value))) + + +def get_task_kwargs_for_message_from_headers(headers: Sequence[Tuple[str, Optional[bytes]]]): + """ + Same as get_task_kwargs_for_message but gets the required information from + the kafka message headers. + """ try: header_data = {k: v for k, v in headers} version = decode_int(header_data["version"]) diff --git a/tests/sentry/eventstream/kafka/test_postprocessworker.py b/tests/sentry/eventstream/kafka/test_postprocessworker.py index 501a38b7770e83..6c16b4aaebc2e9 100644 --- a/tests/sentry/eventstream/kafka/test_postprocessworker.py +++ b/tests/sentry/eventstream/kafka/test_postprocessworker.py @@ -5,7 +5,9 @@ from sentry import options from sentry.eventstream.kafka.postprocessworker import ( _CONCURRENCY_OPTION, + ErrorsPostProcessForwarderWorker, PostProcessForwarderWorker, + TransactionsPostProcessForwarderWorker, ) from sentry.eventstream.kafka.protocol import InvalidVersion from sentry.utils import json @@ -32,22 +34,50 @@ def kafka_message_payload(): ] -@patch("sentry.eventstream.kafka.postprocessworker.dispatch_post_process_group_task") -def test_post_process_forwarder(dispatch_post_process_group_task, kafka_message_payload): - """ - Test that the post process forwarder calls dispatch_post_process_group_task with the correct arguments - """ - forwarder = PostProcessForwarderWorker(concurrency=1) +@pytest.fixture +def kafka_message_without_transaction_header(kafka_message_payload): + mock_message = Mock() + mock_message.headers = MagicMock(return_value=[("timestamp", b"12345")]) + mock_message.value = MagicMock(return_value=json.dumps(kafka_message_payload)) + mock_message.partition = MagicMock("1") + return mock_message + +@pytest.fixture +def kafka_message_with_transaction_header_false(kafka_message_payload): mock_message = Mock() + mock_message.headers = MagicMock( + return_value=[("timestamp", b"12345"), ("transaction_forwarder", b"0")] + ) mock_message.value = MagicMock(return_value=json.dumps(kafka_message_payload)) mock_message.partition = MagicMock("1") + return mock_message - future = forwarder.process_message(mock_message) + +@pytest.fixture +def kafka_message_with_transaction_header_true(kafka_message_payload): + mock_message = Mock() + mock_message.headers = MagicMock( + return_value=[("timestamp", b"12345"), ("transaction_forwarder", b"1")] + ) + mock_message.value = MagicMock(return_value=json.dumps(kafka_message_payload)) + mock_message.partition = MagicMock("1") + return mock_message + + +@patch("sentry.eventstream.kafka.postprocessworker.dispatch_post_process_group_task") +def test_post_process_forwarder( + dispatch_post_process_group_task, kafka_message_without_transaction_header +): + """ + Tests that the post process forwarder calls dispatch_post_process_group_task with the correct arguments + """ + forwarder = PostProcessForwarderWorker(concurrency=1) + future = forwarder.process_message(kafka_message_without_transaction_header) forwarder.flush_batch([future]) - dispatch_post_process_group_task.assert_called_with( + dispatch_post_process_group_task.assert_called_once_with( event_id="fe0ee9a2bc3b415497bad68aaf70dc7f", project_id=1, group_id=43, @@ -66,7 +96,7 @@ def test_post_process_forwarder_bad_message_headers( dispatch_post_process_group_task, kafka_message_payload ): """ - Test that when bad message headers are received, post process forwarder still works if the payload is valid. + Tests that when bad message headers are received, post process forwarder still works if the payload is valid. """ forwarder = PostProcessForwarderWorker(concurrency=1) @@ -80,7 +110,7 @@ def test_post_process_forwarder_bad_message_headers( forwarder.flush_batch([future]) - dispatch_post_process_group_task.assert_called_with( + dispatch_post_process_group_task.assert_called_once_with( event_id="fe0ee9a2bc3b415497bad68aaf70dc7f", project_id=1, group_id=43, @@ -95,7 +125,7 @@ def test_post_process_forwarder_bad_message_headers( def test_post_process_forwarder_bad_message(kafka_message_payload): """ - Test that exception is thrown during flush_batch calls when a bad message is received. + Tests that exception is thrown during flush_batch calls when a bad message is received. """ forwarder = PostProcessForwarderWorker(concurrency=1) @@ -116,7 +146,7 @@ def test_post_process_forwarder_bad_message(kafka_message_payload): @pytest.mark.django_db def test_post_process_forwarder_concurrency(kafka_message_payload): """ - Test that the number of threads change when the option is changed. + Tests that the number of threads change when the option is changed. """ forwarder = PostProcessForwarderWorker(concurrency=1) @@ -128,3 +158,131 @@ def test_post_process_forwarder_concurrency(kafka_message_payload): assert forwarder._PostProcessForwarderWorker__current_concurrency == 5 forwarder.shutdown() + + +@pytest.mark.django_db +@patch("sentry.eventstream.kafka.postprocessworker.dispatch_post_process_group_task") +def test_errors_post_process_forwarder_missing_headers( + dispatch_post_process_group_task, kafka_message_without_transaction_header +): + """ + Tests that the errors post process forwarder calls dispatch_post_process_group_task + when the header "transaction_forwarder" is missing. + """ + forwarder = ErrorsPostProcessForwarderWorker(concurrency=1) + future = forwarder.process_message(kafka_message_without_transaction_header) + assert future is not None + + forwarder.flush_batch([future]) + + dispatch_post_process_group_task.assert_called_once_with( + event_id="fe0ee9a2bc3b415497bad68aaf70dc7f", + project_id=1, + group_id=43, + primary_hash="311ee66a5b8e697929804ceb1c456ffe", + is_new=False, + is_regression=None, + is_new_group_environment=False, + ) + + forwarder.shutdown() + + +@pytest.mark.django_db +@patch("sentry.eventstream.kafka.postprocessworker.dispatch_post_process_group_task") +def test_errors_post_process_forwarder_false_headers( + dispatch_post_process_group_task, kafka_message_with_transaction_header_false +): + """ + Test that the errors post process forwarder calls dispatch_post_process_group_task + when the header "transaction_forwarder" is set to False. + """ + forwarder = ErrorsPostProcessForwarderWorker(concurrency=1) + future = forwarder.process_message(kafka_message_with_transaction_header_false) + assert future is not None + + forwarder.flush_batch([future]) + + dispatch_post_process_group_task.assert_called_once_with( + event_id="fe0ee9a2bc3b415497bad68aaf70dc7f", + project_id=1, + group_id=43, + primary_hash="311ee66a5b8e697929804ceb1c456ffe", + is_new=False, + is_regression=None, + is_new_group_environment=False, + ) + + forwarder.shutdown() + + +@pytest.mark.django_db +def test_errors_post_process_forwarder_true_headers(kafka_message_with_transaction_header_true): + """ + Tests that the errors post process forwarder's process_message returns None + when the header "transaction_forwarder" is set to True. + """ + forwarder = ErrorsPostProcessForwarderWorker(concurrency=1) + future = forwarder.process_message(kafka_message_with_transaction_header_true) + + assert future is None + + forwarder.shutdown() + + +@pytest.mark.django_db +def test_transactions_post_process_forwarder_missing_headers( + kafka_message_without_transaction_header, +): + """ + Tests that the transactions post process forwarder's process_message returns None + when the header "transaction_forwarder" is missing. + """ + forwarder = TransactionsPostProcessForwarderWorker(concurrency=1) + future = forwarder.process_message(kafka_message_without_transaction_header) + assert future is None + + forwarder.shutdown() + + +@pytest.mark.django_db +def test_transactions_post_process_forwarder_false_headers( + kafka_message_with_transaction_header_false, +): + """ + Tests that the transactions post process forwarder's process_message returns None + when the header "transaction_forwarder" is set to False. + """ + forwarder = TransactionsPostProcessForwarderWorker(concurrency=1) + future = forwarder.process_message(kafka_message_with_transaction_header_false) + assert future is None + + forwarder.shutdown() + + +@pytest.mark.django_db +@patch("sentry.eventstream.kafka.postprocessworker.dispatch_post_process_group_task") +def test_transactions_post_process_forwarder_true_headers( + dispatch_post_process_group_task, kafka_message_with_transaction_header_true +): + """ + Tests that the transactions post process forwarder calls dispatch_post_process_group_task + when the header "transaction_forwarder" is set to True. + """ + forwarder = TransactionsPostProcessForwarderWorker(concurrency=1) + future = forwarder.process_message(kafka_message_with_transaction_header_true) + + assert future is not None + forwarder.flush_batch([future]) + + dispatch_post_process_group_task.assert_called_with( + event_id="fe0ee9a2bc3b415497bad68aaf70dc7f", + project_id=1, + group_id=43, + primary_hash="311ee66a5b8e697929804ceb1c456ffe", + is_new=False, + is_regression=None, + is_new_group_environment=False, + ) + + forwarder.shutdown()