Skip to content

Commit

Permalink
Added support for advance KV-queries
Browse files Browse the repository at this point in the history
https://jira.hyperledger.org/browse/FAB-192

Change-Id: I04bf6a03247b38fdc49b7084182c4f70e09c7cdd
Signed-off-by: manish <manish.sethi@gmail.com>
  • Loading branch information
manish-sethi committed Nov 18, 2016
1 parent efdab32 commit 445fbdb
Show file tree
Hide file tree
Showing 8 changed files with 474 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,8 @@ func (q *CouchDBQueryExecutor) GetTransactionsForKey(namespace string, key strin
func (q *CouchDBQueryExecutor) ExecuteQuery(query string) (ledger.ResultsIterator, error) {
return nil, errors.New("Not supported by KV data model")
}

// Done implements method in interface `ledger.QueryExecutor`
func (q *CouchDBQueryExecutor) Done() {
//TODO - acquire lock when constructing and release the lock here
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import (
// RWLockQueryExecutor is a query executor used in `LockBasedTxMgr`
type RWLockQueryExecutor struct {
txmgr *LockBasedTxMgr
done bool
}

// GetState implements method in interface `ledger.QueryExecutor`
func (q *RWLockQueryExecutor) GetState(ns string, key string) ([]byte, error) {
q.checkDone()
var value []byte
var err error
if value, _, err = q.txmgr.getCommittedValueAndVersion(ns, key); err != nil {
Expand All @@ -39,12 +41,29 @@ func (q *RWLockQueryExecutor) GetState(ns string, key string) ([]byte, error) {

// GetStateMultipleKeys implements method in interface `ledger.QueryExecutor`
func (q *RWLockQueryExecutor) GetStateMultipleKeys(namespace string, keys []string) ([][]byte, error) {
return nil, errors.New("Not yet implemented")
var results [][]byte
var value []byte
var err error
for _, key := range keys {
if value, err = q.GetState(namespace, key); err != nil {
return nil, err
}
results = append(results, value)
}
return results, nil
}

// GetStateRangeScanIterator implements method in interface `ledger.QueryExecutor`
// startKey is included in the results and endKey is excluded. An empty startKey refers to the first available key
// and an empty endKey refers to the last available key. For scanning all the keys, both the startKey and the endKey
// can be supplied as empty strings. However, a full scan shuold be used judiciously for performance reasons.
func (q *RWLockQueryExecutor) GetStateRangeScanIterator(namespace string, startKey string, endKey string) (ledger.ResultsIterator, error) {
return nil, errors.New("Not yet implemented")
q.checkDone()
scanner, err := q.txmgr.getCommittedRangeScanner(namespace, startKey, endKey)
if err != nil {
return nil, err
}
return &qKVItr{scanner}, nil
}

// GetTransactionsForKey - implements method in interface `ledger.QueryExecutor`
Expand All @@ -56,3 +75,39 @@ func (q *RWLockQueryExecutor) GetTransactionsForKey(namespace string, key string
func (q *RWLockQueryExecutor) ExecuteQuery(query string) (ledger.ResultsIterator, error) {
return nil, errors.New("Not supported by KV data model")
}

// Done implements method in interface `ledger.TxSimulator`
func (q *RWLockQueryExecutor) Done() {
q.done = true
q.txmgr.commitRWLock.RUnlock()
}

func (q *RWLockQueryExecutor) checkDone() {
if q.done {
panic("This method should not be called after calling Done()")
}
}

type qKVItr struct {
s *kvScanner
}

// Next implements Next() method in ledger.ResultsIterator
func (itr *qKVItr) Next() (ledger.QueryResult, error) {
committedKV, err := itr.s.next()
if err != nil {
return nil, err
}
if committedKV == nil {
return nil, nil
}
if committedKV.isDelete() {
return itr.Next()
}
return &ledger.KV{Key: committedKV.key, Value: committedKV.value}, nil
}

// Close implements Close() method in ledger.ResultsIterator
func (itr *qKVItr) Close() {
itr.s.close()
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"reflect"

"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt"
logging "github.com/op/go-logging"
)
Expand All @@ -42,7 +43,6 @@ func newNsRWs() *nsRWs {
type LockBasedTxSimulator struct {
RWLockQueryExecutor
rwMap map[string]*nsRWs
done bool
}

func (s *LockBasedTxSimulator) getOrCreateNsRWHolder(ns string) *nsRWs {
Expand All @@ -57,6 +57,7 @@ func (s *LockBasedTxSimulator) getOrCreateNsRWHolder(ns string) *nsRWs {

// GetState implements method in interface `ledger.TxSimulator`
func (s *LockBasedTxSimulator) GetState(ns string, key string) ([]byte, error) {
s.checkDone()
logger.Debugf("Get state [%s:%s]", ns, key)
nsRWs := s.getOrCreateNsRWHolder(ns)
// check if it was written
Expand Down Expand Up @@ -106,11 +107,24 @@ func (s *LockBasedTxSimulator) GetState(ns string, key string) ([]byte, error) {
return value, nil
}

// GetStateRangeScanIterator implements method in interface `ledger.QueryExecutor`
// startKey is included in the results and endKey is excluded. An empty startKey refers to the first available key
// and an empty endKey refers to the last available key. For scanning all the keys, both the startKey and the endKey
// can be supplied as empty strings. However, a full scan shuold be used judiciously for performance reasons.
// TODO: The range scan queries still do not support Read-Your_Write (RYW)
// semantics as it is still not agreed upon whether we want RYW model or not.
func (s *LockBasedTxSimulator) GetStateRangeScanIterator(namespace string, startKey string, endKey string) (ledger.ResultsIterator, error) {
s.checkDone()
scanner, err := s.txmgr.getCommittedRangeScanner(namespace, startKey, endKey)
if err != nil {
return nil, err
}
return &sKVItr{scanner, s}, nil
}

// SetState implements method in interface `ledger.TxSimulator`
func (s *LockBasedTxSimulator) SetState(ns string, key string, value []byte) error {
if s.done {
panic("This method should not be called after calling Done()")
}
s.checkDone()
nsRWs := s.getOrCreateNsRWHolder(ns)
kvWrite, ok := nsRWs.writeMap[key]
if ok {
Expand All @@ -126,12 +140,6 @@ func (s *LockBasedTxSimulator) DeleteState(ns string, key string) error {
return s.SetState(ns, key, nil)
}

// Done implements method in interface `ledger.TxSimulator`
func (s *LockBasedTxSimulator) Done() {
s.done = true
s.txmgr.commitRWLock.RUnlock()
}

func (s *LockBasedTxSimulator) getTxReadWriteSet() *txmgmt.TxReadWriteSet {
txRWSet := &txmgmt.TxReadWriteSet{}
sortedNamespaces := getSortedKeys(s.rwMap)
Expand Down Expand Up @@ -186,15 +194,43 @@ func (s *LockBasedTxSimulator) GetTxSimulationResults() ([]byte, error) {

// SetStateMultipleKeys implements method in interface `ledger.TxSimulator`
func (s *LockBasedTxSimulator) SetStateMultipleKeys(namespace string, kvs map[string][]byte) error {
return errors.New("Not yet implemented")
}

// CopyState implements method in interface `ledger.TxSimulator`
func (s *LockBasedTxSimulator) CopyState(sourceNamespace string, targetNamespace string) error {
return errors.New("Not yet implemented")
for k, v := range kvs {
if err := s.SetState(namespace, k, v); err != nil {
return err
}
}
return nil
}

// ExecuteUpdate implements method in interface `ledger.TxSimulator`
func (s *LockBasedTxSimulator) ExecuteUpdate(query string) error {
return errors.New("Not supported by KV data model")
}

type sKVItr struct {
scanner *kvScanner
simulator *LockBasedTxSimulator
}

// Next implements Next() method in ledger.ResultsIterator
func (itr *sKVItr) Next() (ledger.QueryResult, error) {
committedKV, err := itr.scanner.next()
if err != nil {
return nil, err
}
if committedKV == nil {
return nil, nil
}
if committedKV.isDelete() {
return itr.Next()
}
nsRWs := itr.simulator.getOrCreateNsRWHolder(itr.scanner.namespace)
nsRWs.readMap[committedKV.key] = &kvReadCache{
&txmgmt.KVRead{Key: committedKV.key, Version: committedKV.version}, committedKV.value}
return &ledger.KV{Key: committedKV.key, Value: committedKV.value}, nil
}

// Close implements Close() method in ledger.ResultsIterator
func (itr *sKVItr) Close() {
itr.scanner.close()
}

0 comments on commit 445fbdb

Please sign in to comment.