Skip to content

Commit

Permalink
Add a fileledger implementation in rawledger.
Browse files Browse the repository at this point in the history
This addition supplements the existing ramledger rawledger
implementation.  Neither this, nor the ramledger is intended to be
performant, but they are both intended to be extremely simple.

In the case of this added fileledger, it is intended to allow data
persistence between stopping and starting of an orderer.

Ultimately, the expectation would be that for the performant case,
a high performance database of some sort, such as RocksDB or other
would likely be used for persisting data.

https://jira.hyperledger.org/browse/FAB-326

Change-Id: Ic75bee7cd27b311b512b2c3e0e9741b40baffbf1
Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
  • Loading branch information
Jason Yellick committed Sep 14, 2016
1 parent 45bd645 commit fe54d04
Show file tree
Hide file tree
Showing 2 changed files with 369 additions and 0 deletions.
210 changes: 210 additions & 0 deletions orderer/rawledger/fileledger/fileledger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package fileledger

import (
"fmt"
"io/ioutil"
"os"

ab "github.com/hyperledger/fabric/orderer/atomicbroadcast"
"github.com/hyperledger/fabric/orderer/rawledger"

"github.com/golang/protobuf/jsonpb"
"github.com/op/go-logging"
)

var logger = logging.MustGetLogger("rawledger/fileledger")
var closedChan chan struct{}

func init() {
logging.SetLevel(logging.DEBUG, "")
closedChan = make(chan struct{})
close(closedChan)
}

const blockFileFormatString string = "block_%020d.json"

type cursor struct {
fl *fileLedger
blockNumber uint64
}

type fileLedger struct {
directory string
fqFormatString string
height uint64
signal chan struct{}
lastHash []byte
marshaler *jsonpb.Marshaler
}

// New creates a new instance of the file ledger
func New(directory string) rawledger.ReadWriter {
logger.Debugf("Initializing fileLedger at '%s'", directory)
if err := os.MkdirAll(directory, 0700); err != nil {
panic(err)
}
fl := &fileLedger{
directory: directory,
fqFormatString: directory + "/" + blockFileFormatString,
signal: make(chan struct{}),
marshaler: &jsonpb.Marshaler{Indent: " "},
}
genesisBlock := &ab.Block{
Number: 0,
PrevHash: []byte("GENESIS"),
}
if _, err := os.Stat(fl.blockFilename(genesisBlock.Number)); os.IsNotExist(err) {
fl.writeBlock(genesisBlock)
}
fl.initializeBlockHeight()
logger.Debugf("Initialized to block height %d with hash %x", fl.height-1, fl.lastHash)
return fl
}

// initializeBlockHeight verifies all blocks exist between 0 and the block height, and populates the lastHash
func (fl *fileLedger) initializeBlockHeight() {
infos, err := ioutil.ReadDir(fl.directory)
if err != nil {
panic(err)
}
nextNumber := uint64(0)
for _, info := range infos {
if info.IsDir() {
continue
}
var number uint64
_, err := fmt.Sscanf(info.Name(), blockFileFormatString, &number)
if err != nil {
continue
}
if number != nextNumber {
panic(fmt.Errorf("Missing block %d in the chain", nextNumber))
}
nextNumber++
}
fl.height = nextNumber
block, found := fl.readBlock(fl.height - 1)
if !found {
panic(fmt.Errorf("Block %d was in directory listing but error reading", fl.height-1))
}
if block == nil {
panic(fmt.Errorf("Error reading block %d", fl.height-1))
}
fl.lastHash = block.Hash()
}

// blockFilename returns the fully qualified path to where a block of a given number should be stored on disk
func (fl *fileLedger) blockFilename(number uint64) string {
return fmt.Sprintf(fl.fqFormatString, number)
}

// writeBlock commits a block to disk
func (fl *fileLedger) writeBlock(block *ab.Block) {
file, err := os.Create(fl.blockFilename(block.Number))
if err != nil {
panic(err)
}
defer file.Close()
err = fl.marshaler.Marshal(file, block)
logger.Debugf("Wrote block %d", block.Number)
if err != nil {
panic(err)
}

}

// readBlock returns the block or nil, and whether the block was found or not, (nil,true) generally indicates an irrecoverable problem
func (fl *fileLedger) readBlock(number uint64) (*ab.Block, bool) {
file, err := os.Open(fl.blockFilename(number))
if err == nil {
defer file.Close()
block := &ab.Block{}
err = jsonpb.Unmarshal(file, block)
if err != nil {
return nil, true
}
logger.Debugf("Read block %d", block.Number)
return block, true
}
return nil, false
}

// Height returns the highest block number in the chain, plus one
func (fl *fileLedger) Height() uint64 {
return fl.height
}

// Append creates a new block and appends it to the ledger
func (fl *fileLedger) Append(messages []*ab.BroadcastMessage, proof []byte) *ab.Block {
block := &ab.Block{
Number: fl.height,
PrevHash: fl.lastHash,
Messages: messages,
Proof: proof,
}
fl.writeBlock(block)
fl.height++
close(fl.signal)
fl.signal = make(chan struct{})
return block
}

// Iterator implements the rawledger.Reader definition
func (fl *fileLedger) Iterator(startType ab.SeekInfo_StartType, specified uint64) (rawledger.Iterator, uint64) {
switch startType {
case ab.SeekInfo_OLDEST:
return &cursor{fl: fl, blockNumber: 0}, 0
case ab.SeekInfo_NEWEST:
high := fl.height - 1
return &cursor{fl: fl, blockNumber: high}, high
case ab.SeekInfo_SPECIFIED:
if specified > fl.height {
return &rawledger.NotFoundErrorIterator{}, 0
}
return &cursor{fl: fl, blockNumber: specified}, specified
}

// This line should be unreachable, but the compiler requires it
return &rawledger.NotFoundErrorIterator{}, 0
}

// Next blocks until there is a new block available, or returns an error if the next block is no longer retrievable
func (cu *cursor) Next() (*ab.Block, ab.Status) {
// This only loops once, as signal reading indicates the new block has been written
for {
block, found := cu.fl.readBlock(cu.blockNumber)
if found {
if block == nil {
return nil, ab.Status_SERVICE_UNAVAILABLE
}
cu.blockNumber++
return block, ab.Status_SUCCESS
}
<-cu.fl.signal
}
}

// ReadyChan returns a channel that will close when Next is ready to be called without blocking
func (cu *cursor) ReadyChan() <-chan struct{} {
signal := cu.fl.signal
if _, err := os.Stat(cu.fl.blockFilename(cu.blockNumber)); os.IsNotExist(err) {
return signal
}
return closedChan
}
159 changes: 159 additions & 0 deletions orderer/rawledger/fileledger/fileledger_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package fileledger

import (
"bytes"
"io/ioutil"
"os"
"testing"

ab "github.com/hyperledger/fabric/orderer/atomicbroadcast"
)

type testEnv struct {
t *testing.T
location string
}

func initialize(t *testing.T) (*testEnv, *fileLedger) {
name, err := ioutil.TempDir("", "hyperledger")
if err != nil {
t.Fatalf("Error creating temp dir: %s", err)
}
return &testEnv{location: name, t: t}, New(name).(*fileLedger)
}

func (tev *testEnv) tearDown() {
err := os.RemoveAll(tev.location)
if err != nil {
tev.t.Fatalf("Error tearing down env: %s", err)
}
}

func TestInitialization(t *testing.T) {
tev, fl := initialize(t)
defer tev.tearDown()
if fl.height != 1 {
t.Fatalf("Block height should be 1")
}
block, found := fl.readBlock(0)
if block == nil || !found {
t.Fatalf("Error retrieving genesis block")
}
if !bytes.Equal(block.Hash(), fl.lastHash) {
t.Fatalf("Block hashes did no match")
}
}

func TestReinitialization(t *testing.T) {
tev, ofl := initialize(t)
defer tev.tearDown()
ofl.Append([]*ab.BroadcastMessage{&ab.BroadcastMessage{Data: []byte("My Data")}}, nil)
fl := New(tev.location).(*fileLedger)
if fl.height != 2 {
t.Fatalf("Block height should be 2")
}
block, found := fl.readBlock(1)
if block == nil || !found {
t.Fatalf("Error retrieving block 1")
}
if !bytes.Equal(block.Hash(), fl.lastHash) {
t.Fatalf("Block hashes did no match")
}
}

func TestAddition(t *testing.T) {
tev, fl := initialize(t)
defer tev.tearDown()
prevHash := fl.lastHash
fl.Append([]*ab.BroadcastMessage{&ab.BroadcastMessage{Data: []byte("My Data")}}, nil)
if fl.height != 2 {
t.Fatalf("Block height should be 2")
}
block, found := fl.readBlock(1)
if block == nil || !found {
t.Fatalf("Error retrieving genesis block")
}
if !bytes.Equal(block.PrevHash, prevHash) {
t.Fatalf("Block hashes did no match")
}
}

func TestRetrieval(t *testing.T) {
tev, fl := initialize(t)
defer tev.tearDown()
fl.Append([]*ab.BroadcastMessage{&ab.BroadcastMessage{Data: []byte("My Data")}}, nil)
it, num := fl.Iterator(ab.SeekInfo_OLDEST, 99)
if num != 0 {
t.Fatalf("Expected genesis block iterator, but got %d", num)
}
signal := it.ReadyChan()
select {
case <-signal:
default:
t.Fatalf("Should be ready for block read")
}
block, status := it.Next()
if status != ab.Status_SUCCESS {
t.Fatalf("Expected to successfully read the genesis block")
}
if block.Number != 0 {
t.Fatalf("Expected to successfully retrieve the genesis block")
}
signal = it.ReadyChan()
select {
case <-signal:
default:
t.Fatalf("Should still be ready for block read")
}
block, status = it.Next()
if status != ab.Status_SUCCESS {
t.Fatalf("Expected to successfully read the second block")
}
if block.Number != 1 {
t.Fatalf("Expected to successfully retrieve the second block but got block number %d", block.Number)
}
}

func TestBlockedRetrieval(t *testing.T) {
tev, fl := initialize(t)
defer tev.tearDown()
it, num := fl.Iterator(ab.SeekInfo_SPECIFIED, 1)
if num != 1 {
t.Fatalf("Expected block iterator at 1, but got %d", num)
}
signal := it.ReadyChan()
select {
case <-signal:
t.Fatalf("Should not be ready for block read")
default:
}
fl.Append([]*ab.BroadcastMessage{&ab.BroadcastMessage{Data: []byte("My Data")}}, nil)
select {
case <-signal:
default:
t.Fatalf("Should now be ready for block read")
}
block, status := it.Next()
if status != ab.Status_SUCCESS {
t.Fatalf("Expected to successfully read the second block")
}
if block.Number != 1 {
t.Fatalf("Expected to successfully retrieve the second block")
}
}

0 comments on commit fe54d04

Please sign in to comment.