-
Notifications
You must be signed in to change notification settings - Fork 462
/
stats.go
210 lines (172 loc) · 6.95 KB
/
stats.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/thanos-io/thanos/blob/main/pkg/store/bucket.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Thanos Authors.
package storegateway
import (
"sync"
"time"
"github.com/grafana/mimir/pkg/storage/tsdb/block"
)
// queryStats holds query statistics. This data structure is NOT concurrency safe.
type queryStats struct {
blocksQueried int
blocksQueriedByBlockMeta map[blockQueriedMeta]int
postingsTouched int
postingsTouchedSizeSum int
postingsToFetch int
postingsFetched int
postingsFetchedSizeSum int
postingsFetchCount int
postingsFetchDurationSum time.Duration
cachedPostingsCompressions int
cachedPostingsCompressionErrors int
cachedPostingsOriginalSizeSum int
cachedPostingsCompressedSizeSum int
cachedPostingsCompressionTimeSum time.Duration
cachedPostingsDecompressions int
cachedPostingsDecompressionErrors int
cachedPostingsDecompressionTimeSum time.Duration
seriesProcessed int
seriesProcessedSizeSum int
seriesOmitted int
seriesFetched int
seriesFetchedSizeSum int
seriesFetchDurationSum time.Duration
seriesRefetches int
seriesHashCacheRequests int
seriesHashCacheHits int
chunksTouched int
chunksTouchedSizeSum int
chunksFetched int
chunksFetchedSizeSum int
chunksRefetched int
chunksRefetchedSizeSum int
chunksProcessed int
chunksProcessedSizeSum int
chunksReturned int
chunksReturnedSizeSum int
mergedSeriesCount int
mergedChunksCount int
// The number of batches the Series() request has been split into.
streamingSeriesBatchCount int
// The total time spent loading batches.
streamingSeriesBatchLoadDuration time.Duration
// The total time spent waiting until the next batch is loaded, once the store-gateway was
// ready to send it to the client.
streamingSeriesWaitBatchLoadedDuration time.Duration
// The Series() request timing breakdown.
streamingSeriesExpandPostingsDuration time.Duration
streamingSeriesEncodeResponseDuration time.Duration
streamingSeriesSendResponseDuration time.Duration
streamingSeriesIndexHeaderLoadDuration time.Duration
streamingSeriesConcurrencyLimitWaitDuration time.Duration
// streamingSeriesAmbientTime is the total wall clock time spent serving the request. It includes all other durations.
streamingSeriesAmbientTime time.Duration
}
func newQueryStats() *queryStats {
return &queryStats{
blocksQueriedByBlockMeta: make(map[blockQueriedMeta]int),
}
}
// blockQueriedMeta encapsulate a block's thanos source, compaction level, and if it
// was created from out-or-order samples
type blockQueriedMeta struct {
source block.SourceType
level int
outOfOrder bool
}
func newBlockQueriedMeta(meta *block.Meta) blockQueriedMeta {
return blockQueriedMeta{
source: meta.Thanos.Source,
level: meta.Compaction.Level,
outOfOrder: meta.Compaction.FromOutOfOrder(),
}
}
func (s queryStats) merge(o *queryStats) *queryStats {
s.blocksQueried += o.blocksQueried
for m, count := range o.blocksQueriedByBlockMeta {
s.blocksQueriedByBlockMeta[m] += count
}
s.postingsTouched += o.postingsTouched
s.postingsTouchedSizeSum += o.postingsTouchedSizeSum
s.postingsToFetch += o.postingsToFetch
s.postingsFetched += o.postingsFetched
s.postingsFetchedSizeSum += o.postingsFetchedSizeSum
s.postingsFetchCount += o.postingsFetchCount
s.postingsFetchDurationSum += o.postingsFetchDurationSum
s.cachedPostingsCompressions += o.cachedPostingsCompressions
s.cachedPostingsCompressionErrors += o.cachedPostingsCompressionErrors
s.cachedPostingsOriginalSizeSum += o.cachedPostingsOriginalSizeSum
s.cachedPostingsCompressedSizeSum += o.cachedPostingsCompressedSizeSum
s.cachedPostingsCompressionTimeSum += o.cachedPostingsCompressionTimeSum
s.cachedPostingsDecompressions += o.cachedPostingsDecompressions
s.cachedPostingsDecompressionErrors += o.cachedPostingsDecompressionErrors
s.cachedPostingsDecompressionTimeSum += o.cachedPostingsDecompressionTimeSum
s.seriesProcessed += o.seriesProcessed
s.seriesProcessedSizeSum += o.seriesProcessedSizeSum
s.seriesOmitted += o.seriesOmitted
s.seriesFetched += o.seriesFetched
s.seriesFetchedSizeSum += o.seriesFetchedSizeSum
s.seriesFetchDurationSum += o.seriesFetchDurationSum
s.seriesRefetches += o.seriesRefetches
s.seriesHashCacheRequests += o.seriesHashCacheRequests
s.seriesHashCacheHits += o.seriesHashCacheHits
s.chunksTouched += o.chunksTouched
s.chunksTouchedSizeSum += o.chunksTouchedSizeSum
s.chunksFetched += o.chunksFetched
s.chunksFetchedSizeSum += o.chunksFetchedSizeSum
s.chunksRefetched += o.chunksRefetched
s.chunksRefetchedSizeSum += o.chunksRefetchedSizeSum
s.chunksProcessed += o.chunksProcessed
s.chunksProcessedSizeSum += o.chunksProcessedSizeSum
s.chunksReturned += o.chunksReturned
s.chunksReturnedSizeSum += o.chunksReturnedSizeSum
s.mergedSeriesCount += o.mergedSeriesCount
s.mergedChunksCount += o.mergedChunksCount
s.streamingSeriesBatchCount += o.streamingSeriesBatchCount
s.streamingSeriesBatchLoadDuration += o.streamingSeriesBatchLoadDuration
s.streamingSeriesWaitBatchLoadedDuration += o.streamingSeriesWaitBatchLoadedDuration
s.streamingSeriesExpandPostingsDuration += o.streamingSeriesExpandPostingsDuration
s.streamingSeriesEncodeResponseDuration += o.streamingSeriesEncodeResponseDuration
s.streamingSeriesSendResponseDuration += o.streamingSeriesSendResponseDuration
s.streamingSeriesAmbientTime += o.streamingSeriesAmbientTime
s.streamingSeriesIndexHeaderLoadDuration += o.streamingSeriesIndexHeaderLoadDuration
s.streamingSeriesConcurrencyLimitWaitDuration += o.streamingSeriesConcurrencyLimitWaitDuration
return &s
}
// safeQueryStats wraps queryStats adding functions manipulate the statistics while holding a lock.
type safeQueryStats struct {
unsafeStatsMx sync.Mutex
unsafeStats *queryStats
}
func newSafeQueryStats() *safeQueryStats {
return &safeQueryStats{
unsafeStats: newQueryStats(),
}
}
// update the statistics while holding the lock.
func (s *safeQueryStats) update(fn func(stats *queryStats)) {
s.unsafeStatsMx.Lock()
defer s.unsafeStatsMx.Unlock()
fn(s.unsafeStats)
}
// merge the statistics while holding the lock. Statistics are merged in the receiver.
func (s *safeQueryStats) merge(o *queryStats) {
s.unsafeStatsMx.Lock()
defer s.unsafeStatsMx.Unlock()
s.unsafeStats = s.unsafeStats.merge(o)
}
// export returns a copy of the internal statistics.
func (s *safeQueryStats) export() *queryStats {
s.unsafeStatsMx.Lock()
defer s.unsafeStatsMx.Unlock()
copied := *s.unsafeStats
return &copied
}
// seriesAndChunksCount return the value of mergedSeriesCount and mergedChunksCount fields.
func (s *safeQueryStats) seriesAndChunksCount() (seriesCount, chunksCount int) {
s.unsafeStatsMx.Lock()
defer s.unsafeStatsMx.Unlock()
return s.unsafeStats.mergedSeriesCount, s.unsafeStats.mergedChunksCount
}