-
Notifications
You must be signed in to change notification settings - Fork 1
/
utils.go
128 lines (117 loc) · 2.5 KB
/
utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package backends
import (
"context"
"sync"
"time"
"github.com/distronode/receptor/pkg/logger"
"github.com/distronode/receptor/pkg/netceptor"
"github.com/distronode/receptor/pkg/utils"
)
const (
maxRedialDelay = 20 * time.Second
)
type dialerFunc func(chan struct{}) (netceptor.BackendSession, error)
// dialerSession is a convenience function for backends that use dial/retry logic.
func dialerSession(
ctx context.Context,
wg *sync.WaitGroup,
redial bool,
redialDelay time.Duration,
logger *logger.ReceptorLogger,
df dialerFunc,
) (chan netceptor.BackendSession, error) {
sessChan := make(chan netceptor.BackendSession)
wg.Add(1)
go func() {
defer func() {
wg.Done()
close(sessChan)
}()
redialDelayInc := utils.NewIncrementalDuration(redialDelay, maxRedialDelay, 1.5)
for {
closeChan := make(chan struct{})
sess, err := df(closeChan)
if err == nil {
redialDelayInc.Reset()
select {
case sessChan <- sess:
// continue
case <-ctx.Done():
return
}
select {
case <-closeChan:
// continue
case <-ctx.Done():
return
}
}
if redial && ctx.Err() == nil {
if err != nil {
logger.Warning("Backend connection failed (will retry): %s\n", err)
} else {
logger.Warning("Backend connection exited (will retry)\n")
}
select {
case <-redialDelayInc.NextTimeout():
continue
case <-ctx.Done():
return
}
} else {
if err != nil {
logger.Error("Backend connection failed: %s\n", err)
} else if ctx.Err() != nil {
logger.Error("Backend connection exited\n")
}
return
}
}
}()
return sessChan, nil
}
type (
listenFunc func() error
acceptFunc func() (netceptor.BackendSession, error)
listenerCancelFunc func()
)
// listenerSession is a convenience function for backends that use listen/accept logic.
func listenerSession(
ctx context.Context,
wg *sync.WaitGroup,
logger *logger.ReceptorLogger,
lf listenFunc,
af acceptFunc,
lcf listenerCancelFunc,
) (chan netceptor.BackendSession, error) {
if err := lf(); err != nil {
return nil, err
}
sessChan := make(chan netceptor.BackendSession)
wg.Add(1)
go func() {
defer func() {
wg.Done()
lcf()
close(sessChan)
}()
for {
c, err := af()
select {
case <-ctx.Done():
return
default:
}
if err != nil {
logger.Error("Error accepting connection: %s\n", err)
return
}
select {
case sessChan <- c:
case <-ctx.Done():
return
}
}
}()
return sessChan, nil
}