/
stream.go
80 lines (71 loc) · 2.84 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
// Package stream implements streamers that publish AWS events periodically.
// A streamer fetches AWS events periodically and notifies subscribed channels of them.
package stream
import (
"context"
"time"
)
const (
streamerFetchIntervalDurationMs = 4000 // How long to wait in milliseconds until Fetch is called again for a Streamer.
streamerMaxFetchIntervalDurationMs = 32000 // The maximum duration that a client should wait until Fetch is called again.
streamerMinFetchIntervalDurationMs = 1000 // The minimum duration that a client should wait until Fetch is called again.
)
// Streamer is the interface that groups methods to periodically retrieve events,
// publish them to subscribers, and stop publishing once there are no more events left.
type Streamer interface {
// Fetch fetches events, updates the internal state of the Streamer with new events and returns the next time
// the Fetch call should be attempted. On failure, Fetch returns an error.
Fetch() (next time.Time, done bool, err error)
// Notify publishes all new event updates to subscribers.
Notify()
// Close notifies all subscribers that no more events will be sent.
Close()
}
// Stream streams event updates by calling Fetch followed with Notify until there are no more events left.
// If the context is canceled or Fetch errors, then Stream short-circuits and returns the error.
func Stream(ctx context.Context, streamer Streamer) error {
defer streamer.Close()
var fetchDelay time.Duration // By default there is no delay.
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(fetchDelay):
next, done, err := streamer.Fetch()
if err != nil {
return err
}
streamer.Notify()
if done {
return nil
}
fetchDelay = time.Until(next)
}
}
}
// nextFetchDate returns a time to wait using random jitter and exponential backoff.
func nextFetchDate(clock clock, rand func(int) int, retries int) time.Time {
// waitMs := rand.Intn( // Get a random integer between streamerMinFetchIntervalDurationMs and ...
// min( // the minimum of ...
// streamerMaxFetchIntervalDuration, // the max fetch interval and ...
// streamerFetchIntervalDuration*(1<<retries), // d*2^r, where r=retries and d= the normal
// )-streamerMinFetchIntervalDurationMs
// ) + streamerMinFetchIntervalDurationMs
// See https://www.educative.io/answers/how-to-generate-random-numbers-in-a-given-range-in-go
waitMs :=
rand(
min(
streamerMaxFetchIntervalDurationMs,
streamerFetchIntervalDurationMs*(1<<retries),
)-streamerMinFetchIntervalDurationMs,
) + streamerMinFetchIntervalDurationMs
return clock.now().Add(time.Duration(waitMs) * time.Millisecond)
}
func min(x, y int) int {
if x < y {
return x
}
return y
}