forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
service.go
131 lines (110 loc) · 2.72 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package hh
import (
"fmt"
"io"
"log"
"os"
"sync"
"time"
"github.com/influxdb/influxdb/tsdb"
)
var ErrHintedHandoffDisabled = fmt.Errorf("hinted handoff disabled")
type Service struct {
mu sync.RWMutex
wg sync.WaitGroup
closing chan struct{}
Logger *log.Logger
cfg Config
ShardWriter shardWriter
HintedHandoff interface {
WriteShard(shardID, ownerID uint64, points []tsdb.Point) error
Process() error
PurgeOlderThan(when time.Duration) error
}
}
type shardWriter interface {
WriteShard(shardID, ownerID uint64, points []tsdb.Point) error
}
// NewService returns a new instance of Service.
func NewService(c Config, w shardWriter) *Service {
s := &Service{
cfg: c,
Logger: log.New(os.Stderr, "[handoff] ", log.LstdFlags),
}
processor, err := NewProcessor(c.Dir, w, ProcessorOptions{
MaxSize: c.MaxSize,
RetryRateLimit: c.RetryRateLimit,
})
if err != nil {
s.Logger.Fatalf("Failed to start hinted handoff processor: %v", err)
}
processor.Logger = s.Logger
s.HintedHandoff = processor
return s
}
func (s *Service) Open() error {
s.mu.Lock()
defer s.mu.Unlock()
s.closing = make(chan struct{})
s.wg.Add(2)
go s.retryWrites()
go s.expireWrites()
return nil
}
func (s *Service) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.closing != nil {
close(s.closing)
}
s.wg.Wait()
return nil
}
// SetLogger sets the internal logger to the logger passed in.
func (s *Service) SetLogger(l *log.Logger) {
s.Logger = l
}
// WriteShard queues the points write for shardID to node ownerID to handoff queue
func (s *Service) WriteShard(shardID, ownerID uint64, points []tsdb.Point) error {
if !s.cfg.Enabled {
return ErrHintedHandoffDisabled
}
return s.HintedHandoff.WriteShard(shardID, ownerID, points)
}
func (s *Service) retryWrites() {
defer s.wg.Done()
ticker := time.NewTicker(time.Duration(s.cfg.RetryInterval))
defer ticker.Stop()
for {
select {
case <-s.closing:
return
case <-ticker.C:
if err := s.HintedHandoff.Process(); err != nil && err != io.EOF {
s.Logger.Printf("retried write failed: %v", err)
}
}
}
}
// expireWrites will cause the handoff queues to remove writes that are older
// than the configured threshold
func (s *Service) expireWrites() {
defer s.wg.Done()
ticker := time.NewTicker(time.Hour)
defer ticker.Stop()
for {
select {
case <-s.closing:
return
case <-ticker.C:
if err := s.HintedHandoff.PurgeOlderThan(time.Duration(s.cfg.MaxAge)); err != nil {
s.Logger.Printf("purge write failed: %v", err)
}
}
}
}
// purgeWrites will cause the handoff queues to remove writes that are no longer
// valid. e.g. queued writes for a node that has been removed
func (s *Service) purgeWrites() {
panic("not implemented")
}