-
Notifications
You must be signed in to change notification settings - Fork 25
/
differentiator.go
144 lines (120 loc) · 4.4 KB
/
differentiator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package components
import (
"fmt"
"math"
"time"
"go.uber.org/fx"
policylangv1 "github.com/fluxninja/aperture/v2/api/gen/proto/go/aperture/policy/language/v1"
"github.com/fluxninja/aperture/v2/pkg/config"
"github.com/fluxninja/aperture/v2/pkg/notifiers"
"github.com/fluxninja/aperture/v2/pkg/policies/controlplane/iface"
"github.com/fluxninja/aperture/v2/pkg/policies/controlplane/runtime"
"github.com/fluxninja/aperture/v2/pkg/utils"
)
// Differentiator is a component that calculates rate of change per tick.
type Differentiator struct {
// readings are saved in ring buffer
readings []runtime.Reading
window time.Duration
oldestIdx int
newestIdx int
// capacity is calculated from window duration divided by tick interval
capacity int
initialized bool
}
// Name implements runtime.Component.
func (*Differentiator) Name() string { return "Differentiator" }
// Type implements runtime.Component.
func (*Differentiator) Type() runtime.ComponentType { return runtime.ComponentTypeSignalProcessor }
// ShortDescription implements runtime.Component.
func (d *Differentiator) ShortDescription() string { return fmt.Sprintf("win: %s", d.window) }
// IsActuator implements runtime.Component.
func (*Differentiator) IsActuator() bool { return false }
// NewDifferentiator creates a differentiator component.
func NewDifferentiator(diffProto *policylangv1.Differentiator) runtime.Component {
diff := &Differentiator{
window: diffProto.Window.AsDuration(),
oldestIdx: 0,
newestIdx: 0,
capacity: 0,
initialized: false,
}
return diff
}
// NewDifferentiatorAndOptions creates a differentiator component and its fx options.
func NewDifferentiatorAndOptions(diffProto *policylangv1.Differentiator, _ runtime.ComponentID, _ iface.Policy) (runtime.Component, fx.Option, error) {
return NewDifferentiator(diffProto), fx.Options(), nil
}
// Execute implements runtime.Component.Execute.
func (d *Differentiator) Execute(inPortReadings runtime.PortToReading, tickInfo runtime.TickInfo) (runtime.PortToReading, error) {
if !d.initialized {
d.init(tickInfo)
}
inputVal := inPortReadings.ReadSingleReadingPort("input")
outputVal := runtime.InvalidReading()
// add input to readings array
d.readings[d.newestIdx] = inputVal
if d.oldestIdx != d.newestIdx {
oldest := d.readings[d.oldestIdx]
newest := d.readings[d.newestIdx]
oldestIdx := d.oldestIdx
newestIdx := d.newestIdx
if !oldest.Valid() {
oldest, oldestIdx = d.firstValid(d.oldestIdx, true)
}
// find the newest valid reading for extrapolation
if !newest.Valid() {
found, foundIdx := d.firstValid(d.newestIdx, false)
if found.Valid() && oldest.Valid() && foundIdx != oldestIdx {
extrapolatedValue := d.extrapolate(oldest, found, oldestIdx, foundIdx)
newest = runtime.NewReading(extrapolatedValue)
newestIdx = foundIdx
}
}
// calculate the derivative
if oldest.Valid() && newest.Valid() && newestIdx != oldestIdx {
diff := (newest.Value() - oldest.Value()) / float64(d.capacity)
outputVal = runtime.NewReading(diff)
}
}
// shift pointers for newest and oldest
d.newestIdx = utils.Mod((d.newestIdx + 1), d.capacity)
if d.newestIdx == d.oldestIdx {
d.oldestIdx = utils.Mod((d.oldestIdx + 1), d.capacity)
}
return runtime.PortToReading{
"output": []runtime.Reading{outputVal},
}, nil
}
// DynamicConfigUpdate is a no-op for Differentiator.
func (d *Differentiator) DynamicConfigUpdate(event notifiers.Event, unmarshaller config.Unmarshaller) {
}
func (d *Differentiator) extrapolate(firstVal, secondVal runtime.Reading, firstIdx, secondIdx int) float64 {
extrapolatedIdx := utils.Mod(secondIdx+1, d.capacity)
extValue := firstVal.Value() + float64(extrapolatedIdx-firstIdx)/
float64(secondIdx-firstIdx)*
(secondVal.Value()-firstVal.Value())
return extValue
}
func (d *Differentiator) init(tickInfo runtime.TickInfo) {
d.capacity = int(math.Ceil(float64(d.window) / float64(tickInfo.Interval())))
d.readings = make([]runtime.Reading, d.capacity)
for i := 0; i < d.capacity; i++ {
d.readings[i] = runtime.InvalidReading()
}
d.initialized = true
}
func (d *Differentiator) firstValid(fromIdx int, addition bool) (runtime.Reading, int) {
step := 1
if !addition {
step = -1
}
idx := utils.Mod((fromIdx + step), d.capacity)
for i := 0; i < d.capacity; i++ {
if d.readings[idx].Valid() {
return d.readings[idx], idx
}
idx = utils.Mod((idx + step), d.capacity)
}
return runtime.InvalidReading(), idx
}