-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
dedup.go
93 lines (83 loc) · 2.47 KB
/
dedup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package storage
import (
"time"
)
// SetMinScrapeIntervalForDeduplication sets the minimum interval for data points during de-duplication.
//
// De-duplication is disabled if interval is 0.
//
// This function must be called before initializing the storage.
func SetMinScrapeIntervalForDeduplication(interval time.Duration) {
minScrapeInterval = interval.Milliseconds()
}
var minScrapeInterval = int64(0)
// DeduplicateSamples removes samples from src* if they are closer to each other than minScrapeInterval.
func DeduplicateSamples(srcTimestamps []int64, srcValues []float64) ([]int64, []float64) {
if minScrapeInterval <= 0 {
return srcTimestamps, srcValues
}
if !needsDedup(srcTimestamps, minScrapeInterval) {
// Fast path - nothing to deduplicate
return srcTimestamps, srcValues
}
// Slow path - dedup data points.
tsNext := (srcTimestamps[0] - srcTimestamps[0]%minScrapeInterval) + minScrapeInterval
dstTimestamps := srcTimestamps[:1]
dstValues := srcValues[:1]
for i := 1; i < len(srcTimestamps); i++ {
ts := srcTimestamps[i]
if ts < tsNext {
continue
}
dstTimestamps = append(dstTimestamps, ts)
dstValues = append(dstValues, srcValues[i])
// Update tsNext
tsNext += minScrapeInterval
if ts >= tsNext {
// Slow path for updating ts.
tsNext = (ts - ts%minScrapeInterval) + minScrapeInterval
}
}
return dstTimestamps, dstValues
}
func deduplicateSamplesDuringMerge(srcTimestamps, srcValues []int64) ([]int64, []int64) {
if minScrapeInterval <= 0 {
return srcTimestamps, srcValues
}
if !needsDedup(srcTimestamps, minScrapeInterval) {
// Fast path - nothing to deduplicate
return srcTimestamps, srcValues
}
// Slow path - dedup data points.
tsNext := (srcTimestamps[0] - srcTimestamps[0]%minScrapeInterval) + minScrapeInterval
dstTimestamps := srcTimestamps[:1]
dstValues := srcValues[:1]
for i := 1; i < len(srcTimestamps); i++ {
ts := srcTimestamps[i]
if ts < tsNext {
continue
}
dstTimestamps = append(dstTimestamps, ts)
dstValues = append(dstValues, srcValues[i])
// Update tsNext
tsNext += minScrapeInterval
if ts >= tsNext {
// Slow path for updating ts.
tsNext = (ts - ts%minScrapeInterval) + minScrapeInterval
}
}
return dstTimestamps, dstValues
}
func needsDedup(timestamps []int64, minDelta int64) bool {
if len(timestamps) == 0 {
return false
}
prevTimestamp := timestamps[0]
for _, ts := range timestamps[1:] {
if ts-prevTimestamp < minDelta {
return true
}
prevTimestamp = ts
}
return false
}