forked from aliyun/aliyun-log-go-sdk
-
Notifications
You must be signed in to change notification settings - Fork 1
/
checkpoint_tracker.go
61 lines (52 loc) · 2.1 KB
/
checkpoint_tracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package consumerLibrary
import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"time"
)
type ConsumerCheckPointTracker struct {
client *ConsumerClient
defaultFlushCheckPointIntervalSec int64
tempCheckPoint string
lastPersistentCheckPoint string
trackerShardId int
lastCheckTime int64
logger log.Logger
}
func initConsumerCheckpointTracker(shardId int, consumerClient *ConsumerClient, logger log.Logger) *ConsumerCheckPointTracker {
checkpointTracker := &ConsumerCheckPointTracker{
defaultFlushCheckPointIntervalSec: 60,
client: consumerClient,
trackerShardId: shardId,
logger: logger,
}
return checkpointTracker
}
func (checkPointTracker *ConsumerCheckPointTracker) setMemoryCheckPoint(cursor string) {
checkPointTracker.tempCheckPoint = cursor
}
func (checkPointTracker *ConsumerCheckPointTracker) setPersistentCheckPoint(cursor string) {
checkPointTracker.lastPersistentCheckPoint = cursor
}
func (checkPointTracker *ConsumerCheckPointTracker) flushCheckPoint() error {
if checkPointTracker.tempCheckPoint != "" && checkPointTracker.tempCheckPoint != checkPointTracker.lastPersistentCheckPoint {
if err := checkPointTracker.client.updateCheckPoint(checkPointTracker.trackerShardId, checkPointTracker.tempCheckPoint, true); err != nil {
return err
}
checkPointTracker.lastPersistentCheckPoint = checkPointTracker.tempCheckPoint
}
return nil
}
func (checkPointTracker *ConsumerCheckPointTracker) flushCheck() {
currentTime := time.Now().Unix()
if currentTime > checkPointTracker.lastCheckTime+checkPointTracker.defaultFlushCheckPointIntervalSec {
if err := checkPointTracker.flushCheckPoint(); err != nil {
level.Warn(checkPointTracker.logger).Log("msg", "update checkpoint get error", "error", err)
} else {
checkPointTracker.lastCheckTime = currentTime
}
}
}
func (checkPointTracker *ConsumerCheckPointTracker) getCheckPoint() string {
return checkPointTracker.tempCheckPoint
}