Skip to content

Commit

Permalink
Static leader should not give up on retrieving blocks (#507)
Browse files Browse the repository at this point in the history
When a peer is configured as a static org leader for gossip, it should
not give up on retrieving blocks in the event that the orderering
service goes down for an extended period of time (i.e. longer than the
peer.deliveryclient.reconnectTotalTimeThreshold). Otherwise, the peer
must be restarted before it will act as the leader again.

FAB-17327

Signed-off-by: Brett Logan <brett.t.logan@ibm.com>
Signed-off-by: Will Lahti <wtlahti@us.ibm.com>

Co-authored-by: Brett Logan <brett.t.logan@ibm.com>
  • Loading branch information
2 people authored and yacovm committed Jan 16, 2020
1 parent 2d6becc commit 470f133
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 14 deletions.
3 changes: 3 additions & 0 deletions core/deliverservice/deliveryclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type deliverServiceImpl struct {
// how it verifies messages received from it,
// and how it disseminates the messages to other peers
type Config struct {
IsStaticLeader bool
// CryptoSvc performs cryptographic actions like message verification and signing
// and identity validation.
CryptoSvc blocksprovider.BlockVerifier
Expand Down Expand Up @@ -118,6 +119,7 @@ func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo b
return errors.New(errMsg)
}
logger.Info("This peer will retrieve blocks from ordering service and disseminate to other peers in the organization for channel", chainID)

dc := &blocksprovider.Deliverer{
ChannelID: chainID,
Gossip: d.conf.Gossip,
Expand All @@ -134,6 +136,7 @@ func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo b
MaxRetryDelay: time.Duration(d.conf.DeliverServiceConfig.ReConnectBackoffThreshold),
MaxRetryDuration: d.conf.DeliverServiceConfig.ReconnectTotalTimeThreshold,
InitialRetryDelay: 100 * time.Millisecond,
YieldLeadership: !d.conf.IsStaticLeader,
}

if d.conf.DeliverGRPCClient.MutualTLSRequired() {
Expand Down
7 changes: 4 additions & 3 deletions gossip/service/gossip_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ type GossipServiceAdapter interface {
// DeliveryServiceFactory factory to create and initialize delivery service instance
type DeliveryServiceFactory interface {
// Returns an instance of delivery client
Service(g GossipServiceAdapter, ordererSource *orderers.ConnectionSource, msc api.MessageCryptoService) deliverservice.DeliverService
Service(g GossipServiceAdapter, ordererSource *orderers.ConnectionSource, msc api.MessageCryptoService, isStaticLead bool) deliverservice.DeliverService
}

type deliveryFactoryImpl struct {
Expand All @@ -134,8 +134,9 @@ type deliveryFactoryImpl struct {
}

// Returns an instance of delivery client
func (df *deliveryFactoryImpl) Service(g GossipServiceAdapter, ordererSource *orderers.ConnectionSource, mcs api.MessageCryptoService) deliverservice.DeliverService {
func (df *deliveryFactoryImpl) Service(g GossipServiceAdapter, ordererSource *orderers.ConnectionSource, mcs api.MessageCryptoService, isStaticLeader bool) deliverservice.DeliverService {
return deliverservice.NewDeliverService(&deliverservice.Config{
IsStaticLeader: isStaticLeader,
CryptoSvc: mcs,
Gossip: g,
Signer: df.signer,
Expand Down Expand Up @@ -345,7 +346,7 @@ func (g *GossipService) InitializeChannel(channelID string, ordererSource *order
blockingMode,
stateConfig)
if g.deliveryService[channelID] == nil {
g.deliveryService[channelID] = g.deliveryFactory.Service(g, ordererSource, g.mcs)
g.deliveryService[channelID] = g.deliveryFactory.Service(g, ordererSource, g.mcs, g.serviceConfig.OrgLeader)
}

// Delivery service might be nil only if it was not able to get connected
Expand Down
4 changes: 2 additions & 2 deletions gossip/service/gossip_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ type mockDeliverServiceFactory struct {
service *mockDeliverService
}

func (mf *mockDeliverServiceFactory) Service(GossipServiceAdapter, *orderers.ConnectionSource, api.MessageCryptoService) deliverservice.DeliverService {
func (mf *mockDeliverServiceFactory) Service(GossipServiceAdapter, *orderers.ConnectionSource, api.MessageCryptoService, bool) deliverservice.DeliverService {
return mf.service
}

Expand Down Expand Up @@ -943,7 +943,7 @@ func TestInvalidInitialization(t *testing.T) {
go grpcServer.Serve(socket)
defer grpcServer.Stop()

dc := gService.deliveryFactory.Service(gService, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), &naiveCryptoService{})
dc := gService.deliveryFactory.Service(gService, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), &naiveCryptoService{}, false)
assert.NotNil(t, dc)
}

Expand Down
4 changes: 2 additions & 2 deletions gossip/service/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ type embeddingDeliveryServiceFactory struct {
DeliveryServiceFactory
}

func (edsf *embeddingDeliveryServiceFactory) Service(g GossipServiceAdapter, endpoints *orderers.ConnectionSource, mcs api.MessageCryptoService) deliverservice.DeliverService {
ds := edsf.DeliveryServiceFactory.Service(g, endpoints, mcs)
func (edsf *embeddingDeliveryServiceFactory) Service(g GossipServiceAdapter, endpoints *orderers.ConnectionSource, mcs api.MessageCryptoService, isStaticLeader bool) deliverservice.DeliverService {
ds := edsf.DeliveryServiceFactory.Service(g, endpoints, mcs, false)
return newEmbeddingDeliveryService(ds)
}

Expand Down
47 changes: 44 additions & 3 deletions integration/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/onsi/gomega/gbytes"
"github.com/onsi/gomega/gexec"
"github.com/tedsuo/ifrit"
"github.com/tedsuo/ifrit/ginkgomon"
)

var _ = Describe("EndToEnd", func() {
Expand Down Expand Up @@ -218,14 +219,48 @@ var _ = Describe("EndToEnd", func() {
})

Describe("basic single node etcdraft network", func() {
var (
peerRunners []*ginkgomon.Runner
processes map[string]ifrit.Process
ordererProcess ifrit.Process
)

BeforeEach(func() {
network = nwo.New(nwo.MultiChannelEtcdRaft(), testDir, client, StartPort(), components)
network.GenerateConfigTree()
for _, peer := range network.Peers {
core := network.ReadPeerConfig(peer)
core.Peer.Gossip.UseLeaderElection = false
core.Peer.Gossip.OrgLeader = true
core.Peer.Deliveryclient.ReconnectTotalTimeThreshold = time.Duration(time.Second)
network.WritePeerConfig(peer, core)
}
network.Bootstrap()

networkRunner := network.NetworkGroupRunner()
process = ifrit.Invoke(networkRunner)
Eventually(process.Ready(), network.EventuallyTimeout).Should(BeClosed())
ordererRunner := network.OrdererGroupRunner()
ordererProcess = ifrit.Invoke(ordererRunner)
Eventually(ordererProcess.Ready(), network.EventuallyTimeout).Should(BeClosed())

peerRunners := make([]*ginkgomon.Runner, len(network.Peers))
processes = map[string]ifrit.Process{}
for i, peer := range network.Peers {
pr := network.PeerRunner(peer)
peerRunners[i] = pr
p := ifrit.Invoke(pr)
processes[peer.ID()] = p
Eventually(p.Ready(), network.EventuallyTimeout).Should(BeClosed())
}
})

AfterEach(func() {
if ordererProcess != nil {
ordererProcess.Signal(syscall.SIGTERM)
Eventually(ordererProcess.Wait(), network.EventuallyTimeout).Should(Receive())
}
for _, p := range processes {
p.Signal(syscall.SIGTERM)
Eventually(p.Wait(), network.EventuallyTimeout).Should(Receive())
}
})

It("creates two channels with two orgs trying to reconfigure and update metadata", func() {
Expand Down Expand Up @@ -274,6 +309,12 @@ var _ = Describe("EndToEnd", func() {
Expect(err).NotTo(HaveOccurred())
Expect(len(files)).To(Equal(numOfSnaps))

By("ensuring that static leaders do not give up on retrieving blocks after the orderer goes down")
ordererProcess.Signal(syscall.SIGTERM)
Eventually(ordererProcess.Wait(), network.EventuallyTimeout).Should(Receive())
for _, peerRunner := range peerRunners {
Eventually(peerRunner.Err(), network.EventuallyTimeout).Should(gbytes.Say("peer is a static leader, ignoring peer.deliveryclient.reconnectTotalTimeThreshold"))
}
})
})

Expand Down
8 changes: 6 additions & 2 deletions internal/pkg/peer/blocksprovider/blocksprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type Deliverer struct {
Signer identity.SignerSerializer
DeliverStreamer DeliverStreamer
Logger *flogging.FabricLogger
YieldLeadership bool

MaxRetryDelay time.Duration
InitialRetryDelay time.Duration
Expand Down Expand Up @@ -134,8 +135,11 @@ func (d *Deliverer) DeliverBlocks() {
}
totalDuration += sleepDuration
if totalDuration > d.MaxRetryDuration {
d.Logger.Warningf("attempted to retry block delivery for more than %v, giving up", d.MaxRetryDuration)
return
if d.YieldLeadership {
d.Logger.Warningf("attempted to retry block delivery for more than %v, giving up", d.MaxRetryDuration)
return
}
d.Logger.Warningf("peer is a static leader, ignoring peer.deliveryclient.reconnectTotalTimeThreshold")
}
d.sleeper.Sleep(sleepDuration, d.DoneC)
}
Expand Down
21 changes: 19 additions & 2 deletions internal/pkg/peer/blocksprovider/blocksprovider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,13 +268,14 @@ var _ = Describe("Blocksprovider", func() {
})
})

When("the consecutive errors are unbounded", func() {
When("the consecutive errors are unbounded and the peer is not a static leader", func() {
BeforeEach(func() {
fakeDeliverStreamer.DeliverReturns(nil, fmt.Errorf("deliver-error"))
fakeDeliverStreamer.DeliverReturnsOnCall(500, fakeDeliverClient, nil)
})

It("the sleep time hits the maximum value in an exponential fashion and retries until exceeding the max retry duration", func() {
It("hits the maximum sleep time value in an exponential fashion and retries until exceeding the max retry duration", func() {
d.YieldLeadership = true
Eventually(fakeSleeper.SleepCallCount).Should(Equal(380))
Expect(fakeSleeper.SleepArgsForCall(25)).To(Equal(9539 * time.Millisecond))
Expect(fakeSleeper.SleepArgsForCall(26)).To(Equal(10 * time.Second))
Expand All @@ -283,6 +284,22 @@ var _ = Describe("Blocksprovider", func() {
})
})

When("the consecutive errors are unbounded and the peer is static leader", func() {
BeforeEach(func() {
fakeDeliverStreamer.DeliverReturns(nil, fmt.Errorf("deliver-error"))
fakeDeliverStreamer.DeliverReturnsOnCall(500, fakeDeliverClient, nil)
})

It("hits the maximum sleep time value in an exponential fashion and retries indefinitely", func() {
d.YieldLeadership = false
Eventually(fakeSleeper.SleepCallCount).Should(Equal(500))
Expect(fakeSleeper.SleepArgsForCall(25)).To(Equal(9539 * time.Millisecond))
Expect(fakeSleeper.SleepArgsForCall(26)).To(Equal(10 * time.Second))
Expect(fakeSleeper.SleepArgsForCall(27)).To(Equal(10 * time.Second))
Expect(fakeSleeper.SleepArgsForCall(379)).To(Equal(10 * time.Second))
})
})

When("an error occurs, then a block is successfully delivered", func() {
BeforeEach(func() {
fakeDeliverStreamer.DeliverReturnsOnCall(0, nil, fmt.Errorf("deliver-error"))
Expand Down

0 comments on commit 470f133

Please sign in to comment.