Skip to content

Commit

Permalink
[FAB-2669] use fs ledger's blockstore iterator
Browse files Browse the repository at this point in the history
- Updated the orderer's file ledger to use it's underlying
  blockstore ledger impl's iterator APIs.
- Blockstore iterators must be explicitly closed to avoid
  leaking resources, so now orderer/ledger.Iterator must
  also be explicitly closed. Added Close() to the
  orderer/ledger.Iterator interface.

Change-Id: Id838a661a11bf5b64a0cbb57d75a27d69d251269
Signed-off-by: Luis Sanchez <sanchezl@us.ibm.com>
  • Loading branch information
Luis Sanchez committed Jul 12, 2017
1 parent f56a82e commit e87c815
Show file tree
Hide file tree
Showing 11 changed files with 210 additions and 123 deletions.
209 changes: 111 additions & 98 deletions orderer/common/deliver/deliver.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,129 +84,142 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error {
return err
}

payload, err := utils.UnmarshalPayload(envelope.Payload)
if err != nil {
logger.Warningf("Received an envelope with no payload: %s", err)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}

if payload.Header == nil {
logger.Warningf("Malformed envelope received with bad header")
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
if err := ds.deliverBlocks(srv, envelope); err != nil {
return err
}

chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
logger.Warningf("Failed to unmarshal channel header: %s", err)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}
logger.Debugf("Waiting for new SeekInfo")
}
}

chain, ok := ds.sm.GetChain(chdr.ChannelId)
if !ok {
// Note, we log this at DEBUG because SDKs will poll waiting for channels to be created
// So we would expect our log to be somewhat flooded with these
logger.Debugf("Rejecting deliver because channel %s not found", chdr.ChannelId)
return sendStatusReply(srv, cb.Status_NOT_FOUND)
}
func (ds *deliverServer) deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, envelope *cb.Envelope) error {

erroredChan := chain.Errored()
select {
case <-erroredChan:
logger.Warningf("[channel: %s] Rejecting deliver request because of consenter error", chdr.ChannelId)
return sendStatusReply(srv, cb.Status_SERVICE_UNAVAILABLE)
default:
payload, err := utils.UnmarshalPayload(envelope.Payload)
if err != nil {
logger.Warningf("Received an envelope with no payload: %s", err)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}

}
if payload.Header == nil {
logger.Warningf("Malformed envelope received with bad header")
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}

lastConfigSequence := chain.Sequence()
chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
logger.Warningf("Failed to unmarshal channel header: %s", err)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}

sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager())
result, _ := sf.Apply(envelope)
if result != filter.Forward {
logger.Warningf("[channel: %s] Received unauthorized deliver request", chdr.ChannelId)
return sendStatusReply(srv, cb.Status_FORBIDDEN)
}
chain, ok := ds.sm.GetChain(chdr.ChannelId)
if !ok {
// Note, we log this at DEBUG because SDKs will poll waiting for channels to be created
// So we would expect our log to be somewhat flooded with these
logger.Debugf("Rejecting deliver because channel %s not found", chdr.ChannelId)
return sendStatusReply(srv, cb.Status_NOT_FOUND)
}

seekInfo := &ab.SeekInfo{}
if err = proto.Unmarshal(payload.Data, seekInfo); err != nil {
logger.Warningf("[channel: %s] Received a signed deliver request with malformed seekInfo payload: %s", chdr.ChannelId, err)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}
erroredChan := chain.Errored()
select {
case <-erroredChan:
logger.Warningf("[channel: %s] Rejecting deliver request because of consenter error", chdr.ChannelId)
return sendStatusReply(srv, cb.Status_SERVICE_UNAVAILABLE)
default:

if seekInfo.Start == nil || seekInfo.Stop == nil {
logger.Warningf("[channel: %s] Received seekInfo message with missing start or stop %v, %v", chdr.ChannelId, seekInfo.Start, seekInfo.Stop)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}
}

logger.Debugf("[channel: %s] Received seekInfo (%p) %v", chdr.ChannelId, seekInfo, seekInfo)

cursor, number := chain.Reader().Iterator(seekInfo.Start)
var stopNum uint64
switch stop := seekInfo.Stop.Type.(type) {
case *ab.SeekPosition_Oldest:
stopNum = number
case *ab.SeekPosition_Newest:
stopNum = chain.Reader().Height() - 1
case *ab.SeekPosition_Specified:
stopNum = stop.Specified.Number
if stopNum < number {
logger.Warningf("[channel: %s] Received invalid seekInfo message: start number %d greater than stop number %d", chdr.ChannelId, number, stopNum)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}
}
lastConfigSequence := chain.Sequence()

for {
if seekInfo.Behavior == ab.SeekInfo_BLOCK_UNTIL_READY {
select {
case <-erroredChan:
logger.Warningf("[channel: %s] Aborting deliver request because of consenter error", chdr.ChannelId)
return sendStatusReply(srv, cb.Status_SERVICE_UNAVAILABLE)
case <-cursor.ReadyChan():
}
} else {
select {
case <-cursor.ReadyChan():
default:
return sendStatusReply(srv, cb.Status_NOT_FOUND)
}
}
sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager())
result, _ := sf.Apply(envelope)
if result != filter.Forward {
logger.Warningf("[channel: %s] Received unauthorized deliver request", chdr.ChannelId)
return sendStatusReply(srv, cb.Status_FORBIDDEN)
}

currentConfigSequence := chain.Sequence()
if currentConfigSequence > lastConfigSequence {
lastConfigSequence = currentConfigSequence
sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager())
result, _ := sf.Apply(envelope)
if result != filter.Forward {
logger.Warningf("[channel: %s] Client authorization revoked for deliver request", chdr.ChannelId)
return sendStatusReply(srv, cb.Status_FORBIDDEN)
}
}
seekInfo := &ab.SeekInfo{}
if err = proto.Unmarshal(payload.Data, seekInfo); err != nil {
logger.Warningf("[channel: %s] Received a signed deliver request with malformed seekInfo payload: %s", chdr.ChannelId, err)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}

block, status := cursor.Next()
if status != cb.Status_SUCCESS {
logger.Errorf("[channel: %s] Error reading from channel, cause was: %v", chdr.ChannelId, status)
return sendStatusReply(srv, status)
}
if seekInfo.Start == nil || seekInfo.Stop == nil {
logger.Warningf("[channel: %s] Received seekInfo message with missing start or stop %v, %v", chdr.ChannelId, seekInfo.Start, seekInfo.Stop)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}

logger.Debugf("[channel: %s] Delivering block for (%p)", chdr.ChannelId, seekInfo)
logger.Debugf("[channel: %s] Received seekInfo (%p) %v", chdr.ChannelId, seekInfo, seekInfo)

cursor, number := chain.Reader().Iterator(seekInfo.Start)
defer cursor.Close()
var stopNum uint64
switch stop := seekInfo.Stop.Type.(type) {
case *ab.SeekPosition_Oldest:
stopNum = number
case *ab.SeekPosition_Newest:
stopNum = chain.Reader().Height() - 1
case *ab.SeekPosition_Specified:
stopNum = stop.Specified.Number
if stopNum < number {
logger.Warningf("[channel: %s] Received invalid seekInfo message: start number %d greater than stop number %d", chdr.ChannelId, number, stopNum)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}
}

if err := sendBlockReply(srv, block); err != nil {
logger.Warningf("[channel: %s] Error sending to stream: %s", chdr.ChannelId, err)
return err
for {
if seekInfo.Behavior == ab.SeekInfo_BLOCK_UNTIL_READY {
select {
case <-erroredChan:
logger.Warningf("[channel: %s] Aborting deliver request because of consenter error", chdr.ChannelId)
return sendStatusReply(srv, cb.Status_SERVICE_UNAVAILABLE)
case <-cursor.ReadyChan():
}
} else {
select {
case <-cursor.ReadyChan():
default:
return sendStatusReply(srv, cb.Status_NOT_FOUND)
}
}

if stopNum == block.Header.Number {
break
currentConfigSequence := chain.Sequence()
if currentConfigSequence > lastConfigSequence {
lastConfigSequence = currentConfigSequence
sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager())
result, _ := sf.Apply(envelope)
if result != filter.Forward {
logger.Warningf("[channel: %s] Client authorization revoked for deliver request", chdr.ChannelId)
return sendStatusReply(srv, cb.Status_FORBIDDEN)
}
}

if err := sendStatusReply(srv, cb.Status_SUCCESS); err != nil {
block, status := cursor.Next()
if status != cb.Status_SUCCESS {
logger.Errorf("[channel: %s] Error reading from channel, cause was: %v", chdr.ChannelId, status)
return sendStatusReply(srv, status)
}

logger.Debugf("[channel: %s] Delivering block for (%p)", chdr.ChannelId, seekInfo)

if err := sendBlockReply(srv, block); err != nil {
logger.Warningf("[channel: %s] Error sending to stream: %s", chdr.ChannelId, err)
return err
}

logger.Debugf("[channel: %s] Done delivering for (%p), waiting for new SeekInfo", chdr.ChannelId, seekInfo)
if stopNum == block.Header.Number {
break
}
}

if err := sendStatusReply(srv, cb.Status_SUCCESS); err != nil {
logger.Warningf("[channel: %s] Error sending to stream: %s", chdr.ChannelId, err)
return err
}

logger.Debugf("[channel: %s] Done delivering for (%p)", chdr.ChannelId, seekInfo)

return nil

}

func sendStatusReply(srv ab.AtomicBroadcast_DeliverServer, status cb.Status) error {
Expand Down
2 changes: 2 additions & 0 deletions orderer/ledger/blackbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func testRetrieval(lf ledgerTestFactory, t *testing.T) {
_, li := lf.New()
li.Append(CreateNextBlock(li, []*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}}))
it, num := li.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Oldest{}})
defer it.Close()
if num != 0 {
t.Fatalf("Expected genesis block iterator, but got %d", num)
}
Expand Down Expand Up @@ -177,6 +178,7 @@ func TestBlockedRetrieval(t *testing.T) {
func testBlockedRetrieval(lf ledgerTestFactory, t *testing.T) {
_, li := lf.New()
it, num := li.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1}}})
defer it.Close()
if num != 1 {
t.Fatalf("Expected block iterator at 1, but got %d", num)
}
Expand Down
31 changes: 23 additions & 8 deletions orderer/ledger/file/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package fileledger

import (
cl "github.com/hyperledger/fabric/common/ledger"
"github.com/hyperledger/fabric/common/ledger/blkstorage"
ledger "github.com/hyperledger/fabric/orderer/ledger"
cb "github.com/hyperledger/fabric/protos/common"
Expand All @@ -38,21 +39,22 @@ type fileLedger struct {
}

type fileLedgerIterator struct {
ledger *fileLedger
blockNumber uint64
ledger *fileLedger
blockNumber uint64
commonIterator cl.ResultsIterator
}

// Next blocks until there is a new block available, or returns an error if the
// next block is no longer retrievable
func (i *fileLedgerIterator) Next() (*cb.Block, cb.Status) {
for {
if i.blockNumber < i.ledger.Height() {
block, err := i.ledger.blockStore.RetrieveBlockByNumber(i.blockNumber)
result, err := i.commonIterator.Next()
if err != nil {
return nil, cb.Status_SERVICE_UNAVAILABLE
}
i.blockNumber++
return block, cb.Status_SUCCESS
return result.(*cb.Block), cb.Status_SUCCESS
}
<-i.ledger.signal
}
Expand All @@ -67,28 +69,41 @@ func (i *fileLedgerIterator) ReadyChan() <-chan struct{} {
return closedChan
}

// Close releases resources acquired by the Iterator
func (i *fileLedgerIterator) Close() {
i.commonIterator.Close()
}

// Iterator returns an Iterator, as specified by a cb.SeekInfo message, and its
// starting block number
func (fl *fileLedger) Iterator(startPosition *ab.SeekPosition) (ledger.Iterator, uint64) {
var startingBlockNumber uint64
switch start := startPosition.Type.(type) {
case *ab.SeekPosition_Oldest:
return &fileLedgerIterator{ledger: fl, blockNumber: 0}, 0
startingBlockNumber = 0
case *ab.SeekPosition_Newest:
info, err := fl.blockStore.GetBlockchainInfo()
if err != nil {
logger.Panic(err)
}
newestBlockNumber := info.Height - 1
return &fileLedgerIterator{ledger: fl, blockNumber: newestBlockNumber}, newestBlockNumber
startingBlockNumber = newestBlockNumber
case *ab.SeekPosition_Specified:
startingBlockNumber = start.Specified.Number
height := fl.Height()
if start.Specified.Number > height {
if startingBlockNumber > height {
return &ledger.NotFoundErrorIterator{}, 0
}
return &fileLedgerIterator{ledger: fl, blockNumber: start.Specified.Number}, start.Specified.Number
default:
return &ledger.NotFoundErrorIterator{}, 0
}

iterator, err := fl.blockStore.RetrieveBlocks(startingBlockNumber)
if err != nil {
return &ledger.NotFoundErrorIterator{}, 0
}

return &fileLedgerIterator{ledger: fl, blockNumber: startingBlockNumber, commonIterator: iterator}, startingBlockNumber
}

// Height returns the number of blocks on the ledger
Expand Down
Loading

0 comments on commit e87c815

Please sign in to comment.