forked from hyperledger/fabric
-
Notifications
You must be signed in to change notification settings - Fork 0
/
producer.go
110 lines (92 loc) · 2.99 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package comm
import (
"fmt"
"math/rand"
"sync"
"time"
"github.com/hyperledger/fabric/common/flogging"
"google.golang.org/grpc"
)
var logger = flogging.MustGetLogger("ConnProducer")
// ConnectionFactory creates a connection to a certain endpoint
type ConnectionFactory func(endpoint string) (*grpc.ClientConn, error)
// ConnectionProducer produces connections out of a set of predefined
// endpoints
type ConnectionProducer interface {
// NewConnection creates a new connection.
// Returns the connection, the endpoint selected, nil on success.
// Returns nil, "", error on failure
NewConnection() (*grpc.ClientConn, string, error)
// UpdateEndpoints updates the endpoints of the ConnectionProducer
// to be the given endpoints
UpdateEndpoints(endpoints []string)
// GetEndpoints return ordering service endpoints
GetEndpoints() []string
}
type connProducer struct {
sync.RWMutex
endpoints []string
connect ConnectionFactory
nextEndpointIndex int
}
// NewConnectionProducer creates a new ConnectionProducer with given endpoints and connection factory.
// It returns nil, if the given endpoints slice is empty.
func NewConnectionProducer(factory ConnectionFactory, endpoints []string) ConnectionProducer {
if len(endpoints) == 0 {
return nil
}
return &connProducer{endpoints: shuffle(endpoints), connect: factory}
}
// NewConnection creates a new connection.
// Returns the connection, the endpoint selected, nil on success.
// Returns nil, "", error on failure
func (cp *connProducer) NewConnection() (*grpc.ClientConn, string, error) {
cp.Lock()
defer cp.Unlock()
logger.Debugf("Creating a new connection")
for i := 0; i < len(cp.endpoints); i++ {
currentEndpoint := cp.endpoints[cp.nextEndpointIndex]
conn, err := cp.connect(currentEndpoint)
cp.nextEndpointIndex = (cp.nextEndpointIndex + 1) % len(cp.endpoints)
if err != nil {
logger.Error("Failed connecting to", currentEndpoint, ", error:", err)
continue
}
logger.Debugf("Connected to %s", currentEndpoint)
return conn, currentEndpoint, nil
}
logger.Errorf("Could not connect to any of the endpoints: %v", cp.endpoints)
return nil, "", fmt.Errorf("could not connect to any of the endpoints: %v", cp.endpoints)
}
// UpdateEndpoints updates the endpoints of the ConnectionProducer
// to be the given endpoints
func (cp *connProducer) UpdateEndpoints(endpoints []string) {
if len(endpoints) == 0 {
// Ignore updates with empty endpoints
return
}
cp.Lock()
defer cp.Unlock()
cp.nextEndpointIndex = 0
cp.endpoints = endpoints
}
func shuffle(a []string) []string {
n := len(a)
returnedSlice := make([]string, n)
rand.Seed(time.Now().UnixNano())
indices := rand.Perm(n)
for i, idx := range indices {
returnedSlice[i] = a[idx]
}
return returnedSlice
}
// GetEndpoints returns configured endpoints for ordering service
func (cp *connProducer) GetEndpoints() []string {
cp.RLock()
defer cp.RUnlock()
return cp.endpoints
}