Skip to content

Commit

Permalink
[FAB-798] Abstract out the solo deliver handler
Browse files Browse the repository at this point in the history
As the next step of consolidating the common logic of the
atomicbroadcast api between components, this changeset pulls out the
logic which is not solo specific and moves it into the common/deliver
package.

SBFT was already dependent on this functionality, so this is a natural
move.

This continues, but does not satisfy FAB-798.

Change-Id: I02c5ef5b03f9e1a17fd188e3df7b6fb42aa126b5
Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
  • Loading branch information
Jason Yellick committed Nov 22, 2016
1 parent 73c501c commit 1b5d378
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 25 deletions.
18 changes: 15 additions & 3 deletions orderer/solo/deliver.go → orderer/common/deliver/deliver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,39 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package solo
package deliver

import (
"github.com/hyperledger/fabric/orderer/rawledger"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"

"github.com/op/go-logging"
)

var logger = logging.MustGetLogger("orderer/common/deliver")

func init() {
logging.SetLevel(logging.DEBUG, "")
}

type Handler interface {
Handle(srv ab.AtomicBroadcast_DeliverServer) error
}

type DeliverServer struct {
rl rawledger.Reader
maxWindow int
}

func NewDeliverServer(rl rawledger.Reader, maxWindow int) *DeliverServer {
func NewHandlerImpl(rl rawledger.Reader, maxWindow int) Handler {
return &DeliverServer{
rl: rl,
maxWindow: maxWindow,
}
}

func (ds *DeliverServer) HandleDeliver(srv ab.AtomicBroadcast_DeliverServer) error {
func (ds *DeliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error {
logger.Debugf("Starting new Deliver loop")
d := newDeliverer(ds, srv)
return d.recv()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,32 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package solo
package deliver

import (
"fmt"
"testing"
"time"

"google.golang.org/grpc"

"github.com/hyperledger/fabric/orderer/common/bootstrap/static"
"github.com/hyperledger/fabric/orderer/rawledger/ramledger"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"

"google.golang.org/grpc"
)

var genesisBlock *cb.Block

func init() {
bootstrapper := static.New()
var err error
genesisBlock, err = bootstrapper.GenesisBlock()
if err != nil {
panic("Error intializing static bootstrap genesis block")
}
}

// MagicLargestWindow is used as the default max window size for initializing the deliver service
const MagicLargestWindow int = 1000

Expand Down Expand Up @@ -66,9 +78,9 @@ func TestOldestSeek(t *testing.T) {

m := newMockD()
defer close(m.recvChan)
ds := NewDeliverServer(rl, MagicLargestWindow)
ds := NewHandlerImpl(rl, MagicLargestWindow)

go ds.HandleDeliver(m)
go ds.Handle(m)

m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_OLDEST}}}

Expand Down Expand Up @@ -98,9 +110,9 @@ func TestNewestSeek(t *testing.T) {

m := newMockD()
defer close(m.recvChan)
ds := NewDeliverServer(rl, MagicLargestWindow)
ds := NewHandlerImpl(rl, MagicLargestWindow)

go ds.HandleDeliver(m)
go ds.Handle(m)

m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_NEWEST}}}

Expand All @@ -126,9 +138,9 @@ func TestSpecificSeek(t *testing.T) {
}

m := newMockD()
ds := NewDeliverServer(rl, MagicLargestWindow)
ds := NewHandlerImpl(rl, MagicLargestWindow)

go ds.HandleDeliver(m)
go ds.Handle(m)

m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_SPECIFIED, SpecifiedNumber: uint64(ledgerSize - 1)}}}

Expand All @@ -155,9 +167,9 @@ func TestBadSeek(t *testing.T) {

m := newMockD()
defer close(m.recvChan)
ds := NewDeliverServer(rl, MagicLargestWindow)
ds := NewHandlerImpl(rl, MagicLargestWindow)

go ds.HandleDeliver(m)
go ds.Handle(m)

m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_SPECIFIED, SpecifiedNumber: uint64(ledgerSize - 1)}}}

Expand Down Expand Up @@ -188,9 +200,9 @@ func TestBadWindow(t *testing.T) {

m := newMockD()
defer close(m.recvChan)
ds := NewDeliverServer(rl, MagicLargestWindow)
ds := NewHandlerImpl(rl, MagicLargestWindow)

go ds.HandleDeliver(m)
go ds.Handle(m)

m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow) * 2, Start: ab.SeekInfo_OLDEST}}}

Expand All @@ -214,9 +226,9 @@ func TestAck(t *testing.T) {

m := newMockD()
defer close(m.recvChan)
ds := NewDeliverServer(rl, MagicLargestWindow)
ds := NewHandlerImpl(rl, MagicLargestWindow)

go ds.HandleDeliver(m)
go ds.Handle(m)

m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: windowSize, Start: ab.SeekInfo_OLDEST}}}

Expand Down
8 changes: 4 additions & 4 deletions orderer/sbft/backend/backendab.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@ package backend

import (
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/orderer/solo"
"github.com/hyperledger/fabric/orderer/common/deliver"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
)

type BackendAB struct {
backend *Backend
deliverserver *solo.DeliverServer
deliverserver deliver.Handler
}

func NewBackendAB(backend *Backend) *BackendAB {
bab := &BackendAB{
backend: backend,
deliverserver: solo.NewDeliverServer(backend.ledger, 1000),
deliverserver: deliver.NewHandlerImpl(backend.ledger, 1000),
}
return bab
}
Expand Down Expand Up @@ -64,5 +64,5 @@ func (b *BackendAB) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error {

// Deliver sends a stream of blocks to a client after ordering
func (b *BackendAB) Deliver(srv ab.AtomicBroadcast_DeliverServer) error {
return b.deliverserver.HandleDeliver(srv)
return b.deliverserver.Handle(srv)
}
7 changes: 4 additions & 3 deletions orderer/solo/solo.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/hyperledger/fabric/orderer/common/broadcast"
"github.com/hyperledger/fabric/orderer/common/broadcastfilter"
"github.com/hyperledger/fabric/orderer/common/configtx"
"github.com/hyperledger/fabric/orderer/common/deliver"
"github.com/hyperledger/fabric/orderer/rawledger"
ab "github.com/hyperledger/fabric/protos/orderer"

Expand All @@ -38,14 +39,14 @@ func init() {
type server struct {
bh broadcast.Handler
bs *broadcastServer
ds *DeliverServer
ds deliver.Handler
}

// New creates a ab.AtomicBroadcastServer based on the solo orderer implementation
func New(queueSize, batchSize, maxWindowSize int, batchTimeout time.Duration, rl rawledger.ReadWriter, grpcServer *grpc.Server, filters *broadcastfilter.RuleSet, configManager configtx.Manager) ab.AtomicBroadcastServer {
logger.Infof("Starting solo with queueSize=%d, batchSize=%d batchTimeout=%v and ledger=%T", queueSize, batchSize, batchTimeout, rl)
bs := newBroadcastServer(batchSize, batchTimeout, rl, filters, configManager)
ds := NewDeliverServer(rl, maxWindowSize)
ds := deliver.NewHandlerImpl(rl, maxWindowSize)
bh := broadcast.NewHandlerImpl(queueSize, bs, filters, configManager)

s := &server{
Expand All @@ -65,5 +66,5 @@ func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error {
// Deliver sends a stream of blocks to a client after ordering
func (s *server) Deliver(srv ab.AtomicBroadcast_DeliverServer) error {
logger.Debugf("Starting new Deliver loop")
return s.ds.HandleDeliver(srv)
return s.ds.Handle(srv)
}

0 comments on commit 1b5d378

Please sign in to comment.