diff --git a/core/ledger/history/couchdb_histmgr.go b/core/ledger/history/couchdb_histmgr.go index 1ae0f03b95d..d03f74227d4 100644 --- a/core/ledger/history/couchdb_histmgr.go +++ b/core/ledger/history/couchdb_histmgr.go @@ -17,36 +17,125 @@ limitations under the License. package history import ( + "bytes" + "strconv" + + "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt" "github.com/hyperledger/fabric/core/ledger/util/couchdb" "github.com/hyperledger/fabric/protos/common" + putils "github.com/hyperledger/fabric/protos/utils" logging "github.com/op/go-logging" ) -var logger = logging.MustGetLogger("txhistorymgmt") +var logger = logging.MustGetLogger("history") -// CouchDBHistMgr a simple implementation of interface `histmgmt.TxHistMgr'. -// TODO This implementation does not currently use a lock but may need one to insure query's are consistent +// CouchDBHistMgr a simple implementation of interface `histmgmt.HistMgr'. +// TODO This implementation does not currently use a lock but may need one to ensure query's are consistent type CouchDBHistMgr struct { couchDB *couchdb.CouchDBConnectionDef // COUCHDB new properties for CouchDB } -// NewCouchDBHistMgr constructs a new `CouchDBTxHistMgr` +// NewCouchDBHistMgr constructs a new `CouchDB HistMgr` func NewCouchDBHistMgr(couchDBConnectURL string, dbName string, id string, pw string) *CouchDBHistMgr { //TODO locking has not been implemented but may need some sort of locking to insure queries are valid data. couchDB, err := couchdb.CreateCouchDBConnectionAndDB(couchDBConnectURL, dbName, id, pw) if err != nil { - logger.Errorf("Error during NewCouchDBHistMgr(): %s\n", err.Error()) + logger.Errorf("===HISTORYDB=== Error during NewCouchDBHistMgr(): %s\n", err.Error()) return nil } - // db and stateIndexCF will not be used for CouchDB. TODO to cleanup return &CouchDBHistMgr{couchDB: couchDB} } -// Commit implements method in interface `txhistorymgmt.TxMgr` +// Commit implements method in interface `histmgmt.HistMgr` +// This writes to a separate history database. func (histmgr *CouchDBHistMgr) Commit(block *common.Block) error { - logger.Debugf("===HISTORYDB=== Entering CouchDBTxHistMgr.Commit()") + logger.Debugf("===HISTORYDB=== Entering CouchDBHistMgr.Commit()") + + //Get the blocknumber off of the header + blockNo := block.Header.Number + //Set the starting tranNo to 0 + var tranNo uint64 + + logger.Debugf("===HISTORYDB=== Updating history for blockNo: %v with [%d] transactions", + blockNo, len(block.Data.Data)) + for _, envBytes := range block.Data.Data { + tranNo++ + logger.Debugf("===HISTORYDB=== Updating history for tranNo: %v", tranNo) + + // extract actions from the envelope message + respPayload, err := putils.GetActionFromEnvelope(envBytes) + if err != nil { + return err + } + + //preparation for extracting RWSet from transaction + txRWSet := &txmgmt.TxReadWriteSet{} + + // Get the Result from the Action and then Unmarshal + // it into a TxReadWriteSet using custom unmarshalling + if err = txRWSet.Unmarshal(respPayload.Results); err != nil { + return err + } + + //Transactions that have data that is not JSON such as binary data, + // the write value will not write to history database. + //These types of transactions will have the key written to the history + // database to support history key scans. We do not write the binary + // value to CouchDB since the purpose of the history database value is + // for query andbinary data can not be queried. + for _, nsRWSet := range txRWSet.NsRWs { + ns := nsRWSet.NameSpace + + for _, kvWrite := range nsRWSet.Writes { + writeKey := kvWrite.Key + writeValue := kvWrite.Value + compositeKey := constructCompositeKey(ns, writeKey, blockNo, tranNo) + var bytesDoc []byte + + logger.Debugf("===HISTORYDB=== ns (namespace or cc id) = %v, writeKey: %v, compositeKey: %v, writeValue = %v", + ns, writeKey, compositeKey, writeValue) + + if couchdb.IsJSON(string(writeValue)) { + //logger.Debugf("===HISTORYDB=== yes JSON store writeValue = %v", string(writeValue)) + bytesDoc = writeValue + } else { + //For data that is not in JSON format only store the key + //logger.Debugf("===HISTORYDB=== not JSON only store key") + bytesDoc = []byte(`{}`) + } + + // SaveDoc using couchdb client and use JSON format + rev, err := histmgr.couchDB.SaveDoc(compositeKey, "", bytesDoc, nil) + if err != nil { + logger.Errorf("===HISTORYDB=== Error during Commit(): %s\n", err.Error()) + return err + } + if rev != "" { + logger.Debugf("===HISTORYDB=== Saved document revision number: %s\n", rev) + } + + } + } + + } return nil } + +func constructCompositeKey(ns string, key string, blocknum uint64, trannum uint64) string { + //History Key is: "namespace key blocknum trannum"", with namespace being the chaincode id + + // TODO - We will likely want sortable varint encoding, rather then a simple number, in order to support sorted key scans + var buffer bytes.Buffer + buffer.WriteString(ns) + buffer.WriteByte(0) + buffer.WriteString(key) + buffer.WriteByte(0) + buffer.WriteString(strconv.Itoa(int(blocknum))) + buffer.WriteByte(0) + buffer.WriteString(strconv.Itoa(int(trannum))) + + return buffer.String() +} diff --git a/core/ledger/history/couchdb_histmgr_test.go b/core/ledger/history/couchdb_histmgr_test.go index 9237abe08ca..e12d1f5b7ae 100644 --- a/core/ledger/history/couchdb_histmgr_test.go +++ b/core/ledger/history/couchdb_histmgr_test.go @@ -18,61 +18,35 @@ package history import ( "fmt" - "os" "testing" "github.com/hyperledger/fabric/core/ledger/ledgerconfig" "github.com/hyperledger/fabric/core/ledger/testutil" - "github.com/hyperledger/fabric/core/ledger/util/couchdb" ) -//Complex setup to test the use of couch in ledger -type testEnvCouch struct { - couchDBPath string - couchDBAddress string - couchDatabaseName string - couchUsername string - couchPassword string -} - -func newTestEnvCouch(t testing.TB, dbPath string, dbName string) *testEnvCouch { - - couchDBDef := ledgerconfig.GetCouchDBDefinition() - os.RemoveAll(dbPath) - - return &testEnvCouch{ - couchDBPath: dbPath, - couchDBAddress: couchDBDef.URL, - couchDatabaseName: dbName, - couchUsername: couchDBDef.Username, - couchPassword: couchDBDef.Password, - } -} - -func (env *testEnvCouch) cleanup() { - os.RemoveAll(env.couchDBPath) - //create a new connection - couchDB, _ := couchdb.CreateConnectionDefinition(env.couchDBAddress, env.couchDatabaseName, env.couchUsername, env.couchPassword) - //drop the test database if it already existed - couchDB.DropDatabase() -} +/* +Note that these test are only run if HistoryDB is explitily enabled +otherwise HistoryDB may not be installed and all the tests would fail +*/ -// couchdb_test.go tests couchdb functions already. This test just tests that a CouchDB history database is auto-created -// upon creating a new history transaction manager +// Note that the couchdb_test.go tests couchdb functions already. This test just tests that a +// CouchDB history database is auto-created upon creating a new history manager func TestHistoryDatabaseAutoCreate(t *testing.T) { //call a helper method to load the core.yaml testutil.SetupCoreYAMLConfig("./../../../peer") + logger.Debugf("===HISTORYDB=== TestHistoryDatabaseAutoCreate IsCouchDBEnabled()value: %v , IsHistoryDBEnabled()value: %v\n", + ledgerconfig.IsCouchDBEnabled(), ledgerconfig.IsHistoryDBEnabled()) - //Only run the tests if CouchDB is explitily enabled in the code, - //otherwise CouchDB may not be installed and all the tests would fail - //TODO replace this with external config property rather than config within the code - if ledgerconfig.IsCouchDBEnabled() == true { + if ledgerconfig.IsHistoryDBEnabled() == true { - env := newTestEnvCouch(t, "/tmp/tests/ledger/history", "history-test") + env := newTestEnvHistoryCouchDB(t, "history-test") env.cleanup() //cleanup at the beginning to ensure the database doesn't exist already defer env.cleanup() //and cleanup at the end + logger.Debugf("===HISTORYDB=== env.couchDBAddress: %v , env.couchDatabaseName: %v env.couchUsername: %v env.couchPassword: %v\n", + env.couchDBAddress, env.couchDatabaseName, env.couchUsername, env.couchPassword) + histMgr := NewCouchDBHistMgr( env.couchDBAddress, //couchDB Address env.couchDatabaseName, //couchDB db name @@ -98,5 +72,13 @@ func TestHistoryDatabaseAutoCreate(t *testing.T) { testutil.AssertEquals(t, dbResp2.DbName, env.couchDatabaseName) } +} + +func TestConstructCompositeKey(t *testing.T) { + compositeKey := constructCompositeKey("ns1", "key1", 1, 1) + + var compositeKeySep = []byte{0x00} + var strKeySep = string(compositeKeySep) + testutil.AssertEquals(t, compositeKey, "ns1"+strKeySep+"key1"+strKeySep+"1"+strKeySep+"1") } diff --git a/core/ledger/history/histmgmt.go b/core/ledger/history/histmgmt.go index b6c47bda0c9..4935c1b764e 100644 --- a/core/ledger/history/histmgmt.go +++ b/core/ledger/history/histmgmt.go @@ -18,7 +18,7 @@ package history import "github.com/hyperledger/fabric/protos/common" -// TxHistMgr - an interface that a transaction history manager should implement -type TxHistMgr interface { +// HistMgr - an interface that a history manager should implement +type HistMgr interface { Commit(block *common.Block) error } diff --git a/core/ledger/history/pkg_test.go b/core/ledger/history/pkg_test.go new file mode 100644 index 00000000000..7265cece4d3 --- /dev/null +++ b/core/ledger/history/pkg_test.go @@ -0,0 +1,54 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package history + +import ( + "testing" + + "github.com/hyperledger/fabric/core/ledger/ledgerconfig" + "github.com/hyperledger/fabric/core/ledger/util/couchdb" +) + +//Complex setup to test the use of couch in ledger +type testEnvHistoryCouchDB struct { + couchDBAddress string + couchDatabaseName string + couchUsername string + couchPassword string +} + +func newTestEnvHistoryCouchDB(t testing.TB, dbName string) *testEnvHistoryCouchDB { + + couchDBDef := ledgerconfig.GetCouchDBDefinition() + + return &testEnvHistoryCouchDB{ + couchDBAddress: couchDBDef.URL, + couchDatabaseName: dbName, + couchUsername: couchDBDef.Username, + couchPassword: couchDBDef.Password, + } +} + +func (env *testEnvHistoryCouchDB) cleanup() { + + //create a new connection + couchDB, err := couchdb.CreateConnectionDefinition(env.couchDBAddress, env.couchDatabaseName, env.couchUsername, env.couchPassword) + if err == nil { + //drop the test database if it already existed + couchDB.DropDatabase() + } +} diff --git a/core/ledger/kvledger/kv_ledger.go b/core/ledger/kvledger/kv_ledger.go index 09c76de8358..a169a02da18 100644 --- a/core/ledger/kvledger/kv_ledger.go +++ b/core/ledger/kvledger/kv_ledger.go @@ -24,6 +24,7 @@ import ( "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/core/ledger/blkstorage" "github.com/hyperledger/fabric/core/ledger/blkstorage/fsblkstorage" + "github.com/hyperledger/fabric/core/ledger/history" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/couchdbtxmgmt" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/lockbasedtxmgmt" @@ -60,6 +61,7 @@ func NewConf(filesystemPath string, maxBlockfileSize int) *Conf { type KVLedger struct { blockStore blkstorage.BlockStore txtmgmt txmgmt.TxMgr + historymgmt history.HistMgr pendingBlockToCommit *common.Block } @@ -77,7 +79,10 @@ func NewKVLedger(conf *Conf) (*KVLedger, error) { blockStorageConf := fsblkstorage.NewConf(conf.blockStorageDir, conf.maxBlockfileSize) blockStore := fsblkstorage.NewFsBlockStore(blockStorageConf, indexConfig) + //State and History database managers var txmgmt txmgmt.TxMgr + var historymgmt history.HistMgr + if ledgerconfig.IsCouchDBEnabled() == true { //By default we can talk to CouchDB with empty id and pw (""), or you can add your own id and password to talk to a secured CouchDB logger.Debugf("===COUCHDB=== NewKVLedger() Using CouchDB instead of RocksDB...hardcoding and passing connection config for now") @@ -94,14 +99,26 @@ func NewKVLedger(conf *Conf) (*KVLedger, error) { // Fall back to using RocksDB lockbased transaction manager txmgmt = lockbasedtxmgmt.NewLockBasedTxMgr(&lockbasedtxmgmt.Conf{DBPath: conf.txMgrDBPath}) } - l := &KVLedger{blockStore, txmgmt, nil} + + if ledgerconfig.IsHistoryDBEnabled() == true { + logger.Debugf("===HISTORYDB=== NewKVLedger() Using CouchDB for transaction history database") + + couchDBDef := ledgerconfig.GetCouchDBDefinition() + + historymgmt = history.NewCouchDBHistMgr( + couchDBDef.URL, //couchDB connection URL + "system_history", //couchDB db name matches ledger name, TODO for now use system_history ledger, eventually allow passing in subledger name + couchDBDef.Username, //enter couchDB id here + couchDBDef.Password) //enter couchDB pw here + } + + l := &KVLedger{blockStore, txmgmt, historymgmt, nil} if err := recoverStateDB(l); err != nil { panic(fmt.Errorf(`Error during state DB recovery:%s`, err)) } return l, nil - } //Recover the state database by recommitting last valid blocks @@ -220,6 +237,16 @@ func (l *KVLedger) Commit() error { if err := l.txtmgmt.Commit(); err != nil { panic(fmt.Errorf(`Error during commit to txmgr:%s`, err)) } + + //TODO future will want to run async with state db writes. History needs to wait for chain (FSBlock) to write but not the state db + logger.Debugf("===HISTORYDB=== Commit() will write to hisotry if enabled else will be by-passed if not enabled: vledgerconfig.IsHistoryDBEnabled(): %v\n", ledgerconfig.IsHistoryDBEnabled()) + if ledgerconfig.IsHistoryDBEnabled() == true { + logger.Debugf("Committing transactions to history database") + if err := l.historymgmt.Commit(l.pendingBlockToCommit); err != nil { + panic(fmt.Errorf(`Error during commit to txthistory:%s`, err)) + } + } + l.pendingBlockToCommit = nil return nil } diff --git a/core/ledger/kvledger/kv_ledger_test.go b/core/ledger/kvledger/kv_ledger_test.go index 057166baba3..23dd3a17e7d 100644 --- a/core/ledger/kvledger/kv_ledger_test.go +++ b/core/ledger/kvledger/kv_ledger_test.go @@ -19,6 +19,7 @@ package kvledger import ( "testing" + "github.com/hyperledger/fabric/core/ledger/ledgerconfig" "github.com/hyperledger/fabric/core/ledger/testutil" pb "github.com/hyperledger/fabric/protos/peer" ) @@ -157,3 +158,73 @@ func TestKVLedgerStateDBRecovery(t *testing.T) { simulator.Done() ledger.Close() } + +func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) { + + //call a helper method to load the core.yaml + testutil.SetupCoreYAMLConfig("./../../../peer") + + logger.Debugf("TestLedgerWithCouchDbEnabledWithBinaryAndJSONData IsCouchDBEnabled()value: %v , IsHistoryDBEnabled()value: %v\n", + ledgerconfig.IsCouchDBEnabled(), ledgerconfig.IsHistoryDBEnabled()) + + env := newTestEnv(t) + defer env.cleanup() + ledger, _ := NewKVLedger(env.conf) + defer ledger.Close() + + bcInfo, _ := ledger.GetBlockchainInfo() + testutil.AssertEquals(t, bcInfo, &pb.BlockchainInfo{ + Height: 0, CurrentBlockHash: nil, PreviousBlockHash: nil}) + + simulator, _ := ledger.NewTxSimulator() + simulator.SetState("ns1", "key4", []byte("value1")) + simulator.SetState("ns1", "key5", []byte("value2")) + simulator.SetState("ns1", "key6", []byte("{\"shipmentID\":\"161003PKC7300\",\"customsInvoice\":{\"methodOfTransport\":\"GROUND\",\"invoiceNumber\":\"00091624\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}")) + simulator.SetState("ns1", "key7", []byte("{\"shipmentID\":\"161003PKC7600\",\"customsInvoice\":{\"methodOfTransport\":\"AIR MAYBE\",\"invoiceNumber\":\"00091624\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}")) + simulator.Done() + simRes, _ := simulator.GetTxSimulationResults() + bg := testutil.NewBlockGenerator(t) + block1 := bg.NextBlock([][]byte{simRes}, false) + + ledger.RemoveInvalidTransactionsAndPrepare(block1) + ledger.Commit() + + bcInfo, _ = ledger.GetBlockchainInfo() + block1Hash := block1.Header.Hash() + testutil.AssertEquals(t, bcInfo, &pb.BlockchainInfo{ + Height: 1, CurrentBlockHash: block1Hash, PreviousBlockHash: []byte{}}) + + //Note key 4 and 6 are updates but key 7 is new. I.E. should see history for key 4 and 6 if history is enabled + simulator, _ = ledger.NewTxSimulator() + simulator.SetState("ns1", "key4", []byte("value3")) + simulator.SetState("ns1", "key5", []byte("{\"shipmentID\":\"161003PKC7500\",\"customsInvoice\":{\"methodOfTransport\":\"AIR FREIGHT\",\"invoiceNumber\":\"00091623\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}")) + simulator.SetState("ns1", "key6", []byte("value4")) + simulator.SetState("ns1", "key7", []byte("{\"shipmentID\":\"161003PKC7600\",\"customsInvoice\":{\"methodOfTransport\":\"GROUND\",\"invoiceNumber\":\"00091624\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}")) + simulator.SetState("ns1", "key8", []byte("{\"shipmentID\":\"161003PKC7700\",\"customsInvoice\":{\"methodOfTransport\":\"SHIP\",\"invoiceNumber\":\"00091625\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}")) + simulator.Done() + simRes, _ = simulator.GetTxSimulationResults() + block2 := bg.NextBlock([][]byte{simRes}, false) + ledger.RemoveInvalidTransactionsAndPrepare(block2) + ledger.Commit() + + bcInfo, _ = ledger.GetBlockchainInfo() + block2Hash := block2.Header.Hash() + testutil.AssertEquals(t, bcInfo, &pb.BlockchainInfo{ + Height: 2, CurrentBlockHash: block2Hash, PreviousBlockHash: block1.Header.Hash()}) + + b1, _ := ledger.GetBlockByHash(block1Hash) + testutil.AssertEquals(t, b1, block1) + + b2, _ := ledger.GetBlockByHash(block2Hash) + testutil.AssertEquals(t, b2, block2) + + b1, _ = ledger.GetBlockByNumber(1) + testutil.AssertEquals(t, b1, block1) + + b2, _ = ledger.GetBlockByNumber(2) + testutil.AssertEquals(t, b2, block2) + + if ledgerconfig.IsHistoryDBEnabled() == true { + //TODO history specific test + } +} diff --git a/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgmt_test.go b/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgmt_test.go index 49166f649e1..cd70bd84647 100644 --- a/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgmt_test.go +++ b/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgmt_test.go @@ -66,6 +66,9 @@ func (env *testEnv) Cleanup() { // upon creating a new ledger transaction manager func TestDatabaseAutoCreate(t *testing.T) { + //call a helper method to load the core.yaml + testutil.SetupCoreYAMLConfig("./../../../../../peer") + //Only run the tests if CouchDB is explitily enabled in the code, //otherwise CouchDB may not be installed and all the tests would fail //TODO replace this with external config property rather than config within the code