Skip to content

Commit

Permalink
[processor/deltatocumulative]: Sums (open-telemetry#30707)
Browse files Browse the repository at this point in the history
**Description:**
Implements major component functionality:

- [x] metrics identification (`metrics.Ident`) and stream identification
(`streams.Ident`)
- [x] abstract data layer `data.Point` that keeps processing code data
type (sum, histogram, exp histogram) agnostic
- [x] `delta.Accumulator` stream processor for accumulating any
`data.Point`

**Link to tracking Issue:**
open-telemetry#30705

**Testing:**  Done

**Documentation:** <kbd>TODO</kbd>
  • Loading branch information
sh0rez authored and djaglowski committed Feb 19, 2024
1 parent 5a6fa08 commit 64a6f88
Show file tree
Hide file tree
Showing 21 changed files with 952 additions and 3 deletions.
27 changes: 27 additions & 0 deletions .chloggen/deltatocumulative-sums.yaml
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: deltatocumulative

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: adds processor to convert sums (initially) from delta to cumulative temporality

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30705]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Expand Up @@ -152,7 +152,7 @@ pkg/winperfcounters/ @open-telemetry/collect

processor/attributesprocessor/ @open-telemetry/collector-contrib-approvers @boostchicken
processor/cumulativetodeltaprocessor/ @open-telemetry/collector-contrib-approvers @TylerHelmuth
processor/deltatocumulativeprocessor/ @open-telemetry/collector-contrib-approvers @sh0rez
processor/deltatocumulativeprocessor/ @open-telemetry/collector-contrib-approvers @sh0rez @RichieSams
processor/deltatorateprocessor/ @open-telemetry/collector-contrib-approvers @Aneurysm9
processor/filterprocessor/ @open-telemetry/collector-contrib-approvers @TylerHelmuth @boostchicken
processor/groupbyattrsprocessor/ @open-telemetry/collector-contrib-approvers @rnishtala-sumo
Expand Down
2 changes: 1 addition & 1 deletion processor/deltatocumulativeprocessor/README.md
Expand Up @@ -7,7 +7,7 @@
| Distributions | [] |
| Warnings | [Statefulness](#warnings) |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Fdeltatocumulative%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Fdeltatocumulative) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Fdeltatocumulative%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Fdeltatocumulative) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@sh0rez](https://www.github.com/sh0rez) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@sh0rez](https://www.github.com/sh0rez), [@RichieSams](https://www.github.com/RichieSams) |

[development]: https://github.com/open-telemetry/opentelemetry-collector#development
<!-- end autogenerated section -->
Expand Down
8 changes: 8 additions & 0 deletions processor/deltatocumulativeprocessor/go.mod
Expand Up @@ -3,6 +3,8 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/delta
go 1.21

require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.94.0
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/collector/component v0.94.1
go.opentelemetry.io/collector/consumer v0.94.1
go.opentelemetry.io/collector/pdata v1.1.0
Expand All @@ -13,6 +15,8 @@ require (
)

require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
Expand All @@ -25,6 +29,7 @@ require (
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.94.1 // indirect
go.opentelemetry.io/collector/confmap v0.94.1 // indirect
go.opentelemetry.io/otel v1.23.1 // indirect
Expand All @@ -35,4 +40,7 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917 // indirect
google.golang.org/grpc v1.61.0 // indirect
google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil
9 changes: 9 additions & 0 deletions processor/deltatocumulativeprocessor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 29 additions & 0 deletions processor/deltatocumulativeprocessor/internal/data/add.go
@@ -0,0 +1,29 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package data // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data"

import "go.opentelemetry.io/collector/pdata/pmetric"

func (dp Number) Add(in Number) Number {
switch in.ValueType() {
case pmetric.NumberDataPointValueTypeDouble:
v := dp.DoubleValue() + in.DoubleValue()
dp.SetDoubleValue(v)
case pmetric.NumberDataPointValueTypeInt:
v := dp.IntValue() + in.IntValue()
dp.SetIntValue(v)
}
dp.SetTimestamp(in.Timestamp())
return dp
}

// nolint
func (dp Histogram) Add(in Histogram) Histogram {
panic("todo")
}

// nolint
func (dp ExpHistogram) Add(in ExpHistogram) ExpHistogram {
panic("todo")
}
76 changes: 76 additions & 0 deletions processor/deltatocumulativeprocessor/internal/data/data.go
@@ -0,0 +1,76 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package data // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data"

import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

type Point[Self any] interface {
StartTimestamp() pcommon.Timestamp
Timestamp() pcommon.Timestamp
Attributes() pcommon.Map

Clone() Self
CopyTo(Self)

Add(Self) Self
}

type Number struct {
pmetric.NumberDataPoint
}

func (dp Number) Clone() Number {
clone := Number{NumberDataPoint: pmetric.NewNumberDataPoint()}
if dp.NumberDataPoint != (pmetric.NumberDataPoint{}) {
dp.CopyTo(clone)
}
return clone
}

func (dp Number) CopyTo(dst Number) {
dp.NumberDataPoint.CopyTo(dst.NumberDataPoint)
}

type Histogram struct {
pmetric.HistogramDataPoint
}

func (dp Histogram) Clone() Histogram {
clone := Histogram{HistogramDataPoint: pmetric.NewHistogramDataPoint()}
if dp.HistogramDataPoint != (pmetric.HistogramDataPoint{}) {
dp.CopyTo(clone)
}
return clone
}

func (dp Histogram) CopyTo(dst Histogram) {
dp.HistogramDataPoint.CopyTo(dst.HistogramDataPoint)
}

type ExpHistogram struct {
pmetric.ExponentialHistogramDataPoint
}

func (dp ExpHistogram) Clone() ExpHistogram {
clone := ExpHistogram{ExponentialHistogramDataPoint: pmetric.NewExponentialHistogramDataPoint()}
if dp.ExponentialHistogramDataPoint != (pmetric.ExponentialHistogramDataPoint{}) {
dp.CopyTo(clone)
}
return clone
}

func (dp ExpHistogram) CopyTo(dst ExpHistogram) {
dp.ExponentialHistogramDataPoint.CopyTo(dst.ExponentialHistogramDataPoint)
}

type mustPoint[D Point[D]] struct{ _ D }

var (
_ = mustPoint[Number]{}
_ = mustPoint[Histogram]{}
_ = mustPoint[ExpHistogram]{}
)
83 changes: 83 additions & 0 deletions processor/deltatocumulativeprocessor/internal/delta/delta.go
@@ -0,0 +1,83 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package delta // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta"

import (
"fmt"

"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams"
)

func construct[D data.Point[D]]() streams.Aggregator[D] {
acc := &Accumulator[D]{dps: make(map[streams.Ident]D)}
return &Lock[D]{next: acc}
}

func Numbers() streams.Aggregator[data.Number] {
return construct[data.Number]()
}

func Histograms() streams.Aggregator[data.Histogram] {
return construct[data.Histogram]()
}

var _ streams.Aggregator[data.Number] = (*Accumulator[data.Number])(nil)

type Accumulator[D data.Point[D]] struct {
dps map[streams.Ident]D
}

// Aggregate implements delta-to-cumulative aggregation as per spec:
// https://opentelemetry.io/docs/specs/otel/metrics/data-model/#sums-delta-to-cumulative
func (a *Accumulator[D]) Aggregate(id streams.Ident, dp D) (D, error) {
// make the accumulator to start with the current sample, discarding any
// earlier data. return after use
reset := func() (D, error) {
a.dps[id] = dp.Clone()
return a.dps[id], nil
}

aggr, ok := a.dps[id]

// new series: reset
if !ok {
return reset()
}
// belongs to older series: drop
if dp.StartTimestamp() < aggr.StartTimestamp() {
return aggr, ErrOlderStart{Start: aggr.StartTimestamp(), Sample: dp.StartTimestamp()}
}
// belongs to later series: reset
if dp.StartTimestamp() > aggr.StartTimestamp() {
return reset()
}
// out of order: drop
if dp.Timestamp() <= aggr.Timestamp() {
return aggr, ErrOutOfOrder{Last: aggr.Timestamp(), Sample: dp.Timestamp()}
}

a.dps[id] = aggr.Add(dp)
return a.dps[id], nil
}

type ErrOlderStart struct {
Start pcommon.Timestamp
Sample pcommon.Timestamp
}

func (e ErrOlderStart) Error() string {
return fmt.Sprintf("dropped sample with start_time=%s, because series only starts at start_time=%s. consider checking for multiple processes sending the exact same series", e.Sample, e.Start)
}

type ErrOutOfOrder struct {
Last pcommon.Timestamp
Sample pcommon.Timestamp
}

func (e ErrOutOfOrder) Error() string {
return fmt.Sprintf("out of order: dropped sample from time=%s, because series is already at time=%s", e.Sample, e.Last)
}

0 comments on commit 64a6f88

Please sign in to comment.