Skip to content

Commit

Permalink
Merge pull request #783 from go-graphite/cedwards/fix-moving-funcs
Browse files Browse the repository at this point in the history
Fix various issues with moving* functions
  • Loading branch information
Civil committed Jul 12, 2023
2 parents 3713428 + bef90b1 commit c4a66e0
Show file tree
Hide file tree
Showing 3 changed files with 302 additions and 137 deletions.
110 changes: 69 additions & 41 deletions expr/functions/moving/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ func (f *moving) Do(ctx context.Context, e parser.Expr, from, until int64, value
var n int
var err error

var scaleByStep bool

var argstr string
var cons string

Expand All @@ -80,55 +78,82 @@ func (f *moving) Do(ctx context.Context, e parser.Expr, from, until int64, value
return nil, parser.ErrMissingArgument
}

adjustedStart := from
var refetch bool
var windowPoints int
var preview int64

switch e.Arg(1).Type() {
case parser.EtConst:
// In this case, zipper does not request additional retrospective points,
// and leading `n` values, that used to calculate window, become NaN
n, err = e.GetIntArg(1)
argstr = strconv.Itoa(n)

arg, err := helper.GetSeriesArg(ctx, e.Arg(0), from, until, values)
if err != nil {
return nil, err
}
if len(arg) == 0 {
return arg, nil
}

// Find the maximum step to use for determining the altered start time
var maxStep int64
for _, a := range arg {
if a.StepTime > maxStep {
maxStep = a.StepTime
}
}
preview = maxStep * int64(n)
adjustedStart -= maxStep * int64(n)
windowPoints = n
refetch = true
case parser.EtString:
var n32 int32
n32, err = e.GetIntervalArg(1, 1)
argstr = "'" + e.Arg(1).StringValue() + "'"
n = int(n32)
scaleByStep = true
preview = int64(math.Abs(float64(n32))) // Absolute is used in order to handle negative string intervals
adjustedStart -= preview
default:
err = parser.ErrBadType
}
if err != nil {
return nil, err
}

windowSize := n

start := from
if scaleByStep {
start -= int64(n)
var targetValues map[parser.MetricRequest][]*types.MetricData
if refetch {
targetValues, err = f.GetEvaluator().Fetch(ctx, []parser.Expr{e.Arg(0)}, adjustedStart, until, values)
if err != nil {
return nil, err
}
} else {
targetValues = values
}

arg, err := helper.GetSeriesArg(ctx, e.Arg(0), start, until, values)
adjustedArgs, err := helper.GetSeriesArg(ctx, e.Arg(0), adjustedStart, until, targetValues)
if err != nil {
return nil, err
}
if len(arg) == 0 {
return arg, nil

if len(adjustedArgs) == 0 {
return adjustedArgs, nil
}

if e.ArgsLen() >= 3 && e.Target() == "movingWindow" {
if e.ArgsLen() >= 2 && e.Target() == "movingWindow" {
cons, err = e.GetStringArgDefault(2, "average")
if err != nil {
return nil, err
}

if e.ArgsLen() == 4 {
xFilesFactor, err = e.GetFloatArgDefault(3, float64(arg[0].XFilesFactor))
xFilesFactor, err = e.GetFloatArgDefault(3, float64(adjustedArgs[0].XFilesFactor))

if err != nil {
return nil, err
}
}
} else if e.ArgsLen() == 3 {
xFilesFactor, err = e.GetFloatArgDefault(2, float64(arg[0].XFilesFactor))
xFilesFactor, err = e.GetFloatArgDefault(2, float64(adjustedArgs[0].XFilesFactor))

if err != nil {
return nil, err
Expand All @@ -144,43 +169,47 @@ func (f *moving) Do(ctx context.Context, e parser.Expr, from, until int64, value
cons = "min"
case "movingMax":
cons = "max"
case "movingMedian":
cons = "median"
}

if len(arg) == 0 {
return nil, nil
}

var offset int

if scaleByStep {
windowSize /= int(arg[0].StepTime)
offset = windowSize
}
result := make([]*types.MetricData, len(adjustedArgs))

result := make([]*types.MetricData, len(arg))

for n, a := range arg {
for j, a := range adjustedArgs {
r := a.CopyName(e.Target() + "(" + a.Name + "," + argstr + ")")
r.Tags[e.Target()] = argstr

if windowSize == 0 {
if e.Arg(1).Type() == parser.EtString {
windowPoints = int(preview / a.StepTime)
}

if windowPoints == 0 {
if *f.config.ReturnNaNsIfStepMismatch {
r.Values = make([]float64, len(a.Values))
for i := range a.Values {
r.Values[i] = math.NaN()
}
}
result[n] = r
r.StartTime += preview
r.StopTime += preview
result[j] = r
continue
}
r.Values = make([]float64, len(a.Values)-offset)
r.StartTime = (from + r.StepTime - 1) / r.StepTime * r.StepTime // align StartTime to closest >= StepTime

size := len(a.Values) - windowPoints
if size < 0 {
size = 0
}
r.Values = make([]float64, size)
r.StartTime = a.StartTime + preview
r.StopTime = r.StartTime + int64(len(r.Values))*r.StepTime

w := &types.Windowed{Data: make([]float64, windowSize)}
for i, v := range a.Values {
if ridx := i - offset; ridx >= 0 {
if helper.XFilesFactorValues(w.Data, xFilesFactor) {
w := &types.Windowed{Data: make([]float64, windowPoints)}
for i := 1; i < len(a.Values); i++ { // ignoring the first value in the series to avoid shifting of results one step in the future
w.Push(a.Values[i])

if ridx := i - windowPoints; ridx >= 0 {
if w.IsNonNull() && helper.XFilesFactorValues(w.Data, xFilesFactor) {
switch cons {
case "average":
r.Values[ridx] = w.Mean()
Expand Down Expand Up @@ -209,16 +238,15 @@ func (f *moving) Do(ctx context.Context, e parser.Expr, from, until int64, value
case "median":
r.Values[ridx] = w.Median()
}
if i < windowSize || math.IsNaN(r.Values[ridx]) {
if i < windowPoints || math.IsNaN(r.Values[ridx]) {
r.Values[ridx] = math.NaN()
}
} else {
r.Values[ridx] = math.NaN()
}
}
w.Push(v)
}
result[n] = r
result[j] = r
}
return result, nil
}
Expand Down

0 comments on commit c4a66e0

Please sign in to comment.