diff --git a/pyproject.toml b/pyproject.toml index 3d4ce919fd9fb1..6062fd6bb27795 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -79,7 +79,7 @@ dependencies = [ "rfc3339-validator>=0.1.2", "rfc3986-validator>=0.1.1", # [end] jsonschema format validators - "sentry-arroyo>=2.32.5", + "sentry-arroyo>=2.33.1", "sentry-forked-email-reply-parser>=0.5.12.post1", "sentry-kafka-schemas>=2.1.13", "sentry-ophio>=1.1.3", diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index 1737258d125e51..ae0c25c6be46b2 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -465,12 +465,8 @@ def get_stream_processor( group_instance_id: str | None = None, max_dlq_buffer_length: int | None = None, kafka_slice_id: int | None = None, - shutdown_strategy_before_consumer: bool = False, add_global_tags: bool = False, profile_consumer_join: bool = False, - enable_autocommit: bool = False, - retry_handle_destroyed: bool = False, - handle_poll_while_paused: bool = False, ) -> StreamProcessor: from sentry.utils import kafka_config @@ -528,8 +524,6 @@ def build_consumer_config(group_id: str): group_id=group_id, auto_offset_reset=auto_offset_reset, strict_offset_reset=strict_offset_reset, - enable_auto_commit=enable_autocommit, - retry_handle_destroyed=retry_handle_destroyed, ) if max_poll_interval_ms is not None: @@ -542,9 +536,8 @@ def build_consumer_config(group_id: str): if group_instance_id is not None: consumer_config["group.instance.id"] = group_instance_id - if enable_autocommit: - # Set commit interval to 1 second (1000ms) - consumer_config["auto.commit.interval.ms"] = 1000 + # Set commit interval to 1 second (1000ms) + consumer_config["auto.commit.interval.ms"] = 1000 return consumer_config @@ -637,8 +630,6 @@ def build_consumer_config(group_id: str): commit_policy=ONCE_PER_SECOND, join_timeout=join_timeout, dlq_policy=dlq_policy, - shutdown_strategy_before_consumer=shutdown_strategy_before_consumer, - handle_poll_while_paused=handle_poll_while_paused, ) diff --git a/src/sentry/metrics/middleware.py b/src/sentry/metrics/middleware.py index c3116a6f74c4b8..1af8d174c1068d 100644 --- a/src/sentry/metrics/middleware.py +++ b/src/sentry/metrics/middleware.py @@ -7,7 +7,9 @@ from sentry.metrics.base import MetricsBackend, MutableTags, Tags, TagValue _BAD_TAGS = frozenset(["event", "project", "group"]) -_NOT_BAD_TAGS = frozenset(["use_case_id", "consumer_member_id", "broker_id", "kafka_slice_id"]) +_NOT_BAD_TAGS = frozenset( + ["use_case_id", "consumer_member_id", "broker_id", "kafka_slice_id", "group_id"] +) _METRICS_THAT_CAN_HAVE_BAD_TAGS = frozenset( [ # snuba related tags diff --git a/src/sentry/runner/commands/run.py b/src/sentry/runner/commands/run.py index e45be6af8f8c11..6c9c02ae5496ce 100644 --- a/src/sentry/runner/commands/run.py +++ b/src/sentry/runner/commands/run.py @@ -538,36 +538,12 @@ def taskbroker_send_tasks( default=None, help="Quantized rebalancing means that during deploys, rebalancing is triggered across all pods within a consumer group at the same time. The value is used by the pods to align their group join/leave activity to some multiple of the delay", ) -@click.option( - "--shutdown-strategy-before-consumer", - is_flag=True, - default=False, - help="A potential workaround for Broker Handle Destroyed during shutdown (see arroyo option).", -) @click.option( "--profile-consumer-join", is_flag=True, default=False, help="Adds a ProcessingStrategy to the start of a consumer that records a transaction of the consumer's join() method.", ) -@click.option( - "--enable-autocommit", - is_flag=True, - default=False, - help="Enable Kafka autocommit mode with 1s commit interval. Offsets are stored via store_offsets and rdkafka commits them automatically.", -) -@click.option( - "--retry-handle-destroyed", - is_flag=True, - default=False, - help="Enable retrying on `KafkaError._DESTROY` during commit.", -) -@click.option( - "--handle-poll-while-paused", - is_flag=True, - default=False, - help="Enable polling while the consumer is paused to detect rebalancing. Useful for detecting consumer state changes during backpressure.", -) @configuration def basic_consumer( consumer_name: str, @@ -607,9 +583,6 @@ def basic_consumer( kafka_topic=topic, consumer_group=options["group_id"], kafka_slice_id=kafka_slice_id ) - options["shutdown_strategy_before_consumer"] = True - options["enable_autocommit"] = True - processor = get_stream_processor( consumer_name, consumer_args, diff --git a/uv.lock b/uv.lock index 14fce46805c436..fdda926fc5e520 100644 --- a/uv.lock +++ b/uv.lock @@ -2165,7 +2165,7 @@ requires-dist = [ { name = "requests-oauthlib", specifier = ">=1.2.0" }, { name = "rfc3339-validator", specifier = ">=0.1.2" }, { name = "rfc3986-validator", specifier = ">=0.1.1" }, - { name = "sentry-arroyo", specifier = ">=2.32.5" }, + { name = "sentry-arroyo", specifier = ">=2.33.1" }, { name = "sentry-forked-email-reply-parser", specifier = ">=0.5.12.post1" }, { name = "sentry-kafka-schemas", specifier = ">=2.1.13" }, { name = "sentry-ophio", specifier = ">=1.1.3" }, @@ -2257,13 +2257,13 @@ dev = [ [[package]] name = "sentry-arroyo" -version = "2.32.5" +version = "2.33.1" source = { registry = "https://pypi.devinfra.sentry.io/simple" } dependencies = [ { name = "confluent-kafka", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, ] wheels = [ - { url = "https://pypi.devinfra.sentry.io/wheels/sentry_arroyo-2.32.5-py3-none-any.whl", hash = "sha256:80aadbbc6d5f98036bc74080cf5260bc479a2ba6ff7bfaf663f43d9ed4ade8af" }, + { url = "https://pypi.devinfra.sentry.io/wheels/sentry_arroyo-2.33.1-py3-none-any.whl", hash = "sha256:10d05f81a06bd7f9ee28fe7d7a628c868c3ccbdb5987bece6d9860930e1654af" }, ] [[package]]