forked from cgorenflo/fabric
-
Notifications
You must be signed in to change notification settings - Fork 0
/
producer.go
140 lines (120 loc) · 3.76 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package comm
import (
"fmt"
"math/rand"
"sync"
"time"
"github.com/hyperledger/fabric/common/flogging"
"google.golang.org/grpc"
)
var logger = flogging.MustGetLogger("ConnProducer")
var EndpointDisableInterval = time.Second * 10
// ConnectionFactory creates a connection to a certain endpoint
type ConnectionFactory func(endpoint string) (*grpc.ClientConn, error)
// ConnectionProducer produces connections out of a set of predefined
// endpoints
type ConnectionProducer interface {
// NewConnection creates a new connection.
// Returns the connection, the endpoint selected, nil on success.
// Returns nil, "", error on failure
NewConnection() (*grpc.ClientConn, string, error)
// UpdateEndpoints updates the endpoints of the ConnectionProducer
// to be the given endpoints
UpdateEndpoints(endpoints []string)
// DisableEndpoint remove endpoint from endpoint for some time
DisableEndpoint(endpoint string)
// GetEndpoints return ordering service endpoints
GetEndpoints() []string
}
type connProducer struct {
sync.RWMutex
endpoints []string
disabledEndpoints map[string]time.Time
connect ConnectionFactory
}
// NewConnectionProducer creates a new ConnectionProducer with given endpoints and connection factory.
// It returns nil, if the given endpoints slice is empty.
func NewConnectionProducer(factory ConnectionFactory, endpoints []string) ConnectionProducer {
if len(endpoints) == 0 {
return nil
}
return &connProducer{endpoints: endpoints, connect: factory, disabledEndpoints: make(map[string]time.Time)}
}
// NewConnection creates a new connection.
// Returns the connection, the endpoint selected, nil on success.
// Returns nil, "", error on failure
func (cp *connProducer) NewConnection() (*grpc.ClientConn, string, error) {
cp.Lock()
defer cp.Unlock()
for endpoint, timeout := range cp.disabledEndpoints {
if time.Since(timeout) >= EndpointDisableInterval {
delete(cp.disabledEndpoints, endpoint)
}
}
endpoints := shuffle(cp.endpoints)
checkedEndpoints := make([]string, 0)
for _, endpoint := range endpoints {
if _, ok := cp.disabledEndpoints[endpoint]; !ok {
checkedEndpoints = append(checkedEndpoints, endpoint)
conn, err := cp.connect(endpoint)
if err != nil {
logger.Error("Failed connecting to", endpoint, ", error:", err)
continue
}
return conn, endpoint, nil
}
}
return nil, "", fmt.Errorf("Could not connect to any of the endpoints: %v", checkedEndpoints)
}
// UpdateEndpoints updates the endpoints of the ConnectionProducer
// to be the given endpoints
func (cp *connProducer) UpdateEndpoints(endpoints []string) {
if len(endpoints) == 0 {
// Ignore updates with empty endpoints
return
}
cp.Lock()
defer cp.Unlock()
newDisabled := make(map[string]time.Time)
for i := range endpoints {
if startTime, ok := cp.disabledEndpoints[endpoints[i]]; ok {
newDisabled[endpoints[i]] = startTime
}
}
cp.endpoints = endpoints
cp.disabledEndpoints = newDisabled
}
func (cp *connProducer) DisableEndpoint(endpoint string) {
cp.Lock()
defer cp.Unlock()
if len(cp.endpoints)-len(cp.disabledEndpoints) == 1 {
logger.Warning("Only 1 endpoint remained, will not black-list it")
return
}
for _, currEndpoint := range cp.endpoints {
if currEndpoint == endpoint {
cp.disabledEndpoints[endpoint] = time.Now()
break
}
}
}
func shuffle(a []string) []string {
n := len(a)
returnedSlice := make([]string, n)
rand.Seed(time.Now().UnixNano())
indices := rand.Perm(n)
for i, idx := range indices {
returnedSlice[i] = a[idx]
}
return returnedSlice
}
// GetEndpoints returns configured endpoints for ordering service
func (cp *connProducer) GetEndpoints() []string {
cp.RLock()
defer cp.RUnlock()
return cp.endpoints
}