-
Notifications
You must be signed in to change notification settings - Fork 51
/
interfaces.go
258 lines (241 loc) · 11.6 KB
/
interfaces.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
// Package message is a client-only library which implements exactly-once message
// semantics atop raw, at-least-once Journal byte-streams.
//
// It specifies a common Message interface type which must be implemented by
// consumer Applications, and a RFC 4122 v1 UUID type which enables de-duplication
// and atomic commits of multiple messages.
//
// MappingKeyFunc extracts a stable mapping identifier from a custom message type.
// To map messages on a session ID:
//
// var mapOnSessionFn MappingKeyFunc = func(m Mappable, w io.Writer) {
// w.Write([]byte(m.(*MyMsgType).SessionID))
// }
//
// MappingFunc then defines the means of mapping messages to a journal. Several
// routines, like ModuloMapping, help in the construction of MappingFuncs and
// can be used to implement "data shuffles" which stably map messages having a
// shared mapping key to a common journal.
//
// Combine with client.PolledList to build MappingFuncs that publish to a dynamic,
// automatically updating "topic" of selected journal partitions:
//
// var myClient pb.AsyncJournalClient = ...
//
// var partitions, _ = pb.ParseLabelSelector("logs=pageviews, source=mobile")
// var pl, _ = client.NewPolledList(ctx, myClient, time.Minute, pb.ListRequest{
// Selector: partitions,
// })
// // Use RendezvousMapping to minimally shuffle the mapping of
// // SessionID <=> journal when the topic partitioning is updated.
// var mapFn = RendezvousMapping(mapOnSessionFn, pl.List)
//
// Then, use a Publisher to publish messages:
//
// var pub = NewPublisher(myClient, nil)
// for _, msg := range messages {
// // Each message is mapped on its SessionID to a current topic
// // partition (ie, journal), sequenced with a UUID, marshalled,
// // and queued for appended to the mapped journal.
// pub.PublishCommitted(mapFn, msg)
// }
// for op := myClient.PendingExcept("") {
// <-op.Done() // Wait for all async appends to complete.
// }
//
// When reading, NewMessageFunc provides the package with a means of constructing
// new messages of the users's type.
//
// var newMsgFn NewMessageFunc = func(*pb.JournalSpec) (Message, error) {
// return new(MyMsgType), nil
// }
//
// ReadUncommittedIter reads "uncommitted" messages from a journal. Uncommitted
// messages may include duplicates, or messages which are never acknowledged or
// are later explicitly rolled back.
//
// var rr = client.NewRetryReader(ctx, rjc, pb.ReadRequest{
// Journal: "my/journal",
// Block: true,
// })
// var it = NewReadUncommittedIter(rr, newMsgFn)
// for {
// var env, err = it.Next()
//
// // Handle |env| and |err|.
// }
//
// Use a Sequencer to sequence read-uncommitted messages into read-committed ones,
// and a ReadCommittedIter to read only committed messages from the journal.
// ReadCommittedIter is nothing more than the composition of a ReadUncommittedIter
// with a Sequencer.
//
// var seq = NewSequencer(nil, 4096)
// var it = NewReadCommittedIter(rr, newMsgFn, seq)
// for {
// var env, err = it.Next()
//
// // Handle |env| and |err|. We're assured the message has been
// // acknowledged and is not a duplicate.
// }
//
// Journals must declare their associated message Framing via the "content-type"
// label. The journal Framing is used to encode and decode Message instances
// written to the journal. Use RegisterFraming, typically from a package init()
// function, to register new Framing instances and make them available for use
// in applications. This package registers a Framing for the following
// content-types on its import:
//
// * test/csv: Uses "encoding/csv". See CSVFrameable.
// * application/x-ndjson: Uses "encoing/json".
// * application/x-protobuf-fixed: Encodes ProtoFrameable messages with a preamble
// of [4]byte{0x66, 0x33, 0x93, 0x36}, followed by a 4-byte little endian unsigned
// length, followed by a marshalled protobuf message.
//
// See the "labels" package for definitions of well-known label names and values
// such as content-types.
package message
import (
"bufio"
"fmt"
"io"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
pb "go.gazette.dev/core/broker/protocol"
)
// Message is an arbitrary user-defined type which may be serialized to and
// de-serialized from a journal. Examples include plain Go structs which are
// marshalled using reflection, or types generated by the gogo/protobuf
// compiler.
//
// A Message's implementation is largely independent of the particular _way_
// in which serialization to a journal is done, known as a Framing. The same
// Message instance could be serialized using either JSON or Protobuf, for
// example. The choice of Framing is controlled by a journal's "content-type"
// label. Note that some Framings may impose additional run-time interface
// requirements on Messages, such as ProtoFrameable or CSVFrameable.
//
// A journal holds only raw Message serializations. Gazette therefore asks that
// Messages help with representation by taking, persisting, and when asked,
// returning UUIDs generated by Gazette. UUIDs may also be directly useful to users,
// as they're universally unique and they encode a precise publishing timestamp.
//
// In some cases, user types may be unable to represent a UUID. The interface
// can be implemented with no-ops to opt the type out of exactly-once processing,
// falling back to at-least-once semantics.
type Message interface {
// GetUUID returns the UUID previously set on the Message. If the Message
// is not capable of tracking UUIDs, GetUUID returns a zero-valued UUID
// to opt the Message out of exactly-once processing semantics. In this
// case, SetUUID is also a no-op.
GetUUID() UUID
// SetUUID sets the UUID of the Message.
SetUUID(UUID)
// NewAcknowledgement returns a new Message instance of this same type which
// will represent an acknowledgement of this (and future) Messages published
// to the Journal within the context of a transaction.
NewAcknowledgement(pb.Journal) Message
}
// Validator is an optional interface of a Message able to Validate itself.
// An attempt to publish a Message which does not Validate will error.
type Validator = pb.Validator
// Envelope wraps a Message with associated metadata.
type Envelope struct {
Journal *pb.JournalSpec // JournalSpec of the Message.
Begin, End pb.Offset // [Begin, End) byte offset of the Message within the Journal.
Message // Wrapped message.
}
// JournalProducer composes an Journal and ProducerID.
type JournalProducer struct {
Journal pb.Journal
Producer ProducerID
}
// ProducerState is a snapshot of a Producer's state within a Journal.
// It's marshalled into consumer checkpoints to allow a Sequencer to
// recover producer sequence states after a consumer process fault.
type ProducerState struct {
JournalProducer
// LastAck is the Clock of the Producer's last ACK_TXN or OUTSIDE_TXN.
LastAck Clock
// Begin is the offset of the first message byte having CONTINUE_TXN that's
// larger than LastAck. Eg, it's the offset which opens the next transaction.
// If there is no such message, Begin is -1.
Begin pb.Offset
}
// Frameable is an interface suitable for serialization by a Framing.
// The interface requirements of a Frameable are specific to the Framing
// used, and asserted at run-time. Generally an instance of Frameable is also
// an instance of Message, but the Framing interface doesn't require this.
type Frameable interface{}
// Framing specifies the means by which Messages are marshalled to and from a Journal.
type Framing interface {
// ContentType of the Framing.
ContentType() string
// Marshal a Message to a bufio.Writer. Marshal may assume the Message has
// passed validation, if implemented for the message type. It may ignore
// any error returned by the provided Writer.
Marshal(Frameable, *bufio.Writer) error
// NewUnmarshalFunc returns an UnmarshalFunc which will unmarshal Frameable
// instances from the provided Reader.
NewUnmarshalFunc(*bufio.Reader) UnmarshalFunc
}
// UnmarshalFunc is returned by a Framing's NewUnmarshalFunc. It unpacks and
// decodes Frameable instances from the underlying bufio.Reader. It must not
// read beyond the precise byte boundary of each message frame (eg, by
// internally buffering reads beyond the frame end).
type UnmarshalFunc func(Frameable) error
// Mappable is an interface suitable for mapping by a MappingFunc.
// Typically a MappingKeyFunc will cast and assert Mappable's exact
// type at run-time. Generally a Mappable is a Message but the
// MappingFunc interface doesn't require this.
type Mappable interface{}
// MappingFunc maps a Mappable message to a responsible journal. Gazette imposes
// no formal requirement on exactly how that mapping is performed, or the nature
// of the mapped journal.
//
// It's often desired to spread a collection of like messages across a number
// of journals, with each journal playing the role of a topic partition. Such
// partitions can be distinguished through a JournalSpec Label such as
// "app.gazette.dev/message-type: MyMessage". Note that "partition" and "topic"
// are useful terminology, but play no formal role and have no explicit
// implementation within Gazette (aside from their expression via Labels and
// LabelSelectors). See `labels` package documentation for naming conventions.
//
// A Mapper implementation would typically:
// 1) Apply domain knowledge to introspect the Mappable and determine a "topic",
// expressed as a LabelSelector.
// 2) Query the broker List RPC to determine current partitions of the topic,
// caching and refreshing List results as needed (see client.PolledList).
// 3) Use a ModuloMapping or RendezvousMapping to select among partitions.
//
// The MappingFunc returns the contentType of journal messages,
// which must have a registered Framing.
type MappingFunc func(Mappable) (_ pb.Journal, contentType string, _ error)
// MappingKeyFunc extracts an appropriate mapping key from the Mappable
// by writing its value into the provided io.Writer, whose Write() is
// guaranteed to never return an error.
type MappingKeyFunc func(Mappable, io.Writer)
// PartitionsFunc returns a ListResponse of journal partitions from which a
// MappingFunc may select. The returned instance pointer may change across
// invocations, but a returned ListResponse may not be modified. PartitionsFunc
// should seek to preserve pointer equality of result instances when no
// substantive change has occurred. See also: client.PolledList.
type PartitionsFunc func() *pb.ListResponse
// NewMessageFunc returns a Message instance of an appropriate type for the
// reading the given JournalSpec. Implementations may want to introspect the
// JournalSpec, for example by examining application-specific labels therein.
// An error is returned if an appropriate Message type cannot be determined.
type NewMessageFunc func(*pb.JournalSpec) (Message, error)
// ErrEmptyListResponse is returned by a MappingFunc which received an empty
// ListResponse from a PartitionsFunc.
var ErrEmptyListResponse = fmt.Errorf("empty ListResponse")
var (
sequencerQueuedTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "gazette_sequencer_queued",
Help: "Cumulative number of read-uncommitted messages which were sequenced.",
}, []string{"journal", "flag", "outcome"})
sequencerReplayTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "gazette_sequencer_replay",
Help: "Cumulative number of messages re-read from source journal due to insufficient Sequencer ring-buffer size.",
}, []string{"journal"})
)