Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pg_migrations/0001_create_inflight_activations.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ CREATE TABLE IF NOT EXISTS inflight_taskactivations (
taskname TEXT NOT NULL,
on_attempts_exceeded INTEGER NOT NULL DEFAULT 1
);

CREATE INDEX idx_activation_partition ON inflight_taskactivations (partition);
4 changes: 4 additions & 0 deletions src/fetch/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ impl MockStore {

#[async_trait]
impl InflightActivationStore for MockStore {
fn assign_partitions(&self, _partitions: Vec<i32>) -> Result<(), Error> {
Ok(())
}

async fn vacuum_db(&self) -> Result<(), Error> {
unimplemented!()
}
Expand Down
12 changes: 12 additions & 0 deletions src/kafka/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::store::inflight_activation::InflightActivationStore;
use anyhow::{Error, anyhow};
use futures::{
Stream, StreamExt,
Expand Down Expand Up @@ -44,6 +45,7 @@ use tracing::{debug, error, info, instrument, warn};
pub async fn start_consumer(
topics: &[&str],
kafka_client_config: &ClientConfig,
activation_store: Arc<dyn InflightActivationStore>,
spawn_actors: impl FnMut(
Arc<StreamConsumer<KafkaContext>>,
&BTreeSet<(String, i32)>,
Expand All @@ -68,6 +70,7 @@ pub async fn start_consumer(
handle_events(
consumer,
event_receiver,
activation_store,
client_shutdown_sender,
spawn_actors,
)
Expand Down Expand Up @@ -340,6 +343,7 @@ enum ConsumerState {
pub async fn handle_events(
consumer: Arc<StreamConsumer<KafkaContext>>,
events: UnboundedReceiver<(Event, SyncSender<()>)>,
activation_store: Arc<dyn InflightActivationStore>,
shutdown_client: oneshot::Sender<()>,
mut spawn_actors: impl FnMut(
Arc<StreamConsumer<KafkaContext>>,
Expand Down Expand Up @@ -372,6 +376,12 @@ pub async fn handle_events(
state = match (state, event) {
(ConsumerState::Ready, Event::Assign(tpl)) => {
metrics::gauge!("arroyo.consumer.current_partitions").set(tpl.len() as f64);
// Note: This assumes we only process one topic per consumer.
let mut partitions = Vec::<i32>::new();
for (_, partition) in tpl.iter() {
partitions.push(*partition);
}
activation_store.assign_partitions(partitions).unwrap();
Comment thread
cursor[bot] marked this conversation as resolved.
ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl)
}
(ConsumerState::Ready, Event::Revoke(_)) => {
Expand All @@ -386,11 +396,13 @@ pub async fn handle_events(
tpl == revoked,
"Revoked TPL should be equal to the subset of TPL we're consuming from"
);
activation_store.assign_partitions(vec![]).unwrap();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partitions cleared before actors shutdown causes unscoped queries

Medium Severity

assign_partitions(vec![]) is called before handles.shutdown() in both the Revoke and Shutdown handlers. Since add_partition_condition is a no-op when the partition list is empty, any upkeep queries still running during the up-to-4-second shutdown window will execute without partition filtering, operating on ALL rows in the shared database. This defeats partition isolation and could cause issues like duplicate deadletter messages from handle_failed_tasks if another broker is concurrently assigned those partitions. Swapping the order — shutting down actors first, then clearing partitions — would eliminate this race window.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 51c1889. Configure here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Evan thinks this is fine.

handles.shutdown(CALLBACK_DURATION).await;
metrics::gauge!("arroyo.consumer.current_partitions").set(0);
Comment on lines 396 to 401
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Clearing the partition filter via assign_partitions(vec![]) before handles.shutdown() creates a race condition where upkeep tasks can operate on all partitions, not just assigned ones.
Severity: HIGH

Suggested Fix

The partition filter should be cleared after the actor handles have been successfully shut down. Move the activation_store.assign_partitions(vec![]).unwrap(); call to after the handles.shutdown(CALLBACK_DURATION).await; call. This ensures that no upkeep operations can run in the intermediate state where the broker has no assigned partitions but its tasks are still active.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: src/kafka/consumer.rs#L396-L401

Potential issue: During partition revocation, the code calls
`activation_store.assign_partitions(vec![])` to clear the partition list before waiting
for actor handles to shut down via `handles.shutdown(CALLBACK_DURATION)`. An independent
upkeep task runs periodically. If this task executes during the shutdown window, it will
operate with an empty partition list. Because the `add_partition_condition()` method
omits the partition filter when the list is empty, upkeep queries like
`handle_claim_expiration` will run against all partitions in the database. This can
cause a broker to incorrectly modify tasks that have been reassigned to another broker,
violating partition isolation.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Evan thinks this is fine.

ConsumerState::Ready
}
(ConsumerState::Consuming(handles, _), Event::Shutdown) => {
activation_store.assign_partitions(vec![]).unwrap();
handles.shutdown(CALLBACK_DURATION).await;
debug!("Signaling shutdown to client...");
shutdown_client.take();
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ async fn main() -> Result<(), Error> {
start_consumer(
&[&consumer_config.kafka_topic],
&consumer_config.kafka_consumer_config(),
consumer_store.clone(),
processing_strategy!({
err:
OsStreamWriter::new(
Expand Down
57 changes: 35 additions & 22 deletions src/store/inflight_activation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,21 +375,12 @@ pub struct DepthCounts {

#[async_trait]
pub trait InflightActivationStore: Send + Sync {
/// Trigger incremental vacuum to reclaim free pages in the database
async fn vacuum_db(&self) -> Result<(), Error>;

/// Perform a full vacuum on the database
async fn full_vacuum_db(&self) -> Result<(), Error>;

/// Get the size of the database in bytes
async fn db_size(&self) -> Result<u64, Error>;

/// Get an activation by id
async fn get_by_id(&self, id: &str) -> Result<Option<InflightActivation>, Error>;

/// CONSUMER OPERATIONS
/// Store a batch of activations
async fn store(&self, batch: Vec<InflightActivation>) -> Result<QueryResult, Error>;

fn assign_partitions(&self, partitions: Vec<i32>) -> Result<(), Error>;

/// Get `limit` pending activations, optionally filtered by namespaces and bucket subrange.
/// If `mark_processing` is true, sets status to `Processing` and `processing_deadline`; otherwise `Claimed` and `claim_expires_at`.
/// If no limit is provided, all matching activations will be returned.
Expand Down Expand Up @@ -458,6 +449,14 @@ pub trait InflightActivationStore: Send + Sync {
/// Record successful push.
async fn mark_activation_processing(&self, id: &str) -> Result<(), Error>;

/// Update the status of a specific activation
async fn set_status(
&self,
id: &str,
status: InflightActivationStatus,
) -> Result<Option<InflightActivation>, Error>;

/// COUNT OPERATIONS
/// Get the age of the oldest pending activation in seconds
async fn pending_activation_max_lag(&self, now: &DateTime<Utc>) -> f64;

Expand All @@ -473,6 +472,10 @@ pub trait InflightActivationStore: Send + Sync {
/// Count all activations
async fn count(&self) -> Result<usize, Error>;

/// ACTIVATION OPERATIONS
/// Get an activation by id
async fn get_by_id(&self, id: &str) -> Result<Option<InflightActivation>, Error>;

/// Queue depths for pending, delay, and processing (writer backpressure and upkeep gauges).
/// Default implementation uses separate calls, but stores may override with a single query.
async fn count_depths(&self) -> Result<DepthCounts, Error> {
Expand All @@ -491,13 +494,6 @@ pub trait InflightActivationStore: Send + Sync {
})
}

/// Update the status of a specific activation
async fn set_status(
&self,
id: &str,
status: InflightActivationStatus,
) -> Result<Option<InflightActivation>, Error>;

/// Set the processing deadline for a specific activation
async fn set_processing_deadline(
&self,
Expand All @@ -508,12 +504,20 @@ pub trait InflightActivationStore: Send + Sync {
/// Delete an activation by id
async fn delete_activation(&self, id: &str) -> Result<(), Error>;

/// DATABASE OPERATIONS
/// Trigger incremental vacuum to reclaim free pages in the database
async fn vacuum_db(&self) -> Result<(), Error>;

/// Perform a full vacuum on the database
async fn full_vacuum_db(&self) -> Result<(), Error>;

/// Get the size of the database in bytes
async fn db_size(&self) -> Result<u64, Error>;

/// UPKEEP OPERATIONS
/// Get all activations with status Retry
async fn get_retry_activations(&self) -> Result<Vec<InflightActivation>, Error>;

/// Clear all activations from the store
async fn clear(&self) -> Result<(), Error>;

/// Revert expired push claims back to pending status.
async fn handle_claim_expiration(&self) -> Result<u64, Error>;

Expand Down Expand Up @@ -541,6 +545,10 @@ pub trait InflightActivationStore: Send + Sync {
/// Remove killswitched tasks
async fn remove_killswitched(&self, killswitched_tasks: Vec<String>) -> Result<u64, Error>;

/// TEST OPERATIONS
/// Clear all activations from the store
async fn clear(&self) -> Result<(), Error>;

/// Remove the database, used only in tests
async fn remove_db(&self) -> Result<(), Error> {
Ok(())
Expand Down Expand Up @@ -810,6 +818,11 @@ impl InflightActivationStore for SqliteActivationStore {
Ok(Some(row.into()))
}

fn assign_partitions(&self, partitions: Vec<i32>) -> Result<(), Error> {
warn!("assign_partitions: {:?}", partitions);
Ok(())
}

#[instrument(skip_all)]
async fn store(&self, batch: Vec<InflightActivation>) -> Result<QueryResult, Error> {
if batch.is_empty() {
Expand Down
99 changes: 99 additions & 0 deletions src/store/inflight_activation_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,105 @@ async fn test_set_activation_status(#[case] adapter: &str) {
store.remove_db().await.unwrap();
}

#[tokio::test]
#[rstest]
#[case::postgres("postgres")]
async fn test_set_activation_status_with_partitions(#[case] adapter: &str) {
let store = create_test_store(adapter).await;

let mut batch = make_activations(2);
batch[1].partition = 1;
assert!(store.store(batch).await.is_ok());
assert_counts(
StatusCount {
pending: 1,
..StatusCount::default()
},
store.as_ref(),
)
.await;

assert!(
store
.set_status("id_0", InflightActivationStatus::Failure)
.await
.is_ok()
);
assert_counts(
StatusCount {
failure: 1,
..StatusCount::default()
},
store.as_ref(),
)
.await;

assert!(
store
.set_status("id_0", InflightActivationStatus::Pending)
.await
.is_ok()
);
assert_counts(
StatusCount {
pending: 1,
..StatusCount::default()
},
store.as_ref(),
)
.await;
assert!(
store
.set_status("id_0", InflightActivationStatus::Failure)
.await
.is_ok()
);
assert!(
store
.set_status("id_1", InflightActivationStatus::Failure)
.await
.is_ok()
);
// The broker can update the status of an activation in a different partition, but
// it still should not be counted in its upkeep.
assert_counts(
StatusCount {
pending: 0,
failure: 1,
..StatusCount::default()
},
store.as_ref(),
)
.await;
assert!(
store
.claim_activations(None, None, Some(1), None, true)
.await
.unwrap()
.is_empty()
);

let result = store
.set_status("not_there", InflightActivationStatus::Complete)
.await;
assert!(result.is_ok(), "no query error");

let activation = result.unwrap();
assert!(activation.is_none(), "no activation found");

let result = store
.set_status("id_0", InflightActivationStatus::Complete)
.await;
assert!(result.is_ok(), "no query error");

let result_opt = result.unwrap();
assert!(result_opt.is_some(), "activation should be returned");
let inflight = result_opt.unwrap();
assert_eq!(inflight.id, "id_0");
assert_eq!(inflight.status, InflightActivationStatus::Complete);
store.remove_db().await.unwrap();
}

#[tokio::test]
#[rstest]
#[case::sqlite("sqlite")]
Expand Down
Loading
Loading