From bf63a87025a5f97ea76154a2d9c22c2f4ea07a0d Mon Sep 17 00:00:00 2001 From: sh0rez Date: Wed, 6 Mar 2024 18:46:38 +0100 Subject: [PATCH] [processor/deltatocumulative]: timer-based expiry (#31625) **Description:** Moves from complex preemptive expiry to a plain 1 minute timer **Link to tracking Issue:** https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/31615#issuecomment-1980758429 Resolves https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/31615 --- .chloggen/deltatocumulative-timer.yaml | 28 +++++ .../internal/clock/clock.go | 94 ---------------- .../internal/clock/clock_test.go | 32 ------ .../internal/streams/expiry.go | 60 ----------- .../internal/streams/expiry_test.go | 100 ------------------ .../deltatocumulativeprocessor/processor.go | 39 +++---- 6 files changed, 49 insertions(+), 304 deletions(-) create mode 100644 .chloggen/deltatocumulative-timer.yaml delete mode 100644 processor/deltatocumulativeprocessor/internal/clock/clock.go delete mode 100644 processor/deltatocumulativeprocessor/internal/clock/clock_test.go delete mode 100644 processor/deltatocumulativeprocessor/internal/streams/expiry.go delete mode 100644 processor/deltatocumulativeprocessor/internal/streams/expiry_test.go diff --git a/.chloggen/deltatocumulative-timer.yaml b/.chloggen/deltatocumulative-timer.yaml new file mode 100644 index 0000000000000..2d0809a3cc576 --- /dev/null +++ b/.chloggen/deltatocumulative-timer.yaml @@ -0,0 +1,28 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: "bug_fix" + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: "deltatocumulativeprocessor" + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: timer-based expiry + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31615] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + converts expiry to 1m timer, eliminating a race condition and failing test + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/deltatocumulativeprocessor/internal/clock/clock.go b/processor/deltatocumulativeprocessor/internal/clock/clock.go deleted file mode 100644 index 3c010136e97f6..0000000000000 --- a/processor/deltatocumulativeprocessor/internal/clock/clock.go +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package clock // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/clock" - -import ( - "sync" - "time" -) - -var clock Clock = Real{} - -func Change(c Clock) { - clock = c -} - -type Clock interface { - Now() time.Time - After(d time.Duration) <-chan time.Time -} - -func Now() time.Time { - return clock.Now() -} - -func After(d time.Duration) <-chan time.Time { - return clock.After(d) -} - -func Until(t time.Time) time.Duration { - return t.Sub(Now()) -} - -func Sleep(d time.Duration) { - <-After(d) -} - -type Real struct{} - -func (r Real) Now() time.Time { - return time.Now() -} - -func (r Real) After(d time.Duration) <-chan time.Time { - return time.After(d) -} - -type Settable interface { - Clock - Set(now time.Time) -} - -func Fake() Settable { - clock := &fake{} - clock.Set(time.Time{}) - return clock -} - -type fake struct { - mtx sync.RWMutex - ts time.Time -} - -func (f *fake) Set(now time.Time) { - f.mtx.Lock() - f.ts = now - f.mtx.Unlock() -} - -func (f *fake) Now() time.Time { - f.mtx.RLock() - defer f.mtx.RUnlock() - return f.ts -} - -func (f *fake) After(d time.Duration) <-chan time.Time { - var ( - end = f.Now().Add(d) - done = make(chan time.Time) - wait = make(chan struct{}) - ) - - go func() { - close(wait) - for f.Now().Before(end) { - time.Sleep(time.Millisecond / 10) - } - done <- f.Now() - close(done) - }() - <-wait - - return done -} diff --git a/processor/deltatocumulativeprocessor/internal/clock/clock_test.go b/processor/deltatocumulativeprocessor/internal/clock/clock_test.go deleted file mode 100644 index d1671a683ff30..0000000000000 --- a/processor/deltatocumulativeprocessor/internal/clock/clock_test.go +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package clock - -import ( - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestFakeAfter(t *testing.T) { - fake := Fake() - - ch := fake.After(10 * time.Second) - now := fake.Now() - - fake.Set(now.Add(10 * time.Second)) - done := <-ch - - require.Equal(t, 10*time.Second, done.Sub(now)) -} - -func TestFakeSet(t *testing.T) { - fake := Fake() - require.Equal(t, time.Time{}, fake.Now()) - - ts := time.Time{}.Add(10 * time.Minute) - fake.Set(ts) - require.Equal(t, ts, fake.Now()) -} diff --git a/processor/deltatocumulativeprocessor/internal/streams/expiry.go b/processor/deltatocumulativeprocessor/internal/streams/expiry.go deleted file mode 100644 index 37d2c97ede286..0000000000000 --- a/processor/deltatocumulativeprocessor/internal/streams/expiry.go +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package streams // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" - -import ( - "time" - - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/clock" -) - -func ExpireAfter[T any](items streams.Map[T], ttl time.Duration) Expiry[T] { - exp := Expiry[T]{ - Staleness: *staleness.NewStaleness[T](ttl, items), - sig: make(chan struct{}), - } - return exp -} - -var _ streams.Map[any] = (*Expiry[any])(nil) - -type Expiry[T any] struct { - staleness.Staleness[T] - sig chan struct{} -} - -func (e Expiry[T]) ExpireOldEntries() <-chan struct{} { - e.Staleness.ExpireOldEntries() - - n := e.Staleness.Len() - sig := make(chan struct{}) - - go func() { - switch { - case n == 0: - <-e.sig - case n > 0: - expires := e.Staleness.Next().Add(e.Max) - until := clock.Until(expires) - clock.Sleep(until) - } - close(sig) - }() - return sig -} - -func (e Expiry[T]) Store(id identity.Stream, v T) error { - err := e.Staleness.Store(id, v) - - // "try-send" to notify possibly sleeping expiry routine - select { - case e.sig <- struct{}{}: - default: - } - - return err -} diff --git a/processor/deltatocumulativeprocessor/internal/streams/expiry_test.go b/processor/deltatocumulativeprocessor/internal/streams/expiry_test.go deleted file mode 100644 index c415f6baf2df9..0000000000000 --- a/processor/deltatocumulativeprocessor/internal/streams/expiry_test.go +++ /dev/null @@ -1,100 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package streams_test - -import ( - "math/rand" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/require" - - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" - exp "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/clock" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testdata/random" -) - -func TestExpiry(t *testing.T) { - fake := clock.Fake() - clock.Change(fake) - staleness.NowFunc = func() time.Time { return fake.Now() } - - tm := TimeMap[data.Number]{ - Map: make(exp.HashMap[data.Number]), - add: make(map[identity.Stream]time.Time), - del: make(map[identity.Stream]time.Time), - } - const maxStale = time.Minute - exp := streams.ExpireAfter(tm, maxStale) - - var mtx sync.Mutex - go func() { - for { - mtx.Lock() - next := exp.ExpireOldEntries() - mtx.Unlock() - <-next - } - }() - - sum := random.Sum() - mtx.Lock() - now := fake.Now() - for i := 0; i < 10; i++ { - r := rand.Intn(10) - now = now.Add(time.Duration(r) * time.Second) - fake.Set(now) - - id, dp := sum.Stream() - err := exp.Store(id, dp) - require.NoError(t, err) - } - mtx.Unlock() - - go func() { - for { - now = now.Add(time.Second) - fake.Set(now) - time.Sleep(2 * time.Millisecond) - } - }() - - for { - mtx.Lock() - n := tm.Len() - mtx.Unlock() - if n == 0 { - break - } - time.Sleep(10 * time.Millisecond) - } - - for id := range tm.add { - add := tm.add[id] - del := tm.del[id] - require.Equal(t, maxStale, del.Sub(add)) - } -} - -type TimeMap[T any] struct { - streams.Map[T] - - add map[streams.Ident]time.Time - del map[streams.Ident]time.Time -} - -func (t TimeMap[T]) Store(id streams.Ident, v T) error { - t.add[id] = clock.Now() - return t.Map.Store(id, v) -} - -func (t TimeMap[T]) Delete(id streams.Ident) { - t.del[id] = clock.Now() - t.Map.Delete(id) -} diff --git a/processor/deltatocumulativeprocessor/processor.go b/processor/deltatocumulativeprocessor/processor.go index 8f5941ce84b1f..909e0c7fbf140 100644 --- a/processor/deltatocumulativeprocessor/processor.go +++ b/processor/deltatocumulativeprocessor/processor.go @@ -7,6 +7,7 @@ import ( "context" "errors" "sync" + "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" @@ -14,6 +15,7 @@ import ( "go.opentelemetry.io/collector/processor" "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" @@ -29,8 +31,8 @@ type Processor struct { ctx context.Context cancel context.CancelFunc - aggr streams.Aggregator[data.Number] - exp *streams.Expiry[data.Number] + aggr streams.Aggregator[data.Number] + stale *staleness.Staleness[data.Number] mtx sync.Mutex } @@ -49,9 +51,9 @@ func newProcessor(cfg *Config, log *zap.Logger, next consumer.Metrics) *Processo dps = delta.New[data.Number]() if cfg.MaxStale > 0 { - exp := streams.ExpireAfter(dps, cfg.MaxStale) - proc.exp = &exp - dps = &exp + stale := staleness.NewStaleness(cfg.MaxStale, dps) + proc.stale = stale + dps = stale } proc.aggr = streams.IntoAggregator(dps) @@ -59,22 +61,23 @@ func newProcessor(cfg *Config, log *zap.Logger, next consumer.Metrics) *Processo } func (p *Processor) Start(_ context.Context, _ component.Host) error { - if p.exp != nil { - go func() { - for { + if p.stale == nil { + return nil + } + + go func() { + tick := time.NewTicker(time.Minute) + for { + select { + case <-p.ctx.Done(): + return + case <-tick.C: p.mtx.Lock() - next := p.exp.ExpireOldEntries() + p.stale.ExpireOldEntries() p.mtx.Unlock() - - select { - case <-next: - case <-p.ctx.Done(): - return - } } - }() - } - + } + }() return nil }