From 044899d731b6b344d86cd11cc3ace56166a3b61a Mon Sep 17 00:00:00 2001 From: getsentry-bot <10587625+getsentry-bot@users.noreply.github.com> Date: Wed, 12 Nov 2025 15:25:52 +0000 Subject: [PATCH 1/6] ref: bump sentry-arroyo to 2.33.0 Co-Authored-By: untitaker <837573+untitaker@users.noreply.github.com> --- pyproject.toml | 2 +- uv.lock | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 3d4ce919fd9fb1..fad5eb75f91926 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -79,7 +79,7 @@ dependencies = [ "rfc3339-validator>=0.1.2", "rfc3986-validator>=0.1.1", # [end] jsonschema format validators - "sentry-arroyo>=2.32.5", + "sentry-arroyo>=2.33.0", "sentry-forked-email-reply-parser>=0.5.12.post1", "sentry-kafka-schemas>=2.1.13", "sentry-ophio>=1.1.3", diff --git a/uv.lock b/uv.lock index 14fce46805c436..ba0d64e168360b 100644 --- a/uv.lock +++ b/uv.lock @@ -2165,7 +2165,7 @@ requires-dist = [ { name = "requests-oauthlib", specifier = ">=1.2.0" }, { name = "rfc3339-validator", specifier = ">=0.1.2" }, { name = "rfc3986-validator", specifier = ">=0.1.1" }, - { name = "sentry-arroyo", specifier = ">=2.32.5" }, + { name = "sentry-arroyo", specifier = ">=2.33.0" }, { name = "sentry-forked-email-reply-parser", specifier = ">=0.5.12.post1" }, { name = "sentry-kafka-schemas", specifier = ">=2.1.13" }, { name = "sentry-ophio", specifier = ">=1.1.3" }, @@ -2257,13 +2257,13 @@ dev = [ [[package]] name = "sentry-arroyo" -version = "2.32.5" +version = "2.33.0" source = { registry = "https://pypi.devinfra.sentry.io/simple" } dependencies = [ { name = "confluent-kafka", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, ] wheels = [ - { url = "https://pypi.devinfra.sentry.io/wheels/sentry_arroyo-2.32.5-py3-none-any.whl", hash = "sha256:80aadbbc6d5f98036bc74080cf5260bc479a2ba6ff7bfaf663f43d9ed4ade8af" }, + { url = "https://pypi.devinfra.sentry.io/wheels/sentry_arroyo-2.33.0-py3-none-any.whl", hash = "sha256:b9f4efcbf2da61c2dce2969456cde6e0b5905a80005cea50aacbecae93dfcf79" }, ] [[package]] From 797ec1685838c7d94af0ce1abf7824fabbe4fc03 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 12 Nov 2025 16:39:23 +0100 Subject: [PATCH 2/6] remove outdated options --- src/sentry/consumers/__init__.py | 11 ++--------- src/sentry/runner/commands/run.py | 27 --------------------------- 2 files changed, 2 insertions(+), 36 deletions(-) diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index 1737258d125e51..50fdd06efd5686 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -468,9 +468,6 @@ def get_stream_processor( shutdown_strategy_before_consumer: bool = False, add_global_tags: bool = False, profile_consumer_join: bool = False, - enable_autocommit: bool = False, - retry_handle_destroyed: bool = False, - handle_poll_while_paused: bool = False, ) -> StreamProcessor: from sentry.utils import kafka_config @@ -528,8 +525,6 @@ def build_consumer_config(group_id: str): group_id=group_id, auto_offset_reset=auto_offset_reset, strict_offset_reset=strict_offset_reset, - enable_auto_commit=enable_autocommit, - retry_handle_destroyed=retry_handle_destroyed, ) if max_poll_interval_ms is not None: @@ -542,9 +537,8 @@ def build_consumer_config(group_id: str): if group_instance_id is not None: consumer_config["group.instance.id"] = group_instance_id - if enable_autocommit: - # Set commit interval to 1 second (1000ms) - consumer_config["auto.commit.interval.ms"] = 1000 + # Set commit interval to 1 second (1000ms) + consumer_config["auto.commit.interval.ms"] = 1000 return consumer_config @@ -638,7 +632,6 @@ def build_consumer_config(group_id: str): join_timeout=join_timeout, dlq_policy=dlq_policy, shutdown_strategy_before_consumer=shutdown_strategy_before_consumer, - handle_poll_while_paused=handle_poll_while_paused, ) diff --git a/src/sentry/runner/commands/run.py b/src/sentry/runner/commands/run.py index e45be6af8f8c11..6c9c02ae5496ce 100644 --- a/src/sentry/runner/commands/run.py +++ b/src/sentry/runner/commands/run.py @@ -538,36 +538,12 @@ def taskbroker_send_tasks( default=None, help="Quantized rebalancing means that during deploys, rebalancing is triggered across all pods within a consumer group at the same time. The value is used by the pods to align their group join/leave activity to some multiple of the delay", ) -@click.option( - "--shutdown-strategy-before-consumer", - is_flag=True, - default=False, - help="A potential workaround for Broker Handle Destroyed during shutdown (see arroyo option).", -) @click.option( "--profile-consumer-join", is_flag=True, default=False, help="Adds a ProcessingStrategy to the start of a consumer that records a transaction of the consumer's join() method.", ) -@click.option( - "--enable-autocommit", - is_flag=True, - default=False, - help="Enable Kafka autocommit mode with 1s commit interval. Offsets are stored via store_offsets and rdkafka commits them automatically.", -) -@click.option( - "--retry-handle-destroyed", - is_flag=True, - default=False, - help="Enable retrying on `KafkaError._DESTROY` during commit.", -) -@click.option( - "--handle-poll-while-paused", - is_flag=True, - default=False, - help="Enable polling while the consumer is paused to detect rebalancing. Useful for detecting consumer state changes during backpressure.", -) @configuration def basic_consumer( consumer_name: str, @@ -607,9 +583,6 @@ def basic_consumer( kafka_topic=topic, consumer_group=options["group_id"], kafka_slice_id=kafka_slice_id ) - options["shutdown_strategy_before_consumer"] = True - options["enable_autocommit"] = True - processor = get_stream_processor( consumer_name, consumer_args, From 9b0445bc84b80ed2ba7a22ac4cc3f8b5626fc1b7 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 12 Nov 2025 16:55:04 +0100 Subject: [PATCH 3/6] fix mypy --- src/sentry/consumers/__init__.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index 50fdd06efd5686..ae0c25c6be46b2 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -465,7 +465,6 @@ def get_stream_processor( group_instance_id: str | None = None, max_dlq_buffer_length: int | None = None, kafka_slice_id: int | None = None, - shutdown_strategy_before_consumer: bool = False, add_global_tags: bool = False, profile_consumer_join: bool = False, ) -> StreamProcessor: @@ -631,7 +630,6 @@ def build_consumer_config(group_id: str): commit_policy=ONCE_PER_SECOND, join_timeout=join_timeout, dlq_policy=dlq_policy, - shutdown_strategy_before_consumer=shutdown_strategy_before_consumer, ) From 8c9ba067a9b28052a88b3791b0691555c4dea047 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 12 Nov 2025 17:47:01 +0100 Subject: [PATCH 4/6] bump arroyo again for inc resolution --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index fad5eb75f91926..6062fd6bb27795 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -79,7 +79,7 @@ dependencies = [ "rfc3339-validator>=0.1.2", "rfc3986-validator>=0.1.1", # [end] jsonschema format validators - "sentry-arroyo>=2.33.0", + "sentry-arroyo>=2.33.1", "sentry-forked-email-reply-parser>=0.5.12.post1", "sentry-kafka-schemas>=2.1.13", "sentry-ophio>=1.1.3", From 3342fce969cace2c67a4c49378c18f9b51939c2e Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 12 Nov 2025 17:48:41 +0100 Subject: [PATCH 5/6] bs --- src/sentry/metrics/middleware.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/sentry/metrics/middleware.py b/src/sentry/metrics/middleware.py index c3116a6f74c4b8..1af8d174c1068d 100644 --- a/src/sentry/metrics/middleware.py +++ b/src/sentry/metrics/middleware.py @@ -7,7 +7,9 @@ from sentry.metrics.base import MetricsBackend, MutableTags, Tags, TagValue _BAD_TAGS = frozenset(["event", "project", "group"]) -_NOT_BAD_TAGS = frozenset(["use_case_id", "consumer_member_id", "broker_id", "kafka_slice_id"]) +_NOT_BAD_TAGS = frozenset( + ["use_case_id", "consumer_member_id", "broker_id", "kafka_slice_id", "group_id"] +) _METRICS_THAT_CAN_HAVE_BAD_TAGS = frozenset( [ # snuba related tags From 07225211c2f9e2a2536d4d8e160099fb41fc9042 Mon Sep 17 00:00:00 2001 From: "getsantry[bot]" <66042841+getsantry[bot]@users.noreply.github.com> Date: Wed, 12 Nov 2025 17:09:13 +0000 Subject: [PATCH 6/6] :snowflake: re-freeze requirements --- uv.lock | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/uv.lock b/uv.lock index ba0d64e168360b..fdda926fc5e520 100644 --- a/uv.lock +++ b/uv.lock @@ -2165,7 +2165,7 @@ requires-dist = [ { name = "requests-oauthlib", specifier = ">=1.2.0" }, { name = "rfc3339-validator", specifier = ">=0.1.2" }, { name = "rfc3986-validator", specifier = ">=0.1.1" }, - { name = "sentry-arroyo", specifier = ">=2.33.0" }, + { name = "sentry-arroyo", specifier = ">=2.33.1" }, { name = "sentry-forked-email-reply-parser", specifier = ">=0.5.12.post1" }, { name = "sentry-kafka-schemas", specifier = ">=2.1.13" }, { name = "sentry-ophio", specifier = ">=1.1.3" }, @@ -2257,13 +2257,13 @@ dev = [ [[package]] name = "sentry-arroyo" -version = "2.33.0" +version = "2.33.1" source = { registry = "https://pypi.devinfra.sentry.io/simple" } dependencies = [ { name = "confluent-kafka", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, ] wheels = [ - { url = "https://pypi.devinfra.sentry.io/wheels/sentry_arroyo-2.33.0-py3-none-any.whl", hash = "sha256:b9f4efcbf2da61c2dce2969456cde6e0b5905a80005cea50aacbecae93dfcf79" }, + { url = "https://pypi.devinfra.sentry.io/wheels/sentry_arroyo-2.33.1-py3-none-any.whl", hash = "sha256:10d05f81a06bd7f9ee28fe7d7a628c868c3ccbdb5987bece6d9860930e1654af" }, ] [[package]]