-
Notifications
You must be signed in to change notification settings - Fork 0
/
dialer.go
96 lines (71 loc) · 1.95 KB
/
dialer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package mesh
import (
"context"
"fmt"
"net"
"time"
"github.com/dynamicgo/slf4go"
"google.golang.org/grpc"
)
// Dialer .
type Dialer interface {
Dial(ctx context.Context, serviceName string, options ...grpc.DialOption) (*grpc.ClientConn, error)
Network() Network
}
type dialerWithBalancer struct {
slf4go.Logger
balancer DialerBalancer
network Network
}
// DialerBalancer .
type DialerBalancer interface {
NextPeer(serviceName string) (*Peer, error)
}
type defaultBalancer struct {
peers []*Peer
index int
}
// DefaultBalancer .
func DefaultBalancer(peers []*Peer) DialerBalancer {
return &defaultBalancer{
peers: peers,
}
}
func (balancer *defaultBalancer) NextPeer(serviceName string) (*Peer, error) {
if len(balancer.peers) == 0 {
return nil, nil
}
peers := balancer.peers[balancer.index]
balancer.index++
if balancer.index >= len(balancer.peers) {
balancer.index = 0
}
return peers, nil
}
// NewDialer .
func NewDialer(network Network, balancer DialerBalancer) Dialer {
return &dialerWithBalancer{
Logger: slf4go.Get("random dialer"),
network: network,
balancer: balancer,
}
}
func (dialer *dialerWithBalancer) Dial(ctx context.Context, serviceName string, options ...grpc.DialOption) (*grpc.ClientConn, error) {
dialerOption := grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
dialer.DebugF("[%s] dialer[%s] try get next peer", dialer.network.ID(), serviceName)
peer, err := dialer.balancer.NextPeer(serviceName)
if err != nil {
return nil, err
}
if peer == nil {
return nil, fmt.Errorf("can't find valid peer for service %s", serviceName)
}
dialer.DebugF("[%s] dialer[%s] try get next peer -- success", dialer.network.ID(), serviceName)
return dialer.network.Dial(peer, serviceName, timeout)
})
options = append(options, dialerOption)
return grpc.DialContext(ctx, serviceName, options...)
}
func (dialer *dialerWithBalancer) Network() Network {
return dialer.network
}