Skip to content

Commit

Permalink
Detect phantom items during validation
Browse files Browse the repository at this point in the history
https://jira.hyperledger.org/browse/FAB-1668

Phantom reads problem:
-----------------------------
If a transaction executes a key range query (say key_1 - key_n) leveldb
serves all the keys in the given range in sorted order of keys.
Between simulation and validation of a transaction, some other transaction
may insert one or more keys in this range. Now, this change cannot
be detected by merely checking the versions of the keys present in
the read-set of the transaction.
(though, this check can detect updates and deletes).

A serializable isolation level does not permit phantom reads.

Situations where the phantom reads may not be acceptable:
-----------------------------
 - A chaincode may take a decision based on aggregates of the results of
the range query such as min/max/avg/sum etc. This would be affected
if a new entry gets added to the range query results.

 - A chaincode may want to change all the marbles
owned by a specific user to blue in a tran `t1`. The chaincode may do so
by performing a range query to get all the marbles for the user and
change their color to blue. Now, if a new white marble get added to
the user's assets by an another transaction `t2`,
`t1` should be marked as invalid.

Solution in this change-set:
-----------------------------
For solving this, we can capture the details of the range query
(the query and it's results) during the simulation time and
re-execute the query during validation time and compare the results.
If the results (keys and versions) are still the same,
we can safely commit the transaction.

This changes set introduces
 - Structures for capturing the range query details in rwset
 - Detection of phantom items (insertion/deletion)
   that affect the results of the range query during validation
 - Mark the transactions that are affected by the phantom items
   as invalid to ensure serializability in the presence of range queries

Change-Id: I909841a0234c37795ad7a4ffcca8a9ebd9c9f994
Signed-off-by: manish <manish.sethi@gmail.com>
  • Loading branch information
manish-sethi committed Jan 24, 2017
1 parent 230f3cc commit 1637217
Show file tree
Hide file tree
Showing 16 changed files with 1,083 additions and 127 deletions.
156 changes: 147 additions & 9 deletions core/ledger/kvledger/txmgmt/rwset/rwset.go
Expand Up @@ -54,11 +54,38 @@ func (w *KVWrite) SetValue(value []byte) {
w.IsDelete = value == nil
}

// RangeQueryInfo captures a range query executed by a transaction
// and the tuples <key,version> that are read by the transaction
// This it to be used to perform a phantom-read validation during commit
type RangeQueryInfo struct {
StartKey string
EndKey string
ItrExhausted bool
results []*KVRead
resultHash []byte
}

// AddResult appends the result
func (rqi *RangeQueryInfo) AddResult(kvRead *KVRead) {
rqi.results = append(rqi.results, kvRead)
}

// GetResults returns the results of the range query
func (rqi *RangeQueryInfo) GetResults() []*KVRead {
return rqi.results
}

// GetResultHash returns the resultHash
func (rqi *RangeQueryInfo) GetResultHash() []byte {
return rqi.resultHash
}

// NsReadWriteSet - a collection of all the reads and writes that belong to a common namespace
type NsReadWriteSet struct {
NameSpace string
Reads []*KVRead
Writes []*KVWrite
NameSpace string
Reads []*KVRead
Writes []*KVWrite
RangeQueriesInfo []*RangeQueryInfo
}

// TxReadWriteSet - a collection of all the reads and writes collected as a result of a transaction simulation
Expand Down Expand Up @@ -97,6 +124,79 @@ func (r *KVRead) Unmarshal(buf *proto.Buffer) error {
return nil
}

// Marshal serializes a `RangeQueryInfo`
func (rqi *RangeQueryInfo) Marshal(buf *proto.Buffer) error {
if err := buf.EncodeStringBytes(rqi.StartKey); err != nil {
return err
}
if err := buf.EncodeStringBytes(rqi.EndKey); err != nil {
return err
}

itrExhausedMarker := 0 // iterator did not get exhausted
if rqi.ItrExhausted {
itrExhausedMarker = 1
}
if err := buf.EncodeVarint(uint64(itrExhausedMarker)); err != nil {
return err
}

if err := buf.EncodeVarint(uint64(len(rqi.results))); err != nil {
return err
}
for i := 0; i < len(rqi.results); i++ {
if err := rqi.results[i].Marshal(buf); err != nil {
return err
}
}
if err := buf.EncodeRawBytes(rqi.resultHash); err != nil {
return err
}
return nil
}

// Unmarshal deserializes a `RangeQueryInfo`
func (rqi *RangeQueryInfo) Unmarshal(buf *proto.Buffer) error {
var err error
var numResults uint64
var itrExhaustedMarker uint64

if rqi.StartKey, err = buf.DecodeStringBytes(); err != nil {
return err
}
if rqi.EndKey, err = buf.DecodeStringBytes(); err != nil {
return err
}
if itrExhaustedMarker, err = buf.DecodeVarint(); err != nil {
return err
}
if itrExhaustedMarker == 1 {
rqi.ItrExhausted = true
} else {
rqi.ItrExhausted = false
}
if numResults, err = buf.DecodeVarint(); err != nil {
return err
}
if numResults > 0 {
rqi.results = make([]*KVRead, int(numResults))
}
for i := 0; i < int(numResults); i++ {
kvRead := &KVRead{}
if err := kvRead.Unmarshal(buf); err != nil {
return err
}
rqi.results[i] = kvRead
}
if rqi.resultHash, err = buf.DecodeRawBytes(false); err != nil {
return err
}
if len(rqi.resultHash) == 0 {
rqi.resultHash = nil
}
return nil
}

// Marshal serializes a `KVWrite`
func (w *KVWrite) Marshal(buf *proto.Buffer) error {
var err error
Expand Down Expand Up @@ -148,13 +248,25 @@ func (nsRW *NsReadWriteSet) Marshal(buf *proto.Buffer) error {
return err
}
for i := 0; i < len(nsRW.Reads); i++ {
nsRW.Reads[i].Marshal(buf)
if err = nsRW.Reads[i].Marshal(buf); err != nil {
return err
}
}
if err = buf.EncodeVarint(uint64(len(nsRW.Writes))); err != nil {
return err
}
for i := 0; i < len(nsRW.Writes); i++ {
nsRW.Writes[i].Marshal(buf)
if err = nsRW.Writes[i].Marshal(buf); err != nil {
return err
}
}
if err = buf.EncodeVarint(uint64(len(nsRW.RangeQueriesInfo))); err != nil {
return err
}
for i := 0; i < len(nsRW.RangeQueriesInfo); i++ {
if err = nsRW.RangeQueriesInfo[i].Marshal(buf); err != nil {
return err
}
}
return nil
}
Expand Down Expand Up @@ -188,6 +300,18 @@ func (nsRW *NsReadWriteSet) Unmarshal(buf *proto.Buffer) error {
}
nsRW.Writes = append(nsRW.Writes, w)
}

var numRangeQueriesInfo uint64
if numRangeQueriesInfo, err = buf.DecodeVarint(); err != nil {
return err
}
for i := 0; i < int(numRangeQueriesInfo); i++ {
rqInfo := &RangeQueryInfo{}
if err = rqInfo.Unmarshal(buf); err != nil {
return err
}
nsRW.RangeQueriesInfo = append(nsRW.RangeQueriesInfo, rqInfo)
}
return nil
}

Expand Down Expand Up @@ -234,18 +358,32 @@ func (w *KVWrite) String() string {
return fmt.Sprintf("%s=[%#v]", w.Key, w.Value)
}

// String prints a range query info
func (rqi *RangeQueryInfo) String() string {
return fmt.Sprintf("StartKey=%s, EndKey=%s, ItrExhausted=%t, Results=%#v, Hash=%#v",
rqi.StartKey, rqi.EndKey, rqi.ItrExhausted, rqi.results, rqi.resultHash)
}

// String prints a `NsReadWriteSet`
func (nsRW *NsReadWriteSet) String() string {
var buffer bytes.Buffer
buffer.WriteString("ReadSet~")
buffer.WriteString("ReadSet=\n")
for _, r := range nsRW.Reads {
buffer.WriteString("\t")
buffer.WriteString(r.String())
buffer.WriteString(",")
buffer.WriteString("\n")
}
buffer.WriteString("WriteSet~")
buffer.WriteString("WriteSet=\n")
for _, w := range nsRW.Writes {
buffer.WriteString("\t")
buffer.WriteString(w.String())
buffer.WriteString(",")
buffer.WriteString("\n")
}
buffer.WriteString("RangeQueriesInfo=\n")
for _, rqi := range nsRW.RangeQueriesInfo {
buffer.WriteString("\t")
buffer.WriteString(rqi.String())
buffer.WriteString("\n")
}
return buffer.String()
}
Expand Down
54 changes: 35 additions & 19 deletions core/ledger/kvledger/txmgmt/rwset/rwset_holder.go
Expand Up @@ -17,21 +17,28 @@ limitations under the License.
package rwset

import (
"reflect"

"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
"github.com/hyperledger/fabric/core/ledger/util"
logging "github.com/op/go-logging"
)

var logger = logging.MustGetLogger("rwset")

type nsRWs struct {
readMap map[string]*KVRead
writeMap map[string]*KVWrite
readMap map[string]*KVRead //for mvcc validation
writeMap map[string]*KVWrite
rangeQueriesMap map[rangeQueryKey]*RangeQueryInfo //for phantom read validation
rangeQueriesKeys []rangeQueryKey
}

func newNsRWs() *nsRWs {
return &nsRWs{make(map[string]*KVRead), make(map[string]*KVWrite)}
return &nsRWs{make(map[string]*KVRead), make(map[string]*KVWrite), make(map[rangeQueryKey]*RangeQueryInfo), nil}
}

type rangeQueryKey struct {
startKey string
endKey string
itrExhausted bool
}

// RWSet maintains the read-write set
Expand All @@ -56,6 +63,17 @@ func (rws *RWSet) AddToWriteSet(ns string, key string, value []byte) {
nsRWs.writeMap[key] = NewKVWrite(key, value)
}

// AddToRangeQuerySet adds a range query info for performing phantom read validation
func (rws *RWSet) AddToRangeQuerySet(ns string, rqi *RangeQueryInfo) {
nsRWs := rws.getOrCreateNsRW(ns)
key := rangeQueryKey{rqi.StartKey, rqi.EndKey, rqi.ItrExhausted}
_, ok := nsRWs.rangeQueriesMap[key]
if !ok {
nsRWs.rangeQueriesMap[key] = rqi
nsRWs.rangeQueriesKeys = append(nsRWs.rangeQueriesKeys, key)
}
}

// GetFromWriteSet return the value of a key from the write-set
func (rws *RWSet) GetFromWriteSet(ns string, key string) ([]byte, bool) {
nsRWs, ok := rws.rwMap[ns]
Expand All @@ -73,24 +91,32 @@ func (rws *RWSet) GetFromWriteSet(ns string, key string) ([]byte, bool) {
// GetTxReadWriteSet returns the read-write set in the form that can be serialized
func (rws *RWSet) GetTxReadWriteSet() *TxReadWriteSet {
txRWSet := &TxReadWriteSet{}
sortedNamespaces := getSortedKeys(rws.rwMap)
sortedNamespaces := util.GetSortedKeys(rws.rwMap)
for _, ns := range sortedNamespaces {
//Get namespace specific read-writes
nsReadWriteMap := rws.rwMap[ns]

//add read set
reads := []*KVRead{}
sortedReadKeys := getSortedKeys(nsReadWriteMap.readMap)
sortedReadKeys := util.GetSortedKeys(nsReadWriteMap.readMap)
for _, key := range sortedReadKeys {
reads = append(reads, nsReadWriteMap.readMap[key])
}

//add write set
writes := []*KVWrite{}
sortedWriteKeys := getSortedKeys(nsReadWriteMap.writeMap)
sortedWriteKeys := util.GetSortedKeys(nsReadWriteMap.writeMap)
for _, key := range sortedWriteKeys {
writes = append(writes, nsReadWriteMap.writeMap[key])
}
nsRWs := &NsReadWriteSet{NameSpace: ns, Reads: reads, Writes: writes}

//add range query info
rangeQueriesInfo := []*RangeQueryInfo{}
rangeQueriesMap := nsReadWriteMap.rangeQueriesMap
for _, key := range nsReadWriteMap.rangeQueriesKeys {
rangeQueriesInfo = append(rangeQueriesInfo, rangeQueriesMap[key])
}
nsRWs := &NsReadWriteSet{NameSpace: ns, Reads: reads, Writes: writes, RangeQueriesInfo: rangeQueriesInfo}
txRWSet.NsRWs = append(txRWSet.NsRWs, nsRWs)
}
return txRWSet
Expand All @@ -105,13 +131,3 @@ func (rws *RWSet) getOrCreateNsRW(ns string) *nsRWs {
}
return nsRWs
}

func getSortedKeys(m interface{}) []string {
mapVal := reflect.ValueOf(m)
keyVals := mapVal.MapKeys()
keys := []string{}
for _, keyVal := range keyVals {
keys = append(keys, keyVal.String())
}
return keys
}
68 changes: 68 additions & 0 deletions core/ledger/kvledger/txmgmt/rwset/rwset_holder_test.go
@@ -0,0 +1,68 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package rwset

import (
"testing"

"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
"github.com/hyperledger/fabric/core/ledger/testutil"
)

func TestRWSetHolder(t *testing.T) {
rwSet := NewRWSet()

rwSet.AddToReadSet("ns1", "key2", version.NewHeight(1, 2))
rwSet.AddToReadSet("ns1", "key1", version.NewHeight(1, 1))
rwSet.AddToWriteSet("ns1", "key2", []byte("value2"))

rqi1 := &RangeQueryInfo{"bKey", "", false, nil, nil}
rqi1.EndKey = "eKey"
rqi1.results = []*KVRead{NewKVRead("bKey1", version.NewHeight(2, 3)), NewKVRead("bKey2", version.NewHeight(2, 4))}
rqi1.ItrExhausted = true
rwSet.AddToRangeQuerySet("ns1", rqi1)

rqi2 := &RangeQueryInfo{"bKey", "", false, nil, nil}
rqi2.EndKey = "eKey"
rqi2.results = []*KVRead{NewKVRead("bKey1", version.NewHeight(2, 3)), NewKVRead("bKey2", version.NewHeight(2, 4))}
rqi2.ItrExhausted = true
rwSet.AddToRangeQuerySet("ns1", rqi2)

rqi3 := &RangeQueryInfo{"bKey", "", true, nil, nil}
rwSet.AddToRangeQuerySet("ns1", rqi3)
rqi3.EndKey = "eKey1"
rqi3.results = []*KVRead{NewKVRead("bKey1", version.NewHeight(2, 3)), NewKVRead("bKey2", version.NewHeight(2, 4))}

rwSet.AddToReadSet("ns2", "key2", version.NewHeight(1, 2))
rwSet.AddToWriteSet("ns2", "key3", []byte("value3"))

txRWSet := rwSet.GetTxReadWriteSet()

ns1RWSet := &NsReadWriteSet{"ns1",
[]*KVRead{&KVRead{"key1", version.NewHeight(1, 1)}, &KVRead{"key2", version.NewHeight(1, 2)}},
[]*KVWrite{&KVWrite{"key2", false, []byte("value2")}},
[]*RangeQueryInfo{rqi1, rqi3}}

ns2RWSet := &NsReadWriteSet{"ns2",
[]*KVRead{&KVRead{"key2", version.NewHeight(1, 2)}},
[]*KVWrite{&KVWrite{"key3", false, []byte("value3")}},
[]*RangeQueryInfo{}}

expectedTxRWSet := &TxReadWriteSet{[]*NsReadWriteSet{ns1RWSet, ns2RWSet}}
t.Logf("Actual=%s\n Expected=%s", txRWSet, expectedTxRWSet)
testutil.AssertEquals(t, txRWSet, expectedTxRWSet)
}

0 comments on commit 1637217

Please sign in to comment.