feat(postgres): Change the Postgres Adapter to be Partition Aware#591
Conversation
This adds a postgres storage adapter for the taskbroker, as well as providing a way to choose between the adapters in the configuration. This adapter will also work with AlloyDB. In postgres, the keyword `offset` is reserved, so that column is called `kafka_offset` in the PG tables and converted to `offset`. The tests were updated to run with both the SQLite and Postgres adapter using the rstest crate. The `create_test_store` function was updated to be the standard for all tests, and to allow choosing between a SQLite and Postgres DB. A `remove_db` function was added to the trait and the existing adapters, since the tests create a unique PG database on every run that should be cleaned up. The `create_test_store` function was updated to be the standard for all tests, and to allow choosing between an SQLite and Postgres DB.
Have the postgres adapter only fetch and do upkeep on activations that are part of the partition that the consumer is assigned. The broker can still update tasks outside its partitions, in case a worker is connected to a broker that is then rebalanced. Change the consumer to pass the partitions to the store whenever partitions are assigned. This was originally tested with PARTITION BY, but that requires manually keeping track of the partition tables which isn't a desired behaviour.
…eorge/push-taskbroker/partition-store-by-partition
896acfe to
895e119
Compare
| } | ||
| query_builder.push(")"); | ||
| } | ||
| } |
There was a problem hiding this comment.
Empty partition list causes queries to match all rows
High Severity
add_partition_condition silently becomes a no-op when partitions is empty, causing all partition-scoped queries (upkeep, counts, fetches) to operate on ALL rows instead of NO rows. Since the upkeep task in upkeep.rs runs independently of the consumer lifecycle and shares the same store, after a partition revocation (which calls assign_partitions(vec![])) the upkeep loop continues running and will modify/delete/count tasks belonging to other brokers' partitions. This directly undermines the PR's goal of partition isolation.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit f97d2ad. Configure here.
There was a problem hiding this comment.
We get the same behavior before partitions are assigned, since the list will be empty in that case too. Is this desired?
There was a problem hiding this comment.
I don't think this will break anything, just slow the queries down briefly. I'm inclined to say it's OK. Another option would be to do checks for the partitions and not run the queries if there are no partitions assigned but I don't know if that would be better.
…eorge/push-taskbroker/partition-store-by-partition
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
There are 2 total unresolved issues (including 1 from previous review).
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 51c1889. Configure here.
| tpl == revoked, | ||
| "Revoked TPL should be equal to the subset of TPL we're consuming from" | ||
| ); | ||
| activation_store.assign_partitions(vec![]).unwrap(); |
There was a problem hiding this comment.
Partitions cleared before actors shutdown causes unscoped queries
Medium Severity
assign_partitions(vec![]) is called before handles.shutdown() in both the Revoke and Shutdown handlers. Since add_partition_condition is a no-op when the partition list is empty, any upkeep queries still running during the up-to-4-second shutdown window will execute without partition filtering, operating on ALL rows in the shared database. This defeats partition isolation and could cause issues like duplicate deadletter messages from handle_failed_tasks if another broker is concurrently assigned those partitions. Swapping the order — shutting down actors first, then clearing partitions — would eliminate this race window.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 51c1889. Configure here.
There was a problem hiding this comment.
Evan thinks this is fine.
| tpl == revoked, | ||
| "Revoked TPL should be equal to the subset of TPL we're consuming from" | ||
| ); | ||
| activation_store.assign_partitions(vec![]).unwrap(); | ||
| handles.shutdown(CALLBACK_DURATION).await; | ||
| metrics::gauge!("arroyo.consumer.current_partitions").set(0); |
There was a problem hiding this comment.
Bug: Clearing the partition filter via assign_partitions(vec![]) before handles.shutdown() creates a race condition where upkeep tasks can operate on all partitions, not just assigned ones.
Severity: HIGH
Suggested Fix
The partition filter should be cleared after the actor handles have been successfully shut down. Move the activation_store.assign_partitions(vec![]).unwrap(); call to after the handles.shutdown(CALLBACK_DURATION).await; call. This ensures that no upkeep operations can run in the intermediate state where the broker has no assigned partitions but its tasks are still active.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: src/kafka/consumer.rs#L396-L401
Potential issue: During partition revocation, the code calls
`activation_store.assign_partitions(vec![])` to clear the partition list before waiting
for actor handles to shut down via `handles.shutdown(CALLBACK_DURATION)`. An independent
upkeep task runs periodically. If this task executes during the shutdown window, it will
operate with an empty partition list. Because the `add_partition_condition()` method
omits the partition filter when the list is empty, upkeep queries like
`handle_claim_expiration` will run against all partitions in the database. This can
cause a broker to incorrectly modify tasks that have been reassigned to another broker,
violating partition isolation.
There was a problem hiding this comment.
Evan thinks this is fine.


Linear
Completes STREAM-868
Description
Currently, taskworkers pull tasks from taskbrokers via RPC. This approach works, but has some drawbacks. Therefore, we want taskbrokers to push tasks to taskworkers instead. Read this page on Notion for more information.
We are also moving from SQLite to Postgres. Now, brokers will share a single store with multiple other brokers. To prevent contention between brokers and unexpected results due to conflicting upkeep tasks, we will make each broker only touch tasks that came from the Kafka partitions it's currently responsible for.