This repository has been archived by the owner on Jan 28, 2022. It is now read-only.
/
push_server.go
156 lines (137 loc) · 4.24 KB
/
push_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package capture
import (
"context"
fmt "fmt"
pf "github.com/estuary/protocols/flow"
"go.gazette.dev/core/broker/client"
)
// PullServer is a server which aides implementations of the Runtime.Push RPC.
type PushServer struct {
coordinator
ctx context.Context // Context of Serve's lifetime.
pushCh chan readyPush // Sent to from Push.
priorAck, nextAck []chan<- struct{} // Notifications for awaiting RPCs.
}
// NewPushServer builds a new *PushServer using the provided CaptureSpec.
func NewPushServer(
ctx context.Context,
newCombinerFn func(*pf.CaptureSpec_Binding) (pf.Combiner, error),
range_ pf.RangeSpec,
spec *pf.CaptureSpec,
version string,
) (*PushServer, error) {
var coordinator, err = newCoordinator(newCombinerFn, range_, spec, version)
if err != nil {
return nil, err
}
var combiners [2][]pf.Combiner
for i := range combiners {
for _, b := range spec.Bindings {
var combiner, err = newCombinerFn(b)
if err != nil {
return nil, fmt.Errorf("creating %s combiner: %w", b.Collection.Collection, err)
}
combiners[i] = append(combiners[i], combiner)
}
}
var out = &PushServer{
coordinator: coordinator,
ctx: ctx,
pushCh: make(chan readyPush),
}
return out, nil
}
// Push Documents and an accompanying DriverCheckpoint into the capture.
// Push returns an error if the Serve loop isn't running.
// Otherwise, Push returns immediately and |ackCh| will be signaled one
// time when the Push has fully committed.
// The caller must also monitor ServeOp to determine if the Serve loop
// has exited, in which case |achCh| will never be notified.
func (c *PushServer) Push(
docs []Documents,
checkpoint pf.DriverCheckpoint,
ackCh chan<- struct{},
) error {
select {
case c.pushCh <- readyPush{
docs: docs,
checkpoint: checkpoint,
ackCh: ackCh,
}:
return nil
case <-c.loopOp.Done():
return c.loopOp.Err()
}
}
// ServeOp returns the Serve loop future of this PushServer.
// It resolves with its terminal error when the Serve loop has stopped running.
// An error of io.EOF is expected upon a graceful cancellation.
func (c *PushServer) ServeOp() client.OpFuture { return c.loopOp }
// readyPush is a Push that traverses PushServer.readyCh.
type readyPush struct {
docs []Documents
checkpoint pf.DriverCheckpoint
ackCh chan<- struct{}
}
// Serve is a long-lived routine which processes Push transactions.
// When captured documents are ready to commit, it invokes the startCommitFn
// callback.
//
// On callback, the caller must drain documents from Combiners() and track
// the associated DriverCheckpoint(), and then notify the PushServer of a
// pending commit via SetLogCommittedOp().
//
// While this drain and commit is ongoing, Serve() will accumulate further
// pushed documents and checkpoints. It will then notify the caller of
// the next transaction only after the resolution of the prior transaction's
// commit.
//
// Serve will call into startCommitFn with a non-nil error exactly once,
// as its very last invocation.
func (c *PushServer) Serve(startCommitFn func(error)) {
var doneCh = c.ctx.Done()
c.loop(
func(err error) {
if err == nil {
c.priorAck, c.nextAck = c.nextAck, c.priorAck[:0]
}
startCommitFn(err)
},
func(full bool) (drained bool, err error) {
var maybePushCh <-chan readyPush
if !full {
maybePushCh = c.pushCh
}
select {
case <-doneCh:
doneCh = nil // Don't select again.
return true, nil
case <-c.maybeLogCommittedOp():
if err = c.onLogCommitted(); err != nil {
return false, fmt.Errorf("onLogCommitted: %w", err)
}
c.sendAck()
case op := <-c.logCommittedOpCh:
if err := c.onLogCommittedOpCh(op); err != nil {
return false, fmt.Errorf("onLogCommittedOpCh: %w", err)
}
case rx := <-maybePushCh:
for _, docs := range rx.docs {
if err := c.onDocuments(docs); err != nil {
return false, fmt.Errorf("onDocuments: %w", err)
}
}
if err := c.onCheckpoint(rx.checkpoint); err != nil {
return false, fmt.Errorf("onCheckpoint: %w", err)
}
c.nextAck = append(c.nextAck, rx.ackCh)
}
return doneCh == nil, nil
})
}
func (c *PushServer) sendAck() {
for _, ch := range c.priorAck {
ch <- struct{}{}
}
c.priorAck = c.priorAck[:0]
}