-
Notifications
You must be signed in to change notification settings - Fork 13
/
poller.go
154 lines (129 loc) · 3.42 KB
/
poller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package poller
import (
"context"
"fmt"
"sync"
"time"
"github.com/Axway/agent-sdk/pkg/agent/events"
"github.com/Axway/agent-sdk/pkg/harvester"
"github.com/Axway/agent-sdk/pkg/util/log"
"github.com/Axway/agent-sdk/pkg/watchmanager/proto"
)
type pollExecutor struct {
harvester harvester.Harvest
sequence events.SequenceProvider
topicSelfLink string
logger log.FieldLogger
timer *time.Timer
ctx context.Context
cancel context.CancelFunc
interval time.Duration
onStop onClientStopCb
isReady bool
lock sync.RWMutex
}
type newPollExecutorFunc func(interval time.Duration, options ...executorOpt) *pollExecutor
func newPollExecutor(interval time.Duration, options ...executorOpt) *pollExecutor {
logger := log.NewFieldLogger().
WithComponent("pollExecutor").
WithPackage("sdk.agent.poller")
ctx, cancel := context.WithCancel(context.Background())
pm := &pollExecutor{
logger: logger,
timer: time.NewTimer(interval),
ctx: ctx,
cancel: cancel,
interval: interval,
}
for _, opt := range options {
opt(pm)
}
return pm
}
// RegisterWatch registers a watch topic for polling events and publishing events on a channel
func (m *pollExecutor) RegisterWatch(eventChan chan *proto.Event, errChan chan error) {
m.logger.Trace("register watch topic for polling and publishing events")
if m.harvester == nil {
go func() {
m.Stop()
errChan <- fmt.Errorf("harvester is not configured for the polling client")
}()
return
}
if m.sequence.GetSequence() < 0 {
m.onHarvesterErr()
go func() {
m.Stop()
errChan <- fmt.Errorf("do not have a sequence id, stopping poller")
}()
return
}
go func() {
err := m.sync(m.topicSelfLink, eventChan)
m.Stop()
errChan <- err
}()
}
func (m *pollExecutor) sync(topicSelfLink string, eventChan chan *proto.Event) error {
m.logger.Trace("sync events")
if err := m.harvester.EventCatchUp(topicSelfLink, eventChan); err != nil {
m.logger.WithError(err).Error("harvester returned an error when syncing events")
m.onHarvesterErr()
return err
}
m.lock.Lock()
m.isReady = true
m.lock.Unlock()
for {
select {
case <-m.ctx.Done():
m.logger.Info("harvester polling has been stopped")
return nil
case <-m.timer.C:
if err := m.tick(topicSelfLink, eventChan); err != nil {
return err
}
}
}
}
func (m *pollExecutor) tick(topicSelfLink string, eventChan chan *proto.Event) (ret error) {
sequence := m.sequence.GetSequence()
logger := m.logger.WithField("sequence-id", sequence)
logger.Debug("retrieving harvester events")
defer func() {
if ret == nil {
m.timer.Reset(m.interval)
}
}()
if lastSeqID, err := m.harvester.ReceiveSyncEvents(topicSelfLink, sequence, eventChan); err != nil {
if _, ok := err.(*harvester.ErrSeqGone); ok {
m.sequence.SetSequence(lastSeqID)
return
}
logger.WithError(err).Error("harvester returned an error when syncing events")
m.onHarvesterErr()
ret = err
}
return
}
func (m *pollExecutor) onHarvesterErr() {
if m.onStop == nil {
return
}
m.onStop()
}
// Stop stops the poller
func (m *pollExecutor) Stop() {
m.timer.Stop()
m.cancel()
m.lock.Lock()
defer m.lock.Unlock()
m.isReady = false
m.logger.Debug("poller has been stopped")
}
// Status returns a bool indicating the status of the poller
func (m *pollExecutor) Status() bool {
m.lock.RLock()
defer m.lock.RUnlock()
return m.ctx.Err() == nil && m.isReady
}