Skip to content

Commit

Permalink
Merge "[FAB-2828] Add resilient delivery client to peer"
Browse files Browse the repository at this point in the history
  • Loading branch information
gaborh-da authored and Gerrit Code Review committed Mar 29, 2017
2 parents d3c4daf + f9fa8d6 commit 51e6306
Show file tree
Hide file tree
Showing 3 changed files with 776 additions and 0 deletions.
170 changes: 170 additions & 0 deletions core/deliverservice/client.go
@@ -0,0 +1,170 @@
/*
Copyright IBM Corp. 2017 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package deliverclient

import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/core/deliverservice/blocksprovider"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/orderer"
"golang.org/x/net/context"
"google.golang.org/grpc"
)

// broadcastSetup is a function that is called by the broadcastClient immediately after each
// successful connection to the ordering service
type broadcastSetup func(blocksprovider.BlocksDeliverer) error

// retryPolicy receives as parameters the number of times the attempt has failed
// and a duration that specifies the total elapsed time passed since the first attempt.
// If further attempts should be made, it returns:
// - a time duration after which the next attempt would be made, true
// Else, a zero duration, false
type retryPolicy func(attemptNum int, elapsedTime time.Duration) (time.Duration, bool)

// clientFactory creates a gRPC broadcast client out of a ClientConn
type clientFactory func(*grpc.ClientConn) orderer.AtomicBroadcastClient

type broadcastClient struct {
stopFlag int32
sync.RWMutex
stopChan chan struct{}
createClient clientFactory
shouldRetry retryPolicy
onConnect broadcastSetup
prod comm.ConnectionProducer
blocksprovider.BlocksDeliverer
conn *grpc.ClientConn
}

// NewBroadcastClient returns a broadcastClient with the given params
func NewBroadcastClient(prod comm.ConnectionProducer, clFactory clientFactory, onConnect broadcastSetup, bos retryPolicy) *broadcastClient {
return &broadcastClient{prod: prod, onConnect: onConnect, shouldRetry: bos, createClient: clFactory, stopChan: make(chan struct{}, 1)}
}

// Recv receives a message from the ordering service
func (bc *broadcastClient) Recv() (*orderer.DeliverResponse, error) {
o, err := bc.try(func() (interface{}, error) {
return bc.BlocksDeliverer.Recv()
})
if err != nil {
return nil, err
}
return o.(*orderer.DeliverResponse), nil
}

// Send sends a message to the ordering service
func (bc *broadcastClient) Send(msg *common.Envelope) error {
_, err := bc.try(func() (interface{}, error) {
return nil, bc.BlocksDeliverer.Send(msg)
})
return err
}

func (bc *broadcastClient) try(action func() (interface{}, error)) (interface{}, error) {
attempt := 0
start := time.Now()
var backoffDuration time.Duration
retry := true
for retry && !bc.shouldStop() {
attempt++
resp, err := bc.doAction(action)
if err != nil {
backoffDuration, retry = bc.shouldRetry(attempt, time.Since(start))
if !retry {
break
}
bc.sleep(backoffDuration)
continue
}
return resp, nil
}
if bc.shouldStop() {
return nil, errors.New("Client is closing")
}
return nil, fmt.Errorf("Attempts (%d) or elapsed time (%v) exhausted", attempt, time.Since(start))
}

func (bc *broadcastClient) doAction(action func() (interface{}, error)) (interface{}, error) {
if bc.BlocksDeliverer == nil {
err := bc.connect()
if err != nil {
return nil, err
}
}
resp, err := action()
if err != nil {
bc.conn.Close()
bc.BlocksDeliverer = nil
bc.conn = nil
return nil, err
}
return resp, nil
}

func (bc *broadcastClient) sleep(duration time.Duration) {
select {
case <-time.After(duration):
case <-bc.stopChan:
}
}

func (bc *broadcastClient) connect() error {
conn, endpoint, err := bc.prod.NewConnection()
if err != nil {
logger.Error("Failed obtaining connection:", err)
return err
}
abc, err := bc.createClient(conn).Deliver(context.Background())
if err != nil {
logger.Error("Connection to ", endpoint, "established but was unable to create gRPC stream:", err)
conn.Close()
return err
}
err = bc.onConnect(bc)
if err == nil {
bc.Lock()
bc.conn = conn
bc.Unlock()
bc.BlocksDeliverer = abc
return nil
}
logger.Error("Failed setting up broadcast:", err)
conn.Close()
return err
}

func (bc *broadcastClient) shouldStop() bool {
return atomic.LoadInt32(&bc.stopFlag) == int32(1)
}

func (bc *broadcastClient) Close() {
atomic.StoreInt32(&bc.stopFlag, int32(1))
bc.stopChan <- struct{}{}
bc.RLock()
defer bc.RUnlock()
if bc.conn == nil {
return
}
bc.conn.Close()
}

0 comments on commit 51e6306

Please sign in to comment.