-
Notifications
You must be signed in to change notification settings - Fork 282
/
segment.go
192 lines (170 loc) · 5.23 KB
/
segment.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
package tsdb
import (
"fmt"
"path/filepath"
"sync"
"time"
"github.com/eleme/lindb/kv"
"github.com/eleme/lindb/pkg/fileutil"
"github.com/eleme/lindb/pkg/interval"
"github.com/eleme/lindb/pkg/logger"
"github.com/eleme/lindb/pkg/timeutil"
)
//go:generate mockgen -source=./segment.go -destination=./segment_mock.go -package=tsdb -self_package=github.com/eleme/lindb/tsdb
// IntervalSegment represents a interval segment, there are some segments in a shard.
type IntervalSegment interface {
// GetOrCreateSegment creates new segment if not exist, if exist return it
GetOrCreateSegment(segmentName string) (Segment, error)
// GetSegments returns segment list by time range, return nil if not match
GetSegments(timeRange timeutil.TimeRange) []Segment
// Close closes interval segment, release resource
Close()
}
// intervalSegment implements IntervalSegment interface
type intervalSegment struct {
path string
interval time.Duration
intervalType interval.Type
segments sync.Map
mutex sync.Mutex
}
// newIntervalSegment create interval segment based on interval/type/path etc.
func newIntervalSegment(interval time.Duration, intervalType interval.Type, path string) (IntervalSegment, error) {
if err := fileutil.MkDirIfNotExist(path); err != nil {
return nil, err
}
intervalSegment := &intervalSegment{
path: path,
interval: interval,
intervalType: intervalType,
}
// load segments if exist
//TODO too many kv store load???
segmentNames, err := fileutil.ListDir(path)
if err != nil {
//TODO return error????
return nil, err
}
for _, segmentName := range segmentNames {
seg, err := newSegment(segmentName, intervalType, filepath.Join(path, segmentName))
if err != nil {
return nil, fmt.Errorf("create segmenet error:%s", err)
}
intervalSegment.segments.Store(segmentName, seg)
}
return intervalSegment, nil
}
// GetOrCreateSegment creates new segment if not exist, if exist return it
func (s *intervalSegment) GetOrCreateSegment(segmentName string) (Segment, error) {
segment := s.getSegment(segmentName)
if segment == nil {
s.mutex.Lock()
defer s.mutex.Unlock()
// double check, make sure only create segment once
segment = s.getSegment(segmentName)
if segment == nil {
seg, err := newSegment(segmentName, s.intervalType, filepath.Join(s.path, segmentName))
if err != nil {
return nil, fmt.Errorf("create segmenet error:%s", err)
}
s.segments.Store(segmentName, seg)
return seg, nil
}
}
return segment, nil
}
// GetSegments returns segment list by time range, return nil if not match
func (s *intervalSegment) GetSegments(timeRange timeutil.TimeRange) []Segment {
calc, err := interval.GetCalculator(s.intervalType)
if err != nil {
return nil
}
var segments []Segment
start := calc.CalSegmentTime(timeRange.Start)
end := calc.CalSegmentTime(timeRange.End)
s.segments.Range(func(k, v interface{}) bool {
segment, ok := v.(Segment)
if ok {
baseTime := segment.BaseTime()
if baseTime >= start && baseTime <= end {
segments = append(segments, segment)
}
}
return true
})
return segments
}
// Close closes interval segment, release resource
func (s *intervalSegment) Close() {
s.segments.Range(func(k, v interface{}) bool {
seg, ok := v.(Segment)
if ok {
seg.Close()
}
return true
})
}
// getSegment returns segment by name, if not exist return nil
func (s *intervalSegment) getSegment(segmentName string) Segment {
segment, _ := s.segments.Load(segmentName)
seg, ok := segment.(Segment)
if ok {
return seg
}
return nil
}
// Segment represents a time based segment, there are some segments in a interval segment.
// A segment use k/v store for storing time series data.
type Segment interface {
// BaseTime returns segment base time
BaseTime() int64
// GetDataFamilies returns data family list by time range, return nil if not match
GetDataFamilies(timeRange timeutil.TimeRange) []DataFamily
// Close closes segment, include kv store
Close()
}
// segment implements Segment interface
type segment struct {
baseTime int64
kvStore kv.Store
intervalType interval.Type
//TODO
// families map[int64]kv.Family
logger *logger.Logger
}
// newSegment returns segment, segment is wrapper of kv store
func newSegment(segmentName string, intervalType interval.Type, path string) (Segment, error) {
kvStore, err := kv.NewStore(segmentName, kv.StoreOption{Path: path})
if err != nil {
return nil, fmt.Errorf("create kv store for segment error:%s", err)
}
// parse base time from segment name
calc, err := interval.GetCalculator(intervalType)
if err != nil {
return nil, err
}
baseTime, err := calc.ParseSegmentTime(segmentName)
if err != nil {
return nil, fmt.Errorf("parse segment[%s] base time error", path)
}
return &segment{
baseTime: baseTime,
kvStore: kvStore,
intervalType: intervalType,
logger: logger.GetLogger("tsdb/segment"),
}, nil
}
// BaseTime returns segment base time
func (s *segment) BaseTime() int64 {
return s.baseTime
}
func (s *segment) GetDataFamilies(timeRange timeutil.TimeRange) []DataFamily {
//TODO need impl
return nil
}
// Close closes segment, include kv store
func (s *segment) Close() {
if err := s.kvStore.Close(); err != nil {
s.logger.Error("close kv store error", logger.Error(err))
}
}