forked from prometheus/prometheus
-
Notifications
You must be signed in to change notification settings - Fork 0
/
persistence.go
1676 lines (1554 loc) · 53.6 KB
/
persistence.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2014 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package local
import (
"bufio"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/local/chunk"
"github.com/prometheus/prometheus/storage/local/codable"
"github.com/prometheus/prometheus/storage/local/index"
"github.com/prometheus/prometheus/util/flock"
)
const (
// Version of the storage as it can be found in the version file.
// Increment to protect against incompatible changes.
Version = 1
versionFileName = "VERSION"
seriesFileSuffix = ".db"
seriesTempFileSuffix = ".db.tmp"
seriesDirNameLen = 2 // How many bytes of the fingerprint in dir name.
hintFileSuffix = ".hint"
mappingsFileName = "mappings.db"
mappingsTempFileName = "mappings.db.tmp"
mappingsFormatVersion = 1
mappingsMagicString = "PrometheusMappings"
dirtyFileName = "DIRTY"
fileBufSize = 1 << 16 // 64kiB.
chunkHeaderLen = 17
chunkHeaderTypeOffset = 0
chunkHeaderFirstTimeOffset = 1
chunkHeaderLastTimeOffset = 9
chunkLenWithHeader = chunk.ChunkLen + chunkHeaderLen
chunkMaxBatchSize = 62 // Max no. of chunks to load or write in
// one batch. Note that 62 is the largest number of chunks that fit
// into 64kiB on disk because chunkHeaderLen is added to each 1k chunk.
indexingMaxBatchSize = 1024 * 1024
indexingBatchTimeout = 500 * time.Millisecond // Commit batch when idle for that long.
indexingQueueCapacity = 1024 * 256
)
var fpLen = len(model.Fingerprint(0).String()) // Length of a fingerprint as string.
const (
flagHeadChunkPersisted byte = 1 << iota
// Add more flags here like:
// flagFoo
// flagBar
)
type indexingOpType byte
const (
add indexingOpType = iota
remove
)
type indexingOp struct {
fingerprint model.Fingerprint
metric model.Metric
opType indexingOpType
}
// A Persistence is used by a Storage implementation to store samples
// persistently across restarts. The methods are only goroutine-safe if
// explicitly marked as such below. The chunk-related methods persistChunks,
// dropChunks, loadChunks, and loadChunkDescs can be called concurrently with
// each other if each call refers to a different fingerprint.
type persistence struct {
basePath string
archivedFingerprintToMetrics *index.FingerprintMetricIndex
archivedFingerprintToTimeRange *index.FingerprintTimeRangeIndex
labelPairToFingerprints *index.LabelPairFingerprintIndex
labelNameToLabelValues *index.LabelNameLabelValuesIndex
indexingQueue chan indexingOp
indexingStopped chan struct{}
indexingFlush chan chan int
indexingQueueLength prometheus.Gauge
indexingQueueCapacity prometheus.Metric
indexingBatchSizes prometheus.Summary
indexingBatchDuration prometheus.Summary
checkpointDuration prometheus.Summary
checkpointLastDuration prometheus.Gauge
checkpointLastSize prometheus.Gauge
checkpointChunksWritten prometheus.Summary
dirtyCounter prometheus.Counter
startedDirty prometheus.Gauge
checkpointing prometheus.Gauge
seriesChunksPersisted prometheus.Histogram
dirtyMtx sync.Mutex // Protects dirty and becameDirty.
dirty bool // true if persistence was started in dirty state.
becameDirty bool // true if an inconsistency came up during runtime.
pedanticChecks bool // true if crash recovery should check each series.
dirtyFileName string // The file used for locking and to mark dirty state.
fLock flock.Releaser // The file lock to protect against concurrent usage.
shouldSync syncStrategy
minShrinkRatio float64 // How much a series file has to shrink to justify dropping chunks.
bufPool sync.Pool
}
// newPersistence returns a newly allocated persistence backed by local disk storage, ready to use.
func newPersistence(
basePath string,
dirty, pedanticChecks bool,
shouldSync syncStrategy,
minShrinkRatio float64,
) (*persistence, error) {
dirtyPath := filepath.Join(basePath, dirtyFileName)
versionPath := filepath.Join(basePath, versionFileName)
if versionData, err := ioutil.ReadFile(versionPath); err == nil {
if persistedVersion, err := strconv.Atoi(strings.TrimSpace(string(versionData))); err != nil {
return nil, fmt.Errorf("cannot parse content of %s: %s", versionPath, versionData)
} else if persistedVersion != Version {
return nil, fmt.Errorf("found storage version %d on disk, need version %d - please wipe storage or run a version of Prometheus compatible with storage version %d", persistedVersion, Version, persistedVersion)
}
} else if os.IsNotExist(err) {
// No version file found. Let's create the directory (in case
// it's not there yet) and then check if it is actually
// empty. If not, we have found an old storage directory without
// version file, so we have to bail out.
if err := os.MkdirAll(basePath, 0700); err != nil {
if abspath, e := filepath.Abs(basePath); e == nil {
return nil, fmt.Errorf("cannot create persistent directory %s: %s", abspath, err)
}
return nil, fmt.Errorf("cannot create persistent directory %s: %s", basePath, err)
}
fis, err := ioutil.ReadDir(basePath)
if err != nil {
return nil, err
}
filesPresent := len(fis)
for i := range fis {
switch {
case fis[i].Name() == "lost+found" && fis[i].IsDir():
filesPresent--
case strings.HasPrefix(fis[i].Name(), "."):
filesPresent--
}
}
if filesPresent > 0 {
return nil, fmt.Errorf("found existing files in storage path that do not look like storage files compatible with this version of Prometheus; please delete the files in the storage path or choose a different storage path")
}
// Finally we can write our own version into a new version file.
file, err := os.Create(versionPath)
if err != nil {
return nil, err
}
defer file.Close()
if _, err := fmt.Fprintf(file, "%d\n", Version); err != nil {
return nil, err
}
} else {
return nil, err
}
fLock, dirtyfileExisted, err := flock.New(dirtyPath)
if err != nil {
log.Errorf("Could not lock %s, Prometheus already running?", dirtyPath)
return nil, err
}
if dirtyfileExisted {
dirty = true
}
archivedFingerprintToMetrics, err := index.NewFingerprintMetricIndex(basePath)
if err != nil {
return nil, err
}
archivedFingerprintToTimeRange, err := index.NewFingerprintTimeRangeIndex(basePath)
if err != nil {
return nil, err
}
p := &persistence{
basePath: basePath,
archivedFingerprintToMetrics: archivedFingerprintToMetrics,
archivedFingerprintToTimeRange: archivedFingerprintToTimeRange,
indexingQueue: make(chan indexingOp, indexingQueueCapacity),
indexingStopped: make(chan struct{}),
indexingFlush: make(chan chan int),
indexingQueueLength: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "indexing_queue_length",
Help: "The number of metrics waiting to be indexed.",
}),
indexingQueueCapacity: prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "indexing_queue_capacity"),
"The capacity of the indexing queue.",
nil, nil,
),
prometheus.GaugeValue,
float64(indexingQueueCapacity),
),
indexingBatchSizes: prometheus.NewSummary(
prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "indexing_batch_sizes",
Help: "Quantiles for indexing batch sizes (number of metrics per batch).",
},
),
indexingBatchDuration: prometheus.NewSummary(
prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "indexing_batch_duration_seconds",
Help: "Quantiles for batch indexing duration in seconds.",
},
),
checkpointLastDuration: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "checkpoint_last_duration_seconds",
Help: "The duration in seconds it took to last checkpoint open chunks and chunks yet to be persisted.",
}),
checkpointDuration: prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Objectives: map[float64]float64{},
Name: "checkpoint_duration_seconds",
Help: "The duration in seconds taken for checkpointing open chunks and chunks yet to be persisted",
}),
checkpointLastSize: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "checkpoint_last_size_bytes",
Help: "The size of the last checkpoint of open chunks and chunks yet to be persisted",
}),
checkpointChunksWritten: prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Objectives: map[float64]float64{},
Name: "checkpoint_series_chunks_written",
Help: "The number of chunk written per series while checkpointing open chunks and chunks yet to be persisted.",
}),
dirtyCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "inconsistencies_total",
Help: "A counter incremented each time an inconsistency in the local storage is detected. If this is greater zero, restart the server as soon as possible.",
}),
startedDirty: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "started_dirty",
Help: "Whether the local storage was found to be dirty (and crash recovery occurred) during Prometheus startup.",
}),
checkpointing: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "checkpointing",
Help: "1 if the storage is checkpointing, 0 otherwise.",
}),
seriesChunksPersisted: prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "series_chunks_persisted",
Help: "The number of chunks persisted per series.",
// Even with 4 bytes per sample, you're not going to get more than 85
// chunks in 6 hours for a time series with 1s resolution.
Buckets: []float64{1, 2, 4, 8, 16, 32, 64, 128},
}),
dirty: dirty,
pedanticChecks: pedanticChecks,
dirtyFileName: dirtyPath,
fLock: fLock,
shouldSync: shouldSync,
minShrinkRatio: minShrinkRatio,
// Create buffers of length 3*chunkLenWithHeader by default because that is still reasonably small
// and at the same time enough for many uses. The contract is to never return buffer smaller than
// that to the pool so that callers can rely on a minimum buffer size.
bufPool: sync.Pool{New: func() interface{} { return make([]byte, 0, 3*chunkLenWithHeader) }},
}
if p.dirty {
// Blow away the label indexes. We'll rebuild them later.
if err := index.DeleteLabelPairFingerprintIndex(basePath); err != nil {
return nil, err
}
if err := index.DeleteLabelNameLabelValuesIndex(basePath); err != nil {
return nil, err
}
}
labelPairToFingerprints, err := index.NewLabelPairFingerprintIndex(basePath)
if err != nil {
return nil, err
}
labelNameToLabelValues, err := index.NewLabelNameLabelValuesIndex(basePath)
if err != nil {
return nil, err
}
p.labelPairToFingerprints = labelPairToFingerprints
p.labelNameToLabelValues = labelNameToLabelValues
return p, nil
}
func (p *persistence) run() {
p.processIndexingQueue()
}
// Describe implements prometheus.Collector.
func (p *persistence) Describe(ch chan<- *prometheus.Desc) {
ch <- p.indexingQueueLength.Desc()
ch <- p.indexingQueueCapacity.Desc()
p.indexingBatchSizes.Describe(ch)
p.indexingBatchDuration.Describe(ch)
ch <- p.checkpointDuration.Desc()
ch <- p.checkpointLastDuration.Desc()
ch <- p.checkpointLastSize.Desc()
ch <- p.checkpointChunksWritten.Desc()
ch <- p.checkpointing.Desc()
ch <- p.dirtyCounter.Desc()
ch <- p.startedDirty.Desc()
ch <- p.seriesChunksPersisted.Desc()
}
// Collect implements prometheus.Collector.
func (p *persistence) Collect(ch chan<- prometheus.Metric) {
p.indexingQueueLength.Set(float64(len(p.indexingQueue)))
ch <- p.indexingQueueLength
ch <- p.indexingQueueCapacity
p.indexingBatchSizes.Collect(ch)
p.indexingBatchDuration.Collect(ch)
ch <- p.checkpointDuration
ch <- p.checkpointLastDuration
ch <- p.checkpointLastSize
ch <- p.checkpointChunksWritten
ch <- p.checkpointing
ch <- p.dirtyCounter
ch <- p.startedDirty
ch <- p.seriesChunksPersisted
}
// isDirty returns the dirty flag in a goroutine-safe way.
func (p *persistence) isDirty() bool {
p.dirtyMtx.Lock()
defer p.dirtyMtx.Unlock()
return p.dirty
}
// setDirty flags the storage as dirty in a goroutine-safe way. The provided
// error will be logged as a reason the first time the storage is flagged as dirty.
func (p *persistence) setDirty(err error) {
p.dirtyCounter.Inc()
p.dirtyMtx.Lock()
defer p.dirtyMtx.Unlock()
if p.becameDirty {
return
}
p.dirty = true
p.becameDirty = true
log.With("error", err).Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.")
}
// fingerprintsForLabelPair returns the fingerprints for the given label
// pair. This method is goroutine-safe but take into account that metrics queued
// for indexing with IndexMetric might not have made it into the index
// yet. (Same applies correspondingly to UnindexMetric.)
func (p *persistence) fingerprintsForLabelPair(lp model.LabelPair) model.Fingerprints {
fps, _, err := p.labelPairToFingerprints.Lookup(lp)
if err != nil {
p.setDirty(fmt.Errorf("error in method fingerprintsForLabelPair(%v): %s", lp, err))
return nil
}
return fps
}
// labelValuesForLabelName returns the label values for the given label
// name. This method is goroutine-safe but take into account that metrics queued
// for indexing with IndexMetric might not have made it into the index
// yet. (Same applies correspondingly to UnindexMetric.)
func (p *persistence) labelValuesForLabelName(ln model.LabelName) (model.LabelValues, error) {
lvs, _, err := p.labelNameToLabelValues.Lookup(ln)
if err != nil {
p.setDirty(fmt.Errorf("error in method labelValuesForLabelName(%v): %s", ln, err))
return nil, err
}
return lvs, nil
}
// persistChunks persists a number of consecutive chunks of a series. It is the
// caller's responsibility to not modify the chunks concurrently and to not
// persist or drop anything for the same fingerprint concurrently. It returns
// the (zero-based) index of the first persisted chunk within the series
// file. In case of an error, the returned index is -1 (to avoid the
// misconception that the chunk was written at position 0).
//
// Returning an error signals problems with the series file. In this case, the
// caller should quarantine the series.
func (p *persistence) persistChunks(fp model.Fingerprint, chunks []chunk.Chunk) (index int, err error) {
f, err := p.openChunkFileForWriting(fp)
if err != nil {
return -1, err
}
defer p.closeChunkFile(f)
if err := p.writeChunks(f, chunks); err != nil {
return -1, err
}
// Determine index within the file.
offset, err := f.Seek(0, os.SEEK_CUR)
if err != nil {
return -1, err
}
index, err = chunkIndexForOffset(offset)
if err != nil {
return -1, err
}
return index - len(chunks), err
}
// loadChunks loads a group of chunks of a timeseries by their index. The chunk
// with the earliest time will have index 0, the following ones will have
// incrementally larger indexes. The indexOffset denotes the offset to be added to
// each index in indexes. It is the caller's responsibility to not persist or
// drop anything for the same fingerprint concurrently.
func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]chunk.Chunk, error) {
f, err := p.openChunkFileForReading(fp)
if err != nil {
return nil, err
}
defer f.Close()
chunks := make([]chunk.Chunk, 0, len(indexes))
buf := p.bufPool.Get().([]byte)
defer func() {
// buf may change below. An unwrapped 'defer p.bufPool.Put(buf)'
// would only put back the original buf.
p.bufPool.Put(buf)
}()
for i := 0; i < len(indexes); i++ {
// This loads chunks in batches. A batch is a streak of
// consecutive chunks, read from disk in one go.
batchSize := 1
if _, err := f.Seek(offsetForChunkIndex(indexes[i]+indexOffset), os.SEEK_SET); err != nil {
return nil, err
}
for ; batchSize < chunkMaxBatchSize &&
i+1 < len(indexes) &&
indexes[i]+1 == indexes[i+1]; i, batchSize = i+1, batchSize+1 {
}
readSize := batchSize * chunkLenWithHeader
if cap(buf) < readSize {
buf = make([]byte, readSize)
}
buf = buf[:readSize]
if _, err := io.ReadFull(f, buf); err != nil {
return nil, err
}
for c := 0; c < batchSize; c++ {
chunk, err := chunk.NewForEncoding(chunk.Encoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset]))
if err != nil {
return nil, err
}
if err := chunk.UnmarshalFromBuf(buf[c*chunkLenWithHeader+chunkHeaderLen:]); err != nil {
return nil, err
}
chunks = append(chunks, chunk)
}
}
chunk.Ops.WithLabelValues(chunk.Load).Add(float64(len(chunks)))
atomic.AddInt64(&chunk.NumMemChunks, int64(len(chunks)))
return chunks, nil
}
// loadChunkDescs loads the chunk.Descs for a series from disk. offsetFromEnd is
// the number of chunk.Descs to skip from the end of the series file. It is the
// caller's responsibility to not persist or drop anything for the same
// fingerprint concurrently.
func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([]*chunk.Desc, error) {
f, err := p.openChunkFileForReading(fp)
if os.IsNotExist(err) {
return nil, nil
}
if err != nil {
return nil, err
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
return nil, err
}
if fi.Size()%int64(chunkLenWithHeader) != 0 {
// The returned error will bubble up and lead to quarantining of the whole series.
return nil, fmt.Errorf(
"size of series file for fingerprint %v is %d, which is not a multiple of the chunk length %d",
fp, fi.Size(), chunkLenWithHeader,
)
}
numChunks := int(fi.Size())/chunkLenWithHeader - offsetFromEnd
cds := make([]*chunk.Desc, numChunks)
chunkTimesBuf := make([]byte, 16)
for i := 0; i < numChunks; i++ {
_, err := f.Seek(offsetForChunkIndex(i)+chunkHeaderFirstTimeOffset, os.SEEK_SET)
if err != nil {
return nil, err
}
_, err = io.ReadAtLeast(f, chunkTimesBuf, 16)
if err != nil {
return nil, err
}
cds[i] = &chunk.Desc{
ChunkFirstTime: model.Time(binary.LittleEndian.Uint64(chunkTimesBuf)),
ChunkLastTime: model.Time(binary.LittleEndian.Uint64(chunkTimesBuf[8:])),
}
}
chunk.DescOps.WithLabelValues(chunk.Load).Add(float64(len(cds)))
chunk.NumMemDescs.Add(float64(len(cds)))
return cds, nil
}
// checkpointSeriesMapAndHeads persists the fingerprint to memory-series mapping
// and all non persisted chunks. Do not call concurrently with
// loadSeriesMapAndHeads. This method will only write heads format v2, but
// loadSeriesMapAndHeads can also understand v1.
//
// Description of the file format (for both, v1 and v2):
//
// (1) Magic string (const headsMagicString).
//
// (2) Varint-encoded format version (const headsFormatVersion).
//
// (3) Number of series in checkpoint as big-endian uint64.
//
// (4) Repeated once per series:
//
// (4.1) A flag byte, see flag constants above. (Present but unused in v2.)
//
// (4.2) The fingerprint as big-endian uint64.
//
// (4.3) The metric as defined by codable.Metric.
//
// (4.4) The varint-encoded persistWatermark. (Missing in v1.)
//
// (4.5) The modification time of the series file as nanoseconds elapsed since
// January 1, 1970 UTC. -1 if the modification time is unknown or no series file
// exists yet. (Missing in v1.)
//
// (4.6) The varint-encoded chunkDescsOffset.
//
// (4.6) The varint-encoded savedFirstTime.
//
// (4.7) The varint-encoded number of chunk descriptors.
//
// (4.8) Repeated once per chunk descriptor, oldest to most recent, either
// variant 4.8.1 (if index < persistWatermark) or variant 4.8.2 (if index >=
// persistWatermark). In v1, everything is variant 4.8.1 except for a
// non-persisted head-chunk (determined by the flags).
//
// (4.8.1.1) The varint-encoded first time.
// (4.8.1.2) The varint-encoded last time.
//
// (4.8.2.1) A byte defining the chunk type.
// (4.8.2.2) The chunk itself, marshaled with the Marshal() method.
//
func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker) (err error) {
log.Info("Checkpointing in-memory metrics and chunks...")
p.checkpointing.Set(1)
defer p.checkpointing.Set(0)
begin := time.Now()
f, err := os.OpenFile(p.headsTempFileName(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
if err != nil {
return err
}
defer func() {
syncErr := f.Sync()
closeErr := f.Close()
if err != nil {
return
}
err = syncErr
if err != nil {
return
}
err = closeErr
if err != nil {
return
}
err = os.Rename(p.headsTempFileName(), p.headsFileName())
duration := time.Since(begin)
p.checkpointDuration.Observe(duration.Seconds())
p.checkpointLastDuration.Set(duration.Seconds())
log.Infof("Done checkpointing in-memory metrics and chunks in %v.", duration)
}()
w := bufio.NewWriterSize(f, fileBufSize)
if _, err = w.WriteString(headsMagicString); err != nil {
return err
}
var numberOfSeriesOffset int
if numberOfSeriesOffset, err = codable.EncodeVarint(w, headsFormatVersion); err != nil {
return err
}
numberOfSeriesOffset += len(headsMagicString)
numberOfSeriesInHeader := uint64(fingerprintToSeries.length())
// We have to write the number of series as uint64 because we might need
// to overwrite it later, and a varint might change byte width then.
if err = codable.EncodeUint64(w, numberOfSeriesInHeader); err != nil {
return err
}
iter := fingerprintToSeries.iter()
defer func() {
// Consume the iterator in any case to not leak goroutines.
for range iter {
}
}()
var realNumberOfSeries uint64
for m := range iter {
func() { // Wrapped in function to use defer for unlocking the fp.
fpLocker.Lock(m.fp)
defer fpLocker.Unlock(m.fp)
chunksToPersist := len(m.series.chunkDescs) - m.series.persistWatermark
if len(m.series.chunkDescs) == 0 {
// This series was completely purged or archived
// in the meantime. Ignore.
return
}
realNumberOfSeries++
// Sanity checks.
if m.series.chunkDescsOffset < 0 && m.series.persistWatermark > 0 {
panic("encountered unknown chunk desc offset in combination with positive persist watermark")
}
// These are the values to save in the normal case.
var (
// persistWatermark is zero as we only checkpoint non-persisted chunks.
persistWatermark int64
// chunkDescsOffset is shifted by the original persistWatermark for the same reason.
chunkDescsOffset = int64(m.series.chunkDescsOffset + m.series.persistWatermark)
numChunkDescs = int64(chunksToPersist)
)
// However, in the special case of a series being fully
// persisted but still in memory (i.e. not archived), we
// need to save a "placeholder", for which we use just
// the chunk desc of the last chunk. Values have to be
// adjusted accordingly. (The reason for doing it in
// this weird way is to keep the checkpoint format
// compatible with older versions.)
if chunksToPersist == 0 {
persistWatermark = 1
chunkDescsOffset-- // Save one chunk desc after all.
numChunkDescs = 1
}
// seriesFlags left empty in v2.
if err = w.WriteByte(0); err != nil {
return
}
if err = codable.EncodeUint64(w, uint64(m.fp)); err != nil {
return
}
var buf []byte
buf, err = codable.Metric(m.series.metric).MarshalBinary()
if err != nil {
return
}
if _, err = w.Write(buf); err != nil {
return
}
if _, err = codable.EncodeVarint(w, persistWatermark); err != nil {
return
}
if m.series.modTime.IsZero() {
if _, err = codable.EncodeVarint(w, -1); err != nil {
return
}
} else {
if _, err = codable.EncodeVarint(w, m.series.modTime.UnixNano()); err != nil {
return
}
}
if _, err = codable.EncodeVarint(w, chunkDescsOffset); err != nil {
return
}
if _, err = codable.EncodeVarint(w, int64(m.series.savedFirstTime)); err != nil {
return
}
if _, err = codable.EncodeVarint(w, numChunkDescs); err != nil {
return
}
if chunksToPersist == 0 {
// Save the one placeholder chunk desc for a fully persisted series.
chunkDesc := m.series.chunkDescs[len(m.series.chunkDescs)-1]
if _, err = codable.EncodeVarint(w, int64(chunkDesc.FirstTime())); err != nil {
return
}
lt, err := chunkDesc.LastTime()
if err != nil {
return
}
if _, err = codable.EncodeVarint(w, int64(lt)); err != nil {
return
}
} else {
// Save (only) the non-persisted chunks.
for _, chunkDesc := range m.series.chunkDescs[m.series.persistWatermark:] {
if err = w.WriteByte(byte(chunkDesc.C.Encoding())); err != nil {
return
}
if err = chunkDesc.C.Marshal(w); err != nil {
return
}
p.checkpointChunksWritten.Observe(float64(chunksToPersist))
}
}
// Series is checkpointed now, so declare it clean. In case the entire
// checkpoint fails later on, this is fine, as the storage's series
// maintenance will mark these series newly dirty again, continuously
// increasing the total number of dirty series as seen by the storage.
// This has the effect of triggering a new checkpoint attempt even
// earlier than if we hadn't incorrectly set "dirty" to "false" here
// already.
m.series.dirty = false
}()
if err != nil {
return err
}
}
if err = w.Flush(); err != nil {
return err
}
if realNumberOfSeries != numberOfSeriesInHeader {
// The number of series has changed in the meantime.
// Rewrite it in the header.
if _, err = f.Seek(int64(numberOfSeriesOffset), os.SEEK_SET); err != nil {
return err
}
if err = codable.EncodeUint64(f, realNumberOfSeries); err != nil {
return err
}
}
info, err := f.Stat()
if err != nil {
return err
}
p.checkpointLastSize.Set(float64(info.Size()))
return err
}
// loadSeriesMapAndHeads loads the fingerprint to memory-series mapping and all
// the chunks contained in the checkpoint (and thus not yet persisted to series
// files). The method is capable of loading the checkpoint format v1 and v2. If
// recoverable corruption is detected, or if the dirty flag was set from the
// beginning, crash recovery is run, which might take a while. If an
// unrecoverable error is encountered, it is returned. Call this method during
// start-up while nothing else is running in storage land. This method is
// utterly goroutine-unsafe.
func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist int64, err error) {
fingerprintToSeries := make(map[model.Fingerprint]*memorySeries)
sm = &seriesMap{m: fingerprintToSeries}
defer func() {
if p.dirty {
log.Warn("Persistence layer appears dirty.")
p.startedDirty.Set(1)
err = p.recoverFromCrash(fingerprintToSeries)
if err != nil {
sm = nil
}
} else {
p.startedDirty.Set(0)
}
}()
hs := newHeadsScanner(p.headsFileName())
defer hs.close()
for hs.scan() {
fingerprintToSeries[hs.fp] = hs.series
}
if os.IsNotExist(hs.err) {
return sm, 0, nil
}
if hs.err != nil {
p.dirty = true
log.
With("file", p.headsFileName()).
With("error", hs.err).
Error("Error reading heads file.")
return sm, 0, hs.err
}
return sm, hs.chunksToPersistTotal, nil
}
// dropAndPersistChunks deletes all chunks from a series file whose last sample
// time is before beforeTime, and then appends the provided chunks, leaving out
// those whose last sample time is before beforeTime. It returns the timestamp
// of the first sample in the oldest chunk _not_ dropped, the chunk offset
// within the series file of the first chunk persisted (out of the provided
// chunks, or - if no chunks were provided - the chunk offset where chunks would
// have been persisted, i.e. the end of the file), the number of deleted chunks,
// and true if all chunks of the series have been deleted (in which case the
// returned timestamp will be 0 and must be ignored). It is the caller's
// responsibility to make sure nothing is persisted or loaded for the same
// fingerprint concurrently.
//
// Returning an error signals problems with the series file. In this case, the
// caller should quarantine the series.
func (p *persistence) dropAndPersistChunks(
fp model.Fingerprint, beforeTime model.Time, chunks []chunk.Chunk,
) (
firstTimeNotDropped model.Time,
offset int,
numDropped int,
allDropped bool,
err error,
) {
// Style note: With the many return values, it was decided to use naked
// returns in this method. They make the method more readable, but
// please handle with care!
if len(chunks) > 0 {
// We have chunks to persist. First check if those are already
// too old. If that's the case, the chunks in the series file
// are all too old, too.
i := 0
for ; i < len(chunks); i++ {
var lt model.Time
lt, err = chunks[i].NewIterator().LastTimestamp()
if err != nil {
return
}
if !lt.Before(beforeTime) {
break
}
}
if i < len(chunks) {
firstTimeNotDropped = chunks[i].FirstTime()
}
if i > 0 || firstTimeNotDropped.Before(beforeTime) {
// Series file has to go.
if numDropped, err = p.deleteSeriesFile(fp); err != nil {
return
}
numDropped += i
if i == len(chunks) {
allDropped = true
return
}
// Now simply persist what has to be persisted to a new file.
_, err = p.persistChunks(fp, chunks[i:])
return
}
}
// If we are here, we have to check the series file itself.
f, err := p.openChunkFileForReading(fp)
if os.IsNotExist(err) {
// No series file. Only need to create new file with chunks to
// persist, if there are any.
if len(chunks) == 0 {
allDropped = true
err = nil // Do not report not-exist err.
return
}
offset, err = p.persistChunks(fp, chunks)
return
}
if err != nil {
return
}
defer f.Close()
headerBuf := make([]byte, chunkHeaderLen)
var firstTimeInFile model.Time
// Find the first chunk in the file that should be kept.
for ; ; numDropped++ {
_, err = f.Seek(offsetForChunkIndex(numDropped), os.SEEK_SET)
if err != nil {
return
}
_, err = io.ReadFull(f, headerBuf)
if err == io.EOF {
// Close the file before trying to delete it. This is necessary on Windows
// (this will cause the defer f.Close to fail, but the error is silently ignored)
f.Close()
// We ran into the end of the file without finding any chunks that should
// be kept. Remove the whole file.
if numDropped, err = p.deleteSeriesFile(fp); err != nil {
return
}
if len(chunks) == 0 {
allDropped = true
return
}
offset, err = p.persistChunks(fp, chunks)
return
}
if err != nil {
return
}
if numDropped == 0 {
firstTimeInFile = model.Time(
binary.LittleEndian.Uint64(headerBuf[chunkHeaderFirstTimeOffset:]),
)
}
lastTime := model.Time(
binary.LittleEndian.Uint64(headerBuf[chunkHeaderLastTimeOffset:]),
)
if !lastTime.Before(beforeTime) {
break
}
}
// We've found the first chunk that should be kept.
// First check if the shrink ratio is good enough to perform the
// actual drop or leave it for next time if it is not worth the effort.
fi, err := f.Stat()
if err != nil {
return
}
chunksInFile := int(fi.Size()) / chunkLenWithHeader
totalChunks := chunksInFile + len(chunks)
if numDropped == 0 || float64(numDropped)/float64(totalChunks) < p.minShrinkRatio {
// Nothing to drop. Just adjust the return values and append the chunks (if any).
numDropped = 0
firstTimeNotDropped = firstTimeInFile
if len(chunks) > 0 {
offset, err = p.persistChunks(fp, chunks)
} else {
offset = chunksInFile
}
return
}
// If we are here, we have to drop some chunks for real. So we need to
// record firstTimeNotDropped from the last read header, seek backwards
// to the beginning of its header, and start copying everything from
// there into a new file. Then append the chunks to the new file.
firstTimeNotDropped = model.Time(
binary.LittleEndian.Uint64(headerBuf[chunkHeaderFirstTimeOffset:]),
)
chunk.Ops.WithLabelValues(chunk.Drop).Add(float64(numDropped))
_, err = f.Seek(-chunkHeaderLen, os.SEEK_CUR)
if err != nil {
return
}
temp, err := os.OpenFile(p.tempFileNameForFingerprint(fp), os.O_WRONLY|os.O_CREATE, 0640)
if err != nil {
return
}