/
funcs.go
142 lines (136 loc) · 2.87 KB
/
funcs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package futures
import (
"context"
"sync"
"time"
"github.com/bradfitz/iter"
"github.com/anacrolix/missinggo/slices"
)
// Sends each future as it completes on the returned chan, closing it when
// everything has been sent.
func AsCompleted(fs ...*F) <-chan *F {
ret := make(chan *F, len(fs))
var wg sync.WaitGroup
for _, f := range fs {
wg.Add(1)
go func(f *F) {
defer wg.Done()
<-f.Done()
ret <- f
}(f)
}
go func() {
wg.Wait()
close(ret)
}()
return ret
}
// Additional state maintained for each delayed element.
type delayedState struct {
timeout *F
added bool
}
// Returns futures as they complete. Delayed futures are not released until
// their timeout has passed, or all prior delayed futures, and the initial set
// have completed. One use case is to prefer the value in some futures over
// others, such as hitting several origin servers where some are better
// informed than others.
func AsCompletedDelayed(ctx context.Context, initial []*F, delayed []Delayed) <-chan *F {
ret := make(chan *F, func() int {
l := len(initial)
for _, d := range delayed {
l += len(d.Fs)
}
return l
}())
go func() {
defer close(ret)
var (
dss []delayedState
timeouts = map[*F]struct{}{} // Pending timeouts
)
for i := range delayed {
func(i int) {
f := Start(func() (interface{}, error) {
select {
case <-time.After(delayed[i].Delay):
return i, nil
case <-ctx.Done():
return nil, ctx.Err()
}
})
timeouts[f] = struct{}{}
dss = append(dss, delayedState{timeout: f})
}(i)
}
// Number of pending sends for a future.
results := map[*F]int{}
for _, f := range initial {
results[f]++
}
start:
// A slice of futures we want to send when they complete.
resultsSlice := func() (ret []*F) {
for f, left := range results {
for range iter.N(left) {
ret = append(ret, f)
}
}
return
}()
if len(resultsSlice) == 0 {
for i, ds := range dss {
if ds.added {
continue
}
// Add this delayed block prematurely.
delete(timeouts, ds.timeout)
for _, f := range delayed[i].Fs {
results[f]++
}
dss[i].added = true
// We need to recompute the results slice.
goto start
}
}
as := AsCompleted(append(
resultsSlice,
slices.FromMapKeys(timeouts).([]*F)...,
)...)
for {
select {
case <-ctx.Done():
return
case f, ok := <-as:
if !ok {
return
}
if _, ok := timeouts[f]; ok {
if ctx.Err() != nil {
break
}
i := f.MustResult().(int)
for _, f := range delayed[i].Fs {
results[f]++
}
delete(timeouts, f)
dss[i].added = true
goto start
}
select {
case ret <- f:
results[f]--
if results[f] == 0 {
delete(results, f)
}
if len(results) == 0 {
goto start
}
case <-ctx.Done():
return
}
}
}
}()
return ret
}