Skip to content

Commit

Permalink
[FAB-421] Add multi-chain support to rawledger
Browse files Browse the repository at this point in the history
The original rawledger interface only supports a single chain.  However,
to support multiple chains, this interface must be extended to support
multiple chains.  This changeset creates a Factory interface for the
rawledger which supports the method GetOrCreate which will get the
ledger if it already exists, or create a new one if not.

In order to preserve some backwards compatability, and to support the
notion of multi-chain more generally in the future with an ordering
system chain, the factories accept a genesis block to seed the ordering
system chain with.

Change-Id: I99844ff1b9a11f383d08c23f60d95f848632876e
Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
  • Loading branch information
Jason Yellick committed Nov 22, 2016
1 parent a2b9b2e commit 6b58537
Show file tree
Hide file tree
Showing 12 changed files with 269 additions and 58 deletions.
12 changes: 6 additions & 6 deletions orderer/common/deliver/deliver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (m *mockD) Recv() (*ab.DeliverUpdate, error) {

func TestOldestSeek(t *testing.T) {
ledgerSize := 5
rl := ramledger.New(ledgerSize, genesisBlock)
_, rl := ramledger.New(ledgerSize, genesisBlock)
for i := 1; i < ledgerSize; i++ {
rl.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil)
}
Expand Down Expand Up @@ -103,7 +103,7 @@ func TestOldestSeek(t *testing.T) {

func TestNewestSeek(t *testing.T) {
ledgerSize := 5
rl := ramledger.New(ledgerSize, genesisBlock)
_, rl := ramledger.New(ledgerSize, genesisBlock)
for i := 1; i < ledgerSize; i++ {
rl.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil)
}
Expand Down Expand Up @@ -132,7 +132,7 @@ func TestNewestSeek(t *testing.T) {

func TestSpecificSeek(t *testing.T) {
ledgerSize := 5
rl := ramledger.New(ledgerSize, genesisBlock)
_, rl := ramledger.New(ledgerSize, genesisBlock)
for i := 1; i < ledgerSize; i++ {
rl.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil)
}
Expand Down Expand Up @@ -160,7 +160,7 @@ func TestSpecificSeek(t *testing.T) {

func TestBadSeek(t *testing.T) {
ledgerSize := 5
rl := ramledger.New(ledgerSize, genesisBlock)
_, rl := ramledger.New(ledgerSize, genesisBlock)
for i := 1; i < 2*ledgerSize; i++ {
rl.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil)
}
Expand Down Expand Up @@ -196,7 +196,7 @@ func TestBadSeek(t *testing.T) {

func TestBadWindow(t *testing.T) {
ledgerSize := 5
rl := ramledger.New(ledgerSize, genesisBlock)
_, rl := ramledger.New(ledgerSize, genesisBlock)

m := newMockD()
defer close(m.recvChan)
Expand All @@ -219,7 +219,7 @@ func TestBadWindow(t *testing.T) {
func TestAck(t *testing.T) {
ledgerSize := 10
windowSize := uint64(2)
rl := ramledger.New(ledgerSize, genesisBlock)
_, rl := ramledger.New(ledgerSize, genesisBlock)
for i := 1; i < ledgerSize; i++ {
rl.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}, nil)
}
Expand Down
4 changes: 2 additions & 2 deletions orderer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,11 @@ func launchSolo(conf *config.TopLevel) {
}
}

rawledger = fileledger.New(location, genesisBlock)
_, rawledger = fileledger.New(location, genesisBlock)
case "ram":
fallthrough
default:
rawledger = ramledger.New(int(conf.RAMLedger.HistorySize), genesisBlock)
_, rawledger = ramledger.New(int(conf.RAMLedger.HistorySize), genesisBlock)
}

lastConfigTx := retrieveConfiguration(rawledger)
Expand Down
86 changes: 71 additions & 15 deletions orderer/rawledger/blackbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package rawledger_test

import (
"bytes"
"reflect"
"testing"

. "github.com/hyperledger/fabric/orderer/rawledger"
Expand All @@ -26,12 +27,12 @@ import (
)

type ledgerTestable interface {
Initialize() (ledgerFactory, error)
Initialize() (ledgerTestFactory, error)
Name() string
}

type ledgerFactory interface {
New() ReadWriter
type ledgerTestFactory interface {
New() (Factory, ReadWriter)
Destroy() error
Persistent() bool
}
Expand All @@ -52,7 +53,7 @@ func getBlock(number uint64, li ReadWriter) *cb.Block {
}
}

func allTest(t *testing.T, test func(ledgerFactory, *testing.T)) {
func allTest(t *testing.T, test func(ledgerTestFactory, *testing.T)) {
for _, lt := range testables {

t.Log("Running test for", lt.Name())
Expand All @@ -77,8 +78,8 @@ func TestInitialization(t *testing.T) {
allTest(t, testInitialization)
}

func testInitialization(lf ledgerFactory, t *testing.T) {
li := lf.New()
func testInitialization(lf ledgerTestFactory, t *testing.T) {
_, li := lf.New()
if li.Height() != 1 {
t.Fatalf("Block height should be 1")
}
Expand All @@ -92,14 +93,14 @@ func TestReinitialization(t *testing.T) {
allTest(t, testReinitialization)
}

func testReinitialization(lf ledgerFactory, t *testing.T) {
func testReinitialization(lf ledgerTestFactory, t *testing.T) {
if !lf.Persistent() {
t.Log("Skipping test as persistence is not available for this ledger type")
return
}
oli := lf.New()
_, oli := lf.New()
aBlock := oli.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}}, nil)
li := lf.New()
_, li := lf.New()
if li.Height() != 2 {
t.Fatalf("Block height should be 2")
}
Expand All @@ -116,8 +117,8 @@ func TestAddition(t *testing.T) {
allTest(t, testAddition)
}

func testAddition(lf ledgerFactory, t *testing.T) {
li := lf.New()
func testAddition(lf ledgerTestFactory, t *testing.T) {
_, li := lf.New()
genesis := getBlock(0, li)
if genesis == nil {
t.Fatalf("Could not retrieve genesis block")
Expand All @@ -141,8 +142,8 @@ func TestRetrieval(t *testing.T) {
allTest(t, testRetrieval)
}

func testRetrieval(lf ledgerFactory, t *testing.T) {
li := lf.New()
func testRetrieval(lf ledgerTestFactory, t *testing.T) {
_, li := lf.New()
li.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}}, nil)
it, num := li.Iterator(ab.SeekInfo_OLDEST, 99)
if num != 0 {
Expand Down Expand Up @@ -180,8 +181,8 @@ func TestBlockedRetrieval(t *testing.T) {
allTest(t, testBlockedRetrieval)
}

func testBlockedRetrieval(lf ledgerFactory, t *testing.T) {
li := lf.New()
func testBlockedRetrieval(lf ledgerTestFactory, t *testing.T) {
_, li := lf.New()
it, num := li.Iterator(ab.SeekInfo_SPECIFIED, 1)
if num != 1 {
t.Fatalf("Expected block iterator at 1, but got %d", num)
Expand All @@ -206,3 +207,58 @@ func testBlockedRetrieval(lf ledgerFactory, t *testing.T) {
t.Fatalf("Expected to successfully retrieve the second block")
}
}

func TestMultichain(t *testing.T) {
allTest(t, testMultichain)
}

func testMultichain(lf ledgerTestFactory, t *testing.T) {
f, _ := lf.New()
chain1 := []byte("chain1")
chain2 := []byte("chain2")

c1p1 := []byte("c1 payload1")
c1p2 := []byte("c1 payload2")

c2p1 := []byte("c2 payload1")

c1, err := f.GetOrCreate(chain1)
if err != nil {
t.Fatalf("Error creating chain1: %s", err)
}

c1.Append([]*cb.Envelope{&cb.Envelope{Payload: c1p1}}, nil)
c1b1 := c1.Append([]*cb.Envelope{&cb.Envelope{Payload: c1p2}}, nil)

if c1.Height() != 2 {
t.Fatalf("Block height for c1 should be 2")
}

c2, err := f.GetOrCreate(chain2)
if err != nil {
t.Fatalf("Error creating chain2: %s", err)
}
c2b0 := c2.Append([]*cb.Envelope{&cb.Envelope{Payload: c2p1}}, nil)

if c2.Height() != 1 {
t.Fatalf("Block height for c2 should be 1")
}

c1, err = f.GetOrCreate(chain1)
if err != nil {
t.Fatalf("Error retrieving chain1: %s", err)
}

if b := getBlock(1, c1); !reflect.DeepEqual(c1b1, b) {
t.Fatalf("Did not properly store block 1 on chain 1:")
}

c2, err = f.GetOrCreate(chain1)
if err != nil {
t.Fatalf("Error retrieving chain2: %s", err)
}

if b := getBlock(0, c2); reflect.DeepEqual(c2b0, b) {
t.Fatalf("Did not properly store block 1 on chain 1")
}
}
88 changes: 82 additions & 6 deletions orderer/rawledger/fileledger/fileledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"fmt"
"io/ioutil"
"os"
"reflect"
"sync"

"github.com/hyperledger/fabric/orderer/rawledger"
cb "github.com/hyperledger/fabric/protos/common"
Expand Down Expand Up @@ -55,21 +57,92 @@ type fileLedger struct {
marshaler *jsonpb.Marshaler
}

// New creates a new instance of the file ledger
func New(directory string, genesisBlock *cb.Block) rawledger.ReadWriter {
type fileLedgerFactory struct {
directory string
ledgers map[string]rawledger.ReadWriter
mutex sync.Mutex
}

func New(directory string, systemGenesis *cb.Block) (rawledger.Factory, rawledger.ReadWriter) {
env := &cb.Envelope{}
err := proto.Unmarshal(systemGenesis.Data.Data[0], env)
if err != nil {
logger.Fatalf("Bad envelope in genesis block: %s", err)
}

payload := &cb.Payload{}
err = proto.Unmarshal(env.Payload, payload)
if err != nil {
logger.Fatalf("Bad payload in genesis block: %s", err)
}

logger.Debugf("Initializing fileLedger at '%s'", directory)
if err := os.MkdirAll(directory, 0700); err != nil {
panic(err)
logger.Fatalf("Could not create directory %s: %s", directory, err)
}

flf := &fileLedgerFactory{
directory: directory,
ledgers: make(map[string]rawledger.ReadWriter),
}

flt, err := flf.GetOrCreate(payload.Header.ChainHeader.ChainID)
if err != nil {
logger.Fatalf("Error getting orderer system chain dir: %s", err)
}

fl := flt.(*fileLedger)

if fl.height > 0 {
block, ok := fl.readBlock(0)
if !ok {
logger.Fatalf("Error reading genesis block for chain of height %d", fl.height)
}
if !reflect.DeepEqual(block, systemGenesis) {
logger.Fatalf("Attempted to reconfigure an existing ordering system chain with new genesis block")
}
} else {
fl.writeBlock(systemGenesis)
fl.height = 1
fl.lastHash = systemGenesis.Header.Hash()
}

return flf, fl
}

func (flf *fileLedgerFactory) GetOrCreate(chainID []byte) (rawledger.ReadWriter, error) {
flf.mutex.Lock()
defer flf.mutex.Unlock()

key := string(chainID)

// Check a second time with the lock held
l, ok := flf.ledgers[key]
if ok {
return l, nil
}

directory := fmt.Sprintf("%s/%x", flf.directory, chainID)

logger.Debugf("Initializing chain at '%s'", directory)

if err := os.MkdirAll(directory, 0700); err != nil {
return nil, err
}

ch := newChain(directory)
flf.ledgers[key] = ch
return ch, nil
}

// newChain creates a new chain backed by a file ledger
func newChain(directory string) rawledger.ReadWriter {
fl := &fileLedger{
directory: directory,
fqFormatString: directory + "/" + blockFileFormatString,
signal: make(chan struct{}),
marshaler: &jsonpb.Marshaler{Indent: " "},
}
if _, err := os.Stat(fl.blockFilename(genesisBlock.Header.Number)); os.IsNotExist(err) {
fl.writeBlock(genesisBlock)
}
fl.initializeBlockHeight()
logger.Debugf("Initialized to block height %d with hash %x", fl.height-1, fl.lastHash)
return fl
Expand Down Expand Up @@ -97,6 +170,9 @@ func (fl *fileLedger) initializeBlockHeight() {
nextNumber++
}
fl.height = nextNumber
if fl.height == 0 {
return
}
block, found := fl.readBlock(fl.height - 1)
if !found {
panic(fmt.Errorf("Block %d was in directory listing but error reading", fl.height-1))
Expand Down
8 changes: 5 additions & 3 deletions orderer/rawledger/fileledger/fileledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ type testEnv struct {
}

func initialize(t *testing.T) (*testEnv, *fileLedger) {
name, err := ioutil.TempDir("", "hyperledger")
name, err := ioutil.TempDir("", "hyperledger_fabric")
if err != nil {
t.Fatalf("Error creating temp dir: %s", err)
}
return &testEnv{location: name, t: t}, New(name, genesisBlock).(*fileLedger)
_, fl := New(name, genesisBlock)
return &testEnv{location: name, t: t}, fl.(*fileLedger)
}

func (tev *testEnv) tearDown() {
Expand Down Expand Up @@ -77,7 +78,8 @@ func TestReinitialization(t *testing.T) {
tev, ofl := initialize(t)
defer tev.tearDown()
ofl.Append([]*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}}, nil)
fl := New(tev.location, genesisBlock).(*fileLedger)
_, flt := New(tev.location, genesisBlock)
fl := flt.(*fileLedger)
if fl.height != 2 {
t.Fatalf("Block height should be 2")
}
Expand Down

0 comments on commit 6b58537

Please sign in to comment.