Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PoC] Postgres implementation on transaction ID #22

Closed
wants to merge 7 commits into from

Conversation

roblaszczak
Copy link
Member

No description provided.

@@ -73,15 +72,15 @@ func (s DefaultMySQLSchema) SelectQuery(topic string, consumerGroup string, offs
selectQuery := `
SELECT offset, uuid, payload, metadata FROM ` + s.MessagesTable(topic) + `
WHERE
offset > (` + nextOffsetQuery + `)
offset > (` + nextOffsetQuery + `) AND transaction_id < pg_snapshot_xmin(pg_current_snapshot())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, looks unsuitable for MySQL (there is no snapshots)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stupid copy-paste 😅 Removed in this PR: #23

(transaction_id > (SELECT last_processed_transaction_id FROM last_processed))
)
AND
transaction_id < pg_snapshot_xmin(pg_current_snapshot())
ORDER BY
"offset" ASC
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose there should be

Suggested change
"offset" ASC
transaction_id ASC,
"offset" ASC

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this type of ordering, should be covered by index like (transaction_id ASC, "offset" ASC). It's not mentioned in original article, but I faced performance issues when load testing my own implementation of adapter :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch and point about the index!

I applied them in #23

return msg.UUID, false, errors.Wrap(err, "cannot send consumed query")
msgCtx := setTxToContext(ctx, tx)

acked := s.sendMessage(msgCtx, msg, out, logger)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think, if we send messages concurrently here? As I see the main problem here not in querying batch of messages, but in serial processing of the batch

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm not missing anything, it would not be compatible with the current backing strategy 🤔 in other words - the current implementation would not work properly with handling out-of-order acks.

Like:

  • Msg 1 is acked,
  • Msg 2 is acked,
  • Msg 3 is not acked
  • Msg 4 is acked,

In theory, we could just ack the last message before nack 🤔 but it's a pretty specific scenario. So I think that it may be out of this PR scope. Batch querying already gives a lot of boost compared to querying message by message.

If you'll have some spare time and need such acking model, feel free to post PR with that! (As long as I understood you question properly)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants