Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ dependencies = [
"rfc3339-validator>=0.1.2",
"rfc3986-validator>=0.1.1",
# [end] jsonschema format validators
"sentry-arroyo>=2.32.5",
"sentry-arroyo>=2.33.1",
"sentry-forked-email-reply-parser>=0.5.12.post1",
"sentry-kafka-schemas>=2.1.13",
"sentry-ophio>=1.1.3",
Expand Down
13 changes: 2 additions & 11 deletions src/sentry/consumers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,12 +465,8 @@ def get_stream_processor(
group_instance_id: str | None = None,
max_dlq_buffer_length: int | None = None,
kafka_slice_id: int | None = None,
shutdown_strategy_before_consumer: bool = False,
add_global_tags: bool = False,
profile_consumer_join: bool = False,
enable_autocommit: bool = False,
retry_handle_destroyed: bool = False,
handle_poll_while_paused: bool = False,
) -> StreamProcessor:
from sentry.utils import kafka_config

Expand Down Expand Up @@ -528,8 +524,6 @@ def build_consumer_config(group_id: str):
group_id=group_id,
auto_offset_reset=auto_offset_reset,
strict_offset_reset=strict_offset_reset,
enable_auto_commit=enable_autocommit,
retry_handle_destroyed=retry_handle_destroyed,
)

if max_poll_interval_ms is not None:
Expand All @@ -542,9 +536,8 @@ def build_consumer_config(group_id: str):
if group_instance_id is not None:
consumer_config["group.instance.id"] = group_instance_id

if enable_autocommit:
# Set commit interval to 1 second (1000ms)
consumer_config["auto.commit.interval.ms"] = 1000
# Set commit interval to 1 second (1000ms)
consumer_config["auto.commit.interval.ms"] = 1000

return consumer_config

Expand Down Expand Up @@ -637,8 +630,6 @@ def build_consumer_config(group_id: str):
commit_policy=ONCE_PER_SECOND,
join_timeout=join_timeout,
dlq_policy=dlq_policy,
shutdown_strategy_before_consumer=shutdown_strategy_before_consumer,
handle_poll_while_paused=handle_poll_while_paused,
)


Expand Down
4 changes: 3 additions & 1 deletion src/sentry/metrics/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
from sentry.metrics.base import MetricsBackend, MutableTags, Tags, TagValue

_BAD_TAGS = frozenset(["event", "project", "group"])
_NOT_BAD_TAGS = frozenset(["use_case_id", "consumer_member_id", "broker_id", "kafka_slice_id"])
_NOT_BAD_TAGS = frozenset(
["use_case_id", "consumer_member_id", "broker_id", "kafka_slice_id", "group_id"]
)
_METRICS_THAT_CAN_HAVE_BAD_TAGS = frozenset(
[
# snuba related tags
Expand Down
27 changes: 0 additions & 27 deletions src/sentry/runner/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,36 +538,12 @@ def taskbroker_send_tasks(
default=None,
help="Quantized rebalancing means that during deploys, rebalancing is triggered across all pods within a consumer group at the same time. The value is used by the pods to align their group join/leave activity to some multiple of the delay",
)
@click.option(
"--shutdown-strategy-before-consumer",
is_flag=True,
default=False,
help="A potential workaround for Broker Handle Destroyed during shutdown (see arroyo option).",
)
@click.option(
"--profile-consumer-join",
is_flag=True,
default=False,
help="Adds a ProcessingStrategy to the start of a consumer that records a transaction of the consumer's join() method.",
)
@click.option(
"--enable-autocommit",
is_flag=True,
default=False,
help="Enable Kafka autocommit mode with 1s commit interval. Offsets are stored via store_offsets and rdkafka commits them automatically.",
)
@click.option(
"--retry-handle-destroyed",
is_flag=True,
default=False,
help="Enable retrying on `KafkaError._DESTROY` during commit.",
)
@click.option(
"--handle-poll-while-paused",
is_flag=True,
default=False,
help="Enable polling while the consumer is paused to detect rebalancing. Useful for detecting consumer state changes during backpressure.",
)
@configuration
def basic_consumer(
consumer_name: str,
Expand Down Expand Up @@ -607,9 +583,6 @@ def basic_consumer(
kafka_topic=topic, consumer_group=options["group_id"], kafka_slice_id=kafka_slice_id
)

options["shutdown_strategy_before_consumer"] = True
options["enable_autocommit"] = True

processor = get_stream_processor(
consumer_name,
consumer_args,
Expand Down
6 changes: 3 additions & 3 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading