forked from nakabonne/tstorage
/
storage.go
746 lines (662 loc) · 20.9 KB
/
storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
package tstorage
import (
"encoding/json"
"errors"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"regexp"
"sort"
"sync"
"time"
"github.com/Error404UsernameNotFound/tstorage/internal/cgroup"
"github.com/Error404UsernameNotFound/tstorage/internal/timerpool"
)
var (
ErrNoDataPoints = errors.New("no data points found")
// Limit the concurrency for data ingestion to GOMAXPROCS, since this operation
// is CPU bound, so there is no sense in running more than GOMAXPROCS concurrent
// goroutines on data ingestion path.
defaultWorkersLimit = cgroup.AvailableCPUs()
partitionDirRegex = regexp.MustCompile(`^p-.+`)
partitionFilesRegex = regexp.MustCompile(`p-.+[/\\\\].+`)
)
// TimestampPrecision represents precision of timestamps. See WithTimestampPrecision
type TimestampPrecision string
const (
Nanoseconds TimestampPrecision = "ns"
Microseconds TimestampPrecision = "us"
Milliseconds TimestampPrecision = "ms"
Seconds TimestampPrecision = "s"
defaultPartitionDuration = 1 * time.Hour
defaultRetention = 336 * time.Hour
defaultTimestampPrecision = Nanoseconds
defaultWriteTimeout = 30 * time.Second
defaultWALBufferedSize = 4096
defaultPartitionMaxSize = 1024 * 50 // 50KiB per partition
defaultDatabaseMaxSize = 1024 * 1024 * 10 // 10MiB per database
defaultMaxPartitions = 200 // 200 partitions per database
writablePartitionsNum = 2
checkExpiredInterval = time.Hour
walDirName = "wal"
)
// Storage provides goroutine safe capabilities of insertion into and retrieval from the time-series storage.
type Storage interface {
Reader
// InsertRows ingests the given rows to the time-series storage.
// If the timestamp is empty, it uses the machine's local timestamp in UTC.
// The precision of timestamps is nanoseconds by default. It can be changed using WithTimestampPrecision.
InsertRows(rows []Row) error
// Close gracefully shutdowns by flushing any unwritten data to the underlying disk partition.
Close() error
}
// Reader provides reading access to time series data.
type Reader interface {
// Select gives back a list of data points that matches a set of the given metric and
// labels within the given start-end range. Keep in mind that start is inclusive, end is exclusive,
// and both must be Unix timestamp. ErrNoDataPoints will be returned if no data points found.
Select(metric string, labels []Label, start, end int64) (points []*DataPoint, err error)
}
// Row includes a data point along with properties to identify a kind of metrics.
type Row struct {
// The unique name of metric.
// This field must be set.
Metric string
// An optional key-value properties to further detailed identification.
Labels []Label
// This field must be set.
DataPoint
}
// DataPoint represents a data point, the smallest unit of time series data.
type DataPoint struct {
// The actual value. This field must be set.
Value float64
// Unix timestamp.
Timestamp int64
}
// Option is an optional setting for NewStorage.
type Option func(*storage)
// WithPartitionMaxSize specifies the maximum size a partition can have
// before been substuted by a new one.
//
// Defaults to 50KiB.
func WithPartitionMaxSize(partitionMaxSize int64) Option {
return func(s *storage) {
s.partitionMaxSize = partitionMaxSize
}
}
// WithDatabaseMaxSize specifies the maximum size the database can have.
// Higher size leads to deletion of oldest partitions until the database
// fits within the maximum stablished size.
//
// Defaults to 10MiB.
func WithDatabaseMaxSize(databaseMaxSize int64) Option {
return func(s *storage) {
s.databaseMaxSize = databaseMaxSize
}
}
// WithMaxPartitions specifies the maximum amount of partitions the database
// can have. Higher number leads to deletion of oldest partitions, until the
// database fits within the maximum. Memory partitions are not taken in
// consideration.
//
// Defaults to 200 partitions. A value of 0 implies the same behaviour as the
// in memory mode, which means no data will be persisted.
func WithMaxPartitions(maxPartitions int64) Option {
return func(s *storage) {
s.maxPartitions = maxPartitions
}
}
// WithDataPath specifies the path to directory that stores time-series data.
// Use this to make time-series data persistent on disk.
//
// Defaults to empty string which means no data will get persisted.
func WithDataPath(dataPath string) Option {
return func(s *storage) {
s.dataPath = dataPath
}
}
// WithPartitionDuration specifies the timestamp range of partitions.
// Once it exceeds the given time range, the new partition gets inserted.
//
// A partition is a chunk of time-series data with the timestamp range.
// It acts as a fully independent database containing all data
// points for its time range.
//
// Defaults to 1h
func WithPartitionDuration(duration time.Duration) Option {
return func(s *storage) {
s.partitionDuration = duration
}
}
// WithRetention specifies when to remove old data.
// Data points will get automatically removed from the disk after a
// specified period of time after a disk partition was created.
// Defaults to 14d.
func WithRetention(retention time.Duration) Option {
return func(s *storage) {
s.retention = retention
}
}
// WithTimestampPrecision specifies the precision of timestamps to be used by all operations.
//
// Defaults to Nanoseconds
func WithTimestampPrecision(precision TimestampPrecision) Option {
return func(s *storage) {
s.timestampPrecision = precision
}
}
// WithWriteTimeout specifies the timeout to wait when workers are busy.
//
// The storage limits the number of concurrent goroutines to prevent from out of memory
// errors and CPU trashing even if too many goroutines attempt to write.
//
// Defaults to 30s.
func WithWriteTimeout(timeout time.Duration) Option {
return func(s *storage) {
s.writeTimeout = timeout
}
}
// WithLogger specifies the logger to emit verbose output.
//
// Defaults to a logger implementation that does nothing.
func WithLogger(logger Logger) Option {
return func(s *storage) {
s.logger = logger
}
}
// WithWAL specifies the buffered byte size before flushing a WAL file.
// The larger the size, the less frequently the file is written and more write performance at the expense of durability.
// Giving 0 means it writes to a file whenever data point comes in.
// Giving -1 disables using WAL.
//
// Defaults to 4096.
func WithWALBufferedSize(size int) Option {
return func(s *storage) {
s.walBufferedSize = size
}
}
// NewStorage gives back a new storage, which stores time-series data in the process memory by default.
//
// Give the WithDataPath option for running as a on-disk storage. Specify a directory with data already exists,
// then it will be read as the initial data.
func NewStorage(opts ...Option) (Storage, error) {
s := &storage{
partitionList: newPartitionList(),
workersLimitCh: make(chan struct{}, defaultWorkersLimit),
partitionDuration: defaultPartitionDuration,
retention: defaultRetention,
timestampPrecision: defaultTimestampPrecision,
writeTimeout: defaultWriteTimeout,
walBufferedSize: defaultWALBufferedSize,
wal: &nopWAL{},
logger: &nopLogger{},
doneCh: make(chan struct{}, 0),
partitionMaxSize: defaultPartitionMaxSize,
databaseMaxSize: defaultDatabaseMaxSize,
maxPartitions: defaultMaxPartitions,
}
for _, opt := range opts {
opt(s)
}
if s.inMemoryMode() {
s.newPartition(nil, false)
return s, nil
}
// TODO: Possible point for multiple database implementation (for T8 integration)
if err := os.MkdirAll(s.dataPath, fs.ModePerm); err != nil {
return nil, fmt.Errorf("failed to make data directory %s: %w", s.dataPath, err)
}
walDir := filepath.Join(s.dataPath, walDirName)
if s.walBufferedSize >= 0 {
wal, err := newDiskWAL(walDir, s.walBufferedSize)
if err != nil {
return nil, err
}
s.wal = wal
}
// Read existent partitions from the disk.
dirs, err := os.ReadDir(s.dataPath)
if err != nil {
return nil, fmt.Errorf("failed to open data directory: %w", err)
}
if len(dirs) == 0 {
s.newPartition(nil, false)
return s, nil
}
isPartitionDir := func(f fs.DirEntry) bool {
return f.IsDir() && partitionDirRegex.MatchString(f.Name())
}
partitions := make([]partition, 0, len(dirs))
for _, e := range dirs {
if !isPartitionDir(e) {
continue
}
path := filepath.Join(s.dataPath, e.Name())
part, err := openDiskPartition(path, s.retention)
if errors.Is(err, ErrNoDataPoints) {
continue
}
if errors.Is(err, errInvalidPartition) {
// It should be recovered by WAL
continue
}
if err != nil {
return nil, fmt.Errorf("failed to open disk partition for %s: %w", path, err)
}
partitions = append(partitions, part)
}
sort.Slice(partitions, func(i, j int) bool {
return partitions[i].minTimestamp() < partitions[j].minTimestamp()
})
for _, p := range partitions {
s.newPartition(p, false)
}
// Start WAL recovery if there is.
if err := s.recoverWAL(walDir); err != nil {
return nil, fmt.Errorf("failed to recover WAL: %w", err)
}
s.newPartition(nil, false)
// periodically check and permanently remove expired partitions.
go func() {
ticker := time.NewTicker(checkExpiredInterval)
defer ticker.Stop()
for {
select {
case <-s.doneCh:
return
case <-ticker.C:
err := s.removeExpiredPartitions()
if err != nil {
s.logger.Printf("%v\n", err)
}
}
}
}()
return s, nil
}
type storage struct {
partitionList partitionList
walBufferedSize int
wal wal
partitionDuration time.Duration
retention time.Duration
timestampPrecision TimestampPrecision
dataPath string
writeTimeout time.Duration
partitionMaxSize int64
databaseMaxSize int64
maxPartitions int64
logger Logger
workersLimitCh chan struct{}
// wg must be incremented to guarantee all writes are done gracefully.
wg sync.WaitGroup
doneCh chan struct{}
}
func (s *storage) InsertRows(rows []Row) error {
s.wg.Add(1)
defer s.wg.Done()
insert := func() error {
defer func() { <-s.workersLimitCh }()
if err := s.ensureActiveHead(); err != nil {
return err
}
if err := s.ensureHeadInSize(); err != nil {
return err
}
iterator := s.partitionList.newIterator()
n := s.partitionList.size()
rowsToInsert := rows
// Starting at the head partition, try to insert rows, and loop to insert outdated rows
// into older partitions. Any rows more than `writablePartitionsNum` partitions out
// of date are dropped.
for i := 0; i < n && i < writablePartitionsNum; i++ {
if len(rowsToInsert) == 0 {
break
}
if !iterator.next() {
break
}
outdatedRows, err := iterator.value().insertRows(rowsToInsert)
if err != nil {
return fmt.Errorf("failed to insert rows: %w", err)
}
rowsToInsert = outdatedRows
}
return nil
}
// Limit the number of concurrent goroutines to prevent from out of memory
// errors and CPU trashing even if too many goroutines attempt to write.
select {
case s.workersLimitCh <- struct{}{}:
return insert()
default:
}
// Seems like all workers are busy; wait for up to writeTimeout
t := timerpool.Get(s.writeTimeout)
select {
case s.workersLimitCh <- struct{}{}:
timerpool.Put(t)
return insert()
case <-t.C:
timerpool.Put(t)
return fmt.Errorf("failed to write a data point in %s, since it is overloaded with %d concurrent writers",
s.writeTimeout, defaultWorkersLimit)
}
}
// ensureActiveHead ensures the head of partitionList is an active partition.
// If none, it creates a new one.
func (s *storage) ensureActiveHead() error {
head := s.partitionList.getHead()
if head != nil && head.active() {
return nil
}
// All partitions seems to be inactive so add a new partition to the list.
if err := s.newPartition(nil, true); err != nil {
return err
}
go func() {
if err := s.flushPartitions(); err != nil {
s.logger.Printf("failed to flush in-memory partitions: %v", err)
}
}()
return nil
}
// ensureHeadInSize ensures the head of partitionList is under its maximum defined size.
// If none, it creates a new one.
func (s *storage) ensureHeadInSize() error {
head := s.partitionList.getHead()
if head != nil && head.underMaxSize() {
return nil
}
// Head partition does not exist or exceds size
if err := s.newPartition(nil, true); err != nil {
return err
}
go func() {
if err := s.flushPartitions(); err != nil {
s.logger.Printf("failed to flush in-memory partitions: %v", err)
}
}()
return nil
}
func (s *storage) Select(metric string, labels []Label, start, end int64) ([]*DataPoint, error) {
if metric == "" {
return nil, fmt.Errorf("metric must be set")
}
if start >= end {
return nil, fmt.Errorf("the given start is greater than end")
}
points := make([]*DataPoint, 0)
// Iterate over all partitions from the newest one.
iterator := s.partitionList.newIterator()
for iterator.next() {
part := iterator.value()
if part == nil {
return nil, fmt.Errorf("unexpected empty partition found")
}
if part.minTimestamp() == 0 {
// Skip the partition that has no points.
continue
}
if part.maxTimestamp() < start {
// No need to keep going anymore
break
}
if part.minTimestamp() > end {
continue
}
ps, err := part.selectDataPoints(metric, labels, start, end)
if errors.Is(err, ErrNoDataPoints) {
continue
}
if err != nil {
return nil, fmt.Errorf("failed to select data points: %w", err)
}
// in order to keep the order in ascending.
points = append(ps, points...)
}
if len(points) == 0 {
return nil, ErrNoDataPoints
}
return points, nil
}
func (s *storage) Close() error {
s.wg.Wait()
close(s.doneCh)
if err := s.wal.flush(); err != nil {
return fmt.Errorf("failed to flush buffered WAL: %w", err)
}
// TODO: Prevent from new goroutines calling InsertRows(), for graceful shutdown.
// Make all writable partitions read-only by inserting as same number of those.
for i := 0; i < writablePartitionsNum; i++ {
if err := s.newPartition(nil, true); err != nil {
return err
}
if err := s.flushPartitions(); err != nil {
return fmt.Errorf("failed to close storage: %w", err)
}
}
if err := s.removeExpiredPartitions(); err != nil {
return fmt.Errorf("failed to remove expired partitions: %w", err)
}
// All partitions have been flushed, so WAL isn't needed anymore.
if err := s.wal.removeAll(); err != nil {
return fmt.Errorf("failed to remove WAL: %w", err)
}
return nil
}
func (s *storage) newPartition(p partition, punctuateWal bool) error {
if p == nil {
// Check for max database size
// Only when there are disk partitions, hence more than 2
if s.partitionList.size() > 2 {
overSize, err := s.checkDBOverMaxSize()
if err != nil {
return fmt.Errorf("failed to check database maximum size: %w", err)
}
for overSize {
tail := s.partitionList.getTail()
s.partitionList.remove(tail)
overSize, err = s.checkDBOverMaxSize()
if err != nil {
return fmt.Errorf("failed to check database maximum size: %w", err)
}
}
}
p = newMemoryPartition(s.wal, s.partitionDuration, s.timestampPrecision, s.partitionMaxSize)
}
s.partitionList.insert(p)
// Check for max partitions
// Only when there are disk partitions, hence more than 2
// If in memory mode, innecesary
if !s.inMemoryMode() && s.partitionList.size() > 2 {
numPart := s.partitionList.size() - 2
for ; numPart > int(s.maxPartitions); numPart-- {
tail := s.partitionList.getTail()
s.partitionList.remove(tail)
}
}
if punctuateWal {
return s.wal.punctuate()
}
return nil
}
// Returns whether the database has exceeded its maximum size
func (s *storage) checkDBOverMaxSize() (bool, error) {
var size int64
err := filepath.WalkDir(s.dataPath, func(path string, d os.DirEntry, err error) error {
if err != nil {
return err
}
if !partitionFilesRegex.MatchString(path) || d.IsDir() {
return nil
}
fInfo, err := d.Info()
if err != nil {
return err
}
size += fInfo.Size()
return nil
})
if err != nil {
return false, fmt.Errorf("failed to walk database directory: %w", err)
}
return size > s.databaseMaxSize, err
}
// flushPartitions persists all in-memory partitions ready to persisted.
// For the in-memory mode, just removes it from the partition list.
func (s *storage) flushPartitions() error {
// Keep the first two partitions as is even if they are inactive,
// to accept out-of-order data points.
i := 0
iterator := s.partitionList.newIterator()
for iterator.next() {
if i < writablePartitionsNum {
i++
continue
}
part := iterator.value()
if part == nil {
return fmt.Errorf("unexpected empty partition found")
}
memPart, ok := part.(*memoryPartition)
if !ok {
continue
}
if s.inMemoryMode() {
if err := s.partitionList.remove(part); err != nil {
return fmt.Errorf("failed to remove partition: %w", err)
}
continue
}
// Start swapping in-memory partition for disk one.
// The disk partition will place at where in-memory one existed.
dir := filepath.Join(s.dataPath, fmt.Sprintf("p-%d-%d", memPart.minTimestamp(), memPart.maxTimestamp()))
if err := s.flush(dir, memPart); err != nil {
return fmt.Errorf("failed to compact memory partition into %s: %w", dir, err)
}
newPart, err := openDiskPartition(dir, s.retention)
if errors.Is(err, ErrNoDataPoints) {
if err := s.partitionList.remove(part); err != nil {
return fmt.Errorf("failed to remove partition: %w", err)
}
continue
}
if err != nil {
return fmt.Errorf("failed to generate disk partition for %s: %w", dir, err)
}
if err := s.partitionList.swap(part, newPart); err != nil {
return fmt.Errorf("failed to swap partitions: %w", err)
}
if err := s.wal.removeOldest(); err != nil {
return fmt.Errorf("failed to remove oldest WAL segment: %w", err)
}
}
return nil
}
// flush compacts the data points in the given partition and flushes them to the given directory.
func (s *storage) flush(dirPath string, m *memoryPartition) error {
if dirPath == "" {
return fmt.Errorf("dir path is required")
}
if err := os.MkdirAll(dirPath, fs.ModePerm); err != nil {
return fmt.Errorf("failed to make directory %q: %w", dirPath, err)
}
f, err := os.Create(filepath.Join(dirPath, dataFileName))
if err != nil {
return fmt.Errorf("failed to create file %q: %w", dirPath, err)
}
defer f.Close()
encoder := newSeriesEncoder(f)
metrics := map[string]diskMetric{}
m.metrics.Range(func(key, value interface{}) bool {
mt, ok := value.(*memoryMetric)
if !ok {
s.logger.Printf("unknown value found\n")
return false
}
offset, err := f.Seek(0, io.SeekCurrent)
if err != nil {
s.logger.Printf("failed to set file offset of metric %q: %v\n", mt.name, err)
return false
}
if err := mt.encodeAllPoints(encoder); err != nil {
s.logger.Printf("failed to encode a data point that metric is %q: %v\n", mt.name, err)
return false
}
if err := encoder.flush(); err != nil {
s.logger.Printf("failed to flush data points that metric is %q: %v\n", mt.name, err)
return false
}
totalNumPoints := mt.size + int64(len(mt.outOfOrderPoints))
metrics[mt.name] = diskMetric{
Name: mt.name,
Offset: offset,
MinTimestamp: mt.minTimestamp,
MaxTimestamp: mt.maxTimestamp,
NumDataPoints: totalNumPoints,
}
return true
})
b, err := json.Marshal(&meta{
MinTimestamp: m.minTimestamp(),
MaxTimestamp: m.maxTimestamp(),
NumDataPoints: m.size(),
Metrics: metrics,
CreatedAt: time.Now(),
})
if err != nil {
return fmt.Errorf("failed to encode metadata: %w", err)
}
// It should write the meta file at last because what valid meta file exists proves the disk partition is valid.
metaPath := filepath.Join(dirPath, metaFileName)
if err := os.WriteFile(metaPath, b, fs.ModePerm); err != nil {
return fmt.Errorf("failed to write metadata to %s: %w", metaPath, err)
}
return nil
}
func (s *storage) removeExpiredPartitions() error {
expiredList := make([]partition, 0)
iterator := s.partitionList.newIterator()
for iterator.next() {
part := iterator.value()
if part == nil {
return fmt.Errorf("unexpected nil partition found")
}
if part.expired() {
expiredList = append(expiredList, part)
}
}
for i := range expiredList {
if err := s.partitionList.remove(expiredList[i]); err != nil {
return fmt.Errorf("failed to remove expired partition")
}
}
return nil
}
// recoverWAL inserts all records within the given wal, and then removes all WAL segment files.
func (s *storage) recoverWAL(walDir string) error {
reader, err := newDiskWALReader(walDir)
if errors.Is(err, os.ErrNotExist) {
return nil
}
if err != nil {
return err
}
if err := reader.readAll(); err != nil {
return fmt.Errorf("failed to read WAL: %w", err)
}
if len(reader.rowsToInsert) == 0 {
return nil
}
if err := s.InsertRows(reader.rowsToInsert); err != nil {
return fmt.Errorf("failed to insert rows recovered from WAL: %w", err)
}
return s.wal.refresh()
}
func (s *storage) inMemoryMode() bool {
return s.dataPath == "" || s.maxPartitions == 0 || s.databaseMaxSize == 0
}