Skip to content

Commit

Permalink
[FAB-18484] Return transaction forwarding result back to the client s…
Browse files Browse the repository at this point in the history
…ynchronously

This commit makes a Raft follower wait for the transaction forwarded to the leader to be sent into
the gRPC stream, and returns the result (success or failure) back to the client accordingly.

Before this commmit, the behavior was that it returns success after enqueueing it into the message queue,
which might have resulted in the transaction being dropped but a success being returned to the client.

Change-Id: I0cd45540be4988845663eb0c68f76fed2ff25b94
Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
  • Loading branch information
yacovm authored and C0rWin committed Jun 14, 2021
1 parent 7e61944 commit ccecf10
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 45 deletions.
52 changes: 33 additions & 19 deletions orderer/common/cluster/comm.go
Expand Up @@ -461,8 +461,11 @@ type RemoteContext struct {

// Stream is used to send/receive messages to/from the remote cluster member.
type Stream struct {
abortChan <-chan struct{}
sendBuff chan *orderer.StepRequest
abortChan <-chan struct{}
sendBuff chan struct {
request *orderer.StepRequest
report func(error)
}
commShutdown chan struct{}
abortReason *atomic.Value
metrics *Metrics
Expand All @@ -488,6 +491,11 @@ func (stream *Stream) Canceled() bool {

// Send sends the given request to the remote cluster member.
func (stream *Stream) Send(request *orderer.StepRequest) error {
return stream.SendWithReport(request, func(_ error) {})
}

// SendWithReport sends the given request to the remote cluster member and invokes report on the send result.
func (stream *Stream) SendWithReport(request *orderer.StepRequest, report func(error)) error {
if stream.Canceled() {
return errors.New(stream.abortReason.Load().(string))
}
Expand All @@ -498,12 +506,12 @@ func (stream *Stream) Send(request *orderer.StepRequest) error {
allowDrop = true
}

return stream.sendOrDrop(request, allowDrop)
return stream.sendOrDrop(request, allowDrop, report)
}

// sendOrDrop sends the given request to the remote cluster member, or drops it
// if it is a consensus request and the queue is full.
func (stream *Stream) sendOrDrop(request *orderer.StepRequest, allowDrop bool) error {
func (stream *Stream) sendOrDrop(request *orderer.StepRequest, allowDrop bool, report func(error)) error {
msgType := "transaction"
if allowDrop {
msgType = "consensus"
Expand All @@ -520,24 +528,25 @@ func (stream *Stream) sendOrDrop(request *orderer.StepRequest, allowDrop bool) e
select {
case <-stream.abortChan:
return errors.Errorf("stream %d aborted", stream.ID)
case stream.sendBuff <- request:
// Note - async send, errors are not returned back
case stream.sendBuff <- struct {
request *orderer.StepRequest
report func(error)
}{request: request, report: report}:
return nil
case <-stream.commShutdown:
return nil
}
}

// sendMessage sends the request down the stream
// Note - any errors are swallowed and not returned back - TODO Is this intentional? Shouldn't SubmitRequest errors get returned to client?
func (stream *Stream) sendMessage(request *orderer.StepRequest) {
func (stream *Stream) sendMessage(request *orderer.StepRequest, report func(error)) {
start := time.Now()
var err error
defer func() {
message := fmt.Sprintf("Send of %s to %s(%s) took %v",
requestAsString(request), stream.NodeName, stream.Endpoint, time.Since(start))
if err != nil {
stream.Logger.Errorf("%s but failed due to %s", message, err.Error())
stream.Logger.Warnf("%s but failed due to %s", message, err.Error())
} else {
stream.Logger.Debug(message)
}
Expand All @@ -551,7 +560,7 @@ func (stream *Stream) sendMessage(request *orderer.StepRequest) {
return nil, err
}

_, err = stream.operateWithTimeout(f)
_, err = stream.operateWithTimeout(f, report)
}

func (stream *Stream) serviceStream() {
Expand All @@ -564,8 +573,8 @@ func (stream *Stream) serviceStream() {

for {
select {
case msg := <-stream.sendBuff:
stream.sendMessage(msg)
case reqReport := <-stream.sendBuff:
stream.sendMessage(reqReport.request, reqReport.report)
case <-stream.abortChan:
return
case <-stream.commShutdown:
Expand All @@ -588,11 +597,11 @@ func (stream *Stream) Recv() (*orderer.StepResponse, error) {
return stream.Cluster_StepClient.Recv()
}

return stream.operateWithTimeout(f)
return stream.operateWithTimeout(f, func(_ error) {})
}

// operateWithTimeout performs the given operation on the stream, and blocks until the timeout expires.
func (stream *Stream) operateWithTimeout(invoke StreamOperation) (*orderer.StepResponse, error) {
func (stream *Stream) operateWithTimeout(invoke StreamOperation, report func(error)) (*orderer.StepResponse, error) {
timer := time.NewTimer(stream.Timeout)
defer timer.Stop()

Expand All @@ -615,11 +624,13 @@ func (stream *Stream) operateWithTimeout(invoke StreamOperation) (*orderer.StepR

select {
case r := <-responseChan:
report(r.err)
if r.err != nil {
stream.Cancel(r.err)
}
return r.res, r.err
case <-timer.C:
report(errTimeout)
stream.Logger.Warningf("Stream %d to %s(%s) was forcibly terminated because timeout (%v) expired",
stream.ID, stream.NodeName, stream.Endpoint, stream.Timeout)
stream.Cancel(errTimeout)
Expand Down Expand Up @@ -685,11 +696,14 @@ func (rc *RemoteContext) NewStream(timeout time.Duration) (*Stream, error) {
stepLogger := logger.WithOptions(zap.AddCallerSkip(1))

s := &Stream{
Channel: rc.Channel,
metrics: rc.Metrics,
abortReason: abortReason,
abortChan: abortChan,
sendBuff: make(chan *orderer.StepRequest, rc.SendBuffSize),
Channel: rc.Channel,
metrics: rc.Metrics,
abortReason: abortReason,
abortChan: abortChan,
sendBuff: make(chan struct {
request *orderer.StepRequest
report func(error)
}, rc.SendBuffSize),
commShutdown: rc.shutdownSignal,
NodeName: nodeName,
Logger: stepLogger,
Expand Down
41 changes: 35 additions & 6 deletions orderer/common/cluster/rpc.go
Expand Up @@ -8,6 +8,7 @@ package cluster

import (
"context"
"io"
"sync"
"time"

Expand Down Expand Up @@ -64,6 +65,14 @@ const (
SubmitOperation
)

func (ot OperationType) String() string {
if ot == SubmitOperation {
return "transaction"
}

return "consensus"
}

// SendConsensus passes the given ConsensusRequest message to the raft.Node instance.
func (s *RPC) SendConsensus(destination uint64, msg *orderer.ConsensusRequest) error {
if s.Logger.IsEnabledFor(zapcore.DebugLevel) {
Expand All @@ -86,14 +95,14 @@ func (s *RPC) SendConsensus(destination uint64, msg *orderer.ConsensusRequest) e

err = stream.Send(req)
if err != nil {
s.unMapStream(destination, ConsensusOperation)
s.unMapStream(destination, ConsensusOperation, stream.ID)
}

return err
}

// SendSubmit sends a SubmitRequest to the given destination node.
func (s *RPC) SendSubmit(destination uint64, request *orderer.SubmitRequest) error {
func (s *RPC) SendSubmit(destination uint64, request *orderer.SubmitRequest, report func(error)) error {
if s.Logger.IsEnabledFor(zapcore.DebugLevel) {
defer s.submitSent(time.Now(), destination, request)
}
Expand All @@ -109,12 +118,20 @@ func (s *RPC) SendSubmit(destination uint64, request *orderer.SubmitRequest) err
},
}

unmapOnFailure := func(err error) {
if err != nil && err.Error() == io.EOF.Error() {
s.Logger.Infof("Un-mapping transaction stream to %d because encountered a stale stream", destination)
s.unMapStream(destination, SubmitOperation, stream.ID)
}
report(err)
}

s.submitLock.Lock()
defer s.submitLock.Unlock()

err = stream.Send(req)
err = stream.SendWithReport(req, unmapOnFailure)
if err != nil {
s.unMapStream(destination, SubmitOperation)
s.unMapStream(destination, SubmitOperation, stream.ID)
}
return err
}
Expand All @@ -128,7 +145,7 @@ func (s *RPC) consensusSent(start time.Time, to uint64, msg *orderer.ConsensusRe
}

// getOrCreateStream obtains a Submit stream for the given destination node
func (s *RPC) getOrCreateStream(destination uint64, operationType OperationType) (orderer.Cluster_StepClient, error) {
func (s *RPC) getOrCreateStream(destination uint64, operationType OperationType) (*Stream, error) {
stream := s.getStream(destination, operationType)
if stream != nil {
return stream, nil
Expand Down Expand Up @@ -158,9 +175,21 @@ func (s *RPC) mapStream(destination uint64, stream *Stream, operationType Operat
s.cleanCanceledStreams(operationType)
}

func (s *RPC) unMapStream(destination uint64, operationType OperationType) {
func (s *RPC) unMapStream(destination uint64, operationType OperationType, streamIDToUnmap uint64) {
s.lock.Lock()
defer s.lock.Unlock()

stream, exists := s.StreamsByType[operationType][destination]
if !exists {
s.Logger.Debugf("No %s stream to %d found, nothing to unmap", operationType, destination)
return
}

if stream.ID != streamIDToUnmap {
s.Logger.Debugf("Stream for %s to %d has an ID of %d, not %d", operationType, destination, stream.ID, streamIDToUnmap)
return
}

delete(s.StreamsByType[operationType], destination)
}

Expand Down
76 changes: 71 additions & 5 deletions orderer/common/cluster/rpc_test.go
Expand Up @@ -26,6 +26,72 @@ import (
"google.golang.org/grpc"
)

func noopReport(_ error) {
}

func TestSendSubmitWithReport(t *testing.T) {
t.Parallel()
node1 := newTestNode(t)
node2 := newTestNode(t)

var receptionWaitGroup sync.WaitGroup
receptionWaitGroup.Add(1)
node2.handler.On("OnSubmit", testChannel, mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) {
receptionWaitGroup.Done()
})

defer node1.stop()
defer node2.stop()

config := []cluster.RemoteNode{node1.nodeInfo, node2.nodeInfo}
node1.c.Configure(testChannel, config)
node2.c.Configure(testChannel, config)

node1RPC := &cluster.RPC{
Logger: flogging.MustGetLogger("test"),
Timeout: time.Hour,
StreamsByType: cluster.NewStreamsByType(),
Channel: testChannel,
Comm: node1.c,
}

// Wait for connections to be established
time.Sleep(time.Second * 5)

err := node1RPC.SendSubmit(node2.nodeInfo.ID, &orderer.SubmitRequest{Channel: testChannel, Payload: &common.Envelope{Payload: []byte("1")}}, noopReport)
assert.NoError(t, err)
receptionWaitGroup.Wait() // Wait for message to be received

// Restart the node
node2.stop()
node2.resurrect()

var wg2 sync.WaitGroup
wg2.Add(1)

reportSubmitFailed := func(err error) {
assert.EqualError(t, err, io.EOF.Error())
defer wg2.Done()
}

err = node1RPC.SendSubmit(node2.nodeInfo.ID, &orderer.SubmitRequest{Channel: testChannel, Payload: &common.Envelope{Payload: []byte("2")}}, reportSubmitFailed)
assert.NoError(t, err)

wg2.Wait()

// Ensure stale stream is cleaned up and removed from the mapping
assert.Len(t, node1RPC.StreamsByType[cluster.SubmitOperation], 0)

// Wait for connection to be re-established
time.Sleep(time.Second * 5)

// Send again, this time it should be received
receptionWaitGroup.Add(1)
err = node1RPC.SendSubmit(node2.nodeInfo.ID, &orderer.SubmitRequest{Channel: testChannel, Payload: &common.Envelope{Payload: []byte("3")}}, noopReport)
assert.NoError(t, err)
receptionWaitGroup.Wait()
}

func TestRPCChangeDestination(t *testing.T) {
// We send a Submit() to 2 different nodes - 1 and 2.
// The first invocation of Submit() establishes a stream with node 1
Expand Down Expand Up @@ -82,8 +148,8 @@ func TestRPCChangeDestination(t *testing.T) {
streamToNode1.On("Recv").Return(nil, io.EOF)
streamToNode2.On("Recv").Return(nil, io.EOF)

rpc.SendSubmit(1, &orderer.SubmitRequest{Channel: "mychannel"})
rpc.SendSubmit(2, &orderer.SubmitRequest{Channel: "mychannel"})
rpc.SendSubmit(1, &orderer.SubmitRequest{Channel: "mychannel"}, noopReport)
rpc.SendSubmit(2, &orderer.SubmitRequest{Channel: "mychannel"}, noopReport)

sent.Wait()
streamToNode1.AssertNumberOfCalls(t, "Send", 1)
Expand Down Expand Up @@ -111,7 +177,7 @@ func TestSend(t *testing.T) {
}

submit := func(rpc *cluster.RPC) error {
err := rpc.SendSubmit(1, submitRequest)
err := rpc.SendSubmit(1, submitRequest, noopReport)
return err
}

Expand Down Expand Up @@ -291,7 +357,7 @@ func TestRPCGarbageCollection(t *testing.T) {

defineMocks(1)

rpc.SendSubmit(1, &orderer.SubmitRequest{Channel: "mychannel"})
rpc.SendSubmit(1, &orderer.SubmitRequest{Channel: "mychannel"}, noopReport)
// Wait for the message to arrive
sent.Wait()
// Ensure the stream is initialized in the mapping
Expand All @@ -311,7 +377,7 @@ func TestRPCGarbageCollection(t *testing.T) {
defineMocks(2)

// Send a message to a different node.
rpc.SendSubmit(2, &orderer.SubmitRequest{Channel: "mychannel"})
rpc.SendSubmit(2, &orderer.SubmitRequest{Channel: "mychannel"}, noopReport)
// The mapping should be now cleaned from the previous stream.
assert.Len(t, mapping[cluster.SubmitOperation], 1)
assert.Equal(t, uint64(2), mapping[cluster.SubmitOperation][2].ID)
Expand Down

0 comments on commit ccecf10

Please sign in to comment.