Skip to content

feat: blq feature flag can be updated without restart#7871

Merged
kylemumma merged 2 commits intomasterfrom
krm/blq-ff
Apr 14, 2026
Merged

feat: blq feature flag can be updated without restart#7871
kylemumma merged 2 commits intomasterfrom
krm/blq-ff

Conversation

@kylemumma
Copy link
Copy Markdown
Member

I moved the feature flag from factory_v2 into the BLQ. This will allow us to update the feature flag without restarting the consumer.

@kylemumma kylemumma requested a review from a team as a code owner April 13, 2026 17:38
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: Producer not polled when flag disabled mid-routing
    • When BLQ is disabled during RoutingStale, poll() now flushes the producer, resets router state to Idle, and prevents stale state from affecting later re-enablement.

Create PR

Or push these changes by commenting:

@cursor push b55d0c9e96
Preview (b55d0c9e96)
diff --git a/rust_snuba/src/strategies/blq_router.rs b/rust_snuba/src/strategies/blq_router.rs
--- a/rust_snuba/src/strategies/blq_router.rs
+++ b/rust_snuba/src/strategies/blq_router.rs
@@ -163,6 +163,22 @@ where
 {
     fn poll(&mut self) -> Result<Option<CommitRequest>, StrategyError> {
         if !self.is_enabled() {
+            if self.state == State::RoutingStale {
+                let flush_results = self.producer.join(Some(Duration::from_secs(5)))?;
+                self.state = State::Idle;
+                if flush_results.is_some() {
+                    return Ok(flush_results);
+                }
+            }
+
+            if let State::Flushing(commits) = &mut self.state {
+                let commits = commits.take();
+                self.state = State::Idle;
+                if commits.is_some() {
+                    return Ok(commits);
+                }
+            }
+
             return self.next_step.poll();
         }
 

@@ -163,6 +163,22 @@ where
 {
     fn poll(&mut self) -> Result<Option<CommitRequest>, StrategyError> {
         if !self.is_enabled() {
+            if self.state == State::RoutingStale {
+                let flush_results = self.producer.join(Some(Duration::from_secs(5)))?;
+                self.state = State::Idle;
+                if flush_results.is_some() {
+                    return Ok(flush_results);
+                }
+            }
+
+            if let State::Flushing(commits) = &mut self.state {
+                let commits = commits.take();
+                self.state = State::Idle;
+                if commits.is_some() {
+                    return Ok(commits);
+                }
+            }
+
             return self.next_step.poll();
         }
 
@@ -452,4 +468,37 @@ mod tests {
         assert_eq!(router.producer.submitted.len(), 0);
         assert_eq!(router.state, State::Idle);
     }
+
+    #[test]
+    fn test_disable_flag_while_routing_stale_flushes_and_resets_state() {
+        init_config();
+        let guard = override_options(&[("snuba", "consumer.blq_enabled", json!(true))]).unwrap();
+        let mut router = BLQRouter::new_with_strategy(
+            MockStrategy::new(),
+            MockStrategy::new(),
+            TimeDelta::seconds(10),
+            None,
+        )
+        .unwrap();
+
+        router
+            .submit(make_message(Utc::now() - TimeDelta::minutes(1)))
+            .unwrap();
+        assert_eq!(router.state, State::RoutingStale);
+        assert!(!router.producer.join_called);
+
+        drop(guard);
+        let _guard = override_options(&[("snuba", "consumer.blq_enabled", json!(false))]).unwrap();
+
+        _ = router.poll().unwrap();
+        assert!(router.producer.join_called);
+        assert_eq!(router.state, State::Idle);
+
+        drop(_guard);
+        let _guard = override_options(&[("snuba", "consumer.blq_enabled", json!(true))]).unwrap();
+
+        router.submit(make_message(Utc::now())).unwrap();
+        assert_eq!(router.state, State::Forwarding);
+        assert_eq!(router.next_step.submitted.len(), 1);
+    }
 }

@@ -452,4 +468,37 @@ mod tests {
         assert_eq!(router.producer.submitted.len(), 0);
         assert_eq!(router.state, State::Idle);
     }
+
+    #[test]
+    fn test_disable_flag_while_routing_stale_flushes_and_resets_state() {
+        init_config();
+        let guard = override_options(&[("snuba", "consumer.blq_enabled", json!(true))]).unwrap();
+        let mut router = BLQRouter::new_with_strategy(
+            MockStrategy::new(),
+            MockStrategy::new(),
+            TimeDelta::seconds(10),
+            None,
+        )
+        .unwrap();
+
+        router
+            .submit(make_message(Utc::now() - TimeDelta::minutes(1)))
+            .unwrap();
+        assert_eq!(router.state, State::RoutingStale);
+        assert!(!router.producer.join_called);
+
+        drop(guard);
+        let _guard = override_options(&[("snuba", "consumer.blq_enabled", json!(false))]).unwrap();
+
+        _ = router.poll().unwrap();
+        assert!(router.producer.join_called);
+        assert_eq!(router.state, State::Idle);
+
+        drop(_guard);
+        let _guard = override_options(&[("snuba", "consumer.blq_enabled", json!(true))]).unwrap();
+
+        router.submit(make_message(Utc::now())).unwrap();
+        assert_eq!(router.state, State::Forwarding);
+        assert_eq!(router.next_step.submitted.len(), 1);
+    }
 }

This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.

Reviewed by Cursor Bugbot for commit 9a48154. Configure here.

Comment thread rust_snuba/src/strategies/blq_router.rs Outdated
Comment thread rust_snuba/src/strategies/blq_router.rs Outdated
Comment thread rust_snuba/src/factory_v2.rs
Copy link
Copy Markdown
Contributor

@onewland onewland left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should either make this CLI-controlled or always create the BLQ-router step (so it's totally flag-controlled)

Comment on lines +430 to +432
if let (Some(blq_producer_config), Some(blq_topic)) =
(&self.blq_producer_config, self.blq_topic)
{
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we actually get rid of this branching logic here? I think you've made a change that lets you turn off BLQ behavior if the config is enabled during startup, but I don't think you can turn on the BLQ if the config is disabled during startup (because the ProcessingStrategy gets created once on startup)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this branching logic is actually unrelated to the feature flag, the flag will have no effect on it.

We pass in the DLQ topic and producer, but if theres no dlq it will be none so we are unable to produce there. Thats all this is. it will always be set as long as theres a dlq.

@kylemumma kylemumma merged commit dd7ca91 into master Apr 14, 2026
46 checks passed
@kylemumma kylemumma deleted the krm/blq-ff branch April 14, 2026 20:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants