Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read from archive with arbitrary number of workers #3

Closed
jvansanten opened this issue Nov 18, 2018 · 0 comments
Closed

Read from archive with arbitrary number of workers #3

jvansanten opened this issue Nov 18, 2018 · 0 comments
Assignees

Comments

@jvansanten
Copy link
Contributor

Currently ZTF alerts are inserted into the archive database along with the index of the Kafka partition they were read from. When reading back from the archive, each client gets an index, and reads only alerts with that partition id. Since the UW Kafka mirror has 16 partitions per topic, this means that you must use exactly 16 AlertProcessors to consume alerts from the archive. Using more will result in idle workers, and using fewer will result in seeing only e.g. 7/16 of the alerts.

Instead, it should be possible to split alerts from the archive across an arbitrary number of AlertProcessors. The most straightforward way to do this requires implementing a shared queue in PostgreSQL. Here's a sketch of the design:

  1. Similar to Kafka, consumers will identify themselves by a shared group name. Unlike Kafka, though, the group name is associated with a query, so different groups can consume different sets of alerts.
  2. Upon start, each consumer will attempt to insert a new row in the read_queue_groups table, which has a unique constraint on the group name. At most one consumer will succeed; this one is the alert query executor and will proceed to step 3. All others will proceed to step 4. If the queue for this group already existed, all consumers proceed to step 4.
  3. The alert query executor runs the query (e.g. an observation date range and/or cone search), and inserts rows of (group_id,array(alert_id)) into the read_queue table, where the size of the array is a predetermined block size, e.g. 5000. The block size should be large enough that the row is much larger postgres' fixed tuple overhead, but not so large that hours of work would be lost if the consumer were shut down before it processed the block.
  4. Each consumer queries read_queue for a block of alert_ids using FOR UPDATE SKIP LOCKED to atomically acquire a lock an unclaimed item and release it if the consumer dies before it can process the block.
@jvansanten jvansanten self-assigned this Nov 18, 2018
jvansanten added a commit that referenced this issue Nov 19, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant