From 445fbdb5df9d0dac8e1664056086a7bd9e69790e Mon Sep 17 00:00:00 2001 From: manish Date: Thu, 17 Nov 2016 18:03:39 +0530 Subject: [PATCH] Added support for advance KV-queries https://jira.hyperledger.org/browse/FAB-192 Change-Id: I04bf6a03247b38fdc49b7084182c4f70e09c7cdd Signed-off-by: manish --- .../couchdbtxmgmt/couchdb_query_executer.go | 5 + .../lockbased_query_executer.go | 59 ++++- .../lockbasedtxmgmt/lockbased_tx_simulator.go | 68 +++-- .../lockbasedtxmgmt/lockbased_txmgmt_test.go | 242 ++++++++++++++++++ .../txmgmt/lockbasedtxmgmt/lockbased_txmgr.go | 63 ++++- core/ledger/ledger_interface.go | 8 +- core/ledger/util/db/db.go | 7 + core/ledger/util/db/db_test.go | 56 +++- 8 files changed, 474 insertions(+), 34 deletions(-) diff --git a/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_query_executer.go b/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_query_executer.go index 704a1210d8d..8e83380f9a0 100644 --- a/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_query_executer.go +++ b/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_query_executer.go @@ -56,3 +56,8 @@ func (q *CouchDBQueryExecutor) GetTransactionsForKey(namespace string, key strin func (q *CouchDBQueryExecutor) ExecuteQuery(query string) (ledger.ResultsIterator, error) { return nil, errors.New("Not supported by KV data model") } + +// Done implements method in interface `ledger.QueryExecutor` +func (q *CouchDBQueryExecutor) Done() { + //TODO - acquire lock when constructing and release the lock here +} diff --git a/core/ledger/kvledger/txmgmt/lockbasedtxmgmt/lockbased_query_executer.go b/core/ledger/kvledger/txmgmt/lockbasedtxmgmt/lockbased_query_executer.go index ddc91b3a0b0..fbed472890a 100644 --- a/core/ledger/kvledger/txmgmt/lockbasedtxmgmt/lockbased_query_executer.go +++ b/core/ledger/kvledger/txmgmt/lockbasedtxmgmt/lockbased_query_executer.go @@ -25,10 +25,12 @@ import ( // RWLockQueryExecutor is a query executor used in `LockBasedTxMgr` type RWLockQueryExecutor struct { txmgr *LockBasedTxMgr + done bool } // GetState implements method in interface `ledger.QueryExecutor` func (q *RWLockQueryExecutor) GetState(ns string, key string) ([]byte, error) { + q.checkDone() var value []byte var err error if value, _, err = q.txmgr.getCommittedValueAndVersion(ns, key); err != nil { @@ -39,12 +41,29 @@ func (q *RWLockQueryExecutor) GetState(ns string, key string) ([]byte, error) { // GetStateMultipleKeys implements method in interface `ledger.QueryExecutor` func (q *RWLockQueryExecutor) GetStateMultipleKeys(namespace string, keys []string) ([][]byte, error) { - return nil, errors.New("Not yet implemented") + var results [][]byte + var value []byte + var err error + for _, key := range keys { + if value, err = q.GetState(namespace, key); err != nil { + return nil, err + } + results = append(results, value) + } + return results, nil } // GetStateRangeScanIterator implements method in interface `ledger.QueryExecutor` +// startKey is included in the results and endKey is excluded. An empty startKey refers to the first available key +// and an empty endKey refers to the last available key. For scanning all the keys, both the startKey and the endKey +// can be supplied as empty strings. However, a full scan shuold be used judiciously for performance reasons. func (q *RWLockQueryExecutor) GetStateRangeScanIterator(namespace string, startKey string, endKey string) (ledger.ResultsIterator, error) { - return nil, errors.New("Not yet implemented") + q.checkDone() + scanner, err := q.txmgr.getCommittedRangeScanner(namespace, startKey, endKey) + if err != nil { + return nil, err + } + return &qKVItr{scanner}, nil } // GetTransactionsForKey - implements method in interface `ledger.QueryExecutor` @@ -56,3 +75,39 @@ func (q *RWLockQueryExecutor) GetTransactionsForKey(namespace string, key string func (q *RWLockQueryExecutor) ExecuteQuery(query string) (ledger.ResultsIterator, error) { return nil, errors.New("Not supported by KV data model") } + +// Done implements method in interface `ledger.TxSimulator` +func (q *RWLockQueryExecutor) Done() { + q.done = true + q.txmgr.commitRWLock.RUnlock() +} + +func (q *RWLockQueryExecutor) checkDone() { + if q.done { + panic("This method should not be called after calling Done()") + } +} + +type qKVItr struct { + s *kvScanner +} + +// Next implements Next() method in ledger.ResultsIterator +func (itr *qKVItr) Next() (ledger.QueryResult, error) { + committedKV, err := itr.s.next() + if err != nil { + return nil, err + } + if committedKV == nil { + return nil, nil + } + if committedKV.isDelete() { + return itr.Next() + } + return &ledger.KV{Key: committedKV.key, Value: committedKV.value}, nil +} + +// Close implements Close() method in ledger.ResultsIterator +func (itr *qKVItr) Close() { + itr.s.close() +} diff --git a/core/ledger/kvledger/txmgmt/lockbasedtxmgmt/lockbased_tx_simulator.go b/core/ledger/kvledger/txmgmt/lockbasedtxmgmt/lockbased_tx_simulator.go index 7ee6f5d9477..c447bb941cb 100644 --- a/core/ledger/kvledger/txmgmt/lockbasedtxmgmt/lockbased_tx_simulator.go +++ b/core/ledger/kvledger/txmgmt/lockbasedtxmgmt/lockbased_tx_simulator.go @@ -20,6 +20,7 @@ import ( "errors" "reflect" + "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt" logging "github.com/op/go-logging" ) @@ -42,7 +43,6 @@ func newNsRWs() *nsRWs { type LockBasedTxSimulator struct { RWLockQueryExecutor rwMap map[string]*nsRWs - done bool } func (s *LockBasedTxSimulator) getOrCreateNsRWHolder(ns string) *nsRWs { @@ -57,6 +57,7 @@ func (s *LockBasedTxSimulator) getOrCreateNsRWHolder(ns string) *nsRWs { // GetState implements method in interface `ledger.TxSimulator` func (s *LockBasedTxSimulator) GetState(ns string, key string) ([]byte, error) { + s.checkDone() logger.Debugf("Get state [%s:%s]", ns, key) nsRWs := s.getOrCreateNsRWHolder(ns) // check if it was written @@ -106,11 +107,24 @@ func (s *LockBasedTxSimulator) GetState(ns string, key string) ([]byte, error) { return value, nil } +// GetStateRangeScanIterator implements method in interface `ledger.QueryExecutor` +// startKey is included in the results and endKey is excluded. An empty startKey refers to the first available key +// and an empty endKey refers to the last available key. For scanning all the keys, both the startKey and the endKey +// can be supplied as empty strings. However, a full scan shuold be used judiciously for performance reasons. +// TODO: The range scan queries still do not support Read-Your_Write (RYW) +// semantics as it is still not agreed upon whether we want RYW model or not. +func (s *LockBasedTxSimulator) GetStateRangeScanIterator(namespace string, startKey string, endKey string) (ledger.ResultsIterator, error) { + s.checkDone() + scanner, err := s.txmgr.getCommittedRangeScanner(namespace, startKey, endKey) + if err != nil { + return nil, err + } + return &sKVItr{scanner, s}, nil +} + // SetState implements method in interface `ledger.TxSimulator` func (s *LockBasedTxSimulator) SetState(ns string, key string, value []byte) error { - if s.done { - panic("This method should not be called after calling Done()") - } + s.checkDone() nsRWs := s.getOrCreateNsRWHolder(ns) kvWrite, ok := nsRWs.writeMap[key] if ok { @@ -126,12 +140,6 @@ func (s *LockBasedTxSimulator) DeleteState(ns string, key string) error { return s.SetState(ns, key, nil) } -// Done implements method in interface `ledger.TxSimulator` -func (s *LockBasedTxSimulator) Done() { - s.done = true - s.txmgr.commitRWLock.RUnlock() -} - func (s *LockBasedTxSimulator) getTxReadWriteSet() *txmgmt.TxReadWriteSet { txRWSet := &txmgmt.TxReadWriteSet{} sortedNamespaces := getSortedKeys(s.rwMap) @@ -186,15 +194,43 @@ func (s *LockBasedTxSimulator) GetTxSimulationResults() ([]byte, error) { // SetStateMultipleKeys implements method in interface `ledger.TxSimulator` func (s *LockBasedTxSimulator) SetStateMultipleKeys(namespace string, kvs map[string][]byte) error { - return errors.New("Not yet implemented") -} - -// CopyState implements method in interface `ledger.TxSimulator` -func (s *LockBasedTxSimulator) CopyState(sourceNamespace string, targetNamespace string) error { - return errors.New("Not yet implemented") + for k, v := range kvs { + if err := s.SetState(namespace, k, v); err != nil { + return err + } + } + return nil } // ExecuteUpdate implements method in interface `ledger.TxSimulator` func (s *LockBasedTxSimulator) ExecuteUpdate(query string) error { return errors.New("Not supported by KV data model") } + +type sKVItr struct { + scanner *kvScanner + simulator *LockBasedTxSimulator +} + +// Next implements Next() method in ledger.ResultsIterator +func (itr *sKVItr) Next() (ledger.QueryResult, error) { + committedKV, err := itr.scanner.next() + if err != nil { + return nil, err + } + if committedKV == nil { + return nil, nil + } + if committedKV.isDelete() { + return itr.Next() + } + nsRWs := itr.simulator.getOrCreateNsRWHolder(itr.scanner.namespace) + nsRWs.readMap[committedKV.key] = &kvReadCache{ + &txmgmt.KVRead{Key: committedKV.key, Version: committedKV.version}, committedKV.value} + return &ledger.KV{Key: committedKV.key, Value: committedKV.value}, nil +} + +// Close implements Close() method in ledger.ResultsIterator +func (itr *sKVItr) Close() { + itr.scanner.close() +} diff --git a/core/ledger/kvledger/txmgmt/lockbasedtxmgmt/lockbased_txmgmt_test.go b/core/ledger/kvledger/txmgmt/lockbasedtxmgmt/lockbased_txmgmt_test.go index 8f997b0c0ba..c93212429b2 100644 --- a/core/ledger/kvledger/txmgmt/lockbasedtxmgmt/lockbased_txmgmt_test.go +++ b/core/ledger/kvledger/txmgmt/lockbasedtxmgmt/lockbased_txmgmt_test.go @@ -20,6 +20,7 @@ import ( "fmt" "testing" + "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/core/ledger/testutil" ) @@ -192,3 +193,244 @@ func testValueAndVersionEncodeing(t *testing.T, value []byte, version uint64) { testutil.AssertEquals(t, val, value) testutil.AssertEquals(t, ver, version) } + +func TestIterator(t *testing.T) { + testIterator(t, 10, 2, 7) + testIterator(t, 10, 1, 11) + testIterator(t, 10, 0, 0) + testIterator(t, 10, 5, 0) + testIterator(t, 10, 0, 5) +} + +func testIterator(t *testing.T, numKeys int, startKeyNum int, endKeyNum int) { + cID := "cID" + env := newTestEnv(t) + defer env.Cleanup() + txMgr := NewLockBasedTxMgr(env.conf) + defer txMgr.Shutdown() + s, _ := txMgr.NewTxSimulator() + for i := 1; i <= numKeys; i++ { + k := createTestKey(i) + v := createTestValue(i) + t.Logf("Adding k=[%s], v=[%s]", k, v) + s.SetState(cID, k, v) + } + s.Done() + // validate and commit RWset + txRWSet := s.(*LockBasedTxSimulator).getTxReadWriteSet() + isValid, err := txMgr.validateTx(txRWSet) + testutil.AssertNoError(t, err, "") + testutil.AssertSame(t, isValid, true) + txMgr.addWriteSetToBatch(txRWSet) + err = txMgr.Commit() + testutil.AssertNoError(t, err, "") + + var startKey string + var endKey string + var begin int + var end int + + if startKeyNum != 0 { + begin = startKeyNum + startKey = createTestKey(startKeyNum) + } else { + begin = 1 //first key in the db + startKey = "" + } + + if endKeyNum != 0 { + endKey = createTestKey(endKeyNum) + end = endKeyNum + } else { + endKey = "" + end = numKeys + 1 //last key in the db + } + + expectedCount := end - begin + + queryExecuter, _ := txMgr.NewQueryExecutor() + itr, _ := queryExecuter.GetStateRangeScanIterator(cID, startKey, endKey) + count := 0 + for { + kv, _ := itr.Next() + if kv == nil { + break + } + keyNum := begin + count + k := kv.(*ledger.KV).Key + v := kv.(*ledger.KV).Value + t.Logf("Retrieved k=%s, v=%s", k, v) + testutil.AssertEquals(t, k, createTestKey(keyNum)) + testutil.AssertEquals(t, v, createTestValue(keyNum)) + count++ + } + testutil.AssertEquals(t, count, expectedCount) +} + +func TestIteratorWithDeletes(t *testing.T) { + cID := "cID" + env := newTestEnv(t) + defer env.Cleanup() + txMgr := NewLockBasedTxMgr(env.conf) + defer txMgr.Shutdown() + s, _ := txMgr.NewTxSimulator() + for i := 1; i <= 10; i++ { + k := createTestKey(i) + v := createTestValue(i) + t.Logf("Adding k=[%s], v=[%s]", k, v) + s.SetState(cID, k, v) + } + s.Done() + // validate and commit RWset + txRWSet := s.(*LockBasedTxSimulator).getTxReadWriteSet() + isValid, err := txMgr.validateTx(txRWSet) + testutil.AssertNoError(t, err, "") + testutil.AssertSame(t, isValid, true) + txMgr.addWriteSetToBatch(txRWSet) + err = txMgr.Commit() + testutil.AssertNoError(t, err, "") + + s, _ = txMgr.NewTxSimulator() + s.DeleteState(cID, createTestKey(4)) + s.Done() + // validate and commit RWset + txRWSet = s.(*LockBasedTxSimulator).getTxReadWriteSet() + isValid, err = txMgr.validateTx(txRWSet) + testutil.AssertNoError(t, err, "") + testutil.AssertSame(t, isValid, true) + txMgr.addWriteSetToBatch(txRWSet) + err = txMgr.Commit() + testutil.AssertNoError(t, err, "") + + queryExecuter, _ := txMgr.NewQueryExecutor() + itr, _ := queryExecuter.GetStateRangeScanIterator(cID, createTestKey(3), createTestKey(6)) + defer itr.Close() + kv, _ := itr.Next() + testutil.AssertEquals(t, kv.(*ledger.KV).Key, createTestKey(3)) + kv, _ = itr.Next() + testutil.AssertEquals(t, kv.(*ledger.KV).Key, createTestKey(5)) +} + +func TestTxValidationWithItr(t *testing.T) { + cID := "cID" + env := newTestEnv(t) + defer env.Cleanup() + txMgr := NewLockBasedTxMgr(env.conf) + defer txMgr.Shutdown() + + // simulate tx1 + s1, _ := txMgr.NewTxSimulator() + for i := 1; i <= 10; i++ { + k := createTestKey(i) + v := createTestValue(i) + t.Logf("Adding k=[%s], v=[%s]", k, v) + s1.SetState(cID, k, v) + } + s1.Done() + // validate and commit RWset + txRWSet := s1.(*LockBasedTxSimulator).getTxReadWriteSet() + isValid, err := txMgr.validateTx(txRWSet) + testutil.AssertNoError(t, err, "") + testutil.AssertSame(t, isValid, true) + txMgr.addWriteSetToBatch(txRWSet) + err = txMgr.Commit() + testutil.AssertNoError(t, err, "") + + // simulate tx2 that reads key_001 and key_002 + s2, _ := txMgr.NewTxSimulator() + itr, _ := s2.GetStateRangeScanIterator(cID, createTestKey(1), createTestKey(5)) + // read key_001 and key_002 + itr.Next() + itr.Next() + itr.Close() + s2.Done() + + // simulate tx3 that reads key_004 and key_005 + s3, _ := txMgr.NewTxSimulator() + itr, _ = s3.GetStateRangeScanIterator(cID, createTestKey(4), createTestKey(6)) + // read key_001 and key_002 + itr.Next() + itr.Next() + itr.Close() + s3.Done() + + // simulate tx4 before committing tx2 and tx3. Modifies a key read by tx3 + s4, _ := txMgr.NewTxSimulator() + s4.DeleteState(cID, createTestKey(5)) + s4.Done() + + // validate and commit RWset for tx4 + txRWSet = s4.(*LockBasedTxSimulator).getTxReadWriteSet() + isValid, err = txMgr.validateTx(txRWSet) + testutil.AssertNoError(t, err, fmt.Sprintf("Error in validateTx(): %s", err)) + testutil.AssertSame(t, isValid, true) + txMgr.addWriteSetToBatch(txRWSet) + txMgr.Commit() + + //RWSet tx3 should not be invalid now + isValid, err = txMgr.validateTx(s3.(*LockBasedTxSimulator).getTxReadWriteSet()) + testutil.AssertNoError(t, err, fmt.Sprintf("Error in validateTx(): %s", err)) + testutil.AssertSame(t, isValid, false) + + // tx2 should still be valid + isValid, _ = txMgr.validateTx(s2.(*LockBasedTxSimulator).getTxReadWriteSet()) + testutil.AssertSame(t, isValid, true) +} + +func TestGetSetMultipeKeys(t *testing.T) { + cID := "cID" + env := newTestEnv(t) + defer env.Cleanup() + txMgr := NewLockBasedTxMgr(env.conf) + defer txMgr.Shutdown() + + // simulate tx1 + s1, _ := txMgr.NewTxSimulator() + multipleKeyMap := make(map[string][]byte) + for i := 1; i <= 10; i++ { + k := createTestKey(i) + v := createTestValue(i) + multipleKeyMap[k] = v + } + s1.SetStateMultipleKeys(cID, multipleKeyMap) + s1.Done() + // validate and commit RWset + txRWSet := s1.(*LockBasedTxSimulator).getTxReadWriteSet() + isValid, err := txMgr.validateTx(txRWSet) + testutil.AssertNoError(t, err, "") + testutil.AssertSame(t, isValid, true) + txMgr.addWriteSetToBatch(txRWSet) + err = txMgr.Commit() + testutil.AssertNoError(t, err, "") + + qe, _ := txMgr.NewQueryExecutor() + defer qe.Done() + multipleKeys := []string{} + for k := range multipleKeyMap { + multipleKeys = append(multipleKeys, k) + } + values, _ := qe.GetStateMultipleKeys(cID, multipleKeys) + testutil.AssertEquals(t, len(values), 10) + for i, v := range values { + testutil.AssertEquals(t, v, multipleKeyMap[multipleKeys[i]]) + } + + s2, _ := txMgr.NewTxSimulator() + defer s2.Done() + values, _ = s2.GetStateMultipleKeys(cID, multipleKeys[5:7]) + testutil.AssertEquals(t, len(values), 2) + for i, v := range values { + testutil.AssertEquals(t, v, multipleKeyMap[multipleKeys[i+5]]) + } +} + +func createTestKey(i int) string { + if i == 0 { + return "" + } + return fmt.Sprintf("key_%03d", i) +} + +func createTestValue(i int) []byte { + return []byte(fmt.Sprintf("value_%03d", i)) +} diff --git a/core/ledger/kvledger/txmgmt/lockbasedtxmgmt/lockbased_txmgr.go b/core/ledger/kvledger/txmgmt/lockbasedtxmgmt/lockbased_txmgr.go index f47e29e60ca..1eaeeffbae2 100644 --- a/core/ledger/kvledger/txmgmt/lockbasedtxmgmt/lockbased_txmgr.go +++ b/core/ledger/kvledger/txmgmt/lockbasedtxmgmt/lockbased_txmgr.go @@ -17,6 +17,7 @@ limitations under the License. package lockbasedtxmgmt import ( + "bytes" "fmt" "sync" @@ -28,10 +29,13 @@ import ( putils "github.com/hyperledger/fabric/protos/utils" "github.com/op/go-logging" "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/iterator" ) var logger = logging.MustGetLogger("lockbasedtxmgmt") +var compositeKeySep = []byte{0x00} + // Conf - configuration for `LockBasedTxMgr` type Conf struct { DBPath string @@ -80,12 +84,14 @@ func NewLockBasedTxMgr(conf *Conf) *LockBasedTxMgr { // NewQueryExecutor implements method in interface `txmgmt.TxMgr` func (txmgr *LockBasedTxMgr) NewQueryExecutor() (ledger.QueryExecutor, error) { - return &RWLockQueryExecutor{txmgr}, nil + qe := &RWLockQueryExecutor{txmgr, false} + qe.txmgr.commitRWLock.RLock() + return qe, nil } // NewTxSimulator implements method in interface `txmgmt.TxMgr` func (txmgr *LockBasedTxMgr) NewTxSimulator() (ledger.TxSimulator, error) { - s := &LockBasedTxSimulator{RWLockQueryExecutor{txmgr}, make(map[string]*nsRWs), false} + s := &LockBasedTxSimulator{RWLockQueryExecutor{txmgr, false}, make(map[string]*nsRWs)} s.txmgr.commitRWLock.RLock() return s, nil } @@ -259,6 +265,20 @@ func (txmgr *LockBasedTxMgr) getCommittedValueAndVersion(ns string, key string) return value, version, nil } +func (txmgr *LockBasedTxMgr) getCommittedRangeScanner(namespace string, startKey string, endKey string) (*kvScanner, error) { + var compositeStartKey []byte + var compositeEndKey []byte + if startKey != "" { + compositeStartKey = constructCompositeKey(namespace, startKey) + } + if endKey != "" { + compositeEndKey = constructCompositeKey(namespace, endKey) + } + + dbItr := txmgr.db.GetIterator(compositeStartKey, compositeEndKey) + return newKVScanner(namespace, dbItr), nil +} + func encodeValue(value []byte, version uint64) []byte { versionBytes := proto.EncodeVarint(version) deleteMarker := 0 @@ -285,7 +305,44 @@ func decodeValue(encodedValue []byte) ([]byte, uint64) { func constructCompositeKey(ns string, key string) []byte { compositeKey := []byte(ns) - compositeKey = append(compositeKey, byte(0)) + compositeKey = append(compositeKey, compositeKeySep...) compositeKey = append(compositeKey, []byte(key)...) return compositeKey } + +func splitCompositeKey(compositeKey []byte) (string, string) { + split := bytes.SplitN(compositeKey, compositeKeySep, 2) + return string(split[0]), string(split[1]) +} + +type kvScanner struct { + namespace string + dbItr iterator.Iterator +} + +type committedKV struct { + key string + version uint64 + value []byte +} + +func (cKV *committedKV) isDelete() bool { + return cKV.value == nil +} + +func newKVScanner(namespace string, dbItr iterator.Iterator) *kvScanner { + return &kvScanner{namespace, dbItr} +} + +func (scanner *kvScanner) next() (*committedKV, error) { + if !scanner.dbItr.Next() { + return nil, nil + } + _, key := splitCompositeKey(scanner.dbItr.Key()) + value, version := decodeValue(scanner.dbItr.Value()) + return &committedKV{key, version, value}, nil +} + +func (scanner *kvScanner) close() { + scanner.dbItr.Release() +} diff --git a/core/ledger/ledger_interface.go b/core/ledger/ledger_interface.go index c512f3bfdba..93e8f6a5d97 100644 --- a/core/ledger/ledger_interface.go +++ b/core/ledger/ledger_interface.go @@ -79,7 +79,7 @@ type QueryExecutor interface { GetState(namespace string, key string) ([]byte, error) // GetStateMultipleKeys gets the values for multiple keys in a single call GetStateMultipleKeys(namespace string, keys []string) ([][]byte, error) - // GetStateRangeScanIterator returns an iterator that contains all the key-values beteen given key ranges. + // GetStateRangeScanIterator returns an iterator that contains all the key-values between given key ranges. // The returned ResultsIterator contains results of type *KV GetStateRangeScanIterator(namespace string, startKey string, endKey string) (ResultsIterator, error) // GetTransactionsForKey returns an iterator that contains all the transactions that modified the given key. @@ -87,6 +87,8 @@ type QueryExecutor interface { GetTransactionsForKey(namespace string, key string) (ResultsIterator, error) // ExecuteQuery executes the given query and returns an iterator that contains results of type specific to the underlying data store. ExecuteQuery(query string) (ResultsIterator, error) + // Done releases resources occupied by the QueryExecutor + Done() } // TxSimulator simulates a transaction on a consistent snapshot of the 'as recent state as possible' @@ -101,10 +103,6 @@ type TxSimulator interface { SetStateMultipleKeys(namespace string, kvs map[string][]byte) error // ExecuteUpdate for supporting rich data model (see comments on QueryExecutor above) ExecuteUpdate(query string) error - // CopyState copies the entire state in the sourceNamespace to the targetNamespace. This can be a large payload - CopyState(sourceNamespace string, targetNamespace string) error - // Done releases resources occupied by the TxSimulator - Done() // GetTxSimulationResults encapsulates the results of the transaction simulation. // This should contain enough detail for // - The update in the state that would be caused if the transaction is to be committed diff --git a/core/ledger/util/db/db.go b/core/ledger/util/db/db.go index 1fd610460eb..a5fde1b9ff1 100644 --- a/core/ledger/util/db/db.go +++ b/core/ledger/util/db/db.go @@ -23,7 +23,9 @@ import ( "github.com/hyperledger/fabric/core/ledger/util" "github.com/op/go-logging" "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/opt" + goleveldbutil "github.com/syndtr/goleveldb/leveldb/util" ) var logger = logging.MustGetLogger("kvledger.db") @@ -146,6 +148,11 @@ func (dbInst *DB) Delete(key []byte, sync bool) error { return nil } +// GetIterator returns an iterator over key-value store. The iterator should be released after the use +func (dbInst *DB) GetIterator(startKey []byte, endKey []byte) iterator.Iterator { + return dbInst.db.NewIterator(&goleveldbutil.Range{Start: startKey, Limit: endKey}, dbInst.readOpts) +} + // WriteBatch writes a batch func (dbInst *DB) WriteBatch(batch *leveldb.Batch, sync bool) error { wo := dbInst.writeOptsNoSync diff --git a/core/ledger/util/db/db_test.go b/core/ledger/util/db/db_test.go index bb6e167ac66..8b9201917ea 100644 --- a/core/ledger/util/db/db_test.go +++ b/core/ledger/util/db/db_test.go @@ -20,18 +20,15 @@ import ( "os" "testing" + "fmt" + "github.com/hyperledger/fabric/core/ledger/testutil" ) +const testDBPath = "/tmp/test/hyperledger/fabric/core/ledger/util/db" + func TestDBBasicWriteAndReads(t *testing.T) { - testDBPath := "/tmp/test/hyperledger/fabric/core/ledger/util/db" - if err := os.RemoveAll(testDBPath); err != nil { - t.Fatalf("Error:%s", err) - } - dbConf := &Conf{testDBPath} - defer func() { os.RemoveAll(testDBPath) }() - db := CreateDB(dbConf) - db.Open() + db := createTestDB(t) defer db.Close() db.Put([]byte("key1"), []byte("value1"), false) db.Put([]byte("key2"), []byte("value2"), false) @@ -48,3 +45,46 @@ func TestDBBasicWriteAndReads(t *testing.T) { testutil.AssertNoError(t, err, "") testutil.AssertEquals(t, val, []byte("value3")) } + +func TestIterator(t *testing.T) { + db := createTestDB(t) + defer db.Close() + for i := 0; i < 10; i++ { + db.Put(createTestKey(i), createTestValue(i), false) + } + + startKey := 2 + endKey := 7 + itr := db.GetIterator(createTestKey(startKey), createTestKey(endKey)) + defer itr.Release() + var count = 0 + itr.Next() + for i := startKey; itr.Valid(); itr.Next() { + k := itr.Key() + v := itr.Value() + t.Logf("Key=%s, value=%s", string(k), string(v)) + testutil.AssertEquals(t, k, createTestKey(i)) + testutil.AssertEquals(t, v, createTestValue(i)) + i++ + count++ + } + testutil.AssertEquals(t, count, endKey-startKey) +} + +func createTestKey(i int) []byte { + return []byte(fmt.Sprintf("key_%d", i)) +} + +func createTestValue(i int) []byte { + return []byte(fmt.Sprintf("value_%d", i)) +} + +func createTestDB(t *testing.T) *DB { + if err := os.RemoveAll(testDBPath); err != nil { + t.Fatalf("Error:%s", err) + } + dbConf := &Conf{testDBPath} + db := CreateDB(dbConf) + db.Open() + return db +}