Skip to content

Commit

Permalink
[FABG-962] Fix deadlock in event service (#72)
Browse files Browse the repository at this point in the history
Specify a buffer on various Go channels in order to ensure the dispatcher doesn't deadlock when responding on a channel that has no listener.

Signed-off-by: Bob Stasyszyn <Bob.Stasyszyn@securekey.com>
  • Loading branch information
bstasyszyn committed Apr 22, 2020
1 parent c3b3ab6 commit 5fe41b9
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 14 deletions.
2 changes: 1 addition & 1 deletion pkg/fab/events/client/client.go
Expand Up @@ -210,7 +210,7 @@ func (c *Client) connect() error {

logger.Debug("Submitting connection request...")

errch := make(chan error)
errch := make(chan error, 1)
err1 := c.Submit(dispatcher.NewConnectEvent(errch))
if err1 != nil {
return errors.Errorf("Submit failed %s", err1)
Expand Down
17 changes: 13 additions & 4 deletions pkg/fab/events/client/mocks/mockconnection.go
Expand Up @@ -9,6 +9,7 @@ package mocks
import (
"sync"
"sync/atomic"
"time"

"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
Expand Down Expand Up @@ -98,10 +99,11 @@ type MockConnection struct {

// Opts contains mock connection options
type Opts struct {
Ledger servicemocks.Ledger
Operations OperationMap
Factory ConnectionFactory
SourceURL string
Ledger servicemocks.Ledger
Operations OperationMap
Factory ConnectionFactory
SourceURL string
ResponseDelay time.Duration
}

// NewMockConnection returns a new MockConnection using the given options
Expand Down Expand Up @@ -319,3 +321,10 @@ func WithResults(funcResults ...*OperationResult) Opt {
}
}
}

// WithResponseDelay sets the amount of time to wait before returning a response
func WithResponseDelay(delay time.Duration) Opt {
return func(opts *Opts) {
opts.ResponseDelay = delay
}
}
2 changes: 1 addition & 1 deletion pkg/fab/events/deliverclient/deliverclient.go
Expand Up @@ -103,7 +103,7 @@ func (c *Client) seek() error {
return err
}

errch := make(chan error)
errch := make(chan error, 1)
err1 := c.Submit(dispatcher.NewSeekEvent(seekInfo, errch))
if err1 != nil {
return err1
Expand Down
44 changes: 42 additions & 2 deletions pkg/fab/events/deliverclient/deliverclient_test.go
Expand Up @@ -12,6 +12,8 @@ import (

"github.com/stretchr/testify/require"

cb "github.com/hyperledger/fabric-protos-go/common"
pb "github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric-sdk-go/pkg/common/options"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/api"
Expand All @@ -26,8 +28,6 @@ import (
fabclientmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks"
fabmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks"
mspmocks "github.com/hyperledger/fabric-sdk-go/pkg/msp/test/mockmsp"
cb "github.com/hyperledger/fabric-protos-go/common"
pb "github.com/hyperledger/fabric-protos-go/peer"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -94,6 +94,46 @@ func TestClientConnect(t *testing.T) {
time.Sleep(2 * time.Second)
}

func TestClientConnect_ImmediateTimeout(t *testing.T) {
// Ensures that the dispatcher doesn't deadlock sending to a channel with no listener.
// Set the response timeout to 0 so that the client times out immendiately and no longer listens
// to the error channel. Since the error channel has a buffer, the dispatcher replies to the error channel
// without deadlocking.
channelID := "mychannel"
eventClient, err := New(
newMockContext(),
fabmocks.NewMockChannelCfg(channelID),
clientmocks.NewDiscoveryService(peer1, peer2),
client.WithBlockEvents(),
withConnectionProvider(
clientmocks.NewProviderFactory().Provider(
delivermocks.NewConnection(
clientmocks.WithLedger(servicemocks.NewMockLedger(delivermocks.BlockEventFactory, sourceURL)),
clientmocks.WithResponseDelay(200*time.Millisecond),
),
),
),
WithSeekType(seek.FromBlock),
WithBlockNum(0),
client.WithResponseTimeout(0*time.Second),
)
if err != nil {
t.Fatalf("error creating channel event client: %s", err)
}
if eventClient.ConnectionState() != client.Disconnected {
t.Fatalf("expecting connection state %s but got %s", client.Disconnected, eventClient.ConnectionState())
}

err = eventClient.Connect()
require.Error(t, err)
require.Contains(t, err.Error(), "timeout waiting for deliver status response")

eventClient.respTimeout = 3 * time.Second
err = eventClient.Connect()
require.Error(t, err)
require.Contains(t, err.Error(), "connection is closed")
}

// TestReconnect tests the ability of the Channel Event Client to retry multiple
// times to connect, and reconnect after it has disconnected.
func TestReconnect(t *testing.T) {
Expand Down
18 changes: 17 additions & 1 deletion pkg/fab/events/deliverclient/mocks/mockconnection.go
Expand Up @@ -7,6 +7,8 @@ SPDX-License-Identifier: Apache-2.0
package mocks

import (
"time"

cb "github.com/hyperledger/fabric-protos-go/common"
ab "github.com/hyperledger/fabric-protos-go/orderer"
pb "github.com/hyperledger/fabric-protos-go/peer"
Expand Down Expand Up @@ -34,13 +36,23 @@ var ConnFactory = func(opts ...clientmocks.Opt) clientmocks.Connection {
// MockConnection is a fake connection used for unit testing
type MockConnection struct {
clientmocks.MockConnection
responseDelay time.Duration
}

// NewConnection returns a new MockConnection using the given options
func NewConnection(opts ...clientmocks.Opt) *MockConnection {
return &MockConnection{
c := &MockConnection{
MockConnection: *clientmocks.NewMockConnection(opts...),
}

copts := &clientmocks.Opts{}
for _, opt := range opts {
opt(copts)
}

c.responseDelay = copts.ResponseDelay

return c
}

// Receive implements the MockConnection interface
Expand All @@ -65,6 +77,10 @@ func (c *MockConnection) Send(sinfo *ab.SeekInfo) error {
return errors.New("mock connection is closed")
}

if c.responseDelay > 0 {
time.Sleep(c.responseDelay)
}

switch seek := sinfo.Start.Type.(type) {
case *ab.SeekPosition_Specified:
// Deliver all blocks from the given block number
Expand Down
10 changes: 5 additions & 5 deletions pkg/fab/events/service/service.go
Expand Up @@ -76,7 +76,7 @@ func (s *Service) Stop() {
return
}

errch := make(chan error)
errch := make(chan error, 1)
eventch <- dispatcher.NewStopEvent(errch)

select {
Expand All @@ -97,8 +97,8 @@ func (s *Service) StopAndTransfer() (fab.EventSnapshot, error) {
return nil, err
}

snapshotch := make(chan fab.EventSnapshot)
errch := make(chan error)
snapshotch := make(chan fab.EventSnapshot, 1)
errch := make(chan error, 1)
eventch <- dispatcher.NewStopAndTransferEvent(snapshotch, errch)

select {
Expand All @@ -121,8 +121,8 @@ func (s *Service) Transfer() (fab.EventSnapshot, error) {
return nil, err
}

snapshotch := make(chan fab.EventSnapshot)
errch := make(chan error)
snapshotch := make(chan fab.EventSnapshot, 1)
errch := make(chan error, 1)
eventch <- dispatcher.NewTransferEvent(snapshotch, errch)

select {
Expand Down

0 comments on commit 5fe41b9

Please sign in to comment.