-
Notifications
You must be signed in to change notification settings - Fork 1
/
service.go
140 lines (118 loc) · 4.26 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
/*
Copyright IBM Corp. 2017 All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package cluster
import (
"context"
"io"
"time"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/protos/orderer"
"go.uber.org/zap"
"google.golang.org/grpc"
)
//go:generate mockery -dir . -name Dispatcher -case underscore -output ./mocks/
// Dispatcher dispatches requests
type Dispatcher interface {
DispatchSubmit(ctx context.Context, request *orderer.SubmitRequest) error
DispatchConsensus(ctx context.Context, request *orderer.ConsensusRequest) error
}
//go:generate mockery -dir . -name StepStream -case underscore -output ./mocks/
// StepStream defines the gRPC stream for sending
// transactions, and receiving corresponding responses
type StepStream interface {
Send(response *orderer.StepResponse) error
Recv() (*orderer.StepRequest, error)
grpc.ServerStream
}
// Service defines the raft Service
type Service struct {
StreamCountReporter *StreamCountReporter
Dispatcher Dispatcher
Logger *flogging.FabricLogger
StepLogger *flogging.FabricLogger
MinimumExpirationWarningInterval time.Duration
CertExpWarningThreshold time.Duration
}
// Step passes an implementation-specific message to another cluster member.
func (s *Service) Step(stream orderer.Cluster_StepServer) error {
s.StreamCountReporter.Increment()
defer s.StreamCountReporter.Decrement()
addr := util.ExtractRemoteAddress(stream.Context())
commonName := commonNameFromContext(stream.Context())
exp := s.initializeExpirationCheck(stream, addr, commonName)
s.Logger.Debugf("Connection from %s(%s)", commonName, addr)
defer s.Logger.Debugf("Closing connection from %s(%s)", commonName, addr)
for {
err := s.handleMessage(stream, addr, exp)
if err == io.EOF {
s.Logger.Debugf("%s(%s) disconnected", commonName, addr)
return nil
}
if err != nil {
return err
}
// Else, no error occurred, so we continue to the next iteration
}
}
func (s *Service) handleMessage(stream StepStream, addr string, exp *certificateExpirationCheck) error {
request, err := stream.Recv()
if err == io.EOF {
return err
}
if err != nil {
s.Logger.Warningf("Stream read from %s failed: %v", addr, err)
return err
}
exp.checkExpiration(time.Now(), extractChannel(request))
if s.StepLogger.IsEnabledFor(zap.DebugLevel) {
nodeName := commonNameFromContext(stream.Context())
s.StepLogger.Debugf("Received message from %s(%s): %v", nodeName, addr, requestAsString(request))
}
if submitReq := request.GetSubmitRequest(); submitReq != nil {
nodeName := commonNameFromContext(stream.Context())
s.Logger.Debugf("Received message from %s(%s): %v", nodeName, addr, requestAsString(request))
return s.handleSubmit(submitReq, stream, addr)
}
// Else, it's a consensus message.
return s.Dispatcher.DispatchConsensus(stream.Context(), request.GetConsensusRequest())
}
func (s *Service) handleSubmit(request *orderer.SubmitRequest, stream StepStream, addr string) error {
err := s.Dispatcher.DispatchSubmit(stream.Context(), request)
if err != nil {
s.Logger.Warningf("Handling of Submit() from %s failed: %v", addr, err)
return err
}
return err
}
func (s *Service) initializeExpirationCheck(stream orderer.Cluster_StepServer, endpoint, nodeName string) *certificateExpirationCheck {
return &certificateExpirationCheck{
minimumExpirationWarningInterval: s.MinimumExpirationWarningInterval,
expirationWarningThreshold: s.CertExpWarningThreshold,
expiresAt: expiresAt(stream),
endpoint: endpoint,
nodeName: nodeName,
alert: func(template string, args ...interface{}) {
s.Logger.Warningf(template, args...)
},
}
}
func expiresAt(stream orderer.Cluster_StepServer) time.Time {
cert := comm.ExtractCertificateFromContext(stream.Context())
if cert == nil {
return time.Time{}
}
return cert.NotAfter
}
func extractChannel(msg *orderer.StepRequest) string {
if consReq := msg.GetConsensusRequest(); consReq != nil {
return consReq.Channel
}
if submitReq := msg.GetSubmitRequest(); submitReq != nil {
return submitReq.Channel
}
return ""
}