Skip to content

Commit

Permalink
Report correct reason of stream abort in orderer cluster
Browse files Browse the repository at this point in the history
This commit fixes a bug that makes the cluster communication infrastructure
always report an "aborted" reason after a stream terminates.

The reason for the bug is that the serviceStream() method was always
overriding the real reason the stream was terminated, with the same "aborted" reason.

Moving the stream termination reason to reside inside the sync.Once block along with
the rest of the termination logic solved this bug.

Change-Id: I7060a3c5630c6d28c73f025de8b85061077638d6
Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
(cherry picked from commit f0584c6)
  • Loading branch information
yacovm authored and sykesm committed Feb 19, 2021
1 parent 4db2091 commit b6822cb
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 15 deletions.
28 changes: 13 additions & 15 deletions orderer/common/cluster/comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,10 +557,9 @@ func (stream *Stream) sendMessage(request *orderer.StepRequest) {
func (stream *Stream) serviceStream() {
streamStartTime := time.Now()
defer func() {
stream.Logger.Debugf("Stream %d to (%s) terminating at total lifetime of %s",
stream.ID, stream.Endpoint, time.Since(streamStartTime))

stream.Cancel(errAborted)
stream.Logger.Debugf("Stream %d to (%s) terminated with total lifetime of %s",
stream.ID, stream.Endpoint, time.Since(streamStartTime))
}()

for {
Expand Down Expand Up @@ -666,21 +665,20 @@ func (rc *RemoteContext) NewStream(timeout time.Duration) (*Stream, error) {
var canceled uint32

abortChan := make(chan struct{})

abort := func() {
cancel()
rc.streamsByID.Delete(streamID)
rc.Metrics.reportEgressStreamCount(rc.Channel, atomic.LoadUint32(&rc.streamsByID.size))
rc.Logger.Debugf("Stream %d to %s(%s) is aborted", streamID, nodeName, rc.endpoint)
atomic.StoreUint32(&canceled, 1)
close(abortChan)
}
abortReason := &atomic.Value{}

once := &sync.Once{}
abortReason := &atomic.Value{}

cancelWithReason := func(err error) {
abortReason.Store(err.Error())
once.Do(abort)
once.Do(func() {
abortReason.Store(err.Error())
cancel()
rc.streamsByID.Delete(streamID)
rc.Metrics.reportEgressStreamCount(rc.Channel, atomic.LoadUint32(&rc.streamsByID.size))
rc.Logger.Debugf("Stream %d to %s(%s) is aborted", streamID, nodeName, rc.endpoint)
atomic.StoreUint32(&canceled, 1)
close(abortChan)
})
}

logger := flogging.MustGetLogger("orderer.common.cluster.step")
Expand Down
49 changes: 49 additions & 0 deletions orderer/common/cluster/comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,55 @@ func TestUnavailableHosts(t *testing.T) {
assert.Contains(t, err.Error(), "connection")
}

func TestStreamAbortReportCorrectError(t *testing.T) {
// Scenario: node 1 acquires a stream to node 2 and then the stream
// encounters an error and as a result, the stream is aborted.
// We ensure the error reported is the first error, even after
// multiple attempts of using it.

node1 := newTestNode(t)
defer node1.stop()

node2 := newTestNode(t)
defer node2.stop()

node1.c.Configure(testChannel, []cluster.RemoteNode{node2.nodeInfo})
node2.c.Configure(testChannel, []cluster.RemoteNode{node1.nodeInfo})

node2.handler.On("OnSubmit", testChannel, node1.nodeInfo.ID, mock.Anything).Return(errors.Errorf("whoops")).Once()

rm1, err := node1.c.Remote(testChannel, node2.nodeInfo.ID)
assert.NoError(t, err)
var streamTerminated sync.WaitGroup
streamTerminated.Add(1)

stream := assertEventualEstablishStream(t, rm1)

l, err := zap.NewDevelopment()
assert.NoError(t, err)
stream.Logger = flogging.NewFabricLogger(l, zap.Hooks(func(entry zapcore.Entry) error {
if strings.Contains(entry.Message, "Stream 1 to") && strings.Contains(entry.Message, "terminated") {
streamTerminated.Done()
}
return nil
}))

// Probe the stream for the first time
err = stream.Send(wrapSubmitReq(testReq))
assert.NoError(t, err)

// We should receive back the crafted error
_, err = stream.Recv()
assert.Contains(t, err.Error(), "whoops")

// Wait for the stream to be terminated from within the communication infrastructure
streamTerminated.Wait()

// We should still receive the original crafted error despite the stream being terminated
err = stream.Send(wrapSubmitReq(testReq))
assert.Contains(t, err.Error(), "whoops")
}

func TestStreamAbort(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit b6822cb

Please sign in to comment.