diff --git a/core/ledger/kvledger/txmgmt/rwset/rwset.go b/core/ledger/kvledger/txmgmt/rwset/rwset.go index e6f30c15676..c2a31d6d486 100644 --- a/core/ledger/kvledger/txmgmt/rwset/rwset.go +++ b/core/ledger/kvledger/txmgmt/rwset/rwset.go @@ -54,11 +54,38 @@ func (w *KVWrite) SetValue(value []byte) { w.IsDelete = value == nil } +// RangeQueryInfo captures a range query executed by a transaction +// and the tuples that are read by the transaction +// This it to be used to perform a phantom-read validation during commit +type RangeQueryInfo struct { + StartKey string + EndKey string + ItrExhausted bool + results []*KVRead + resultHash []byte +} + +// AddResult appends the result +func (rqi *RangeQueryInfo) AddResult(kvRead *KVRead) { + rqi.results = append(rqi.results, kvRead) +} + +// GetResults returns the results of the range query +func (rqi *RangeQueryInfo) GetResults() []*KVRead { + return rqi.results +} + +// GetResultHash returns the resultHash +func (rqi *RangeQueryInfo) GetResultHash() []byte { + return rqi.resultHash +} + // NsReadWriteSet - a collection of all the reads and writes that belong to a common namespace type NsReadWriteSet struct { - NameSpace string - Reads []*KVRead - Writes []*KVWrite + NameSpace string + Reads []*KVRead + Writes []*KVWrite + RangeQueriesInfo []*RangeQueryInfo } // TxReadWriteSet - a collection of all the reads and writes collected as a result of a transaction simulation @@ -97,6 +124,79 @@ func (r *KVRead) Unmarshal(buf *proto.Buffer) error { return nil } +// Marshal serializes a `RangeQueryInfo` +func (rqi *RangeQueryInfo) Marshal(buf *proto.Buffer) error { + if err := buf.EncodeStringBytes(rqi.StartKey); err != nil { + return err + } + if err := buf.EncodeStringBytes(rqi.EndKey); err != nil { + return err + } + + itrExhausedMarker := 0 // iterator did not get exhausted + if rqi.ItrExhausted { + itrExhausedMarker = 1 + } + if err := buf.EncodeVarint(uint64(itrExhausedMarker)); err != nil { + return err + } + + if err := buf.EncodeVarint(uint64(len(rqi.results))); err != nil { + return err + } + for i := 0; i < len(rqi.results); i++ { + if err := rqi.results[i].Marshal(buf); err != nil { + return err + } + } + if err := buf.EncodeRawBytes(rqi.resultHash); err != nil { + return err + } + return nil +} + +// Unmarshal deserializes a `RangeQueryInfo` +func (rqi *RangeQueryInfo) Unmarshal(buf *proto.Buffer) error { + var err error + var numResults uint64 + var itrExhaustedMarker uint64 + + if rqi.StartKey, err = buf.DecodeStringBytes(); err != nil { + return err + } + if rqi.EndKey, err = buf.DecodeStringBytes(); err != nil { + return err + } + if itrExhaustedMarker, err = buf.DecodeVarint(); err != nil { + return err + } + if itrExhaustedMarker == 1 { + rqi.ItrExhausted = true + } else { + rqi.ItrExhausted = false + } + if numResults, err = buf.DecodeVarint(); err != nil { + return err + } + if numResults > 0 { + rqi.results = make([]*KVRead, int(numResults)) + } + for i := 0; i < int(numResults); i++ { + kvRead := &KVRead{} + if err := kvRead.Unmarshal(buf); err != nil { + return err + } + rqi.results[i] = kvRead + } + if rqi.resultHash, err = buf.DecodeRawBytes(false); err != nil { + return err + } + if len(rqi.resultHash) == 0 { + rqi.resultHash = nil + } + return nil +} + // Marshal serializes a `KVWrite` func (w *KVWrite) Marshal(buf *proto.Buffer) error { var err error @@ -148,13 +248,25 @@ func (nsRW *NsReadWriteSet) Marshal(buf *proto.Buffer) error { return err } for i := 0; i < len(nsRW.Reads); i++ { - nsRW.Reads[i].Marshal(buf) + if err = nsRW.Reads[i].Marshal(buf); err != nil { + return err + } } if err = buf.EncodeVarint(uint64(len(nsRW.Writes))); err != nil { return err } for i := 0; i < len(nsRW.Writes); i++ { - nsRW.Writes[i].Marshal(buf) + if err = nsRW.Writes[i].Marshal(buf); err != nil { + return err + } + } + if err = buf.EncodeVarint(uint64(len(nsRW.RangeQueriesInfo))); err != nil { + return err + } + for i := 0; i < len(nsRW.RangeQueriesInfo); i++ { + if err = nsRW.RangeQueriesInfo[i].Marshal(buf); err != nil { + return err + } } return nil } @@ -188,6 +300,18 @@ func (nsRW *NsReadWriteSet) Unmarshal(buf *proto.Buffer) error { } nsRW.Writes = append(nsRW.Writes, w) } + + var numRangeQueriesInfo uint64 + if numRangeQueriesInfo, err = buf.DecodeVarint(); err != nil { + return err + } + for i := 0; i < int(numRangeQueriesInfo); i++ { + rqInfo := &RangeQueryInfo{} + if err = rqInfo.Unmarshal(buf); err != nil { + return err + } + nsRW.RangeQueriesInfo = append(nsRW.RangeQueriesInfo, rqInfo) + } return nil } @@ -234,18 +358,32 @@ func (w *KVWrite) String() string { return fmt.Sprintf("%s=[%#v]", w.Key, w.Value) } +// String prints a range query info +func (rqi *RangeQueryInfo) String() string { + return fmt.Sprintf("StartKey=%s, EndKey=%s, ItrExhausted=%t, Results=%#v, Hash=%#v", + rqi.StartKey, rqi.EndKey, rqi.ItrExhausted, rqi.results, rqi.resultHash) +} + // String prints a `NsReadWriteSet` func (nsRW *NsReadWriteSet) String() string { var buffer bytes.Buffer - buffer.WriteString("ReadSet~") + buffer.WriteString("ReadSet=\n") for _, r := range nsRW.Reads { + buffer.WriteString("\t") buffer.WriteString(r.String()) - buffer.WriteString(",") + buffer.WriteString("\n") } - buffer.WriteString("WriteSet~") + buffer.WriteString("WriteSet=\n") for _, w := range nsRW.Writes { + buffer.WriteString("\t") buffer.WriteString(w.String()) - buffer.WriteString(",") + buffer.WriteString("\n") + } + buffer.WriteString("RangeQueriesInfo=\n") + for _, rqi := range nsRW.RangeQueriesInfo { + buffer.WriteString("\t") + buffer.WriteString(rqi.String()) + buffer.WriteString("\n") } return buffer.String() } diff --git a/core/ledger/kvledger/txmgmt/rwset/rwset_holder.go b/core/ledger/kvledger/txmgmt/rwset/rwset_holder.go index 81b57bca3fd..1d0b20ba45d 100644 --- a/core/ledger/kvledger/txmgmt/rwset/rwset_holder.go +++ b/core/ledger/kvledger/txmgmt/rwset/rwset_holder.go @@ -17,21 +17,28 @@ limitations under the License. package rwset import ( - "reflect" - "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version" + "github.com/hyperledger/fabric/core/ledger/util" logging "github.com/op/go-logging" ) var logger = logging.MustGetLogger("rwset") type nsRWs struct { - readMap map[string]*KVRead - writeMap map[string]*KVWrite + readMap map[string]*KVRead //for mvcc validation + writeMap map[string]*KVWrite + rangeQueriesMap map[rangeQueryKey]*RangeQueryInfo //for phantom read validation + rangeQueriesKeys []rangeQueryKey } func newNsRWs() *nsRWs { - return &nsRWs{make(map[string]*KVRead), make(map[string]*KVWrite)} + return &nsRWs{make(map[string]*KVRead), make(map[string]*KVWrite), make(map[rangeQueryKey]*RangeQueryInfo), nil} +} + +type rangeQueryKey struct { + startKey string + endKey string + itrExhausted bool } // RWSet maintains the read-write set @@ -56,6 +63,17 @@ func (rws *RWSet) AddToWriteSet(ns string, key string, value []byte) { nsRWs.writeMap[key] = NewKVWrite(key, value) } +// AddToRangeQuerySet adds a range query info for performing phantom read validation +func (rws *RWSet) AddToRangeQuerySet(ns string, rqi *RangeQueryInfo) { + nsRWs := rws.getOrCreateNsRW(ns) + key := rangeQueryKey{rqi.StartKey, rqi.EndKey, rqi.ItrExhausted} + _, ok := nsRWs.rangeQueriesMap[key] + if !ok { + nsRWs.rangeQueriesMap[key] = rqi + nsRWs.rangeQueriesKeys = append(nsRWs.rangeQueriesKeys, key) + } +} + // GetFromWriteSet return the value of a key from the write-set func (rws *RWSet) GetFromWriteSet(ns string, key string) ([]byte, bool) { nsRWs, ok := rws.rwMap[ns] @@ -73,24 +91,32 @@ func (rws *RWSet) GetFromWriteSet(ns string, key string) ([]byte, bool) { // GetTxReadWriteSet returns the read-write set in the form that can be serialized func (rws *RWSet) GetTxReadWriteSet() *TxReadWriteSet { txRWSet := &TxReadWriteSet{} - sortedNamespaces := getSortedKeys(rws.rwMap) + sortedNamespaces := util.GetSortedKeys(rws.rwMap) for _, ns := range sortedNamespaces { //Get namespace specific read-writes nsReadWriteMap := rws.rwMap[ns] + //add read set reads := []*KVRead{} - sortedReadKeys := getSortedKeys(nsReadWriteMap.readMap) + sortedReadKeys := util.GetSortedKeys(nsReadWriteMap.readMap) for _, key := range sortedReadKeys { reads = append(reads, nsReadWriteMap.readMap[key]) } //add write set writes := []*KVWrite{} - sortedWriteKeys := getSortedKeys(nsReadWriteMap.writeMap) + sortedWriteKeys := util.GetSortedKeys(nsReadWriteMap.writeMap) for _, key := range sortedWriteKeys { writes = append(writes, nsReadWriteMap.writeMap[key]) } - nsRWs := &NsReadWriteSet{NameSpace: ns, Reads: reads, Writes: writes} + + //add range query info + rangeQueriesInfo := []*RangeQueryInfo{} + rangeQueriesMap := nsReadWriteMap.rangeQueriesMap + for _, key := range nsReadWriteMap.rangeQueriesKeys { + rangeQueriesInfo = append(rangeQueriesInfo, rangeQueriesMap[key]) + } + nsRWs := &NsReadWriteSet{NameSpace: ns, Reads: reads, Writes: writes, RangeQueriesInfo: rangeQueriesInfo} txRWSet.NsRWs = append(txRWSet.NsRWs, nsRWs) } return txRWSet @@ -105,13 +131,3 @@ func (rws *RWSet) getOrCreateNsRW(ns string) *nsRWs { } return nsRWs } - -func getSortedKeys(m interface{}) []string { - mapVal := reflect.ValueOf(m) - keyVals := mapVal.MapKeys() - keys := []string{} - for _, keyVal := range keyVals { - keys = append(keys, keyVal.String()) - } - return keys -} diff --git a/core/ledger/kvledger/txmgmt/rwset/rwset_holder_test.go b/core/ledger/kvledger/txmgmt/rwset/rwset_holder_test.go new file mode 100644 index 00000000000..8ca168bf595 --- /dev/null +++ b/core/ledger/kvledger/txmgmt/rwset/rwset_holder_test.go @@ -0,0 +1,68 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rwset + +import ( + "testing" + + "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version" + "github.com/hyperledger/fabric/core/ledger/testutil" +) + +func TestRWSetHolder(t *testing.T) { + rwSet := NewRWSet() + + rwSet.AddToReadSet("ns1", "key2", version.NewHeight(1, 2)) + rwSet.AddToReadSet("ns1", "key1", version.NewHeight(1, 1)) + rwSet.AddToWriteSet("ns1", "key2", []byte("value2")) + + rqi1 := &RangeQueryInfo{"bKey", "", false, nil, nil} + rqi1.EndKey = "eKey" + rqi1.results = []*KVRead{NewKVRead("bKey1", version.NewHeight(2, 3)), NewKVRead("bKey2", version.NewHeight(2, 4))} + rqi1.ItrExhausted = true + rwSet.AddToRangeQuerySet("ns1", rqi1) + + rqi2 := &RangeQueryInfo{"bKey", "", false, nil, nil} + rqi2.EndKey = "eKey" + rqi2.results = []*KVRead{NewKVRead("bKey1", version.NewHeight(2, 3)), NewKVRead("bKey2", version.NewHeight(2, 4))} + rqi2.ItrExhausted = true + rwSet.AddToRangeQuerySet("ns1", rqi2) + + rqi3 := &RangeQueryInfo{"bKey", "", true, nil, nil} + rwSet.AddToRangeQuerySet("ns1", rqi3) + rqi3.EndKey = "eKey1" + rqi3.results = []*KVRead{NewKVRead("bKey1", version.NewHeight(2, 3)), NewKVRead("bKey2", version.NewHeight(2, 4))} + + rwSet.AddToReadSet("ns2", "key2", version.NewHeight(1, 2)) + rwSet.AddToWriteSet("ns2", "key3", []byte("value3")) + + txRWSet := rwSet.GetTxReadWriteSet() + + ns1RWSet := &NsReadWriteSet{"ns1", + []*KVRead{&KVRead{"key1", version.NewHeight(1, 1)}, &KVRead{"key2", version.NewHeight(1, 2)}}, + []*KVWrite{&KVWrite{"key2", false, []byte("value2")}}, + []*RangeQueryInfo{rqi1, rqi3}} + + ns2RWSet := &NsReadWriteSet{"ns2", + []*KVRead{&KVRead{"key2", version.NewHeight(1, 2)}}, + []*KVWrite{&KVWrite{"key3", false, []byte("value3")}}, + []*RangeQueryInfo{}} + + expectedTxRWSet := &TxReadWriteSet{[]*NsReadWriteSet{ns1RWSet, ns2RWSet}} + t.Logf("Actual=%s\n Expected=%s", txRWSet, expectedTxRWSet) + testutil.AssertEquals(t, txRWSet, expectedTxRWSet) +} diff --git a/core/ledger/kvledger/txmgmt/rwset/rwset_test.go b/core/ledger/kvledger/txmgmt/rwset/rwset_test.go index 673904b0c21..b542bffa014 100644 --- a/core/ledger/kvledger/txmgmt/rwset/rwset_test.go +++ b/core/ledger/kvledger/txmgmt/rwset/rwset_test.go @@ -17,8 +17,10 @@ limitations under the License. package rwset import ( + "fmt" "testing" + "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version" "github.com/hyperledger/fabric/core/ledger/testutil" ) @@ -27,7 +29,8 @@ func TestNilTxRWSet(t *testing.T) { txRW := &TxReadWriteSet{} nsRW1 := &NsReadWriteSet{"ns1", []*KVRead{&KVRead{"key1", nil}}, - []*KVWrite{&KVWrite{"key1", false, []byte("value1")}}} + []*KVWrite{&KVWrite{"key1", false, []byte("value1")}}, + nil} txRW.NsRWs = append(txRW.NsRWs, nsRW1) b, err := txRW.Marshal() testutil.AssertNoError(t, err, "Error while marshalling changeset") @@ -43,24 +46,52 @@ func TestTxRWSetMarshalUnmarshal(t *testing.T) { txRW := &TxReadWriteSet{} nsRW1 := &NsReadWriteSet{"ns1", []*KVRead{&KVRead{"key1", version.NewHeight(1, 1)}}, - []*KVWrite{&KVWrite{"key2", false, []byte("value2")}}} + []*KVWrite{&KVWrite{"key2", false, []byte("value2")}}, + nil} nsRW2 := &NsReadWriteSet{"ns2", []*KVRead{&KVRead{"key3", version.NewHeight(1, 2)}}, - []*KVWrite{&KVWrite{"key4", true, nil}}} + []*KVWrite{&KVWrite{"key4", true, nil}}, + nil} nsRW3 := &NsReadWriteSet{"ns3", []*KVRead{&KVRead{"key5", version.NewHeight(1, 3)}}, - []*KVWrite{&KVWrite{"key6", false, []byte("value6")}, &KVWrite{"key7", false, []byte("value7")}}} + []*KVWrite{&KVWrite{"key6", false, []byte("value6")}, &KVWrite{"key7", false, []byte("value7")}}, + nil} - txRW.NsRWs = append(txRW.NsRWs, nsRW1, nsRW2, nsRW3) + nsRW4 := &NsReadWriteSet{"ns4", + []*KVRead{&KVRead{"key8", version.NewHeight(1, 3)}}, + []*KVWrite{&KVWrite{"key9", false, []byte("value9")}, &KVWrite{"key10", false, []byte("value10")}}, + []*RangeQueryInfo{&RangeQueryInfo{"startKey1", "endKey1", true, nil, testutil.ConstructRandomBytes(t, 10)}}} + buf := proto.NewBuffer(nil) + rqInfo := &RangeQueryInfo{"startKey2", "endKey2", false, []*KVRead{&KVRead{"key11", version.NewHeight(1, 3)}}, nil} + rqInfo.Marshal(buf) + + rqInfo1 := &RangeQueryInfo{} + rqInfo1.Unmarshal(buf) + fmt.Printf("rqInfo=%#v\n", rqInfo) + fmt.Printf("rqInfo1=%#v\n", rqInfo1) + + nsRW5 := &NsReadWriteSet{"ns5", + nil, + nil, + []*RangeQueryInfo{&RangeQueryInfo{"startKey2", "endKey2", false, []*KVRead{&KVRead{"key11", version.NewHeight(1, 3)}}, nil}}} + + nsRW6 := &NsReadWriteSet{"ns6", + nil, + nil, + []*RangeQueryInfo{ + &RangeQueryInfo{"startKey2", "endKey2", false, []*KVRead{&KVRead{"key11", version.NewHeight(1, 3)}}, nil}, + &RangeQueryInfo{"startKey3", "endKey3", true, []*KVRead{&KVRead{"key12", version.NewHeight(2, 4)}}, nil}}} + + txRW.NsRWs = append(txRW.NsRWs, nsRW1, nsRW2, nsRW3, nsRW4, nsRW5, nsRW6) + t.Logf("Testing txRWSet = %s", txRW) b, err := txRW.Marshal() testutil.AssertNoError(t, err, "Error while marshalling changeset") deserializedRWSet := &TxReadWriteSet{} err = deserializedRWSet.Unmarshal(b) testutil.AssertNoError(t, err, "Error while unmarshalling changeset") - t.Logf("Unmarshalled changeset = %#+v", deserializedRWSet.NsRWs[0].Writes[0].IsDelete) testutil.AssertEquals(t, deserializedRWSet, txRW) } diff --git a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go index 2c004a828e7..24fbadb727d 100644 --- a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go +++ b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go @@ -193,59 +193,63 @@ func (vdb *VersionedDB) ExecuteQuery(query string) (statedb.ResultsIterator, err // ApplyUpdates implements method in VersionedDB interface func (vdb *VersionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version.Height) error { - for ck, vv := range batch.KVs { - compositeKey := constructCompositeKey(ck.Namespace, ck.Key) - - // trace the first 200 characters of versioned value only, in case it is huge - if logger.IsEnabledFor(logging.DEBUG) { - versionedValueDump := fmt.Sprintf("%#v", vv) - if len(versionedValueDump) > 200 { - versionedValueDump = versionedValueDump[0:200] + "..." + namespaces := batch.GetUpdatedNamespaces() + for _, ns := range namespaces { + updates := batch.GetUpdates(ns) + for k, vv := range updates { + compositeKey := constructCompositeKey(ns, k) + + // trace the first 200 characters of versioned value only, in case it is huge + if logger.IsEnabledFor(logging.DEBUG) { + versionedValueDump := fmt.Sprintf("%#v", vv) + if len(versionedValueDump) > 200 { + versionedValueDump = versionedValueDump[0:200] + "..." + } + logger.Debugf("Applying key=%#v, versionedValue=%s", compositeKey, versionedValueDump) } - logger.Debugf("Applying key=%#v, versionedValue=%s", ck, versionedValueDump) - } - // TODO add delete logic for couch using this approach from stateleveldb - convert nils to deletes - /* if vv.Value == nil { - levelBatch.Delete(compositeKey) - } else { - levelBatch.Put(compositeKey, encodeValue(vv.Value, vv.Version)) + // TODO add delete logic for couch using this approach from stateleveldb - convert nils to deletes + /* if vv.Value == nil { + levelBatch.Delete(compositeKey) + } else { + levelBatch.Put(compositeKey, encodeValue(vv.Value, vv.Version)) + } + */ + + if couchdb.IsJSON(string(vv.Value)) { + + // SaveDoc using couchdb client and use JSON format + rev, err := vdb.db.SaveDoc(string(compositeKey), "", vv.Value, nil) + if err != nil { + logger.Errorf("Error during Commit(): %s\n", err.Error()) + return err + } + if rev != "" { + logger.Debugf("Saved document revision number: %s\n", rev) } - */ - - if couchdb.IsJSON(string(vv.Value)) { - // SaveDoc using couchdb client and use JSON format - rev, err := vdb.db.SaveDoc(string(compositeKey), "", vv.Value, nil) - if err != nil { - logger.Errorf("Error during Commit(): %s\n", err.Error()) - return err - } - if rev != "" { - logger.Debugf("Saved document revision number: %s\n", rev) - } + } else { // if the data is not JSON, save as binary attachment in Couch - } else { // if the data is not JSON, save as binary attachment in Couch + //Create an attachment structure and load the bytes + attachment := &couchdb.Attachment{} + attachment.AttachmentBytes = vv.Value + attachment.ContentType = "application/octet-stream" + attachment.Name = "valueBytes" - //Create an attachment structure and load the bytes - attachment := &couchdb.Attachment{} - attachment.AttachmentBytes = vv.Value - attachment.ContentType = "application/octet-stream" - attachment.Name = "valueBytes" + attachments := []couchdb.Attachment{} + attachments = append(attachments, *attachment) - attachments := []couchdb.Attachment{} - attachments = append(attachments, *attachment) + // SaveDoc using couchdb client and use attachment to persist the binary data + rev, err := vdb.db.SaveDoc(string(compositeKey), "", nil, attachments) + if err != nil { + logger.Errorf("Error during Commit(): %s\n", err.Error()) + return err + } + if rev != "" { + logger.Debugf("Saved document revision number: %s\n", rev) + } - // SaveDoc using couchdb client and use attachment to persist the binary data - rev, err := vdb.db.SaveDoc(string(compositeKey), "", nil, attachments) - if err != nil { - logger.Errorf("Error during Commit(): %s\n", err.Error()) - return err } - if rev != "" { - logger.Debugf("Saved document revision number: %s\n", rev) - } - } } diff --git a/core/ledger/kvledger/txmgmt/statedb/statedb.go b/core/ledger/kvledger/txmgmt/statedb/statedb.go index 5471e3d1f14..fe09c82cc16 100644 --- a/core/ledger/kvledger/txmgmt/statedb/statedb.go +++ b/core/ledger/kvledger/txmgmt/statedb/statedb.go @@ -16,7 +16,12 @@ limitations under the License. package statedb -import "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version" +import ( + "sort" + + "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version" + "github.com/hyperledger/fabric/core/ledger/util" +) // VersionedDBProvider provides an instance of an versioned DB type VersionedDBProvider interface { @@ -87,14 +92,22 @@ type ResultsIterator interface { // QueryResult - a general interface for supporting different types of query results. Actual types differ for different queries type QueryResult interface{} +type nsUpdates struct { + m map[string]*VersionedValue +} + +func newNsUpdates() *nsUpdates { + return &nsUpdates{make(map[string]*VersionedValue)} +} + // UpdateBatch encloses the details of multiple `updates` type UpdateBatch struct { - KVs map[CompositeKey]*VersionedValue + updates map[string]*nsUpdates } // NewUpdateBatch constructs an instance of a Batch func NewUpdateBatch() *UpdateBatch { - return &UpdateBatch{make(map[CompositeKey]*VersionedValue)} + return &UpdateBatch{make(map[string]*nsUpdates)} } // Put adds a VersionedKV @@ -102,16 +115,107 @@ func (batch *UpdateBatch) Put(ns string, key string, value []byte, version *vers if value == nil { panic("Nil value not allowed") } - batch.KVs[CompositeKey{ns, key}] = &VersionedValue{value, version} + nsUpdates := batch.getOrCreateNsUpdates(ns) + nsUpdates.m[key] = &VersionedValue{value, version} } // Delete deletes a Key and associated value func (batch *UpdateBatch) Delete(ns string, key string, version *version.Height) { - batch.KVs[CompositeKey{ns, key}] = &VersionedValue{nil, version} + nsUpdates := batch.getOrCreateNsUpdates(ns) + nsUpdates.m[key] = &VersionedValue{nil, version} } // Exists checks whether the given key exists in the batch func (batch *UpdateBatch) Exists(ns string, key string) bool { - _, ok := batch.KVs[CompositeKey{ns, key}] + nsUpdates, ok := batch.updates[ns] + if !ok { + return false + } + _, ok = nsUpdates.m[key] return ok } + +// GetUpdatedNamespaces returns the names of the namespaces that are updated +func (batch *UpdateBatch) GetUpdatedNamespaces() []string { + namespaces := make([]string, len(batch.updates)) + i := 0 + for ns := range batch.updates { + namespaces[i] = ns + i++ + } + return namespaces +} + +// GetUpdates returns all the updates for a namespace +func (batch *UpdateBatch) GetUpdates(ns string) map[string]*VersionedValue { + nsUpdates, ok := batch.updates[ns] + if !ok { + return nil + } + return nsUpdates.m +} + +// GetRangeScanIterator returns an iterator that iterates over keys of a specific namespace in sorted order +// In other word this gives the same functionality over the contents in the `UpdateBatch` as +// `VersionedDB.GetStateRangeScanIterator()` method gives over the contents in the statedb +// This function can be used for querying the contents in the updateBatch before they are committed to the statedb. +// For instance, a validator implementation can used this to verify the validity of a range query of a transaction +// where the UpdateBatch represents the union of the modifications performed by the preceding valid transactions in the same block +// (Assuming Group commit approach where we commit all the updates caused by a block together). +func (batch *UpdateBatch) GetRangeScanIterator(ns string, startKey string, endKey string) ResultsIterator { + return newNsIterator(ns, startKey, endKey, batch) +} + +func (batch *UpdateBatch) getOrCreateNsUpdates(ns string) *nsUpdates { + nsUpdates := batch.updates[ns] + if nsUpdates == nil { + nsUpdates = newNsUpdates() + batch.updates[ns] = nsUpdates + } + return nsUpdates +} + +type nsIterator struct { + ns string + nsUpdates *nsUpdates + sortedKeys []string + nextIndex int + lastIndex int +} + +func newNsIterator(ns string, startKey string, endKey string, batch *UpdateBatch) *nsIterator { + nsUpdates, ok := batch.updates[ns] + if !ok { + return &nsIterator{} + } + sortedKeys := util.GetSortedKeys(nsUpdates.m) + var nextIndex int + var lastIndex int + if startKey == "" { + nextIndex = 0 + } else { + nextIndex = sort.SearchStrings(sortedKeys, startKey) + } + if endKey == "" { + lastIndex = len(sortedKeys) + } else { + lastIndex = sort.SearchStrings(sortedKeys, endKey) + } + return &nsIterator{ns, nsUpdates, sortedKeys, nextIndex, lastIndex} +} + +// Next gives next key and versioned value. It returns a nil when exhausted +func (itr *nsIterator) Next() (QueryResult, error) { + if itr.nextIndex >= itr.lastIndex { + return nil, nil + } + key := itr.sortedKeys[itr.nextIndex] + vv := itr.nsUpdates.m[key] + itr.nextIndex++ + return &VersionedKV{CompositeKey{itr.ns, key}, VersionedValue{vv.Value, vv.Version}}, nil +} + +// Close implements the method from QueryResult interface +func (itr *nsIterator) Close() { + // do nothing +} diff --git a/core/ledger/kvledger/txmgmt/statedb/statedb_test.go b/core/ledger/kvledger/txmgmt/statedb/statedb_test.go new file mode 100644 index 00000000000..511a09cfa8a --- /dev/null +++ b/core/ledger/kvledger/txmgmt/statedb/statedb_test.go @@ -0,0 +1,63 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package statedb + +import ( + "testing" + + "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version" + "github.com/hyperledger/fabric/core/ledger/testutil" +) + +func TestUpdateBatchIterator(t *testing.T) { + batch := NewUpdateBatch() + batch.Put("ns1", "key1", []byte("value1"), version.NewHeight(1, 1)) + batch.Put("ns1", "key2", []byte("value2"), version.NewHeight(1, 2)) + batch.Put("ns1", "key3", []byte("value3"), version.NewHeight(1, 3)) + + batch.Put("ns2", "key6", []byte("value6"), version.NewHeight(2, 3)) + batch.Put("ns2", "key5", []byte("value5"), version.NewHeight(2, 2)) + batch.Put("ns2", "key4", []byte("value4"), version.NewHeight(2, 1)) + + checkItrResults(t, batch.GetRangeScanIterator("ns1", "key2", "key3"), []*VersionedKV{ + &VersionedKV{CompositeKey{"ns1", "key2"}, VersionedValue{[]byte("value2"), version.NewHeight(1, 2)}}, + }) + + checkItrResults(t, batch.GetRangeScanIterator("ns2", "key0", "key8"), []*VersionedKV{ + &VersionedKV{CompositeKey{"ns2", "key4"}, VersionedValue{[]byte("value4"), version.NewHeight(2, 1)}}, + &VersionedKV{CompositeKey{"ns2", "key5"}, VersionedValue{[]byte("value5"), version.NewHeight(2, 2)}}, + &VersionedKV{CompositeKey{"ns2", "key6"}, VersionedValue{[]byte("value6"), version.NewHeight(2, 3)}}, + }) + + checkItrResults(t, batch.GetRangeScanIterator("ns2", "", ""), []*VersionedKV{ + &VersionedKV{CompositeKey{"ns2", "key4"}, VersionedValue{[]byte("value4"), version.NewHeight(2, 1)}}, + &VersionedKV{CompositeKey{"ns2", "key5"}, VersionedValue{[]byte("value5"), version.NewHeight(2, 2)}}, + &VersionedKV{CompositeKey{"ns2", "key6"}, VersionedValue{[]byte("value6"), version.NewHeight(2, 3)}}, + }) + + checkItrResults(t, batch.GetRangeScanIterator("non-existing-ns", "", ""), nil) +} + +func checkItrResults(t *testing.T, itr ResultsIterator, expectedResults []*VersionedKV) { + for i := 0; i < len(expectedResults); i++ { + res, _ := itr.Next() + testutil.AssertEquals(t, res, expectedResults[i]) + } + lastRes, err := itr.Next() + testutil.AssertNoError(t, err, "") + testutil.AssertNil(t, lastRes) +} diff --git a/core/ledger/kvledger/txmgmt/statedb/stateleveldb/stateleveldb.go b/core/ledger/kvledger/txmgmt/statedb/stateleveldb/stateleveldb.go index dbc973dbdd9..2b4b08f9315 100644 --- a/core/ledger/kvledger/txmgmt/statedb/stateleveldb/stateleveldb.go +++ b/core/ledger/kvledger/txmgmt/statedb/stateleveldb/stateleveldb.go @@ -128,20 +128,24 @@ func (vdb *versionedDB) ExecuteQuery(query string) (statedb.ResultsIterator, err // ApplyUpdates implements method in VersionedDB interface func (vdb *versionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version.Height) error { dbBatch := leveldbhelper.NewUpdateBatch() - for ck, vv := range batch.KVs { - compositeKey := constructCompositeKey(ck.Namespace, ck.Key) - // trace the first 200 characters of versioned value only, in case it is huge - if logger.IsEnabledFor(logging.DEBUG) { - versionedValueDump := fmt.Sprintf("%#v", vv) - if len(versionedValueDump) > 200 { - versionedValueDump = versionedValueDump[0:200] + "..." + namespaces := batch.GetUpdatedNamespaces() + for _, ns := range namespaces { + updates := batch.GetUpdates(ns) + for k, vv := range updates { + compositeKey := constructCompositeKey(ns, k) + // trace the first 200 characters of versioned value only, in case it is huge + if logger.IsEnabledFor(logging.DEBUG) { + versionedValueDump := fmt.Sprintf("%#v", vv) + if len(versionedValueDump) > 200 { + versionedValueDump = versionedValueDump[0:200] + "..." + } + logger.Debugf("Applying key=%#v, versionedValue=%s", compositeKey, versionedValueDump) + } + if vv.Value == nil { + dbBatch.Delete(compositeKey) + } else { + dbBatch.Put(compositeKey, encodeValue(vv.Value, vv.Version)) } - logger.Debugf("Applying key=%#v, versionedValue=%s", ck, versionedValueDump) - } - if vv.Value == nil { - dbBatch.Delete(compositeKey) - } else { - dbBatch.Put(compositeKey, encodeValue(vv.Value, vv.Version)) } } dbBatch.Put(savePointKey, height.ToBytes()) @@ -197,8 +201,12 @@ func (scanner *kvScanner) Next() (statedb.QueryResult, error) { if !scanner.dbItr.Next() { return nil, nil } - _, key := splitCompositeKey(scanner.dbItr.Key()) - value, version := decodeValue(scanner.dbItr.Value()) + dbKey := scanner.dbItr.Key() + dbVal := scanner.dbItr.Value() + dbValCopy := make([]byte, len(dbVal)) + copy(dbValCopy, dbVal) + _, key := splitCompositeKey(dbKey) + value, version := decodeValue(dbValCopy) return &statedb.VersionedKV{ CompositeKey: statedb.CompositeKey{Namespace: scanner.namespace, Key: key}, VersionedValue: statedb.VersionedValue{Value: value, Version: version}}, nil diff --git a/core/ledger/kvledger/txmgmt/txmgr/commontests/txmgr_test.go b/core/ledger/kvledger/txmgmt/txmgr/commontests/txmgr_test.go index c3bb55377f4..9fe322862de 100644 --- a/core/ledger/kvledger/txmgmt/txmgr/commontests/txmgr_test.go +++ b/core/ledger/kvledger/txmgmt/txmgr/commontests/txmgr_test.go @@ -181,6 +181,75 @@ func testTxValidation(t *testing.T, env testEnv) { */ } +func TestTxPhantomValidation(t *testing.T) { + for _, testEnv := range testEnvs { + t.Logf("Running test for TestEnv = %s", testEnv.getName()) + testEnv.init(t) + testTxPhantomValidation(t, testEnv) + testEnv.cleanup() + } +} + +func testTxPhantomValidation(t *testing.T, env testEnv) { + txMgr := env.getTxMgr() + txMgrHelper := newTxMgrTestHelper(t, txMgr) + // simulate tx1 + s1, _ := txMgr.NewTxSimulator() + s1.SetState("ns", "key1", []byte("value1")) + s1.SetState("ns", "key2", []byte("value2")) + s1.SetState("ns", "key3", []byte("value3")) + s1.SetState("ns", "key4", []byte("value4")) + s1.SetState("ns", "key5", []byte("value5")) + s1.SetState("ns", "key6", []byte("value6")) + s1.Done() + // validate and commit RWset + txRWSet1, _ := s1.GetTxSimulationResults() + txMgrHelper.validateAndCommitRWSet(txRWSet1) + + // simulate tx2 + s2, _ := txMgr.NewTxSimulator() + itr2, _ := s2.GetStateRangeScanIterator("ns", "key2", "key5") + for { + if result, _ := itr2.Next(); result == nil { + break + } + } + s2.DeleteState("ns", "key3") + s2.Done() + txRWSet2, _ := s2.GetTxSimulationResults() + + // simulate tx3 + s3, _ := txMgr.NewTxSimulator() + itr3, _ := s3.GetStateRangeScanIterator("ns", "key2", "key5") + for { + if result, _ := itr3.Next(); result == nil { + break + } + } + s3.SetState("ns", "key3", []byte("value3_new")) + s3.Done() + txRWSet3, _ := s3.GetTxSimulationResults() + + // simulate tx4 + s4, _ := txMgr.NewTxSimulator() + itr4, _ := s4.GetStateRangeScanIterator("ns", "key4", "key6") + for { + if result, _ := itr4.Next(); result == nil { + break + } + } + s4.SetState("ns", "key3", []byte("value3_new")) + s4.Done() + txRWSet4, _ := s4.GetTxSimulationResults() + + // txRWSet2 should be valid + txMgrHelper.validateAndCommitRWSet(txRWSet2) + // txRWSet2 makes txRWSet3 invalid as it deletes a key in the range + txMgrHelper.checkRWsetInvalid(txRWSet3) + // txRWSet4 should be valid as it iterates over a different range + txMgrHelper.validateAndCommitRWSet(txRWSet4) +} + func TestIterator(t *testing.T) { for _, testEnv := range testEnvs { t.Logf("Running test for TestEnv = %s", testEnv.getName()) diff --git a/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/helper.go b/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/helper.go index b2e3eb2dad2..1082cfad2ac 100644 --- a/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/helper.go +++ b/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/helper.go @@ -26,6 +26,7 @@ import ( type queryHelper struct { txmgr *LockBasedTxMgr rwset *rwset.RWSet + itrs []*resultsItr doneInvoked bool } @@ -61,11 +62,12 @@ func (h *queryHelper) getStateMultipleKeys(namespace string, keys []string) ([][ func (h *queryHelper) getStateRangeScanIterator(namespace string, startKey string, endKey string) (ledger.ResultsIterator, error) { h.checkDone() - dbItr, err := h.txmgr.db.GetStateRangeScanIterator(namespace, startKey, endKey) + itr, err := newResultsItr(namespace, startKey, endKey, h.txmgr.db, h.rwset) if err != nil { return nil, err } - return &resultsItr{DBItr: dbItr, RWSet: h.rwset}, nil + h.itrs = append(h.itrs, itr) + return itr, nil } func (h *queryHelper) executeQuery(query string) (ledger.ResultsIterator, error) { @@ -77,8 +79,17 @@ func (h *queryHelper) executeQuery(query string) (ledger.ResultsIterator, error) } func (h *queryHelper) done() { + if h.doneInvoked { + return + } + defer h.txmgr.commitRWLock.RUnlock() h.doneInvoked = true - h.txmgr.commitRWLock.RUnlock() + for _, itr := range h.itrs { + itr.Close() + if h.rwset != nil { + h.rwset.AddToRangeQuerySet(itr.ns, itr.rangeQueryInfo) + } + } } func (h *queryHelper) checkDone() { @@ -87,31 +98,76 @@ func (h *queryHelper) checkDone() { } } +// resultsItr implements interface ledger.ResultsIterator +// this wraps the actual db iterator and intercept the calls +// to build rangeQueryInfo in the ReadWriteSet that is used +// for performing phantom read validation during commit type resultsItr struct { - DBItr statedb.ResultsIterator - RWSet *rwset.RWSet + ns string + endKey string + dbItr statedb.ResultsIterator + rwSet *rwset.RWSet + rangeQueryInfo *rwset.RangeQueryInfo +} + +func newResultsItr(ns string, startKey string, endKey string, db statedb.VersionedDB, rwSet *rwset.RWSet) (*resultsItr, error) { + dbItr, err := db.GetStateRangeScanIterator(ns, startKey, endKey) + if err != nil { + return nil, err + } + // In the range query info, just set the StartKey. + // Set the EndKey later below in the Next() method. + rqInfo := &rwset.RangeQueryInfo{StartKey: startKey} + return &resultsItr{ns, endKey, dbItr, rwSet, rqInfo}, nil } // Next implements method in interface ledger.ResultsIterator +// Before returning the next result, update the EndKey and ItrExhausted in rangeQueryInfo +// If we set the EndKey in the constructor (as we do for the StartKey) to what is +// supplied in the original query, we may be capturing the unnecessary longer range if the +// caller decides to stop iterating at some intermidiate point. Alternatively, we could have +// set the EndKey and ItrExhausted in the Close() function but it may not be desirable to change +// transactional behaviour based on whether the Close() was invoked or not func (itr *resultsItr) Next() (ledger.QueryResult, error) { - - queryResult, err := itr.DBItr.Next() + queryResult, err := itr.dbItr.Next() if err != nil { return nil, err } + itr.updateRangeQueryInfo(queryResult) if queryResult == nil { return nil, nil } versionedKV := queryResult.(*statedb.VersionedKV) - if itr.RWSet != nil { - itr.RWSet.AddToReadSet(versionedKV.Namespace, versionedKV.Key, versionedKV.Version) - } return &ledger.KV{Key: versionedKV.Key, Value: versionedKV.Value}, nil } +// updateRangeQueryInfo updates two attributes of the rangeQueryInfo +// 1) The EndKey - set to either a) latest key that is to be returned to the caller (if the iterator is not exhausted) +// because, we do not know if the caller is again going to invoke Next() or not. +// or b) the last key that was supplied in the original query (if the iterator is exhausted) +// 2) The ItrExhausted - set to true if the iterator is going to return nil as a result of the Next() call +func (itr *resultsItr) updateRangeQueryInfo(queryResult statedb.QueryResult) { + if itr.rwSet == nil { + return + } + + if queryResult == nil { + // caller scanned till the iterator got exhausted. + // So, set the endKey to the actual endKey supplied in the query + itr.rangeQueryInfo.ItrExhausted = true + itr.rangeQueryInfo.EndKey = itr.endKey + return + } + versionedKV := queryResult.(*statedb.VersionedKV) + itr.rangeQueryInfo.AddResult(rwset.NewKVRead(versionedKV.Key, versionedKV.Version)) + // Set the end key to the latest key retrieved by the caller. + // Because, the caller may actually not invoke the Next() function again + itr.rangeQueryInfo.EndKey = versionedKV.Key +} + // Close implements method in interface ledger.ResultsIterator func (itr *resultsItr) Close() { - itr.DBItr.Close() + itr.dbItr.Close() } type queryResultsItr struct { diff --git a/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_tx_simulator.go b/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_tx_simulator.go index 77b9578c56e..a09c94fe68f 100644 --- a/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_tx_simulator.go +++ b/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_tx_simulator.go @@ -67,6 +67,7 @@ func (s *lockBasedTxSimulator) SetStateMultipleKeys(namespace string, kvs map[st // GetTxSimulationResults implements method in interface `ledger.TxSimulator` func (s *lockBasedTxSimulator) GetTxSimulationResults() ([]byte, error) { logger.Debugf("Simulation completed, getting simulation results") + s.Done() return s.rwset.GetTxReadWriteSet().Marshal() } diff --git a/core/ledger/kvledger/txmgmt/validator/statebasedval/combined_iterator.go b/core/ledger/kvledger/txmgmt/validator/statebasedval/combined_iterator.go new file mode 100644 index 00000000000..5d668344578 --- /dev/null +++ b/core/ledger/kvledger/txmgmt/validator/statebasedval/combined_iterator.go @@ -0,0 +1,121 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package statebasedval + +import "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb" +import "strings" + +// combinedIterator implements the interface statedb.ResultsIterator. +// Internally, it maintains two iterators +// - (1) dbItr - an iterator that iterates over keys present in the db +// - (2) updatesItr - an iterator that iterates over keys present in the update batch +// (i.e, the keys that are inserted/updated/deleted by preceding valid transactions +// in the block and to be committed to the db as a part of block commit operation) +// +// This can be used where the caller wants to see what would be the final results of +// iterating over a key range if the modifications of the preceding valid transactions +// were to be applied to the db +// +// This can be used to perfrom validation for phantom reads in a transactions rwset +type combinedIterator struct { + ns string + dbItr statedb.ResultsIterator + updatesItr statedb.ResultsIterator + dbItem statedb.QueryResult + updatesItem statedb.QueryResult +} + +func newCombinedIterator(ns string, dbItr statedb.ResultsIterator, updatesItr statedb.ResultsIterator) (*combinedIterator, error) { + var dbItem, updatesItem statedb.QueryResult + var err error + if dbItem, err = dbItr.Next(); err != nil { + return nil, err + } + if updatesItem, err = updatesItr.Next(); err != nil { + return nil, err + } + logger.Debugf("Combined iterator initialized. dbItem=%#v, updatesItem=%#v", dbItem, updatesItem) + return &combinedIterator{ns, dbItr, updatesItr, dbItem, updatesItem}, nil +} + +// Next returns the KV from either dbItr or updatesItr that gives the next smaller key +// If both gives the same keys, then it returns the KV from updatesItr. +func (itr *combinedIterator) Next() (statedb.QueryResult, error) { + if itr.dbItem == nil && itr.updatesItem == nil { + logger.Debugf("dbItem and updatesItem both are nil. So, returning nil") + return nil, nil + } + var moveDBItr bool + var moveUpdatesItr bool + var selectedItem statedb.QueryResult + compResult := compareKeys(itr.dbItem, itr.updatesItem) + logger.Debugf("compResult=%d", compResult) + switch compResult { + case -1: + // dbItem is smaller + selectedItem = itr.dbItem + moveDBItr = true + case 0: + //both items are same so, choose the updatesItem (latest) + selectedItem = itr.updatesItem + moveUpdatesItr = true + moveDBItr = true + case 1: + // updatesItem is smaller + selectedItem = itr.updatesItem + moveUpdatesItr = true + } + var err error + if moveDBItr { + if itr.dbItem, err = itr.dbItr.Next(); err != nil { + return nil, err + } + } + + if moveUpdatesItr { + if itr.updatesItem, err = itr.updatesItr.Next(); err != nil { + return nil, err + } + } + if isDelete(selectedItem) { + return itr.Next() + } + logger.Debugf("Returning item=%#v. Next dbItem=%#v, Next updatesItem=%#v", selectedItem, itr.dbItem, itr.updatesItem) + return selectedItem, nil +} + +func (itr *combinedIterator) Close() { + itr.dbItr.Close() +} + +func compareKeys(item1 statedb.QueryResult, item2 statedb.QueryResult) int { + if item1 == nil { + if item2 == nil { + return 0 + } + return 1 + } + if item2 == nil { + return -1 + } + // at this stage both items are not nil + return strings.Compare(item1.(*statedb.VersionedKV).Key, item2.(*statedb.VersionedKV).Key) +} + +func isDelete(item statedb.QueryResult) bool { + return item.(*statedb.VersionedKV).Value == nil +} diff --git a/core/ledger/kvledger/txmgmt/validator/statebasedval/combined_iterator_test.go b/core/ledger/kvledger/txmgmt/validator/statebasedval/combined_iterator_test.go new file mode 100644 index 00000000000..acde72d8582 --- /dev/null +++ b/core/ledger/kvledger/txmgmt/validator/statebasedval/combined_iterator_test.go @@ -0,0 +1,99 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package statebasedval + +import ( + "testing" + + "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb" + "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/stateleveldb" + "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version" + "github.com/hyperledger/fabric/core/ledger/testutil" +) + +func TestCombinedIterator(t *testing.T) { + testDBEnv := stateleveldb.NewTestVDBEnv(t) + defer testDBEnv.Cleanup() + + db, err := testDBEnv.DBProvider.GetDBHandle("TestDB") + testutil.AssertNoError(t, err, "") + + //populate db with initial data + batch := statedb.NewUpdateBatch() + batch.Put("ns", "key1", []byte("value1"), version.NewHeight(1, 1)) + batch.Put("ns", "key4", []byte("value4"), version.NewHeight(1, 1)) + batch.Put("ns", "key6", []byte("value6"), version.NewHeight(1, 1)) + db.ApplyUpdates(batch, version.NewHeight(1, 5)) + + // prepare batch1 + batch1 := statedb.NewUpdateBatch() + batch1.Put("ns", "key3", []byte("value3"), version.NewHeight(1, 1)) + batch1.Delete("ns", "key5", version.NewHeight(1, 1)) + batch1.Put("ns", "key6", []byte("value6_new"), version.NewHeight(1, 1)) + batch1.Put("ns", "key7", []byte("value7"), version.NewHeight(1, 1)) + + // prepare batch2 (empty) + batch2 := statedb.NewUpdateBatch() + + // Test db + batch1 updates + dbItr1, _ := db.GetStateRangeScanIterator("ns", "key2", "key8") + itr1, _ := newCombinedIterator("ns", dbItr1, batch1.GetRangeScanIterator("ns", "key2", "key8")) + defer itr1.Close() + + checkItrResults(t, itr1, []*statedb.VersionedKV{ + constructVersionedKV("ns", "key3", []byte("value3"), version.NewHeight(1, 1)), + constructVersionedKV("ns", "key4", []byte("value4"), version.NewHeight(1, 1)), + constructVersionedKV("ns", "key6", []byte("value6_new"), version.NewHeight(1, 1)), + constructVersionedKV("ns", "key7", []byte("value7"), version.NewHeight(1, 1)), + }) + + // Test db + batch2 updates + dbItr2, _ := db.GetStateRangeScanIterator("ns", "key2", "key8") + itr2, _ := newCombinedIterator("ns", dbItr2, batch2.GetRangeScanIterator("ns", "key2", "key8")) + defer itr2.Close() + checkItrResults(t, itr2, []*statedb.VersionedKV{ + constructVersionedKV("ns", "key4", []byte("value4"), version.NewHeight(1, 1)), + constructVersionedKV("ns", "key6", []byte("value6"), version.NewHeight(1, 1)), + }) + + // Test db + batch1 updates with full range query + dbItr3, _ := db.GetStateRangeScanIterator("ns", "", "") + itr3, _ := newCombinedIterator("ns", dbItr3, batch1.GetRangeScanIterator("ns", "", "")) + checkItrResults(t, itr3, []*statedb.VersionedKV{ + constructVersionedKV("ns", "key1", []byte("value1"), version.NewHeight(1, 1)), + constructVersionedKV("ns", "key3", []byte("value3"), version.NewHeight(1, 1)), + constructVersionedKV("ns", "key4", []byte("value4"), version.NewHeight(1, 1)), + constructVersionedKV("ns", "key6", []byte("value6_new"), version.NewHeight(1, 1)), + constructVersionedKV("ns", "key7", []byte("value7"), version.NewHeight(1, 1)), + }) +} + +func checkItrResults(t *testing.T, itr statedb.ResultsIterator, expectedResults []*statedb.VersionedKV) { + for i := 0; i < len(expectedResults); i++ { + res, _ := itr.Next() + testutil.AssertEquals(t, res, expectedResults[i]) + } + lastRes, err := itr.Next() + testutil.AssertNoError(t, err, "") + testutil.AssertNil(t, lastRes) +} + +func constructVersionedKV(ns string, key string, value []byte, version *version.Height) *statedb.VersionedKV { + return &statedb.VersionedKV{ + CompositeKey: statedb.CompositeKey{Namespace: ns, Key: key}, + VersionedValue: statedb.VersionedValue{Value: value, Version: version}} +} diff --git a/core/ledger/kvledger/txmgmt/validator/statebasedval/state_based_validator.go b/core/ledger/kvledger/txmgmt/validator/statebasedval/state_based_validator.go index bbc3d83ae40..ec34cf6fd15 100644 --- a/core/ledger/kvledger/txmgmt/validator/statebasedval/state_based_validator.go +++ b/core/ledger/kvledger/txmgmt/validator/statebasedval/state_based_validator.go @@ -153,24 +153,122 @@ func addWriteSetToBatch(txRWSet *rwset.TxReadWriteSet, txHeight *version.Height, func (v *Validator) validateTx(txRWSet *rwset.TxReadWriteSet, updates *statedb.UpdateBatch) (bool, error) { for _, nsRWSet := range txRWSet.NsRWs { ns := nsRWSet.NameSpace - for _, kvRead := range nsRWSet.Reads { - if updates.Exists(ns, kvRead.Key) { - return false, nil - } - versionedValue, err := v.db.GetState(ns, kvRead.Key) - if err != nil { - return false, nil - } - var committedVersion *version.Height - if versionedValue != nil { - committedVersion = versionedValue.Version - } - if !version.AreSame(committedVersion, kvRead.Version) { - logger.Debugf("Version mismatch for key [%s:%s]. Committed version = [%s], Version in readSet [%s]", - ns, kvRead.Key, committedVersion, kvRead.Version) - return false, nil - } + //TODO introduce different Error codes for different causes of validation failure + if valid, err := v.validateReadSet(ns, nsRWSet.Reads, updates); !valid || err != nil { + return valid, err + } + if valid, err := v.validateRangeQueries(ns, nsRWSet.RangeQueriesInfo, updates); !valid || err != nil { + return valid, err + } + } + return true, nil +} + +func (v *Validator) validateReadSet(ns string, kvReads []*rwset.KVRead, updates *statedb.UpdateBatch) (bool, error) { + for _, kvRead := range kvReads { + if valid, err := v.validateKVRead(ns, kvRead, updates); !valid || err != nil { + return valid, err + } + } + return true, nil +} + +// validateKVRead performs mvcc check for a key read during transaction simulation. +// i.e., it checks whether a key/version combination is already updated in the statedb (by an already committed block) +// or in the updates (by a preceding valid transaction in the current block) +func (v *Validator) validateKVRead(ns string, kvRead *rwset.KVRead, updates *statedb.UpdateBatch) (bool, error) { + if updates.Exists(ns, kvRead.Key) { + return false, nil + } + versionedValue, err := v.db.GetState(ns, kvRead.Key) + if err != nil { + return false, nil + } + var committedVersion *version.Height + if versionedValue != nil { + committedVersion = versionedValue.Version + } + if !version.AreSame(committedVersion, kvRead.Version) { + logger.Debugf("Version mismatch for key [%s:%s]. Committed version = [%s], Version in readSet [%s]", + ns, kvRead.Key, committedVersion, kvRead.Version) + return false, nil + } + return true, nil +} + +func (v *Validator) validateRangeQueries(ns string, rangeQueriesInfo []*rwset.RangeQueryInfo, updates *statedb.UpdateBatch) (bool, error) { + for _, rqi := range rangeQueriesInfo { + if valid, err := v.validateRangeQuery(ns, rqi, updates); !valid || err != nil { + return valid, err + } + } + return true, nil +} + +// validateRangeQuery performs a phatom read check i.e., it +// checks whether the results of the range query are still the same when executed on the +// statedb (latest state as of last committed block) + updates (prepared by the writes of preceding valid transactions +// in the current block and yet to be committed as part of group commit at the end of the validation of the block) +func (v *Validator) validateRangeQuery(ns string, rangeQueryInfo *rwset.RangeQueryInfo, updates *statedb.UpdateBatch) (bool, error) { + logger.Debugf("validateRangeQuery: ns=%s, rangeQueryInfo=%s", ns, rangeQueryInfo) + var dbItr statedb.ResultsIterator + var updatesItr statedb.ResultsIterator + var combinedItr statedb.ResultsIterator + var err error + if dbItr, err = v.db.GetStateRangeScanIterator(ns, rangeQueryInfo.StartKey, rangeQueryInfo.EndKey); err != nil { + return false, err + } + updatesItr = updates.GetRangeScanIterator(ns, rangeQueryInfo.StartKey, rangeQueryInfo.EndKey) + if combinedItr, err = newCombinedIterator(ns, dbItr, updatesItr); err != nil { + return false, err + } + defer combinedItr.Close() + rqResults := rangeQueryInfo.GetResults() + lastIndexToVerify := len(rqResults) - 1 + + if !rangeQueryInfo.ItrExhausted { + // During simulation the caller had not exhausted the iterator so + // rangeQueryInfo.EndKey is not actual endKey given by the caller in the range query. + // Leveldb results exclude the endkey so just iterate one short of the results present + // in the rangeQueryInfo. Check for the last result explicitly + lastIndexToVerify-- + logger.Debugf("Checking last result") + if valid, err := v.validateKVRead(ns, rqResults[len(rqResults)-1], updates); !valid || err != nil { + return valid, err + } + } + + var queryResponse statedb.QueryResult + // Iterate over sorted results in the rangeQueryInfo and compare + // with the results retruned by the combined iterator and return false at first mismatch + for i := 0; i <= lastIndexToVerify; i++ { + kvRead := rqResults[i] + if queryResponse, err = combinedItr.Next(); err != nil { + return false, err } + logger.Debugf("comparing kvRead=[%#v] to queryResponse=[%#v]", kvRead, queryResponse) + if queryResponse == nil { + logger.Debugf("Query response nil. Key [%s] got deleted", kvRead.Key) + return false, nil + } + versionedKV := queryResponse.(*statedb.VersionedKV) + if versionedKV.Key != kvRead.Key { + logger.Debugf("key name mismatch: Key in rwset = [%s], key in query results = [%s]", kvRead.Key, versionedKV.Key) + return false, nil + } + if !version.AreSame(versionedKV.Version, kvRead.Version) { + logger.Debugf(`Version mismatch for key [%s]: Version in rwset = [%#v], latest version = [%#v]`, + versionedKV.Key, versionedKV.Version, kvRead.Version) + return false, nil + } + } + if queryResponse, err = combinedItr.Next(); err != nil { + return false, err + } + if queryResponse != nil { + // iterator is not exhausted - which means that there are extra results in the given range + logger.Debugf("Extra result = [%#v]", queryResponse) + return false, nil } return true, nil } diff --git a/core/ledger/kvledger/txmgmt/validator/statebasedval/state_based_validator_test.go b/core/ledger/kvledger/txmgmt/validator/statebasedval/state_based_validator_test.go index 0ca538919c8..2a7449905b3 100644 --- a/core/ledger/kvledger/txmgmt/validator/statebasedval/state_based_validator_test.go +++ b/core/ledger/kvledger/txmgmt/validator/statebasedval/state_based_validator_test.go @@ -78,6 +78,72 @@ func TestValidator(t *testing.T) { checkValidation(t, validator, []*rwset.RWSet{rwset4, rwset5}, []int{1}) } +func TestPhantomValidation(t *testing.T) { + testDBEnv := stateleveldb.NewTestVDBEnv(t) + defer testDBEnv.Cleanup() + + db, err := testDBEnv.DBProvider.GetDBHandle("TestDB") + testutil.AssertNoError(t, err, "") + + //populate db with initial data + batch := statedb.NewUpdateBatch() + batch.Put("ns1", "key1", []byte("value1"), version.NewHeight(1, 1)) + batch.Put("ns1", "key2", []byte("value2"), version.NewHeight(1, 2)) + batch.Put("ns1", "key3", []byte("value3"), version.NewHeight(1, 3)) + batch.Put("ns1", "key4", []byte("value4"), version.NewHeight(1, 4)) + batch.Put("ns1", "key5", []byte("value5"), version.NewHeight(1, 5)) + db.ApplyUpdates(batch, version.NewHeight(1, 5)) + + validator := NewValidator(db) + + //rwset1 should be valid + rwset1 := rwset.NewRWSet() + rqi1 := &rwset.RangeQueryInfo{StartKey: "key2", EndKey: "key4", ItrExhausted: true} + rqi1.AddResult(rwset.NewKVRead("key2", version.NewHeight(1, 2))) + rqi1.AddResult(rwset.NewKVRead("key3", version.NewHeight(1, 3))) + rwset1.AddToRangeQuerySet("ns1", rqi1) + checkValidation(t, validator, []*rwset.RWSet{rwset1}, []int{}) + + //rwset2 should not be valid - Version of key4 changed + rwset2 := rwset.NewRWSet() + rqi2 := &rwset.RangeQueryInfo{StartKey: "key2", EndKey: "key4", ItrExhausted: false} + rqi2.AddResult(rwset.NewKVRead("key2", version.NewHeight(1, 2))) + rqi2.AddResult(rwset.NewKVRead("key3", version.NewHeight(1, 3))) + rqi2.AddResult(rwset.NewKVRead("key4", version.NewHeight(1, 3))) + rwset2.AddToRangeQuerySet("ns1", rqi2) + checkValidation(t, validator, []*rwset.RWSet{rwset2}, []int{1}) + + //rwset3 should not be valid - simulate key3 got commited to db + rwset3 := rwset.NewRWSet() + rqi3 := &rwset.RangeQueryInfo{StartKey: "key2", EndKey: "key4", ItrExhausted: false} + rqi3.AddResult(rwset.NewKVRead("key2", version.NewHeight(1, 2))) + rqi3.AddResult(rwset.NewKVRead("key4", version.NewHeight(1, 4))) + rwset3.AddToRangeQuerySet("ns1", rqi3) + checkValidation(t, validator, []*rwset.RWSet{rwset3}, []int{1}) + + // //Remove a key in rwset4 and rwset5 should become invalid + rwset4 := rwset.NewRWSet() + rwset4.AddToWriteSet("ns1", "key3", nil) + rwset5 := rwset.NewRWSet() + rqi5 := &rwset.RangeQueryInfo{StartKey: "key2", EndKey: "key4", ItrExhausted: false} + rqi5.AddResult(rwset.NewKVRead("key2", version.NewHeight(1, 2))) + rqi5.AddResult(rwset.NewKVRead("key3", version.NewHeight(1, 3))) + rqi5.AddResult(rwset.NewKVRead("key4", version.NewHeight(1, 4))) + rwset5.AddToRangeQuerySet("ns1", rqi5) + checkValidation(t, validator, []*rwset.RWSet{rwset4, rwset5}, []int{1}) + + //Add a key in rwset6 and rwset7 should become invalid + rwset6 := rwset.NewRWSet() + rwset6.AddToWriteSet("ns1", "key2_1", []byte("value2_1")) + rwset7 := rwset.NewRWSet() + rqi7 := &rwset.RangeQueryInfo{StartKey: "key2", EndKey: "key4", ItrExhausted: false} + rqi7.AddResult(rwset.NewKVRead("key2", version.NewHeight(1, 2))) + rqi7.AddResult(rwset.NewKVRead("key3", version.NewHeight(1, 3))) + rqi7.AddResult(rwset.NewKVRead("key4", version.NewHeight(1, 4))) + rwset7.AddToRangeQuerySet("ns1", rqi7) + checkValidation(t, validator, []*rwset.RWSet{rwset6, rwset7}, []int{1}) +} + func checkValidation(t *testing.T, validator *Validator, rwsets []*rwset.RWSet, invalidTxIndexes []int) { simulationResults := [][]byte{} for _, rwset := range rwsets { diff --git a/core/ledger/util/util.go b/core/ledger/util/util.go index fdc768a7e2f..37ecfbbda1a 100644 --- a/core/ledger/util/util.go +++ b/core/ledger/util/util.go @@ -19,6 +19,8 @@ package util import ( "encoding/binary" "fmt" + "reflect" + "sort" "github.com/golang/protobuf/proto" ) @@ -60,3 +62,15 @@ func DecodeOrderPreservingVarUint64(bytes []byte) (uint64, int) { numBytesConsumed := size + 1 return binary.BigEndian.Uint64(decodedBytes), numBytesConsumed } + +// GetSortedKeys returns the keys of the map in a sorted order. This function assumes that the keys are string +func GetSortedKeys(m interface{}) []string { + mapVal := reflect.ValueOf(m) + keyVals := mapVal.MapKeys() + keys := []string{} + for _, keyVal := range keyVals { + keys = append(keys, keyVal.String()) + } + sort.Strings(keys) + return keys +}