-
Notifications
You must be signed in to change notification settings - Fork 30
/
schema_adapter_postgresql.go
145 lines (118 loc) · 3.87 KB
/
schema_adapter_postgresql.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package sql
import (
"database/sql"
"encoding/json"
"fmt"
"strings"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/pkg/errors"
)
// DefaultPostgreSQLSchema is a default implementation of SchemaAdapter based on PostgreSQL.
type DefaultPostgreSQLSchema struct {
// GenerateMessagesTableName may be used to override how the messages table name is generated.
GenerateMessagesTableName func(topic string) string
// SubscribeBatchSize is the number of messages to be queried at once.
//
// Higher value, increases a chance of message re-delivery in case of crash or networking issues.
// 1 is the safest value, but it may have a negative impact on performance when consuming a lot of messages.
//
// Default value is 100.
SubscribeBatchSize int
}
func (s DefaultPostgreSQLSchema) SchemaInitializingQueries(topic string) []Query {
createMessagesTable := `
CREATE TABLE IF NOT EXISTS ` + s.MessagesTable(topic) + ` (
"offset" SERIAL,
"uuid" VARCHAR(36) NOT NULL,
"created_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
"payload" JSON DEFAULT NULL,
"metadata" JSON DEFAULT NULL,
"transaction_id" xid8 NOT NULL,
PRIMARY KEY ("transaction_id", "offset")
);
`
return []Query{{Query: createMessagesTable}}
}
func (s DefaultPostgreSQLSchema) InsertQuery(topic string, msgs message.Messages) (Query, error) {
insertQuery := fmt.Sprintf(
`INSERT INTO %s (uuid, payload, metadata, transaction_id) VALUES %s`,
s.MessagesTable(topic),
defaultInsertMarkers(len(msgs)),
)
args, err := defaultInsertArgs(msgs)
if err != nil {
return Query{}, err
}
return Query{insertQuery, args}, nil
}
func defaultInsertMarkers(count int) string {
result := strings.Builder{}
index := 1
for i := 0; i < count; i++ {
result.WriteString(fmt.Sprintf("($%d,$%d,$%d,pg_current_xact_id()),", index, index+1, index+2))
index += 3
}
return strings.TrimRight(result.String(), ",")
}
func (s DefaultPostgreSQLSchema) batchSize() int {
if s.SubscribeBatchSize == 0 {
return 100
}
return s.SubscribeBatchSize
}
func (s DefaultPostgreSQLSchema) SelectQuery(topic string, consumerGroup string, offsetsAdapter OffsetsAdapter) Query {
// Query inspired by https://event-driven.io/en/ordering_in_postgres_outbox/
nextOffsetQuery := offsetsAdapter.NextOffsetQuery(topic, consumerGroup)
selectQuery := `
WITH last_processed AS (
` + nextOffsetQuery.Query + `
)
SELECT "offset", transaction_id, uuid, payload, metadata FROM ` + s.MessagesTable(topic) + `
WHERE
(
(
transaction_id = (SELECT last_processed_transaction_id FROM last_processed)
AND
"offset" > (SELECT offset_acked FROM last_processed)
)
OR
(transaction_id > (SELECT last_processed_transaction_id FROM last_processed))
)
AND
transaction_id < pg_snapshot_xmin(pg_current_snapshot())
ORDER BY
transaction_id ASC,
"offset" ASC
LIMIT ` + fmt.Sprintf("%d", s.batchSize())
return Query{selectQuery, nextOffsetQuery.Args}
}
func (s DefaultPostgreSQLSchema) UnmarshalMessage(row Scanner) (Row, error) {
r := Row{}
var transactionID int64
err := row.Scan(&r.Offset, &transactionID, &r.UUID, &r.Payload, &r.Metadata)
if err != nil {
return Row{}, errors.Wrap(err, "could not scan message row")
}
msg := message.NewMessage(string(r.UUID), r.Payload)
if r.Metadata != nil {
err = json.Unmarshal(r.Metadata, &msg.Metadata)
if err != nil {
return Row{}, errors.Wrap(err, "could not unmarshal metadata as JSON")
}
}
r.Msg = msg
r.ExtraData = map[string]any{
"transaction_id": transactionID,
}
return r, nil
}
func (s DefaultPostgreSQLSchema) MessagesTable(topic string) string {
if s.GenerateMessagesTableName != nil {
return s.GenerateMessagesTableName(topic)
}
return fmt.Sprintf(`"watermill_%s"`, topic)
}
func (s DefaultPostgreSQLSchema) SubscribeIsolationLevel() sql.IsolationLevel {
// For Postgres Repeatable Read is enough.
return sql.LevelRepeatableRead
}