Skip to content

Commit

Permalink
Fix historydb issue for keys containing nil bytes
Browse files Browse the repository at this point in the history
historydb code assumes that the keys does not contain a nil byte.
In the presence of such keys, a historydb query can behave in an
unpredictable way that ranges from including the wrong results
to a panic crash.

The main issue is that when the keys have nil bytes, additional
keys fall in the results of the range query that is formed for
scanning the history of a key. This CR provides a simple fix
in which these non-relevant results will be skipped.

Though, ideally, the keys layout should be such that the non-relevant
results of a scan should be zero, however, that solution requires a
change in key formats and will require a rebuild of the historydb
and hence not suitable for an interim release and will potentially
be provided with 2.0 release.

done #FAB-11244

Change-Id: I843f1babc2673d3a38c47e9e04a660203180a3de
Signed-off-by: manish <manish.sethi@gmail.com>
  • Loading branch information
manish-sethi authored and mastersingh24 committed Sep 27, 2018
1 parent 620392c commit e40d9c7
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 43 deletions.
11 changes: 6 additions & 5 deletions core/ledger/kvledger/history/historydb/histmgr_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@ import (
"github.com/hyperledger/fabric/common/ledger/util"
)

var compositeKeySep = []byte{0x00}
// CompositeKeySep is a nil byte used as a separator between different components of a composite key
var CompositeKeySep = []byte{0x00}

//ConstructCompositeHistoryKey builds the History Key of namespace~key~blocknum~trannum
// using an order preserving encoding so that history query results are ordered by height
func ConstructCompositeHistoryKey(ns string, key string, blocknum uint64, trannum uint64) []byte {

var compositeKey []byte
compositeKey = append(compositeKey, []byte(ns)...)
compositeKey = append(compositeKey, compositeKeySep...)
compositeKey = append(compositeKey, CompositeKeySep...)
compositeKey = append(compositeKey, []byte(key)...)
compositeKey = append(compositeKey, compositeKeySep...)
compositeKey = append(compositeKey, CompositeKeySep...)
compositeKey = append(compositeKey, util.EncodeOrderPreservingVarUint64(blocknum)...)
compositeKey = append(compositeKey, util.EncodeOrderPreservingVarUint64(trannum)...)

Expand All @@ -44,9 +45,9 @@ func ConstructCompositeHistoryKey(ns string, key string, blocknum uint64, trannu
func ConstructPartialCompositeHistoryKey(ns string, key string, endkey bool) []byte {
var compositeKey []byte
compositeKey = append(compositeKey, []byte(ns)...)
compositeKey = append(compositeKey, compositeKeySep...)
compositeKey = append(compositeKey, CompositeKeySep...)
compositeKey = append(compositeKey, []byte(key)...)
compositeKey = append(compositeKey, compositeKeySep...)
compositeKey = append(compositeKey, CompositeKeySep...)
if endkey {
compositeKey = append(compositeKey, []byte{0xff}...)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/hyperledger/fabric/common/ledger/testutil"
)

var strKeySep = string(compositeKeySep)
var strKeySep = string(CompositeKeySep)

func TestConstructCompositeKey(t *testing.T) {
compositeKey := ConstructCompositeHistoryKey("ns1", "key1", 1, 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package historyleveldb

import (
"bytes"
"errors"

commonledger "github.com/hyperledger/fabric/common/ledger"
Expand Down Expand Up @@ -69,32 +70,53 @@ func newHistoryScanner(compositePartialKey []byte, namespace string, key string,
}

func (scanner *historyScanner) Next() (commonledger.QueryResult, error) {
if !scanner.dbItr.Next() {
return nil, nil
for {
if !scanner.dbItr.Next() {
return nil, nil
}
historyKey := scanner.dbItr.Key() // history key is in the form namespace~key~blocknum~trannum

// SplitCompositeKey(namespace~key~blocknum~trannum, namespace~key~) will return the blocknum~trannum in second position
_, blockNumTranNumBytes := historydb.SplitCompositeHistoryKey(historyKey, scanner.compositePartialKey)

// check that blockNumTranNumBytes does not contain a nil byte (FAB-11244) - except the last byte.
// if this contains a nil byte that indicate that its a different key other than the one we are
// scanning the history for. However, the last byte can be nil even for the valid key (indicating the transaction numer being zero)
// This is because, if 'blockNumTranNumBytes' really is the suffix of the desired key - only possibility of this containing a nil byte
// is the last byte when the transaction number in blockNumTranNumBytes is zero).
// On the other hand, if 'blockNumTranNumBytes' really is NOT the suffix of the desired key, then this has to be a prefix
// of some other key (other than the desired key) and in this case, there has to be at least one nil byte (other than the last byte),
// for the 'last' CompositeKeySep in the composite key
// Take an example of two keys "key" and "key\x00" in a namespace ns. The entries for these keys will be
// of type "ns-\x00-key-\x00-blkNumTranNumBytes" and ns-\x00-key-\x00-\x00-blkNumTranNumBytes respectively.
// "-" in above examples are just for readability. Further, when scanning the range
// {ns-\x00-key-\x00 - ns-\x00-key-xff} for getting the history for <ns, key>, the entry for the other key
// falls in the range and needs to be ignored
if bytes.Contains(blockNumTranNumBytes[:len(blockNumTranNumBytes)-1], historydb.CompositeKeySep) {
logger.Debugf("Some other key [%#v] found in the range while scanning history for key [%#v]. Skipping...",
historyKey, scanner.key)
continue
}
blockNum, bytesConsumed := util.DecodeOrderPreservingVarUint64(blockNumTranNumBytes[0:])
tranNum, _ := util.DecodeOrderPreservingVarUint64(blockNumTranNumBytes[bytesConsumed:])
logger.Debugf("Found history record for namespace:%s key:%s at blockNumTranNum %v:%v\n",
scanner.namespace, scanner.key, blockNum, tranNum)

// Get the transaction from block storage that is associated with this history record
tranEnvelope, err := scanner.blockStore.RetrieveTxByBlockNumTranNum(blockNum, tranNum)
if err != nil {
return nil, err
}

// Get the txid, key write value, timestamp, and delete indicator associated with this transaction
queryResult, err := getKeyModificationFromTran(tranEnvelope, scanner.namespace, scanner.key)
if err != nil {
return nil, err
}
logger.Debugf("Found historic key value for namespace:%s key:%s from transaction %s\n",
scanner.namespace, scanner.key, queryResult.(*queryresult.KeyModification).TxId)
return queryResult, nil
}
historyKey := scanner.dbItr.Key() // history key is in the form namespace~key~blocknum~trannum

// SplitCompositeKey(namespace~key~blocknum~trannum, namespace~key~) will return the blocknum~trannum in second position
_, blockNumTranNumBytes := historydb.SplitCompositeHistoryKey(historyKey, scanner.compositePartialKey)
blockNum, bytesConsumed := util.DecodeOrderPreservingVarUint64(blockNumTranNumBytes[0:])
tranNum, _ := util.DecodeOrderPreservingVarUint64(blockNumTranNumBytes[bytesConsumed:])
logger.Debugf("Found history record for namespace:%s key:%s at blockNumTranNum %v:%v\n",
scanner.namespace, scanner.key, blockNum, tranNum)

// Get the transaction from block storage that is associated with this history record
tranEnvelope, err := scanner.blockStore.RetrieveTxByBlockNumTranNum(blockNum, tranNum)
if err != nil {
return nil, err
}

// Get the txid, key write value, timestamp, and delete indicator associated with this transaction
queryResult, err := getKeyModificationFromTran(tranEnvelope, scanner.namespace, scanner.key)
if err != nil {
return nil, err
}
logger.Debugf("Found historic key value for namespace:%s key:%s from transaction %s\n",
scanner.namespace, scanner.key, queryResult.(*queryresult.KeyModification).TxId)
return queryResult, nil
}

func (scanner *historyScanner) Close() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,7 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Copyright IBM Corp. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
SPDX-License-Identifier: Apache-2.0
*/

package historyleveldb
Expand All @@ -22,6 +12,7 @@ import (
"testing"

configtxtest "github.com/hyperledger/fabric/common/configtx/test"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/testutil"
util2 "github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/ledger"
Expand All @@ -34,6 +25,8 @@ import (

func TestMain(m *testing.M) {
viper.Set("peer.fileSystemPath", "/tmp/fabric/ledgertests/kvledger/history/historydb/historyleveldb")
flogging.SetModuleLevel("leveldbhelper", "debug")
flogging.SetModuleLevel("historyleveldb", "debug")
os.Exit(m.Run())
}

Expand Down Expand Up @@ -266,3 +259,81 @@ func TestGenesisBlockNoError(t *testing.T) {
err = env.testHistoryDB.Commit(block)
testutil.AssertNoError(t, err, "")
}

// TestHistoryWithKeyContainingNilBytes tests historydb when keys contains nil bytes (FAB-11244) -
// which happens to be used as a separator in the composite keys that is formed for the entries in the historydb
func TestHistoryWithKeyContainingNilBytes(t *testing.T) {
env := newTestHistoryEnv(t)
defer env.cleanup()
provider := env.testBlockStorageEnv.provider
ledger1id := "ledger1"
store1, err := provider.OpenBlockStore(ledger1id)
testutil.AssertNoError(t, err, "Error upon provider.OpenBlockStore()")
defer store1.Shutdown()

bg, gb := testutil.NewBlockGenerator(t, ledger1id, false)
testutil.AssertNoError(t, store1.AddBlock(gb), "")
testutil.AssertNoError(t, env.testHistoryDB.Commit(gb), "")

//block1
txid := util2.GenerateUUID()
simulator, _ := env.txmgr.NewTxSimulator(txid)
simulator.SetState("ns1", "key", []byte("value1")) // add a key <key> that contains no nil byte
simulator.Done()
simRes, _ := simulator.GetTxSimulationResults()
pubSimResBytes, _ := simRes.GetPubSimulationBytes()
block1 := bg.NextBlock([][]byte{pubSimResBytes})
err = store1.AddBlock(block1)
testutil.AssertNoError(t, err, "")
err = env.testHistoryDB.Commit(block1)
testutil.AssertNoError(t, err, "")

//block2 tran1
simulationResults := [][]byte{}
txid = util2.GenerateUUID()
simulator, _ = env.txmgr.NewTxSimulator(txid)
simulator.SetState("ns1", "key", []byte("value2")) // add another value for the key <key>
simulator.Done()
simRes, _ = simulator.GetTxSimulationResults()
pubSimResBytes, _ = simRes.GetPubSimulationBytes()
simulationResults = append(simulationResults, pubSimResBytes)

//block2 tran2
txid2 := util2.GenerateUUID()
simulator2, _ := env.txmgr.NewTxSimulator(txid2)
// add another key <key\x00\x01\x01\x15> that contains a nil byte - such that when a range query is formed using old format, this key falls in the range
simulator2.SetState("ns1", "key\x00\x01\x01\x15", []byte("dummyVal1"))
simulator2.SetState("ns1", "\x00key\x00\x01\x01\x15", []byte("dummyVal2"))
simulator2.Done()
simRes2, _ := simulator2.GetTxSimulationResults()
pubSimResBytes2, _ := simRes2.GetPubSimulationBytes()
simulationResults = append(simulationResults, pubSimResBytes2)
block2 := bg.NextBlock(simulationResults)
err = store1.AddBlock(block2)
testutil.AssertNoError(t, err, "")
err = env.testHistoryDB.Commit(block2)
testutil.AssertNoError(t, err, "")

qhistory, err := env.testHistoryDB.NewHistoryQueryExecutor(store1)
testutil.AssertNoError(t, err, "Error upon NewHistoryQueryExecutor")
testutilVerifyResults(t, qhistory, "ns1", "key", []string{"value1", "value2"})
testutilVerifyResults(t, qhistory, "ns1", "key\x00\x01\x01\x15", []string{"dummyVal1"})
testutilVerifyResults(t, qhistory, "ns1", "\x00key\x00\x01\x01\x15", []string{"dummyVal2"})
}

func testutilVerifyResults(t *testing.T, hqe ledger.HistoryQueryExecutor, ns, key string, expectedVals []string) {
itr, err := hqe.GetHistoryForKey(ns, key)
testutil.AssertNoError(t, err, "Error upon GetHistoryForKey()")
retrievedVals := []string{}
for {
kmod, _ := itr.Next()
if kmod == nil {
break
}
txid := kmod.(*queryresult.KeyModification).TxId
retrievedValue := string(kmod.(*queryresult.KeyModification).Value)
retrievedVals = append(retrievedVals, retrievedValue)
t.Logf("Retrieved history record at TxId=%s with value %s", txid, retrievedValue)
}
testutil.AssertEquals(t, retrievedVals, expectedVals)
}

0 comments on commit e40d9c7

Please sign in to comment.