This repository has been archived by the owner on Jul 31, 2023. It is now read-only.
/
view.go
175 lines (150 loc) · 4.71 KB
/
view.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
// Copyright 2017, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package stats
import (
"bytes"
"fmt"
"reflect"
"sort"
"sync/atomic"
"time"
"go.opencensus.io/tag"
)
// View allows users to filter and aggregate the recorded events
// over a time window. Each view has to be registered to enable
// data retrieval. Use NewView to initiate new views.
// Unregister views once you don't want to collect any more events.
type View struct {
name string // name of View. Must be unique.
description string
// tagKeys to perform the aggregation on.
tagKeys []tag.Key
// Examples of measures are cpu:tickCount, diskio:time...
m Measure
subscribed uint32 // 1 if someone is subscribed and data need to be exported, use atomic to access
collector *collector
}
// NewView creates a new view with the given name and description.
// View names need to be unique globally in the entire system.
//
// Data collection will only filter measurements recorded by the given keys.
// Collected data will be processed by the given aggregation algorithm for
// the given time window.
//
// Views need to be subscribed toin order to retrieve collection data.
//
// Once the view is no longer required, the view can be unregistered.
func NewView(name, description string, keys []tag.Key, measure Measure, agg Aggregation, window Window) (*View, error) {
if err := checkViewName(name); err != nil {
return nil, err
}
var ks []tag.Key
if len(keys) > 0 {
ks = make([]tag.Key, len(keys))
copy(ks, keys)
sort.Slice(ks, func(i, j int) bool { return ks[i].Name() < ks[j].Name() })
}
return &View{
name: name,
description: description,
tagKeys: ks,
m: measure,
collector: &collector{make(map[string]aggregator), agg, window},
}, nil
}
// Name returns the name of the view.
func (v *View) Name() string {
return v.name
}
// Description returns the name of the view.
func (v *View) Description() string {
return v.description
}
func (v *View) subscribe() {
atomic.StoreUint32(&v.subscribed, 1)
}
func (v *View) unsubscribe() {
atomic.StoreUint32(&v.subscribed, 0)
}
// isSubscribed returns true if the view is exporting
// data by subscription.
func (v *View) isSubscribed() bool {
return atomic.LoadUint32(&v.subscribed) == 1
}
func (v *View) clearRows() {
v.collector.clearRows()
}
// TagKeys returns the list of tag keys associated with this view.
func (v *View) TagKeys() []tag.Key {
return v.tagKeys
}
// Window returns the timing window being used to collect
// metrics from this view.
func (v *View) Window() Window {
return v.collector.w
}
// Aggregation returns the data aggregation method used to aggregate
// the measurements collected by this view.
func (v *View) Aggregation() Aggregation {
return v.collector.a
}
// Measure returns the measure the view is collecting measurements for.
func (v *View) Measure() Measure {
return v.m
}
func (v *View) collectedRows(now time.Time) []*Row {
return v.collector.collectedRows(v.tagKeys, now)
}
func (v *View) addSample(m *tag.Map, val interface{}, now time.Time) {
if !v.isSubscribed() {
return
}
sig := string(encodeWithKeys(m, v.tagKeys))
v.collector.addSample(sig, val, now)
}
// A ViewData is a set of rows about usage of the single measure associated
// with the given view during a particular window. Each row is specific to a
// unique set of tags.
type ViewData struct {
View *View
Start, End time.Time
Rows []*Row
}
// Row is the collected value for a specific set of key value pairs a.k.a tags.
type Row struct {
Tags []tag.Tag
Data AggregationData
}
func (r *Row) String() string {
var buffer bytes.Buffer
buffer.WriteString("{ ")
buffer.WriteString("{ ")
for _, t := range r.Tags {
buffer.WriteString(fmt.Sprintf("{%v %v}", t.Key.Name(), t.Value))
}
buffer.WriteString(" }")
buffer.WriteString(fmt.Sprintf("%v", r.Data))
buffer.WriteString(" }")
return buffer.String()
}
// Equal returns true if both Rows are equal. Tags are expected to be ordered
// by the key name. Even both rows have the same tags but the tags appear in
// different orders it will return false.
func (r *Row) Equal(other *Row) bool {
if r == other {
return true
}
return reflect.DeepEqual(r.Tags, other.Tags) && r.Data.equal(other.Data)
}