-
Notifications
You must be signed in to change notification settings - Fork 7
/
coalesce.go
54 lines (49 loc) · 1.96 KB
/
coalesce.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package ctok
import (
"context"
"time"
)
// coalesce will wait until the given function blocks for at least `quiet`
// time or is returning for at least `max` time. This enables easy and safe
// data coalescing.
//
// `quiet` is the time waited between `f` returns, and is reset on each return.
// If quiet is reached, then Coalesce returns and `f` is not called anymore.
// `max` is the maximum time to wait for `f` to return. This does not reset.
//
// The callback f must be well-behaved with the context passed in. The context
// will be cancelled prior to Coalesce returning, and Coalesce will block until
// the function call returns. This ensures that there are no data races once
// Coalesce returns.
//
// If the given ctx is cancelled, this function also cancels. It follows the
// same behavior as if the timeout were reached.
//
// Real world example: imagine you have a function processing input data,
// and you'd like to accumulate as much input data as possible to batch process
// it. The logic you'd say is: keep receiving data until I don't receive any
// within Q time or at most M time passes. Q is usually much shorter than M.
// This means if the data input is "quiet" enough, continue, otherwise wait
// until some maximum amount of time and still continue. This is what this
// function does generally.
func coalesce(ctx context.Context, quiet, max time.Duration, f func(context.Context)) {
// Setup a max duration timeout
ctx, maxCloser := context.WithTimeout(ctx, max)
defer maxCloser()
for {
err := doCoalesce(ctx, quiet, f)
if err != nil {
return
}
}
}
func doCoalesce(ctx context.Context, quiet time.Duration, f func(context.Context)) error {
// Create a context with our quiet period
curCtx, curCancel := context.WithTimeout(ctx, quiet)
defer curCancel()
// Call the function
f(curCtx)
// If the context ended, then we're also done. If the context didn't
// end, then the function processed successfully and we continue.
return curCtx.Err()
}