Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix potential message loss with small PollInterval and allow batch querying of messages #23

Merged
merged 8 commits into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,7 @@ update_watermill:
go mod edit -fmt

mycli:
@mycli -h 127.0.0.1 -u root -p secret
@mycli -h 127.0.0.1 -u root watermill

pgcli:
@pgcli postgres://watermill:password@localhost:5432/watermill?sslmode=disable
26 changes: 24 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/ThreeDotsLabs/watermill-sql
module github.com/ThreeDotsLabs/watermill-sql/v2

go 1.15
go 1.20

require (
github.com/ThreeDotsLabs/watermill v1.2.0
Expand All @@ -11,3 +11,25 @@ require (
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.1
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.6.4 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.0.2 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
github.com/jackc/pgtype v1.4.2 // indirect
github.com/lithammer/shortuuid/v3 v3.0.7 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 // indirect
golang.org/x/text v0.3.3 // indirect
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
475 changes: 7 additions & 468 deletions go.sum

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pkg/sql/offsets_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package sql

type OffsetsAdapter interface {
// AckMessageQuery the SQL query and arguments that will mark a message as read for a given consumer group.
AckMessageQuery(topic string, offset int, consumerGroup string) (string, []interface{})
AckMessageQuery(topic string, row Row, consumerGroup string) (string, []interface{})

// ConsumedMessageQuery will return the SQL query and arguments which be executed after consuming message,
// but before ack.
//
// ConsumedMessageQuery is optional, and will be not executed if query is empty.
ConsumedMessageQuery(topic string, offset int, consumerGroup string, consumerULID []byte) (string, []interface{})
ConsumedMessageQuery(topic string, row Row, consumerGroup string, consumerULID []byte) (string, []interface{})

// NextOffsetQuery returns the SQL query and arguments which should return offset of next message to consume.
NextOffsetQuery(topic, consumerGroup string) (string, []interface{})
Expand Down
16 changes: 6 additions & 10 deletions pkg/sql/offsets_adapter_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ func (a DefaultMySQLOffsetsAdapter) SchemaInitializingQueries(topic string) []st
)`}
}

func (a DefaultMySQLOffsetsAdapter) AckMessageQuery(topic string, offset int, consumerGroup string) (string, []interface{}) {
ackQuery := `UPDATE ` + a.MessagesOffsetsTable(topic) + ` SET offset_acked = ? WHERE consumer_group = ?`
return ackQuery, []interface{}{offset, consumerGroup}
func (a DefaultMySQLOffsetsAdapter) AckMessageQuery(topic string, row Row, consumerGroup string) (string, []interface{}) {
ackQuery := `INSERT INTO ` + a.MessagesOffsetsTable(topic) + ` (offset_consumed, offset_acked, consumer_group)
VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE offset_consumed=VALUES(offset_consumed), offset_acked=VALUES(offset_acked)`
return ackQuery, []interface{}{row.Offset, row.Offset, consumerGroup}
}

func (a DefaultMySQLOffsetsAdapter) NextOffsetQuery(topic, consumerGroup string) (string, []interface{}) {
Expand All @@ -49,14 +50,9 @@ func (a DefaultMySQLOffsetsAdapter) MessagesOffsetsTable(topic string) string {
return fmt.Sprintf("`watermill_offsets_%s`", topic)
}

func (a DefaultMySQLOffsetsAdapter) ConsumedMessageQuery(
topic string,
offset int,
consumerGroup string,
consumerULID []byte,
) (string, []interface{}) {
func (a DefaultMySQLOffsetsAdapter) ConsumedMessageQuery(topic string, row Row, consumerGroup string, consumerULID []byte) (string, []interface{}) {
// offset_consumed is not queried anywhere, it's used only to detect race conditions with NextOffsetQuery.
ackQuery := `INSERT INTO ` + a.MessagesOffsetsTable(topic) + ` (offset_consumed, consumer_group)
VALUES (?, ?) ON DUPLICATE KEY UPDATE offset_consumed=VALUES(offset_consumed)`
return ackQuery, []interface{}{offset, consumerGroup}
return ackQuery, []interface{}{row.Offset, consumerGroup}
}
37 changes: 18 additions & 19 deletions pkg/sql/offsets_adapter_postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,30 @@ func (a DefaultPostgreSQLOffsetsAdapter) SchemaInitializingQueries(topic string)
CREATE TABLE IF NOT EXISTS ` + a.MessagesOffsetsTable(topic) + ` (
consumer_group VARCHAR(255) NOT NULL,
offset_acked BIGINT,
offset_consumed BIGINT NOT NULL,
last_processed_transaction_id xid8 NOT NULL,
PRIMARY KEY(consumer_group)
)`}
}

func (a DefaultPostgreSQLOffsetsAdapter) NextOffsetQuery(topic, consumerGroup string) (string, []interface{}) {
return `SELECT COALESCE(
(SELECT offset_acked
FROM ` + a.MessagesOffsetsTable(topic) + `
WHERE consumer_group=$1 FOR UPDATE
), 0)`,
return `SELECT
coalesce(MAX(offset_acked),0) AS offset_acked,
coalesce(MAX(last_processed_transaction_id::text), '0')::xid8 AS last_processed_transaction_id
FROM ` + a.MessagesOffsetsTable(topic) + `
WHERE consumer_group=$1`,
[]interface{}{consumerGroup}
}

func (a DefaultPostgreSQLOffsetsAdapter) AckMessageQuery(topic string, offset int, consumerGroup string) (string, []interface{}) {
ackQuery := `UPDATE ` + a.MessagesOffsetsTable(topic) + ` SET offset_acked = $1 WHERE consumer_group = $2`
return ackQuery, []interface{}{offset, consumerGroup}
func (a DefaultPostgreSQLOffsetsAdapter) AckMessageQuery(topic string, row Row, consumerGroup string) (string, []interface{}) {
ackQuery := `INSERT INTO ` + a.MessagesOffsetsTable(topic) + `(offset_acked, last_processed_transaction_id, consumer_group)
VALUES
($1, $2, $3)
ON CONFLICT
(consumer_group)
DO UPDATE SET
offset_acked = excluded.offset_acked,
last_processed_transaction_id = excluded.last_processed_transaction_id`
return ackQuery, []interface{}{row.Offset, row.ExtraData["transaction_id"], consumerGroup}
}

func (a DefaultPostgreSQLOffsetsAdapter) MessagesOffsetsTable(topic string) string {
Expand All @@ -49,14 +56,6 @@ func (a DefaultPostgreSQLOffsetsAdapter) MessagesOffsetsTable(topic string) stri
return fmt.Sprintf(`"watermill_offsets_%s"`, topic)
}

func (a DefaultPostgreSQLOffsetsAdapter) ConsumedMessageQuery(
topic string,
offset int,
consumerGroup string,
consumerULID []byte,
) (string, []interface{}) {
// offset_consumed is not queried anywhere, it's used only to detect race conditions with NextOffsetQuery.
ackQuery := `INSERT INTO ` + a.MessagesOffsetsTable(topic) + ` (offset_consumed, consumer_group)
VALUES ($1, $2) ON CONFLICT("consumer_group") DO UPDATE SET offset_consumed=excluded.offset_consumed`
return ackQuery, []interface{}{offset, consumerGroup}
func (a DefaultPostgreSQLOffsetsAdapter) ConsumedMessageQuery(topic string, row Row, consumerGroup string, consumerULID []byte) (string, []interface{}) {
return "", nil
}
Loading