-
Notifications
You must be signed in to change notification settings - Fork 554
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Scene trending #327
Scene trending #327
Conversation
a2eb100
to
0de61ed
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this is looking great. Don't let my comments come off as negativity, this is feeling very clean and getting solid value for the amount of effort.
export interface MessageQueue { | ||
id: Generated<number> | ||
message: string | ||
read: 0 | 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is small and completely fine w/you ignoring it, but a separate "MessageQueueCursors" table might be better for two reasons -- it allows multiple separate processors (may not need it but who knows) and it would eliminate N numbers for N events, saving ... some ... space on disk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should still allow for multiple separate processors. we lock rows with SELECT FOR UPDATE
that keep separate processors from stepping on each other's toes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking 2 processors that aren't coordinated with each other, eg maintaining their own cursors. But it's not a real concern tbh
packages/pds/tests/seeds/basic.ts
Outdated
@@ -42,6 +84,9 @@ export default async (sc: SeedClient) => { | |||
await sc.vote('down', carol, sc.posts[alice][1].ref) | |||
await sc.vote('up', carol, sc.posts[alice][2].ref) | |||
await sc.vote('up', dan, sc.posts[alice][1].ref) | |||
await sc.vote('up', alice, sc.posts[carol][0].ref) | |||
await sc.vote('up', bob, sc.posts[carol][0].ref) | |||
mq && (await mq.process()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here, we want to wait till the queue is done processing before moving on
this gives us a predictable order for our snapshots
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had a few thoughts, but so many things I'm digging about this. Real impressive
Approving since I think it's essentially all set, but this comment could probably use attention: https://github.com/bluesky-social/atproto/pull/327/files/908c1e829a64f6a9e7372d4aed8da6eab22b1c6a#diff-4c1f0ad0cea9ddfe9a8ab78478a07d53a598be698252db06be359db1c77eda21
const messageQueue = new SqlMessageQueue('pds', db, (did: string) => { | ||
return auth.verifier.loadAuthStore(keypair, [], did) | ||
}) | ||
db.setMessageQueue(messageQueue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know you weren't pumped about this. The two ideas that come to mind are that
- perhaps
indexRecord()
,deleteRecord()
don't need to live ondb
. - perhaps the message queue deserves a separate
db
of its own.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah both of these crossed my mind as well 🤔
we may want something like RecordIndexer
. This kinda has to do with that fact that the db has a lot of functionality jammed into it. probably needs to be rethought & broken up a bit
if (this.db.dialect !== 'sqlite') { | ||
builder = builder.forUpdate() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could add a skipLocked()
here so that if there's contention for the cursor, one consumer gets it and the other just bails rather than wait for it to open up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we actually want to wait for it to open up. we basically ping the consumer on every event push to try & process exactly 1 event. So it should wait until the cursor is freed up & then give it a go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, okay got it. How does keepGoing
play into that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah not sure if you saw, but keepGoing
actually got ripped out.
Yeah beforehand, i was throwing way to many handlers at this & they'd all just hang out at the lock with their hands open until the DB got back to them & said "sorry buddy no cursor for you"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahhh right on!
await this.handleMessage(dbTxn, message) | ||
await dbTxn.db | ||
.updateTable('message_queue_cursor') | ||
.set({ cursor: sql`cursor + 1` }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only question I have about this is if we can assume the incrementing id is guaranteed to not leave any gaps. Otherwise I think the cursor can get stuck.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh that's a good point. i think so? worth a look 🧐
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it doesn't guarantee it :notlikethis:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kk this should fix it
e446481
(also added an actual SQL LIMIT in the next commit)
private async handleMessage(db: Database, message: Message) { | ||
switch (message.type) { | ||
case 'add_member': | ||
return this.handleAddMember(db, message) | ||
case 'remove_member': | ||
return this.handleRemoveMember(db, message) | ||
case 'add_upvote': | ||
return this.handleAddUpvote(db, message) | ||
case 'remove_upvote': | ||
return this.handleRemoveUpvote(db, message) | ||
case 'create_notification': | ||
return this.handleCreateNotification(db, message) | ||
case 'delete_notifications': | ||
return this.handleDeleteNotifications(db, message) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eventually I think it might be nice to colocate these handlers with related code (e.g.handleAddMember()
living closer to member methods) and allow other areas of the app to hook-in to the message queue. Thinking of the message queue as more of a pull-based transport, rather than a vertical slice of the app. Zero problem with it as-is today, though, just thinkin!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup makes sense 👍
await this.handleMessage(dbTxn, message) | ||
await dbTxn.db | ||
.updateTable('message_queue_cursor') | ||
.set({ cursor: sql`cursor + 1` }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think by incrementing by one here we can cause messages to get processed multiple times if there's a gap in the ids. Pretty sure we'll want to catch the cursor up to whichever message was just processed (plus 1).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a true hero you are 🙏
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯
* wip * views * trending schema * starting message queue * scene processor * wip * send mq messages from db * db events * undo screwing up codegen lol * setup queue * db migrations * fixing up message processing * div by 0 check * tx issue * queue use cursor * update not insert * sql bugfix + tests * trying to linearize tests * correclty serializing txs * attempt update before insert * log errors * handle gaps in cursor * cleanup * oops reenable test * correctly incr cursor
This adds a message queue abstraction to the database for handling side effects of created records.
It is currently used for
For scene processing, we maintain two tables: scene_member_count & scene_votes_on_post. And we publish a new trending record when a post has atleast 2 upvotes from a scene & the ratio of upvotes is >=20% of the scene