/
writer.go
108 lines (93 loc) · 3.56 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package sql
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/alexandria-oss/streams"
"github.com/alexandria-oss/streams/codec"
"github.com/alexandria-oss/streams/persistence"
"github.com/alexandria-oss/streams/proxy/egress"
)
// A Writer is a SQL database writer.
// This specific kind of streams.Writer is used by systems implementing the transactional outbox messaging pattern.
// More in depth, a Writer instance will attempt to execute a transactional write into an <<egress table>> where all messages
// generated by the system will be stored, so they may be later publish by an <<egress proxy agent>> (aka. log trailing).
//
// Writer instances MUST be used along transaction context functions
// (persistence.SetTransactionContext, persistence.GetTransactionContext). This is because Writer instances obtain
// the sql.Tx instance from the context. If no context is found, then Writer.Write will fail.
//
// Finally, the main reason to apply the transactional outbox pattern is to obtain write atomicity between a database
// and an external message stream (i.e. message broker, event bus).
//
// Transactional outbox pattern reference: https://microservices.io/patterns/data/transactional-outbox.html
type Writer struct {
cfg WriterConfig
}
var _ streams.Writer = Writer{}
// A WriterConfig is the Writer configuration.
// Writer uses streams.IdentifierFactory to generate batch identifiers.
type WriterConfig struct {
Codec codec.Codec // used to encode message batches, so it can be stored on the database (default codec.ProtocolBuffers).
WriterEgressTable string // table to write message batches to be later published.
}
func newWriterDefaults() WriterConfig {
return WriterConfig{
Codec: codec.ProtocolBuffers{},
WriterEgressTable: egress.DefaultEgressTableName,
}
}
// NewWriter allocates a new Writer instance with default configuration but open to apply any WriterOption(s).
func NewWriter(opts ...WriterOption) Writer {
baseOpts := newWriterDefaults()
for _, o := range opts {
o.apply(&baseOpts)
}
return Writer{
cfg: baseOpts,
}
}
// NewWriterWithConfig allocates a new Writer instance with passed configuration.
func NewWriterWithConfig(cfg WriterConfig) Writer {
return Writer{
cfg: cfg,
}
}
// Write append a new batch of messages into the egress table.
//
// A transaction context (persistence.SetTransactionContext) MUST be set before calling this routine.
// This is because Writer instances obtain the sql.Tx instance from the context.
// If no context is found, then Writer.Write will fail.
//
// Batch identifier will be taken from TransactionContext.TransactionID.
func (w Writer) Write(ctx context.Context, msgBatch []streams.Message) (err error) {
if len(msgBatch) == 0 {
return streams.ErrEmptyMessage
}
txCtx, err := persistence.GetTransactionContext[*sql.Tx](ctx)
if err != nil {
return err
}
var msgBatchAny any = msgBatch
if w.cfg.Codec.ApplicationType() == codec.ProtocolBuffersApplicationType {
msgBatchAny = persistence.NewTransportMessageBatch(msgBatch)
}
encodedData, err := w.cfg.Codec.Encode(msgBatchAny)
if err != nil {
return err
}
query := fmt.Sprintf("INSERT INTO %s(batch_id,message_count,raw_data,insert_time) VALUES ($1,$2,$3,$4)", w.cfg.WriterEgressTable)
stmt, err := txCtx.Tx.PrepareContext(ctx, query)
if err != nil {
return err
}
defer stmt.Close()
res, err := stmt.Exec(txCtx.TransactionID, len(msgBatch), encodedData, time.Now().UTC())
if err != nil {
return err
} else if writeRowCount, _ := res.RowsAffected(); writeRowCount <= 0 {
err = ErrUnableToWriteRows
}
return
}