-
Notifications
You must be signed in to change notification settings - Fork 2.8k
/
promise.go
143 lines (124 loc) · 3.46 KB
/
promise.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium
package promise
import (
"context"
"sync"
"github.com/cilium/cilium/pkg/lock"
)
// A promise for a future value.
type Promise[T any] interface {
// Await blocks until the value is resolved or rejected.
Await(context.Context) (T, error)
}
// Resolver can resolve or reject a promise.
// These methods are separate from 'Promise' to make it clear where the promise is resolved
// from.
type Resolver[T any] interface {
// Resolve a promise. Unblocks all Await()s. Future calls of Await()
// return the resolved value immediately.
//
// Only the first call to resolve (or reject) has an effect and
// further calls are ignored.
Resolve(T)
// Reject a promise with an error.
Reject(error)
}
// New creates a new promise for value T.
// Returns a resolver and the promise.
func New[T any]() (Resolver[T], Promise[T]) {
promise := &promise[T]{}
promise.cond = sync.NewCond(promise)
return promise, promise
}
const (
promiseUnresolved = iota
promiseResolved
promiseRejected
)
type promise[T any] struct {
lock.Mutex
cond *sync.Cond
state int
value T
err error
}
// Resolve informs all other codepaths who are Await()ing on the received
// promise that T is now successfully initialized and available for usage.
//
// Initialization logic for T should either call Resolve() or Reject(), and
// must not call these functions more than once.
func (p *promise[T]) Resolve(value T) {
p.Lock()
defer p.Unlock()
if p.state != promiseUnresolved {
return
}
p.state = promiseResolved
p.value = value
p.cond.Broadcast()
}
// Reject informs all other codepaths who are Await()ing on the received
// promise that T could not be initialized and cannot be used to due the
// specified error reason.
//
// Initialization logic for T should either call Resolve() or Reject(), and
// must not call these functions more than once.
func (p *promise[T]) Reject(err error) {
p.Lock()
defer p.Unlock()
if p.state != promiseUnresolved {
return
}
p.state = promiseRejected
p.err = err
p.cond.Broadcast()
}
// Await blocks until the promise has been resolved, rejected or context cancelled.
func (p *promise[T]) Await(ctx context.Context) (value T, err error) {
// Fork off a goroutine to wait for cancellation and wake up.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
<-ctx.Done()
p.cond.Broadcast()
}()
p.Lock()
defer p.Unlock()
// Wait until the promise is resolved or context cancelled.
for p.state == promiseUnresolved && (ctx == nil || ctx.Err() == nil) {
p.cond.Wait()
}
if ctx.Err() != nil {
err = ctx.Err()
} else if p.state == promiseResolved {
value = p.value
} else {
err = p.err
}
return
}
type wrappedPromise[T any] func(context.Context) (T, error)
func (await wrappedPromise[T]) Await(ctx context.Context) (T, error) {
return await(ctx)
}
// Map transforms the value of a promise with the provided function.
func Map[A, B any](p Promise[A], transform func(A) B) Promise[B] {
return wrappedPromise[B](func(ctx context.Context) (out B, err error) {
v, err := p.Await(ctx)
if err != nil {
return out, err
}
return transform(v), nil
})
}
// MapError transforms the error of a rejected promise with the provided function.
func MapError[A any](p Promise[A], transform func(error) error) Promise[A] {
return wrappedPromise[A](func(ctx context.Context) (out A, err error) {
v, err := p.Await(ctx)
if err != nil {
err = transform(err)
}
return v, err
})
}