Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion src/sentry/eventstream/kafka/postprocessworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from sentry import options
from sentry.eventstream.kafka.protocol import (
decode_bool,
get_task_kwargs_for_message,
get_task_kwargs_for_message_from_headers,
)
Expand All @@ -21,6 +22,7 @@
_CONCURRENCY_METRIC = "eventstream.concurrency"
_MESSAGES_METRIC = "eventstream.messages"
_CONCURRENCY_OPTION = "post-process-forwarder:concurrency"
_TRANSACTION_FORWARDER_HEADER = "transaction_forwarder"


@contextmanager
Expand Down Expand Up @@ -114,7 +116,7 @@ def process_message(self, message: Message) -> Optional[Future]:
"""
return self.__executor.submit(_get_task_kwargs_and_dispatch, message)

def flush_batch(self, batch: Sequence[Future]) -> None:
def flush_batch(self, batch: Optional[Sequence[Future]]) -> None:
"""
For all work which was submitted to the thread pool executor, we need to ensure that if an exception was
raised, then we raise it in the main thread. This is needed so that processing can be stopped in such
Expand All @@ -140,3 +142,44 @@ def flush_batch(self, batch: Sequence[Future]) -> None:

def shutdown(self) -> None:
self.__executor.shutdown()


class ErrorsPostProcessForwarderWorker(PostProcessForwarderWorker):
"""
ErrorsPostProcessForwarderWorker will processes messages only in the following scenarios:
1. _TRANSACTION_FORWARDER_HEADER is missing from the kafka headers. This is a backward compatibility
use case. There can be messages in the queue which do not have this header. Those messages should be
handled by the errors post process forwarder
2. _TRANSACTION_FORWARDER_HEADER is False in the kafka headers.
"""

def process_message(self, message: Message) -> Optional[Future]:
headers = {header: value for header, value in message.headers()}

# Backwards-compatibility case for messages missing header.
if _TRANSACTION_FORWARDER_HEADER not in headers:
return super().process_message(message)

if decode_bool(headers.get(_TRANSACTION_FORWARDER_HEADER)) is False:
return super().process_message(message)

return None


class TransactionsPostProcessForwarderWorker(PostProcessForwarderWorker):
"""
TransactionsPostProcessForwarderWorker will processes messages only in the following scenarios:
1. _TRANSACTION_FORWARDER_HEADER is True in the kafka headers.
"""

def process_message(self, message: Message) -> Optional[Future]:
headers = {header: value for header, value in message.headers()}

# Backwards-compatibility for messages missing headers.
if _TRANSACTION_FORWARDER_HEADER not in headers:
return None

if decode_bool(headers.get(_TRANSACTION_FORWARDER_HEADER)) is True:
return super().process_message(message)

return None
46 changes: 25 additions & 21 deletions src/sentry/eventstream/kafka/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,33 +99,37 @@ def get_task_kwargs_for_message(value):
return handler(*payload[1:])


def get_task_kwargs_for_message_from_headers(headers: Sequence[Tuple[str, Optional[bytes]]]):
"""
Same as get_task_kwargs_for_message but gets the required information from
the kafka message headers.
"""
def decode_str(value: Optional[bytes]) -> str:
assert isinstance(value, bytes)
return value.decode("utf-8")

def decode_str(value: Optional[bytes]) -> str:
assert isinstance(value, bytes)
return value.decode("utf-8")

def decode_optional_str(value: Optional[bytes]) -> Optional[str]:
if value is None:
return None
return decode_str(value)
def decode_optional_str(value: Optional[bytes]) -> Optional[str]:
if value is None:
return None
return decode_str(value)

def decode_int(value: Optional[bytes]) -> int:
assert isinstance(value, bytes)
return int(value)

def decode_optional_int(value: Optional[bytes]) -> Optional[int]:
if value is None:
return None
return decode_int(value)
def decode_int(value: Optional[bytes]) -> int:
assert isinstance(value, bytes)
return int(value)


def decode_bool(value: bytes) -> bool:
return bool(int(decode_str(value)))
def decode_optional_int(value: Optional[bytes]) -> Optional[int]:
if value is None:
return None
return decode_int(value)


def decode_bool(value: bytes) -> bool:
return bool(int(decode_str(value)))


def get_task_kwargs_for_message_from_headers(headers: Sequence[Tuple[str, Optional[bytes]]]):
"""
Same as get_task_kwargs_for_message but gets the required information from
the kafka message headers.
"""
try:
header_data = {k: v for k, v in headers}
version = decode_int(header_data["version"])
Expand Down
182 changes: 170 additions & 12 deletions tests/sentry/eventstream/kafka/test_postprocessworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
from sentry import options
from sentry.eventstream.kafka.postprocessworker import (
_CONCURRENCY_OPTION,
ErrorsPostProcessForwarderWorker,
PostProcessForwarderWorker,
TransactionsPostProcessForwarderWorker,
)
from sentry.eventstream.kafka.protocol import InvalidVersion
from sentry.utils import json
Expand All @@ -32,22 +34,50 @@ def kafka_message_payload():
]


@patch("sentry.eventstream.kafka.postprocessworker.dispatch_post_process_group_task")
def test_post_process_forwarder(dispatch_post_process_group_task, kafka_message_payload):
"""
Test that the post process forwarder calls dispatch_post_process_group_task with the correct arguments
"""
forwarder = PostProcessForwarderWorker(concurrency=1)
@pytest.fixture
def kafka_message_without_transaction_header(kafka_message_payload):
mock_message = Mock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why using a mock for the message? Can't we just create a real Message object ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried using actual confluent_kafka.Message but it does not work. For example the set_headers expect Message objects rather than the actual headers. When I create a Message and pass it to set_headers, it barfs saying "cannot create 'cimpl.Message' instances".

I feel its fine to use mocks here, since they return values which would have been returned by the actual message when the api's like headers(), values() are called.

mock_message.headers = MagicMock(return_value=[("timestamp", b"12345")])
mock_message.value = MagicMock(return_value=json.dumps(kafka_message_payload))
mock_message.partition = MagicMock("1")
return mock_message


@pytest.fixture
def kafka_message_with_transaction_header_false(kafka_message_payload):
mock_message = Mock()
mock_message.headers = MagicMock(
return_value=[("timestamp", b"12345"), ("transaction_forwarder", b"0")]
)
mock_message.value = MagicMock(return_value=json.dumps(kafka_message_payload))
mock_message.partition = MagicMock("1")
return mock_message

future = forwarder.process_message(mock_message)

@pytest.fixture
def kafka_message_with_transaction_header_true(kafka_message_payload):
mock_message = Mock()
mock_message.headers = MagicMock(
return_value=[("timestamp", b"12345"), ("transaction_forwarder", b"1")]
)
mock_message.value = MagicMock(return_value=json.dumps(kafka_message_payload))
mock_message.partition = MagicMock("1")
return mock_message


@patch("sentry.eventstream.kafka.postprocessworker.dispatch_post_process_group_task")
def test_post_process_forwarder(
dispatch_post_process_group_task, kafka_message_without_transaction_header
):
"""
Tests that the post process forwarder calls dispatch_post_process_group_task with the correct arguments
"""
forwarder = PostProcessForwarderWorker(concurrency=1)
future = forwarder.process_message(kafka_message_without_transaction_header)

forwarder.flush_batch([future])

dispatch_post_process_group_task.assert_called_with(
dispatch_post_process_group_task.assert_called_once_with(
event_id="fe0ee9a2bc3b415497bad68aaf70dc7f",
project_id=1,
group_id=43,
Expand All @@ -66,7 +96,7 @@ def test_post_process_forwarder_bad_message_headers(
dispatch_post_process_group_task, kafka_message_payload
):
"""
Test that when bad message headers are received, post process forwarder still works if the payload is valid.
Tests that when bad message headers are received, post process forwarder still works if the payload is valid.
"""
forwarder = PostProcessForwarderWorker(concurrency=1)

Expand All @@ -80,7 +110,7 @@ def test_post_process_forwarder_bad_message_headers(

forwarder.flush_batch([future])

dispatch_post_process_group_task.assert_called_with(
dispatch_post_process_group_task.assert_called_once_with(
event_id="fe0ee9a2bc3b415497bad68aaf70dc7f",
project_id=1,
group_id=43,
Expand All @@ -95,7 +125,7 @@ def test_post_process_forwarder_bad_message_headers(

def test_post_process_forwarder_bad_message(kafka_message_payload):
"""
Test that exception is thrown during flush_batch calls when a bad message is received.
Tests that exception is thrown during flush_batch calls when a bad message is received.
"""
forwarder = PostProcessForwarderWorker(concurrency=1)

Expand All @@ -116,7 +146,7 @@ def test_post_process_forwarder_bad_message(kafka_message_payload):
@pytest.mark.django_db
def test_post_process_forwarder_concurrency(kafka_message_payload):
"""
Test that the number of threads change when the option is changed.
Tests that the number of threads change when the option is changed.
"""
forwarder = PostProcessForwarderWorker(concurrency=1)

Expand All @@ -128,3 +158,131 @@ def test_post_process_forwarder_concurrency(kafka_message_payload):
assert forwarder._PostProcessForwarderWorker__current_concurrency == 5

forwarder.shutdown()


@pytest.mark.django_db
@patch("sentry.eventstream.kafka.postprocessworker.dispatch_post_process_group_task")
def test_errors_post_process_forwarder_missing_headers(
dispatch_post_process_group_task, kafka_message_without_transaction_header
):
"""
Tests that the errors post process forwarder calls dispatch_post_process_group_task
when the header "transaction_forwarder" is missing.
"""
forwarder = ErrorsPostProcessForwarderWorker(concurrency=1)
future = forwarder.process_message(kafka_message_without_transaction_header)
assert future is not None

forwarder.flush_batch([future])

dispatch_post_process_group_task.assert_called_once_with(
event_id="fe0ee9a2bc3b415497bad68aaf70dc7f",
project_id=1,
group_id=43,
primary_hash="311ee66a5b8e697929804ceb1c456ffe",
is_new=False,
is_regression=None,
is_new_group_environment=False,
)

forwarder.shutdown()


@pytest.mark.django_db
@patch("sentry.eventstream.kafka.postprocessworker.dispatch_post_process_group_task")
def test_errors_post_process_forwarder_false_headers(
dispatch_post_process_group_task, kafka_message_with_transaction_header_false
):
"""
Test that the errors post process forwarder calls dispatch_post_process_group_task
when the header "transaction_forwarder" is set to False.
"""
forwarder = ErrorsPostProcessForwarderWorker(concurrency=1)
future = forwarder.process_message(kafka_message_with_transaction_header_false)
assert future is not None

forwarder.flush_batch([future])

dispatch_post_process_group_task.assert_called_once_with(
event_id="fe0ee9a2bc3b415497bad68aaf70dc7f",
project_id=1,
group_id=43,
primary_hash="311ee66a5b8e697929804ceb1c456ffe",
is_new=False,
is_regression=None,
is_new_group_environment=False,
)

forwarder.shutdown()


@pytest.mark.django_db
def test_errors_post_process_forwarder_true_headers(kafka_message_with_transaction_header_true):
"""
Tests that the errors post process forwarder's process_message returns None
when the header "transaction_forwarder" is set to True.
"""
forwarder = ErrorsPostProcessForwarderWorker(concurrency=1)
future = forwarder.process_message(kafka_message_with_transaction_header_true)

assert future is None

forwarder.shutdown()


@pytest.mark.django_db
def test_transactions_post_process_forwarder_missing_headers(
kafka_message_without_transaction_header,
):
"""
Tests that the transactions post process forwarder's process_message returns None
when the header "transaction_forwarder" is missing.
"""
forwarder = TransactionsPostProcessForwarderWorker(concurrency=1)
future = forwarder.process_message(kafka_message_without_transaction_header)
assert future is None

forwarder.shutdown()


@pytest.mark.django_db
def test_transactions_post_process_forwarder_false_headers(
kafka_message_with_transaction_header_false,
):
"""
Tests that the transactions post process forwarder's process_message returns None
when the header "transaction_forwarder" is set to False.
"""
forwarder = TransactionsPostProcessForwarderWorker(concurrency=1)
future = forwarder.process_message(kafka_message_with_transaction_header_false)
assert future is None

forwarder.shutdown()


@pytest.mark.django_db
@patch("sentry.eventstream.kafka.postprocessworker.dispatch_post_process_group_task")
def test_transactions_post_process_forwarder_true_headers(
dispatch_post_process_group_task, kafka_message_with_transaction_header_true
):
"""
Tests that the transactions post process forwarder calls dispatch_post_process_group_task
when the header "transaction_forwarder" is set to True.
"""
forwarder = TransactionsPostProcessForwarderWorker(concurrency=1)
future = forwarder.process_message(kafka_message_with_transaction_header_true)

assert future is not None
forwarder.flush_batch([future])

dispatch_post_process_group_task.assert_called_with(
event_id="fe0ee9a2bc3b415497bad68aaf70dc7f",
project_id=1,
group_id=43,
primary_hash="311ee66a5b8e697929804ceb1c456ffe",
is_new=False,
is_regression=None,
is_new_group_environment=False,
)

forwarder.shutdown()