Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PoC] Postgres implementation on transaction ID #22

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ update_watermill:
mycli:
@mycli -h 127.0.0.1 -u root -p secret

pgcli:
@pgcli postgres://watermill:password@localhost:5432/watermill?sslmode=disable

3 changes: 2 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ services:
MYSQL_ALLOW_EMPTY_PASSWORD: "yes"

postgres:
image: postgres:11
image: postgres:14.4
restart: unless-stopped
command: postgres -c 'max_connections=500'
ports:
- 5432:5432
environment:
Expand Down
22 changes: 21 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/ThreeDotsLabs/watermill-sql

go 1.15
go 1.19

require (
github.com/ThreeDotsLabs/watermill v1.0.2
Expand All @@ -10,5 +10,25 @@ require (
github.com/oklog/ulid v1.3.1
github.com/pkg/errors v0.8.1
github.com/stretchr/testify v1.5.1
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/uuid v1.1.1 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.0.0 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.6.4 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.0.2 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
github.com/jackc/pgtype v1.4.2 // indirect
github.com/lithammer/shortuuid/v3 v3.0.4 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 // indirect
golang.org/x/text v0.3.3 // indirect
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 // indirect
google.golang.org/appengine v1.6.0 // indirect
gopkg.in/yaml.v2 v2.2.2 // indirect
)
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/U
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/jackc/chunkreader v1.0.0 h1:4s39bBR8ByfqH+DKm8rQA3E1LHZWB9XWcrz8fqaZbe0=
github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo=
github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk=
github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8=
Expand All @@ -52,7 +51,6 @@ github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2 h1:JVX6jT/XfzNqIjye47
github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2/go.mod h1:fGZlG77KXmcq05nJLRkk0+p82V8B8Dw8KN2/V9c/OAE=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgproto3 v1.1.0 h1:FYYE4yRw+AgI8wXIinMlNjBbp/UitDJwfj5LqqewP1A=
github.com/jackc/pgproto3 v1.1.0/go.mod h1:eR5FA3leWg7p9aeAqi37XOTgTIbkABlvcPB3E5rlc78=
github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190420180111-c116219b62db/go.mod h1:bhq50y+xrl9n5mRYyCBFKkpRVTLYJVWeCc+mEAI3yXA=
github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711/go.mod h1:uH0AWtUmuShn0bcesswc4aBTWGvw0cAxIJp+6OB//Wg=
Expand Down Expand Up @@ -99,7 +97,6 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.3.0 h1:/qkRGz8zljWiDcFvgpwUpwIAPu3r07TDvs3Rws+o/pU=
github.com/lib/pq v1.3.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
Expand Down Expand Up @@ -146,7 +143,6 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/offsets_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package sql

type OffsetsAdapter interface {
// AckMessageQuery the SQL query and arguments that will mark a message as read for a given consumer group.
AckMessageQuery(topic string, offset int, consumerGroup string) (string, []interface{})
AckMessageQuery(topic string, offset int64, transactionID int64, consumerGroup string) (string, []interface{})

// ConsumedMessageQuery will return the SQL query and arguments which be executed after consuming message,
// but before ack.
Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/offsets_adapter_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ func (a DefaultMySQLOffsetsAdapter) SchemaInitializingQueries(topic string) []st
)`}
}

func (a DefaultMySQLOffsetsAdapter) AckMessageQuery(topic string, offset int, consumerGroup string) (string, []interface{}) {
func (a DefaultMySQLOffsetsAdapter) AckMessageQuery(
topic string,
offset int64,
transactionID int64,
consumerGroup string,
) (string, []interface{}) {
ackQuery := `UPDATE ` + a.MessagesOffsetsTable(topic) + ` SET offset_acked = ? WHERE consumer_group = ?`
return ackQuery, []interface{}{offset, consumerGroup}
}
Expand Down
30 changes: 21 additions & 9 deletions pkg/sql/offsets_adapter_postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,35 @@ func (a DefaultPostgreSQLOffsetsAdapter) SchemaInitializingQueries(topic string)
CREATE TABLE IF NOT EXISTS ` + a.MessagesOffsetsTable(topic) + ` (
consumer_group VARCHAR(255) NOT NULL,
offset_acked BIGINT,
offset_consumed BIGINT NOT NULL,
last_processed_transaction_id xid8 NOT NULL,
PRIMARY KEY(consumer_group)
)`}
}

func (a DefaultPostgreSQLOffsetsAdapter) NextOffsetQuery(topic, consumerGroup string) (string, []interface{}) {
return `SELECT COALESCE(
(SELECT offset_acked
FROM ` + a.MessagesOffsetsTable(topic) + `
WHERE consumer_group=$1 FOR UPDATE
), 0)`,
return `SELECT
coalesce(MAX(offset_acked),0) AS offset_acked,
coalesce(MAX(last_processed_transaction_id::text), '0')::xid8 AS last_processed_transaction_id
FROM ` + a.MessagesOffsetsTable(topic) + `
WHERE consumer_group=$1`,
[]interface{}{consumerGroup}
}

func (a DefaultPostgreSQLOffsetsAdapter) AckMessageQuery(topic string, offset int, consumerGroup string) (string, []interface{}) {
ackQuery := `UPDATE ` + a.MessagesOffsetsTable(topic) + ` SET offset_acked = $1 WHERE consumer_group = $2`
return ackQuery, []interface{}{offset, consumerGroup}
func (a DefaultPostgreSQLOffsetsAdapter) AckMessageQuery(
topic string,
offset int64,
transactionID int64,
consumerGroup string,
) (string, []interface{}) {
ackQuery := `INSERT INTO ` + a.MessagesOffsetsTable(topic) + `(offset_acked, last_processed_transaction_id, consumer_group)
VALUES
($1, $2, $3)
ON CONFLICT
(consumer_group)
DO UPDATE SET
offset_acked = excluded.offset_acked,
last_processed_transaction_id = excluded.last_processed_transaction_id`
return ackQuery, []interface{}{offset, transactionID, consumerGroup}
}

func (a DefaultPostgreSQLOffsetsAdapter) MessagesOffsetsTable(topic string) string {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

var (
logger = watermill.NewStdLogger(true, false)
logger = watermill.NewStdLogger(true, true)
)

func newPubSub(t *testing.T, db *stdSQL.DB, consumerGroup string, schemaAdapter sql.SchemaAdapter, offsetsAdapter sql.OffsetsAdapter) (message.Publisher, message.Subscriber) {
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestMySQLPublishSubscribe(t *testing.T) {
func TestPostgreSQLPublishSubscribe(t *testing.T) {
features := tests.Features{
ConsumerGroups: true,
ExactlyOnceDelivery: true,
ExactlyOnceDelivery: false,
GuaranteedOrder: true,
Persistent: true,
}
Expand Down
24 changes: 12 additions & 12 deletions pkg/sql/schema_adapter.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package sql

import (
"database/sql"
"encoding/json"

"github.com/ThreeDotsLabs/watermill/message"
Expand All @@ -14,13 +13,13 @@ type SchemaAdapter interface {
// InsertQuery returns the SQL query and arguments that will insert the Watermill message into the SQL storage.
InsertQuery(topic string, msgs message.Messages) (string, []interface{}, error)

// SelectQuery returns the the SQL query and arguments
// SelectQuery returns the SQL query and arguments
// that returns the next unread message for a given consumer group.
SelectQuery(topic string, consumerGroup string, offsetsAdapter OffsetsAdapter) (string, []interface{})

// UnmarshalMessage transforms the Row obtained SelectQuery a Watermill message.
// It also returns the offset of the last read message, for the purpose of acking.
UnmarshalMessage(row *sql.Row) (offset int, msg *message.Message, err error)
UnmarshalMessage(row Scanner) (offset int64, transactionID int64, msg *message.Message, err error)

// SchemaInitializingQueries returns SQL queries which will make sure (CREATE IF NOT EXISTS)
// that the appropriate tables exist to write messages to the given topic.
Expand All @@ -31,10 +30,11 @@ type SchemaAdapter interface {
type DefaultSchema = DefaultMySQLSchema

type defaultSchemaRow struct {
Offset int64
UUID []byte
Payload []byte
Metadata []byte
Offset int64
TransactionID int64
UUID []byte
Payload []byte
Metadata []byte
}

func defaultInsertArgs(msgs message.Messages) ([]interface{}, error) {
Expand All @@ -51,21 +51,21 @@ func defaultInsertArgs(msgs message.Messages) ([]interface{}, error) {
return args, nil
}

func unmarshalDefaultMessage(row *sql.Row) (offset int, msg *message.Message, err error) {
func unmarshalDefaultMessage(row Scanner) (offset int64, transactionID int64, msg *message.Message, err error) {
r := defaultSchemaRow{}
err = row.Scan(&r.Offset, &r.UUID, &r.Payload, &r.Metadata)
err = row.Scan(&r.Offset, &r.TransactionID, &r.UUID, &r.Payload, &r.Metadata)
if err != nil {
return 0, nil, errors.Wrap(err, "could not scan message row")
return 0, 0, nil, errors.Wrap(err, "could not scan message row")
}

msg = message.NewMessage(string(r.UUID), r.Payload)

if r.Metadata != nil {
err = json.Unmarshal(r.Metadata, &msg.Metadata)
if err != nil {
return 0, nil, errors.Wrap(err, "could not unmarshal metadata as JSON")
return 0, 0, nil, errors.Wrap(err, "could not unmarshal metadata as JSON")
}
}

return int(r.Offset), msg, nil
return r.Offset, r.TransactionID, msg, nil
}
7 changes: 3 additions & 4 deletions pkg/sql/schema_adapter_mysql.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package sql

import (
"database/sql"
"fmt"
"strings"

Expand Down Expand Up @@ -73,15 +72,15 @@ func (s DefaultMySQLSchema) SelectQuery(topic string, consumerGroup string, offs
selectQuery := `
SELECT offset, uuid, payload, metadata FROM ` + s.MessagesTable(topic) + `
WHERE
offset > (` + nextOffsetQuery + `)
offset > (` + nextOffsetQuery + `) AND transaction_id < pg_snapshot_xmin(pg_current_snapshot())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, looks unsuitable for MySQL (there is no snapshots)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stupid copy-paste 😅 Removed in this PR: #23

ORDER BY
offset ASC
transaction_id ASC, offset ASC
LIMIT 1`

return selectQuery, nextOffsetArgs
}

func (s DefaultMySQLSchema) UnmarshalMessage(row *sql.Row) (offset int, msg *message.Message, err error) {
func (s DefaultMySQLSchema) UnmarshalMessage(row Scanner) (offset int64, transactionID int64, msg *message.Message, err error) {
return unmarshalDefaultMessage(row)
}

Expand Down
49 changes: 32 additions & 17 deletions pkg/sql/schema_adapter_postgresql.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package sql

import (
"database/sql"
"fmt"
"strings"

Expand All @@ -15,22 +14,23 @@ type DefaultPostgreSQLSchema struct {
}

func (s DefaultPostgreSQLSchema) SchemaInitializingQueries(topic string) []string {
createMessagesTable := strings.Join([]string{
`CREATE TABLE IF NOT EXISTS ` + s.MessagesTable(topic) + ` (`,
`"offset" SERIAL,`,
`"uuid" VARCHAR(36) NOT NULL,`,
`"created_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,`,
`"payload" JSON DEFAULT NULL,`,
`"metadata" JSON DEFAULT NULL`,
`);`,
}, "\n")
createMessagesTable := `
CREATE TABLE IF NOT EXISTS ` + s.MessagesTable(topic) + ` (
"offset" SERIAL,
"uuid" VARCHAR(36) NOT NULL,
"created_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
"payload" JSON DEFAULT NULL,
"metadata" JSON DEFAULT NULL,
"transaction_id" xid8 NOT NULL
);
`

return []string{createMessagesTable}
}

func (s DefaultPostgreSQLSchema) InsertQuery(topic string, msgs message.Messages) (string, []interface{}, error) {
insertQuery := fmt.Sprintf(
`INSERT INTO %s (uuid, payload, metadata) VALUES %s`,
`INSERT INTO %s (uuid, payload, metadata, transaction_id) VALUES %s`,
s.MessagesTable(topic),
defaultInsertMarkers(len(msgs)),
)
Expand All @@ -48,7 +48,7 @@ func defaultInsertMarkers(count int) string {

index := 1
for i := 0; i < count; i++ {
result.WriteString(fmt.Sprintf("($%d,$%d,$%d),", index, index+1, index+2))
result.WriteString(fmt.Sprintf("($%d,$%d,$%d,pg_current_xact_id()),", index, index+1, index+2))
index += 3
}

Expand All @@ -58,17 +58,32 @@ func defaultInsertMarkers(count int) string {
func (s DefaultPostgreSQLSchema) SelectQuery(topic string, consumerGroup string, offsetsAdapter OffsetsAdapter) (string, []interface{}) {
nextOffsetQuery, nextOffsetArgs := offsetsAdapter.NextOffsetQuery(topic, consumerGroup)
selectQuery := `
SELECT "offset", uuid, payload, metadata FROM ` + s.MessagesTable(topic) + `
WHERE
"offset" > (` + nextOffsetQuery + `)
WITH last_processed AS (
` + nextOffsetQuery + `
)

SELECT "offset", transaction_id, uuid, payload, metadata FROM ` + s.MessagesTable(topic) + `

WHERE
(
(
transaction_id = (SELECT last_processed_transaction_id FROM last_processed)
AND
"offset" > (SELECT offset_acked FROM last_processed)
)
OR
(transaction_id > (SELECT last_processed_transaction_id FROM last_processed))
)
AND
transaction_id < pg_snapshot_xmin(pg_current_snapshot())
ORDER BY
"offset" ASC
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose there should be

Suggested change
"offset" ASC
transaction_id ASC,
"offset" ASC

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this type of ordering, should be covered by index like (transaction_id ASC, "offset" ASC). It's not mentioned in original article, but I faced performance issues when load testing my own implementation of adapter :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch and point about the index!

I applied them in #23

LIMIT 1`
LIMIT 100`

return selectQuery, nextOffsetArgs
}

func (s DefaultPostgreSQLSchema) UnmarshalMessage(row *sql.Row) (offset int, msg *message.Message, err error) {
func (s DefaultPostgreSQLSchema) UnmarshalMessage(row Scanner) (offset int64, transactionID int64, msg *message.Message, err error) {
return unmarshalDefaultMessage(row)
}

Expand Down
12 changes: 8 additions & 4 deletions pkg/sql/schema_adapter_postgresql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@ func TestDefaultInsertMarkers(t *testing.T) {
},
{
Count: 1,
ExpectedOutput: "($1,$2,$3)",
ExpectedOutput: "($1,$2,$3,pg_current_xact_id())",
},
{
Count: 2,
ExpectedOutput: "($1,$2,$3),($4,$5,$6)",
ExpectedOutput: "($1,$2,$3,pg_current_xact_id()),($4,$5,$6,pg_current_xact_id())",
},
{
Count: 5,
ExpectedOutput: "($1,$2,$3),($4,$5,$6),($7,$8,$9),($10,$11,$12),($13,$14,$15)",
Count: 5,
ExpectedOutput: "($1,$2,$3,pg_current_xact_id())," +
"($4,$5,$6,pg_current_xact_id())," +
"($7,$8,$9,pg_current_xact_id())," +
"($10,$11,$12,pg_current_xact_id())," +
"($13,$14,$15,pg_current_xact_id())",
},
}

Expand Down