From d18aa9850edf2c13b737be70ec23cf01fe094cf0 Mon Sep 17 00:00:00 2001 From: Mari Wade Date: Wed, 7 Dec 2016 13:30:27 -0500 Subject: [PATCH] FAB-1140 Ledger History Database framework This is an initial check-in that will 1) create the history database in couchDB if it does not exist. 2) add the history commit to the LEDGER Commit if history is enabled 3) stores the history in the history database The History functionality is not enabled unless both couchDB and History are enabled in the config. Note that tests will be added to validate the writes and the actual data written to the database in future changes. The tests to validate history is not possible until the query APIs are put in place in future changes. Change-Id: Id207007ab5faae957c1e05234e441566a116ea33 Signed-off-by: Mari Wade --- core/ledger/history/couchdb_histmgr.go | 105 ++++++++++++++++-- core/ledger/history/couchdb_histmgr_test.go | 60 ++++------ core/ledger/history/histmgmt.go | 4 +- core/ledger/history/pkg_test.go | 54 +++++++++ core/ledger/kvledger/kv_ledger.go | 31 +++++- core/ledger/kvledger/kv_ledger_test.go | 71 ++++++++++++ .../couchdbtxmgmt/couchdb_txmgmt_test.go | 3 + 7 files changed, 277 insertions(+), 51 deletions(-) create mode 100644 core/ledger/history/pkg_test.go diff --git a/core/ledger/history/couchdb_histmgr.go b/core/ledger/history/couchdb_histmgr.go index 1ae0f03b95d..d03f74227d4 100644 --- a/core/ledger/history/couchdb_histmgr.go +++ b/core/ledger/history/couchdb_histmgr.go @@ -17,36 +17,125 @@ limitations under the License. package history import ( + "bytes" + "strconv" + + "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt" "github.com/hyperledger/fabric/core/ledger/util/couchdb" "github.com/hyperledger/fabric/protos/common" + putils "github.com/hyperledger/fabric/protos/utils" logging "github.com/op/go-logging" ) -var logger = logging.MustGetLogger("txhistorymgmt") +var logger = logging.MustGetLogger("history") -// CouchDBHistMgr a simple implementation of interface `histmgmt.TxHistMgr'. -// TODO This implementation does not currently use a lock but may need one to insure query's are consistent +// CouchDBHistMgr a simple implementation of interface `histmgmt.HistMgr'. +// TODO This implementation does not currently use a lock but may need one to ensure query's are consistent type CouchDBHistMgr struct { couchDB *couchdb.CouchDBConnectionDef // COUCHDB new properties for CouchDB } -// NewCouchDBHistMgr constructs a new `CouchDBTxHistMgr` +// NewCouchDBHistMgr constructs a new `CouchDB HistMgr` func NewCouchDBHistMgr(couchDBConnectURL string, dbName string, id string, pw string) *CouchDBHistMgr { //TODO locking has not been implemented but may need some sort of locking to insure queries are valid data. couchDB, err := couchdb.CreateCouchDBConnectionAndDB(couchDBConnectURL, dbName, id, pw) if err != nil { - logger.Errorf("Error during NewCouchDBHistMgr(): %s\n", err.Error()) + logger.Errorf("===HISTORYDB=== Error during NewCouchDBHistMgr(): %s\n", err.Error()) return nil } - // db and stateIndexCF will not be used for CouchDB. TODO to cleanup return &CouchDBHistMgr{couchDB: couchDB} } -// Commit implements method in interface `txhistorymgmt.TxMgr` +// Commit implements method in interface `histmgmt.HistMgr` +// This writes to a separate history database. func (histmgr *CouchDBHistMgr) Commit(block *common.Block) error { - logger.Debugf("===HISTORYDB=== Entering CouchDBTxHistMgr.Commit()") + logger.Debugf("===HISTORYDB=== Entering CouchDBHistMgr.Commit()") + + //Get the blocknumber off of the header + blockNo := block.Header.Number + //Set the starting tranNo to 0 + var tranNo uint64 + + logger.Debugf("===HISTORYDB=== Updating history for blockNo: %v with [%d] transactions", + blockNo, len(block.Data.Data)) + for _, envBytes := range block.Data.Data { + tranNo++ + logger.Debugf("===HISTORYDB=== Updating history for tranNo: %v", tranNo) + + // extract actions from the envelope message + respPayload, err := putils.GetActionFromEnvelope(envBytes) + if err != nil { + return err + } + + //preparation for extracting RWSet from transaction + txRWSet := &txmgmt.TxReadWriteSet{} + + // Get the Result from the Action and then Unmarshal + // it into a TxReadWriteSet using custom unmarshalling + if err = txRWSet.Unmarshal(respPayload.Results); err != nil { + return err + } + + //Transactions that have data that is not JSON such as binary data, + // the write value will not write to history database. + //These types of transactions will have the key written to the history + // database to support history key scans. We do not write the binary + // value to CouchDB since the purpose of the history database value is + // for query andbinary data can not be queried. + for _, nsRWSet := range txRWSet.NsRWs { + ns := nsRWSet.NameSpace + + for _, kvWrite := range nsRWSet.Writes { + writeKey := kvWrite.Key + writeValue := kvWrite.Value + compositeKey := constructCompositeKey(ns, writeKey, blockNo, tranNo) + var bytesDoc []byte + + logger.Debugf("===HISTORYDB=== ns (namespace or cc id) = %v, writeKey: %v, compositeKey: %v, writeValue = %v", + ns, writeKey, compositeKey, writeValue) + + if couchdb.IsJSON(string(writeValue)) { + //logger.Debugf("===HISTORYDB=== yes JSON store writeValue = %v", string(writeValue)) + bytesDoc = writeValue + } else { + //For data that is not in JSON format only store the key + //logger.Debugf("===HISTORYDB=== not JSON only store key") + bytesDoc = []byte(`{}`) + } + + // SaveDoc using couchdb client and use JSON format + rev, err := histmgr.couchDB.SaveDoc(compositeKey, "", bytesDoc, nil) + if err != nil { + logger.Errorf("===HISTORYDB=== Error during Commit(): %s\n", err.Error()) + return err + } + if rev != "" { + logger.Debugf("===HISTORYDB=== Saved document revision number: %s\n", rev) + } + + } + } + + } return nil } + +func constructCompositeKey(ns string, key string, blocknum uint64, trannum uint64) string { + //History Key is: "namespace key blocknum trannum"", with namespace being the chaincode id + + // TODO - We will likely want sortable varint encoding, rather then a simple number, in order to support sorted key scans + var buffer bytes.Buffer + buffer.WriteString(ns) + buffer.WriteByte(0) + buffer.WriteString(key) + buffer.WriteByte(0) + buffer.WriteString(strconv.Itoa(int(blocknum))) + buffer.WriteByte(0) + buffer.WriteString(strconv.Itoa(int(trannum))) + + return buffer.String() +} diff --git a/core/ledger/history/couchdb_histmgr_test.go b/core/ledger/history/couchdb_histmgr_test.go index 9237abe08ca..e12d1f5b7ae 100644 --- a/core/ledger/history/couchdb_histmgr_test.go +++ b/core/ledger/history/couchdb_histmgr_test.go @@ -18,61 +18,35 @@ package history import ( "fmt" - "os" "testing" "github.com/hyperledger/fabric/core/ledger/ledgerconfig" "github.com/hyperledger/fabric/core/ledger/testutil" - "github.com/hyperledger/fabric/core/ledger/util/couchdb" ) -//Complex setup to test the use of couch in ledger -type testEnvCouch struct { - couchDBPath string - couchDBAddress string - couchDatabaseName string - couchUsername string - couchPassword string -} - -func newTestEnvCouch(t testing.TB, dbPath string, dbName string) *testEnvCouch { - - couchDBDef := ledgerconfig.GetCouchDBDefinition() - os.RemoveAll(dbPath) - - return &testEnvCouch{ - couchDBPath: dbPath, - couchDBAddress: couchDBDef.URL, - couchDatabaseName: dbName, - couchUsername: couchDBDef.Username, - couchPassword: couchDBDef.Password, - } -} - -func (env *testEnvCouch) cleanup() { - os.RemoveAll(env.couchDBPath) - //create a new connection - couchDB, _ := couchdb.CreateConnectionDefinition(env.couchDBAddress, env.couchDatabaseName, env.couchUsername, env.couchPassword) - //drop the test database if it already existed - couchDB.DropDatabase() -} +/* +Note that these test are only run if HistoryDB is explitily enabled +otherwise HistoryDB may not be installed and all the tests would fail +*/ -// couchdb_test.go tests couchdb functions already. This test just tests that a CouchDB history database is auto-created -// upon creating a new history transaction manager +// Note that the couchdb_test.go tests couchdb functions already. This test just tests that a +// CouchDB history database is auto-created upon creating a new history manager func TestHistoryDatabaseAutoCreate(t *testing.T) { //call a helper method to load the core.yaml testutil.SetupCoreYAMLConfig("./../../../peer") + logger.Debugf("===HISTORYDB=== TestHistoryDatabaseAutoCreate IsCouchDBEnabled()value: %v , IsHistoryDBEnabled()value: %v\n", + ledgerconfig.IsCouchDBEnabled(), ledgerconfig.IsHistoryDBEnabled()) - //Only run the tests if CouchDB is explitily enabled in the code, - //otherwise CouchDB may not be installed and all the tests would fail - //TODO replace this with external config property rather than config within the code - if ledgerconfig.IsCouchDBEnabled() == true { + if ledgerconfig.IsHistoryDBEnabled() == true { - env := newTestEnvCouch(t, "/tmp/tests/ledger/history", "history-test") + env := newTestEnvHistoryCouchDB(t, "history-test") env.cleanup() //cleanup at the beginning to ensure the database doesn't exist already defer env.cleanup() //and cleanup at the end + logger.Debugf("===HISTORYDB=== env.couchDBAddress: %v , env.couchDatabaseName: %v env.couchUsername: %v env.couchPassword: %v\n", + env.couchDBAddress, env.couchDatabaseName, env.couchUsername, env.couchPassword) + histMgr := NewCouchDBHistMgr( env.couchDBAddress, //couchDB Address env.couchDatabaseName, //couchDB db name @@ -98,5 +72,13 @@ func TestHistoryDatabaseAutoCreate(t *testing.T) { testutil.AssertEquals(t, dbResp2.DbName, env.couchDatabaseName) } +} + +func TestConstructCompositeKey(t *testing.T) { + compositeKey := constructCompositeKey("ns1", "key1", 1, 1) + + var compositeKeySep = []byte{0x00} + var strKeySep = string(compositeKeySep) + testutil.AssertEquals(t, compositeKey, "ns1"+strKeySep+"key1"+strKeySep+"1"+strKeySep+"1") } diff --git a/core/ledger/history/histmgmt.go b/core/ledger/history/histmgmt.go index b6c47bda0c9..4935c1b764e 100644 --- a/core/ledger/history/histmgmt.go +++ b/core/ledger/history/histmgmt.go @@ -18,7 +18,7 @@ package history import "github.com/hyperledger/fabric/protos/common" -// TxHistMgr - an interface that a transaction history manager should implement -type TxHistMgr interface { +// HistMgr - an interface that a history manager should implement +type HistMgr interface { Commit(block *common.Block) error } diff --git a/core/ledger/history/pkg_test.go b/core/ledger/history/pkg_test.go new file mode 100644 index 00000000000..7265cece4d3 --- /dev/null +++ b/core/ledger/history/pkg_test.go @@ -0,0 +1,54 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package history + +import ( + "testing" + + "github.com/hyperledger/fabric/core/ledger/ledgerconfig" + "github.com/hyperledger/fabric/core/ledger/util/couchdb" +) + +//Complex setup to test the use of couch in ledger +type testEnvHistoryCouchDB struct { + couchDBAddress string + couchDatabaseName string + couchUsername string + couchPassword string +} + +func newTestEnvHistoryCouchDB(t testing.TB, dbName string) *testEnvHistoryCouchDB { + + couchDBDef := ledgerconfig.GetCouchDBDefinition() + + return &testEnvHistoryCouchDB{ + couchDBAddress: couchDBDef.URL, + couchDatabaseName: dbName, + couchUsername: couchDBDef.Username, + couchPassword: couchDBDef.Password, + } +} + +func (env *testEnvHistoryCouchDB) cleanup() { + + //create a new connection + couchDB, err := couchdb.CreateConnectionDefinition(env.couchDBAddress, env.couchDatabaseName, env.couchUsername, env.couchPassword) + if err == nil { + //drop the test database if it already existed + couchDB.DropDatabase() + } +} diff --git a/core/ledger/kvledger/kv_ledger.go b/core/ledger/kvledger/kv_ledger.go index 09c76de8358..a169a02da18 100644 --- a/core/ledger/kvledger/kv_ledger.go +++ b/core/ledger/kvledger/kv_ledger.go @@ -24,6 +24,7 @@ import ( "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/core/ledger/blkstorage" "github.com/hyperledger/fabric/core/ledger/blkstorage/fsblkstorage" + "github.com/hyperledger/fabric/core/ledger/history" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/couchdbtxmgmt" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/lockbasedtxmgmt" @@ -60,6 +61,7 @@ func NewConf(filesystemPath string, maxBlockfileSize int) *Conf { type KVLedger struct { blockStore blkstorage.BlockStore txtmgmt txmgmt.TxMgr + historymgmt history.HistMgr pendingBlockToCommit *common.Block } @@ -77,7 +79,10 @@ func NewKVLedger(conf *Conf) (*KVLedger, error) { blockStorageConf := fsblkstorage.NewConf(conf.blockStorageDir, conf.maxBlockfileSize) blockStore := fsblkstorage.NewFsBlockStore(blockStorageConf, indexConfig) + //State and History database managers var txmgmt txmgmt.TxMgr + var historymgmt history.HistMgr + if ledgerconfig.IsCouchDBEnabled() == true { //By default we can talk to CouchDB with empty id and pw (""), or you can add your own id and password to talk to a secured CouchDB logger.Debugf("===COUCHDB=== NewKVLedger() Using CouchDB instead of RocksDB...hardcoding and passing connection config for now") @@ -94,14 +99,26 @@ func NewKVLedger(conf *Conf) (*KVLedger, error) { // Fall back to using RocksDB lockbased transaction manager txmgmt = lockbasedtxmgmt.NewLockBasedTxMgr(&lockbasedtxmgmt.Conf{DBPath: conf.txMgrDBPath}) } - l := &KVLedger{blockStore, txmgmt, nil} + + if ledgerconfig.IsHistoryDBEnabled() == true { + logger.Debugf("===HISTORYDB=== NewKVLedger() Using CouchDB for transaction history database") + + couchDBDef := ledgerconfig.GetCouchDBDefinition() + + historymgmt = history.NewCouchDBHistMgr( + couchDBDef.URL, //couchDB connection URL + "system_history", //couchDB db name matches ledger name, TODO for now use system_history ledger, eventually allow passing in subledger name + couchDBDef.Username, //enter couchDB id here + couchDBDef.Password) //enter couchDB pw here + } + + l := &KVLedger{blockStore, txmgmt, historymgmt, nil} if err := recoverStateDB(l); err != nil { panic(fmt.Errorf(`Error during state DB recovery:%s`, err)) } return l, nil - } //Recover the state database by recommitting last valid blocks @@ -220,6 +237,16 @@ func (l *KVLedger) Commit() error { if err := l.txtmgmt.Commit(); err != nil { panic(fmt.Errorf(`Error during commit to txmgr:%s`, err)) } + + //TODO future will want to run async with state db writes. History needs to wait for chain (FSBlock) to write but not the state db + logger.Debugf("===HISTORYDB=== Commit() will write to hisotry if enabled else will be by-passed if not enabled: vledgerconfig.IsHistoryDBEnabled(): %v\n", ledgerconfig.IsHistoryDBEnabled()) + if ledgerconfig.IsHistoryDBEnabled() == true { + logger.Debugf("Committing transactions to history database") + if err := l.historymgmt.Commit(l.pendingBlockToCommit); err != nil { + panic(fmt.Errorf(`Error during commit to txthistory:%s`, err)) + } + } + l.pendingBlockToCommit = nil return nil } diff --git a/core/ledger/kvledger/kv_ledger_test.go b/core/ledger/kvledger/kv_ledger_test.go index 057166baba3..23dd3a17e7d 100644 --- a/core/ledger/kvledger/kv_ledger_test.go +++ b/core/ledger/kvledger/kv_ledger_test.go @@ -19,6 +19,7 @@ package kvledger import ( "testing" + "github.com/hyperledger/fabric/core/ledger/ledgerconfig" "github.com/hyperledger/fabric/core/ledger/testutil" pb "github.com/hyperledger/fabric/protos/peer" ) @@ -157,3 +158,73 @@ func TestKVLedgerStateDBRecovery(t *testing.T) { simulator.Done() ledger.Close() } + +func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) { + + //call a helper method to load the core.yaml + testutil.SetupCoreYAMLConfig("./../../../peer") + + logger.Debugf("TestLedgerWithCouchDbEnabledWithBinaryAndJSONData IsCouchDBEnabled()value: %v , IsHistoryDBEnabled()value: %v\n", + ledgerconfig.IsCouchDBEnabled(), ledgerconfig.IsHistoryDBEnabled()) + + env := newTestEnv(t) + defer env.cleanup() + ledger, _ := NewKVLedger(env.conf) + defer ledger.Close() + + bcInfo, _ := ledger.GetBlockchainInfo() + testutil.AssertEquals(t, bcInfo, &pb.BlockchainInfo{ + Height: 0, CurrentBlockHash: nil, PreviousBlockHash: nil}) + + simulator, _ := ledger.NewTxSimulator() + simulator.SetState("ns1", "key4", []byte("value1")) + simulator.SetState("ns1", "key5", []byte("value2")) + simulator.SetState("ns1", "key6", []byte("{\"shipmentID\":\"161003PKC7300\",\"customsInvoice\":{\"methodOfTransport\":\"GROUND\",\"invoiceNumber\":\"00091624\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}")) + simulator.SetState("ns1", "key7", []byte("{\"shipmentID\":\"161003PKC7600\",\"customsInvoice\":{\"methodOfTransport\":\"AIR MAYBE\",\"invoiceNumber\":\"00091624\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}")) + simulator.Done() + simRes, _ := simulator.GetTxSimulationResults() + bg := testutil.NewBlockGenerator(t) + block1 := bg.NextBlock([][]byte{simRes}, false) + + ledger.RemoveInvalidTransactionsAndPrepare(block1) + ledger.Commit() + + bcInfo, _ = ledger.GetBlockchainInfo() + block1Hash := block1.Header.Hash() + testutil.AssertEquals(t, bcInfo, &pb.BlockchainInfo{ + Height: 1, CurrentBlockHash: block1Hash, PreviousBlockHash: []byte{}}) + + //Note key 4 and 6 are updates but key 7 is new. I.E. should see history for key 4 and 6 if history is enabled + simulator, _ = ledger.NewTxSimulator() + simulator.SetState("ns1", "key4", []byte("value3")) + simulator.SetState("ns1", "key5", []byte("{\"shipmentID\":\"161003PKC7500\",\"customsInvoice\":{\"methodOfTransport\":\"AIR FREIGHT\",\"invoiceNumber\":\"00091623\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}")) + simulator.SetState("ns1", "key6", []byte("value4")) + simulator.SetState("ns1", "key7", []byte("{\"shipmentID\":\"161003PKC7600\",\"customsInvoice\":{\"methodOfTransport\":\"GROUND\",\"invoiceNumber\":\"00091624\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}")) + simulator.SetState("ns1", "key8", []byte("{\"shipmentID\":\"161003PKC7700\",\"customsInvoice\":{\"methodOfTransport\":\"SHIP\",\"invoiceNumber\":\"00091625\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}")) + simulator.Done() + simRes, _ = simulator.GetTxSimulationResults() + block2 := bg.NextBlock([][]byte{simRes}, false) + ledger.RemoveInvalidTransactionsAndPrepare(block2) + ledger.Commit() + + bcInfo, _ = ledger.GetBlockchainInfo() + block2Hash := block2.Header.Hash() + testutil.AssertEquals(t, bcInfo, &pb.BlockchainInfo{ + Height: 2, CurrentBlockHash: block2Hash, PreviousBlockHash: block1.Header.Hash()}) + + b1, _ := ledger.GetBlockByHash(block1Hash) + testutil.AssertEquals(t, b1, block1) + + b2, _ := ledger.GetBlockByHash(block2Hash) + testutil.AssertEquals(t, b2, block2) + + b1, _ = ledger.GetBlockByNumber(1) + testutil.AssertEquals(t, b1, block1) + + b2, _ = ledger.GetBlockByNumber(2) + testutil.AssertEquals(t, b2, block2) + + if ledgerconfig.IsHistoryDBEnabled() == true { + //TODO history specific test + } +} diff --git a/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgmt_test.go b/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgmt_test.go index 49166f649e1..cd70bd84647 100644 --- a/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgmt_test.go +++ b/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgmt_test.go @@ -66,6 +66,9 @@ func (env *testEnv) Cleanup() { // upon creating a new ledger transaction manager func TestDatabaseAutoCreate(t *testing.T) { + //call a helper method to load the core.yaml + testutil.SetupCoreYAMLConfig("./../../../../../peer") + //Only run the tests if CouchDB is explitily enabled in the code, //otherwise CouchDB may not be installed and all the tests would fail //TODO replace this with external config property rather than config within the code