forked from gazette/core
-
Notifications
You must be signed in to change notification settings - Fork 2
/
persister.go
116 lines (101 loc) · 2.56 KB
/
persister.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package fragment
import (
"context"
"sync"
"time"
"github.com/LiveRamp/gazette/v2/pkg/allocator"
"github.com/LiveRamp/gazette/v2/pkg/keyspace"
pb "github.com/LiveRamp/gazette/v2/pkg/protocol"
log "github.com/sirupsen/logrus"
)
const (
persistInterval = time.Minute
)
type Persister struct {
qA, qB, qC []Spool
mu sync.Mutex
doneCh chan struct{}
ks *keyspace.KeySpace
ticker *time.Ticker
persistFn func(ctx context.Context, spool Spool) error
}
// NewPersister returns an empty, initialized Persister.
func NewPersister(ks *keyspace.KeySpace) *Persister {
return &Persister{
doneCh: make(chan struct{}),
ks: ks,
persistFn: Persist,
ticker: time.NewTicker(persistInterval),
}
}
func (p *Persister) SpoolComplete(spool Spool, primary bool) {
if primary {
// Attempt to immediately persist the Spool.
go p.attemptPersist(spool)
} else if spool.ContentLength() != 0 {
p.queue(spool)
}
return
}
func (p *Persister) Finish() {
p.doneCh <- struct{}{}
<-p.doneCh
}
func (p *Persister) queue(spool Spool) {
defer p.mu.Unlock()
p.mu.Lock()
p.qC = append(p.qC, spool)
}
func (p *Persister) Serve() {
for done, exiting := false, false; !done; {
if !exiting {
select {
case <-p.ticker.C:
case <-p.doneCh:
exiting = true
p.ticker.Stop()
}
}
for _, spool := range p.qA {
p.attemptPersist(spool)
}
// Rotate queues.
p.mu.Lock()
p.qA, p.qB, p.qC = p.qB, p.qC, p.qA[:0]
if exiting && len(p.qA) == 0 && len(p.qB) == 0 {
done = true
}
p.mu.Unlock()
}
close(p.doneCh)
}
// attemptPersist persist valid spools, dropping spools with no content or no configured
// backing store. If persistFn fails the spool will be requeued and be attmepted again.
func (p *Persister) attemptPersist(spool Spool) {
if spool.ContentLength() == 0 {
// Persisting an empty Spool is a no-op.
return
}
// Attach the current BackingStore of the Fragment's JournalSpec.
if item, ok := allocator.LookupItem(p.ks, spool.Journal.String()); ok {
var spec = item.ItemValue.(*pb.JournalSpec)
// Journal spec has no configured store, drop this fragment.
if len(spec.Fragment.Stores) == 0 {
return
}
spool.BackingStore = spec.Fragment.Stores[0]
} else {
log.WithFields(log.Fields{
"journal": spool.Journal,
"contentName": spool.ContentName,
}).Warn("journal spec has been removed")
return
}
if err := p.persistFn(context.Background(), spool); err != nil {
log.WithFields(log.Fields{
"journal": spool.Journal,
"err": err,
}).Warn("failed to persist Spool")
p.queue(spool)
}
}