Skip to content

Commit

Permalink
[FAB-1521] Fix rawledger to support restart
Browse files Browse the repository at this point in the history
https://jira.hyperledger.org/browse/FAB-1521

The rawledger interface was originally implemented as a single chain
initialized with a genesis block. This causes problems on restart
because a newer genesis block may already exist and passing the genesis
block becomes nonsensical.

This changeset changes this behavior to intialize no ledgers on factory
creation, and to allow the use of Append for blocks, rather than
transaction sets.  This satisfies a secondary goal of allowing block
creation without block commit, as is required by the SBFT flow of
consenting on blocks, rather than on transactions.

Change-Id: Icbb0bb1a80f9e04861f4302966709f1752068961
Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
  • Loading branch information
Jason Yellick committed Jan 10, 2017
1 parent 4896a04 commit f806802
Show file tree
Hide file tree
Showing 17 changed files with 295 additions and 162 deletions.
30 changes: 22 additions & 8 deletions orderer/common/deliver/deliver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,15 @@ func (mcs *mockSupport) Reader() rawledger.Reader {
return mcs.ledger
}

func NewRAMLedger() rawledger.ReadWriter {
rlf := ramledger.New(ledgerSize + 1)
rl, _ := rlf.GetOrCreate(provisional.TestChainID)
rl.Append(genesisBlock)
return rl
}

func newMockMultichainManager() *mockSupportManager {
_, rl := ramledger.New(ledgerSize+1, genesisBlock)
rl := NewRAMLedger()
mm := &mockSupportManager{
chains: make(map[string]*mockSupport),
}
Expand All @@ -112,7 +119,8 @@ func seekSpecified(number uint64) *ab.SeekPosition {
func TestOldestSeek(t *testing.T) {
mm := newMockMultichainManager()
for i := 1; i < ledgerSize; i++ {
mm.chains[string(systemChainID)].ledger.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil)
ledger := mm.chains[string(systemChainID)].ledger
ledger.Append(rawledger.CreateNextBlock(ledger, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil))
}

m := newMockD()
Expand Down Expand Up @@ -150,7 +158,8 @@ func TestOldestSeek(t *testing.T) {
func TestNewestSeek(t *testing.T) {
mm := newMockMultichainManager()
for i := 1; i < ledgerSize; i++ {
mm.chains[string(systemChainID)].ledger.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil)
ledger := mm.chains[string(systemChainID)].ledger
ledger.Append(rawledger.CreateNextBlock(ledger, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil))
}

m := newMockD()
Expand Down Expand Up @@ -181,7 +190,8 @@ func TestNewestSeek(t *testing.T) {
func TestSpecificSeek(t *testing.T) {
mm := newMockMultichainManager()
for i := 1; i < ledgerSize; i++ {
mm.chains[string(systemChainID)].ledger.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil)
ledger := mm.chains[string(systemChainID)].ledger
ledger.Append(rawledger.CreateNextBlock(ledger, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil))
}

m := newMockD()
Expand Down Expand Up @@ -218,7 +228,8 @@ func TestSpecificSeek(t *testing.T) {
func TestBadSeek(t *testing.T) {
mm := newMockMultichainManager()
for i := 1; i < ledgerSize; i++ {
mm.chains[string(systemChainID)].ledger.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil)
ledger := mm.chains[string(systemChainID)].ledger
ledger.Append(rawledger.CreateNextBlock(ledger, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil))
}

m := newMockD()
Expand All @@ -242,7 +253,8 @@ func TestBadSeek(t *testing.T) {
func TestFailFastSeek(t *testing.T) {
mm := newMockMultichainManager()
for i := 1; i < ledgerSize; i++ {
mm.chains[string(systemChainID)].ledger.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil)
ledger := mm.chains[string(systemChainID)].ledger
ledger.Append(rawledger.CreateNextBlock(ledger, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil))
}

m := newMockD()
Expand Down Expand Up @@ -275,7 +287,8 @@ func TestFailFastSeek(t *testing.T) {
func TestBlockingSeek(t *testing.T) {
mm := newMockMultichainManager()
for i := 1; i < ledgerSize; i++ {
mm.chains[string(systemChainID)].ledger.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil)
ledger := mm.chains[string(systemChainID)].ledger
ledger.Append(rawledger.CreateNextBlock(ledger, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil))
}

m := newMockD()
Expand All @@ -301,7 +314,8 @@ func TestBlockingSeek(t *testing.T) {
case <-time.After(50 * time.Millisecond):
}

mm.chains[string(systemChainID)].ledger.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", ledgerSize+1))}}, nil)
ledger := mm.chains[string(systemChainID)].ledger
ledger.Append(rawledger.CreateNextBlock(ledger, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", ledgerSize+1))}}, nil))

select {
case deliverReply := <-m.sendChan:
Expand Down
56 changes: 40 additions & 16 deletions orderer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ import (
"github.com/hyperledger/fabric/orderer/solo"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/utils"

"github.com/Shopify/sarama"
"github.com/op/go-logging"

logging "github.com/op/go-logging"
"google.golang.org/grpc"
)

Expand All @@ -67,18 +67,6 @@ func main() {
return
}

var genesisBlock *cb.Block

// Select the bootstrapping mechanism
switch conf.General.GenesisMethod {
case "provisional":
genesisBlock = provisional.New(conf).GenesisBlock()
case "file":
genesisBlock = file.New(conf.General.GenesisFile).GenesisBlock()
default:
panic(fmt.Errorf("Unknown genesis method %s", conf.General.GenesisMethod))
}

var lf rawledger.Factory
switch conf.General.LedgerType {
case "file":
Expand All @@ -90,11 +78,47 @@ func main() {
panic(fmt.Errorf("Error creating temp dir: %s", err))
}
}
lf, _ = fileledger.New(location, genesisBlock)
lf = fileledger.New(location)
case "ram":
fallthrough
default:
lf, _ = ramledger.New(int(conf.RAMLedger.HistorySize), genesisBlock)
lf = ramledger.New(int(conf.RAMLedger.HistorySize))
}

// Are we bootstrapping
if len(lf.ChainIDs()) == 0 {
var genesisBlock *cb.Block

// Select the bootstrapping mechanism
switch conf.General.GenesisMethod {
case "provisional":
genesisBlock = provisional.New(conf).GenesisBlock()
case "file":
genesisBlock = file.New(conf.General.GenesisFile).GenesisBlock()
default:
panic(fmt.Errorf("Unknown genesis method %s", conf.General.GenesisMethod))
}

chainID, err := utils.GetChainIDFromBlock(genesisBlock)
if err != nil {
logger.Errorf("Failed to parse chain ID from genesis block: %s", err)
return
}
gl, err := lf.GetOrCreate(chainID)
if err != nil {
logger.Errorf("Failed to create the genesis chain: %s", err)
return
}

err = gl.Append(genesisBlock)
if err != nil {
logger.Errorf("Could not write genesis block to ledger: %s", err)
return
}
} else {
logger.Infof("Not bootstrapping because of existing chains")
logger.Warningf("XXXXXXX RESTART IS NOT CURRENTLY SUPPORTED XXXXXXXXX")
// XXX Remove this once restart is supported
}

if conf.Kafka.Verbose {
Expand Down
2 changes: 1 addition & 1 deletion orderer/multichain/chainsupport.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,5 +178,5 @@ func (cs *chainSupport) WriteBlock(data []*cb.Envelope, metadata [][]byte, commi
committer.Commit()
}

cs.ledger.Append(data, metadata)
cs.ledger.Append(rawledger.CreateNextBlock(cs.ledger, data, metadata))
}
20 changes: 14 additions & 6 deletions orderer/multichain/chainsupport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,19 @@ import (
"github.com/hyperledger/fabric/orderer/rawledger"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/utils"
)

type mockLedgerReadWriter struct {
data []*cb.Envelope
data [][]byte
metadata [][]byte
height uint64
}

func (mlw *mockLedgerReadWriter) Append(data []*cb.Envelope, metadata [][]byte) *cb.Block {
mlw.data = data
mlw.metadata = metadata
func (mlw *mockLedgerReadWriter) Append(block *cb.Block) error {
mlw.data = block.Data.Data
mlw.metadata = block.Metadata.Metadata
mlw.height++
return nil
}

Expand All @@ -42,7 +45,7 @@ func (mlw *mockLedgerReadWriter) Iterator(startType *ab.SeekPosition) (rawledger
}

func (mlw *mockLedgerReadWriter) Height() uint64 {
panic("Unimplemented")
return mlw.height
}

type mockCommitter struct {
Expand All @@ -65,7 +68,12 @@ func TestCommitConfig(t *testing.T) {
committers := []filter.Committer{&mockCommitter{}, &mockCommitter{}}
cs.WriteBlock(txs, md, committers)

if !reflect.DeepEqual(ml.data, txs) {
blockTXs := make([]*cb.Envelope, len(ml.data))
for i := range ml.data {
blockTXs[i] = utils.UnmarshalEnvelopeOrPanic(ml.data[i])
}

if !reflect.DeepEqual(blockTXs, txs) {
t.Errorf("Should have written input data to ledger but did not")
}

Expand Down
2 changes: 1 addition & 1 deletion orderer/multichain/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (ml *multiLedger) systemChain() *systemChain {

func (ml *multiLedger) newChain(configtx *cb.Envelope) {
configManager, policyManager, backingLedger, sharedConfig := ml.newResources(configtx)
backingLedger.Append([]*cb.Envelope{configtx}, nil)
backingLedger.Append(rawledger.CreateNextBlock(backingLedger, []*cb.Envelope{configtx}, nil))

// Copy the map to allow concurrent reads from broadcast/deliver while the new chainSupport is
newChains := make(map[string]*chainSupport)
Expand Down
41 changes: 30 additions & 11 deletions orderer/multichain/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/hyperledger/fabric/orderer/common/bootstrap/provisional"
"github.com/hyperledger/fabric/orderer/localconfig"
"github.com/hyperledger/fabric/orderer/rawledger"
"github.com/hyperledger/fabric/orderer/rawledger/ramledger"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
Expand Down Expand Up @@ -57,16 +58,34 @@ func makeNormalTx(chainID string, i int) *cb.Envelope {
}
}

func NewRAMLedgerAndFactory(maxSize int) (rawledger.Factory, rawledger.ReadWriter) {
rlf := ramledger.New(10)
rl, err := rlf.GetOrCreate(provisional.TestChainID)
if err != nil {
panic(err)
}
err = rl.Append(genesisBlock)
if err != nil {
panic(err)
}
return rlf, rl
}

func NewRAMLedger(maxSize int) rawledger.ReadWriter {
_, rl := NewRAMLedgerAndFactory(maxSize)
return rl
}

// Tests for a normal chain which contains 3 config transactions and other normal transactions to make sure the right one returned
func TestGetConfigTx(t *testing.T) {
_, rl := ramledger.New(10, genesisBlock)
rl := NewRAMLedger(10)
for i := 0; i < 5; i++ {
rl.Append([]*cb.Envelope{makeNormalTx(provisional.TestChainID, i)}, nil)
rl.Append(rawledger.CreateNextBlock(rl, []*cb.Envelope{makeNormalTx(provisional.TestChainID, i)}, nil))
}
rl.Append([]*cb.Envelope{makeConfigTx(provisional.TestChainID, 5)}, nil)
rl.Append(rawledger.CreateNextBlock(rl, []*cb.Envelope{makeConfigTx(provisional.TestChainID, 5)}, nil))
ctx := makeConfigTx(provisional.TestChainID, 6)
rl.Append([]*cb.Envelope{ctx}, nil)
rl.Append([]*cb.Envelope{makeNormalTx(provisional.TestChainID, 7)}, nil)
rl.Append(rawledger.CreateNextBlock(rl, []*cb.Envelope{ctx}, nil))
rl.Append(rawledger.CreateNextBlock(rl, []*cb.Envelope{makeNormalTx(provisional.TestChainID, 7)}, nil))

pctx := getConfigTx(rl)

Expand All @@ -77,14 +96,14 @@ func TestGetConfigTx(t *testing.T) {

// Tests a chain which contains blocks with multi-transactions mixed with config txs, and a single tx which is not a config tx, none count as config blocks so nil should return
func TestGetConfigTxFailure(t *testing.T) {
_, rl := ramledger.New(10, genesisBlock)
rl := NewRAMLedger(10)
for i := 0; i < 10; i++ {
rl.Append([]*cb.Envelope{
rl.Append(rawledger.CreateNextBlock(rl, []*cb.Envelope{
makeNormalTx(provisional.TestChainID, i),
makeConfigTx(provisional.TestChainID, i),
}, nil)
}, nil))
}
rl.Append([]*cb.Envelope{makeNormalTx(provisional.TestChainID, 11)}, nil)
rl.Append(rawledger.CreateNextBlock(rl, []*cb.Envelope{makeNormalTx(provisional.TestChainID, 11)}, nil))
pctx := getConfigTx(rl)

if pctx != nil {
Expand All @@ -94,7 +113,7 @@ func TestGetConfigTxFailure(t *testing.T) {

// This test essentially brings the entire system up and is ultimately what main.go will replicate
func TestManagerImpl(t *testing.T) {
lf, rl := ramledger.New(10, genesisBlock)
lf, rl := NewRAMLedgerAndFactory(10)

consenters := make(map[string]Consenter)
consenters[conf.General.OrdererType] = &mockConsenter{}
Expand Down Expand Up @@ -141,7 +160,7 @@ func TestManagerImpl(t *testing.T) {
// This test brings up the entire system, with the mock consenter, including the broadcasters etc. and creates a new chain
func TestNewChain(t *testing.T) {
conf := config.Load()
lf, rl := ramledger.New(10, genesisBlock)
lf, rl := NewRAMLedgerAndFactory(10)

consenters := make(map[string]Consenter)
consenters[conf.General.OrdererType] = &mockConsenter{}
Expand Down
20 changes: 13 additions & 7 deletions orderer/rawledger/blackbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,12 @@ func testReinitialization(lf ledgerTestFactory, t *testing.T) {
return
}
_, oli := lf.New()
aBlock := oli.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}}, nil)
aBlock := CreateNextBlock(oli, []*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}}, nil)
err := oli.Append(aBlock)
if err != nil {
t.Fatalf("Error appending block: %s", err)
}

_, li := lf.New()
if li.Height() != 2 {
t.Fatalf("Block height should be 2")
Expand All @@ -125,7 +130,7 @@ func testAddition(lf ledgerTestFactory, t *testing.T) {
}
prevHash := genesis.Header.Hash()

li.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}}, nil)
li.Append(CreateNextBlock(li, []*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}}, nil))
if li.Height() != 2 {
t.Fatalf("Block height should be 2")
}
Expand All @@ -144,7 +149,7 @@ func TestRetrieval(t *testing.T) {

func testRetrieval(lf ledgerTestFactory, t *testing.T) {
_, li := lf.New()
li.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}}, nil)
li.Append(CreateNextBlock(li, []*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}}, nil))
it, num := li.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Oldest{}})
if num != 0 {
t.Fatalf("Expected genesis block iterator, but got %d", num)
Expand Down Expand Up @@ -193,7 +198,7 @@ func testBlockedRetrieval(lf ledgerTestFactory, t *testing.T) {
t.Fatalf("Should not be ready for block read")
default:
}
li.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}}, nil)
li.Append(CreateNextBlock(li, []*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}}, nil))
select {
case <-signal:
default:
Expand Down Expand Up @@ -227,8 +232,9 @@ func testMultichain(lf ledgerTestFactory, t *testing.T) {
t.Fatalf("Error creating chain1: %s", err)
}

c1.Append([]*cb.Envelope{&cb.Envelope{Payload: c1p1}}, nil)
c1b1 := c1.Append([]*cb.Envelope{&cb.Envelope{Payload: c1p2}}, nil)
c1.Append(CreateNextBlock(c1, []*cb.Envelope{&cb.Envelope{Payload: c1p1}}, nil))
c1b1 := CreateNextBlock(c1, []*cb.Envelope{&cb.Envelope{Payload: c1p2}}, nil)
c1.Append(c1b1)

if c1.Height() != 2 {
t.Fatalf("Block height for c1 should be 2")
Expand All @@ -238,7 +244,7 @@ func testMultichain(lf ledgerTestFactory, t *testing.T) {
if err != nil {
t.Fatalf("Error creating chain2: %s", err)
}
c2b0 := c2.Append([]*cb.Envelope{&cb.Envelope{Payload: c2p1}}, nil)
c2b0 := c2.Append(CreateNextBlock(c2, []*cb.Envelope{&cb.Envelope{Payload: c2p1}}, nil))

if c2.Height() != 1 {
t.Fatalf("Block height for c2 should be 1")
Expand Down

0 comments on commit f806802

Please sign in to comment.