forked from asonawalla/gazette
/
persister.go
123 lines (107 loc) · 2.67 KB
/
persister.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package fragment
import (
"context"
"sync"
"time"
"github.com/LiveRamp/gazette/v2/pkg/allocator"
"github.com/LiveRamp/gazette/v2/pkg/keyspace"
pb "github.com/LiveRamp/gazette/v2/pkg/protocol"
log "github.com/sirupsen/logrus"
)
const (
persistInterval = time.Minute
)
type Persister struct {
qA, qB, qC []Spool
mu sync.Mutex
doneCh chan struct{}
ks *keyspace.KeySpace
ticker *time.Ticker
persistFn func(ctx context.Context, spool Spool) error
}
// NewPersister returns an empty, initialized Persister.
func NewPersister(ks *keyspace.KeySpace) *Persister {
return &Persister{
doneCh: make(chan struct{}),
ks: ks,
persistFn: Persist,
}
}
func (p *Persister) SpoolComplete(spool Spool, primary bool) {
if primary {
// Attempt to immediately persist the Spool.
go p.attemptPersist(spool)
} else if spool.ContentLength() != 0 {
p.queue(spool)
}
return
}
func (p *Persister) Finish() {
p.doneCh <- struct{}{}
<-p.doneCh
}
func (p *Persister) queue(spool Spool) {
defer p.mu.Unlock()
p.mu.Lock()
p.qC = append(p.qC, spool)
}
func (p *Persister) Serve() {
if p.ticker == nil {
p.ticker = time.NewTicker(persistInterval)
}
for done, exiting := false, false; !done; {
if !exiting {
select {
case <-p.ticker.C:
case <-p.doneCh:
exiting = true
p.ticker.Stop()
}
}
for _, spool := range p.qA {
p.attemptPersist(spool)
}
// Rotate queues.
p.mu.Lock()
p.qA, p.qB, p.qC = p.qB, p.qC, p.qA[:0]
if exiting && len(p.qA) == 0 && len(p.qB) == 0 {
done = true
}
p.mu.Unlock()
}
close(p.doneCh)
}
// attemptPersist persist valid spools, dropping spools with no content or no configured
// backing store. If persistFn fails the spool will be requeued and be attmepted again.
func (p *Persister) attemptPersist(spool Spool) {
if spool.ContentLength() == 0 {
// Persisting an empty Spool is a no-op.
return
}
// Attach the current BackingStore of the Fragment's JournalSpec.
p.ks.Mu.RLock()
var item, ok = allocator.LookupItem(p.ks, spool.Journal.String())
p.ks.Mu.RUnlock()
if ok {
var spec = item.ItemValue.(*pb.JournalSpec)
// Journal spec has no configured store, drop this fragment.
if len(spec.Fragment.Stores) == 0 {
return
}
spool.BackingStore = spec.Fragment.Stores[0]
} else {
log.WithFields(log.Fields{
"journal": spool.Journal,
"name": spool.ContentName(),
}).Warn("dropping Spool (JournalSpec was removed)")
return
}
if err := p.persistFn(context.Background(), spool); err != nil {
log.WithFields(log.Fields{
"journal": spool.Journal,
"name": spool.ContentName(),
"err": err,
}).Warn("failed to persist Spool (will retry)")
p.queue(spool)
}
}