Skip to content

Commit

Permalink
[FAB-6554] Expiry schedule keeper for pvt-statedb data
Browse files Browse the repository at this point in the history
This CR introduces expiry schedule keeper that maintains
an index in an underlying leveldb for the expiry time
of the data items.
The main functions exposed are insert of new expiry schedules,
retrive existing schedules by blocknumber (expiry block - based on BTL),
and clear scheduled entries

This is intented to be used in the purging of pvt data upon expiry

Change-Id: Iefa3f77d48fddc73fb577628258ad4029dc06f1f
Signed-off-by: manish <manish.sethi@gmail.com>
  • Loading branch information
manish-sethi committed Apr 30, 2018
1 parent b966981 commit 0ca1af5
Show file tree
Hide file tree
Showing 5 changed files with 402 additions and 0 deletions.
124 changes: 124 additions & 0 deletions core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/expiry_keeper.go
@@ -0,0 +1,124 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package pvtstatepurgemgmt

import (
proto "github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/util"
"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
"github.com/hyperledger/fabric/core/ledger/kvledger/bookkeeping"
)

var logger = flogging.MustGetLogger("pvtstatepurgemgmt")

const (
expiryPrefix = '1'
)

// expiryInfoKey is used as a key of an entry in the bookkeeper (backed by a leveldb instance)
type expiryInfoKey struct {
committingBlk uint64
expiryBlk uint64
}

// expiryInfo encapsulates an 'expiryInfoKey' and corresponding private data keys.
// In another words, this struct encapsulates the keys and key-hashes that are committed by
// the block number 'expiryInfoKey.committingBlk' and should be expired (and hence purged)
// with the commit of block number 'expiryInfoKey.expiryBlk'
type expiryInfo struct {
expiryInfoKey *expiryInfoKey
pvtdataKeys *PvtdataKeys
}

// expiryKeeper is used to keep track of the expired items in the pvtdata space
type expiryKeeper interface {
// updateBookkeeping keeps track of the list of keys and their corresponding expiry block number
updateBookkeeping(toTrack []*expiryInfo, toClear []*expiryInfoKey) error
// retrieve returns the keys info that are supposed to be expired by the given block number
retrieve(expiringAtBlkNum uint64) ([]*expiryInfo, error)
}

func newExpiryKeeper(ledgerid string, provider bookkeeping.Provider) expiryKeeper {
return &expKeeper{provider.GetDBHandle(ledgerid, bookkeeping.PvtdataExpiry)}
}

type expKeeper struct {
db *leveldbhelper.DBHandle
}

// updateBookkeeping updates the information stored in the bookkeeper
// 'toTrack' parameter causes new entries in the bookkeeper and 'toClear' parameter contains the entries that
// are to be removed from the bookkeeper. This function is invoked with the commit of every block. As an
// example, the commit of the block with block number 50, 'toTrack' parameter may contain following two entries:
// (1) &expiryInfo{&expiryInfoKey{committingBlk: 50, expiryBlk: 55}, pvtdataKeys....} and
// (2) &expiryInfo{&expiryInfoKey{committingBlk: 50, expiryBlk: 60}, pvtdataKeys....}
// The 'pvtdataKeys' in the first entry contains all the keys (and key-hashes) that are to be expired at block 55 (i.e., these collections have a BTL configured to 4)
// and the 'pvtdataKeys' in second entry contains all the keys (and key-hashes) that are to be expired at block 60 (i.e., these collections have a BTL configured to 9).
// Similarly, continuing with the above example, the parameter 'toClear' may contain following two entries
// (1) &expiryInfoKey{committingBlk: 45, expiryBlk: 50} and (2) &expiryInfoKey{committingBlk: 40, expiryBlk: 50}. The first entry was created
// at the time of the commit of the block number 45 and the second entry was created at the time of the commit of the block number 40, however
// both are expiring with the commit of block number 50.
func (ek *expKeeper) updateBookkeeping(toTrack []*expiryInfo, toClear []*expiryInfoKey) error {
updateBatch := leveldbhelper.NewUpdateBatch()
for _, expinfo := range toTrack {
k, v, err := encodeKV(expinfo)
if err != nil {
return err
}
updateBatch.Put(k, v)
}
for _, expinfokey := range toClear {
updateBatch.Delete(encodeExpiryInfoKey(expinfokey))
}
return ek.db.WriteBatch(updateBatch, true)
}

func (ek *expKeeper) retrieve(expiringAtBlkNum uint64) ([]*expiryInfo, error) {
startKey := encodeExpiryInfoKey(&expiryInfoKey{expiryBlk: expiringAtBlkNum, committingBlk: 0})
endKey := encodeExpiryInfoKey(&expiryInfoKey{expiryBlk: expiringAtBlkNum + 1, committingBlk: 0})
itr := ek.db.GetIterator(startKey, endKey)
defer itr.Release()

var listExpinfo []*expiryInfo
for itr.Next() {
expinfo, err := decodeExpiryInfo(itr.Key(), itr.Value())
if err != nil {
return nil, err
}
listExpinfo = append(listExpinfo, expinfo)
}
return listExpinfo, nil
}

func encodeKV(expinfo *expiryInfo) (key []byte, value []byte, err error) {
key = encodeExpiryInfoKey(expinfo.expiryInfoKey)
value, err = encodeExpiryInfoValue(expinfo.pvtdataKeys)
return
}

func encodeExpiryInfoKey(expinfoKey *expiryInfoKey) []byte {
key := append([]byte{expiryPrefix}, util.EncodeOrderPreservingVarUint64(expinfoKey.expiryBlk)...)
return append(key, util.EncodeOrderPreservingVarUint64(expinfoKey.committingBlk)...)
}

func encodeExpiryInfoValue(pvtdataKeys *PvtdataKeys) ([]byte, error) {
return proto.Marshal(pvtdataKeys)
}

func decodeExpiryInfo(key []byte, value []byte) (*expiryInfo, error) {
expiryBlk, n := util.DecodeOrderPreservingVarUint64(key[1:])
committingBlk, _ := util.DecodeOrderPreservingVarUint64(key[n+1:])
pvtdataKeys := &PvtdataKeys{}
if err := proto.Unmarshal(value, pvtdataKeys); err != nil {
return nil, err
}
return &expiryInfo{
expiryInfoKey: &expiryInfoKey{committingBlk: committingBlk, expiryBlk: expiryBlk},
pvtdataKeys: pvtdataKeys},
nil
}
@@ -0,0 +1,74 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package pvtstatepurgemgmt

import (
fmt "fmt"
"testing"

"github.com/davecgh/go-spew/spew"
"github.com/hyperledger/fabric/common/ledger/testutil"
"github.com/hyperledger/fabric/core/ledger/kvledger/bookkeeping"
"github.com/stretchr/testify/assert"
)

func TestExpiryKVEncoding(t *testing.T) {
pvtdataKeys := newPvtdataKeys()
pvtdataKeys.add("ns1", "coll-1", "key-1", []byte("key-1-hash"))
expiryInfo := &expiryInfo{&expiryInfoKey{expiryBlk: 10, committingBlk: 2}, pvtdataKeys}
t.Logf("expiryInfo:%s", spew.Sdump(expiryInfo))
k, v, err := encodeKV(expiryInfo)
testutil.AssertNoError(t, err, "")
expiryInfo1, err := decodeExpiryInfo(k, v)
testutil.AssertNoError(t, err, "")
testutil.AssertEquals(t, expiryInfo, expiryInfo1)
}

func TestExpiryKeeper(t *testing.T) {
testenv := bookkeeping.NewTestEnv(t)
defer testenv.Cleanup()
expiryKeeper := newExpiryKeeper("testledger", testenv.TestProvider)

expinfo1 := &expiryInfo{&expiryInfoKey{committingBlk: 3, expiryBlk: 13}, buildPvtdataKeysForTest(1, 1)}
expinfo2 := &expiryInfo{&expiryInfoKey{committingBlk: 3, expiryBlk: 15}, buildPvtdataKeysForTest(2, 2)}
expinfo3 := &expiryInfo{&expiryInfoKey{committingBlk: 4, expiryBlk: 13}, buildPvtdataKeysForTest(3, 3)}
expinfo4 := &expiryInfo{&expiryInfoKey{committingBlk: 5, expiryBlk: 17}, buildPvtdataKeysForTest(4, 4)}

// Insert entries for keys at committingBlk 3
expiryKeeper.updateBookkeeping([]*expiryInfo{expinfo1, expinfo2}, nil)
// Insert entries for keys at committingBlk 4 and 5
expiryKeeper.updateBookkeeping([]*expiryInfo{expinfo3, expinfo4}, nil)

// Retrieve entries by expiring block 13, 15, and 17
listExpinfo1, _ := expiryKeeper.retrieve(13)
assert.Equal(t, []*expiryInfo{expinfo1, expinfo3}, listExpinfo1)

listExpinfo2, _ := expiryKeeper.retrieve(15)
assert.Equal(t, []*expiryInfo{expinfo2}, listExpinfo2)

listExpinfo3, _ := expiryKeeper.retrieve(17)
assert.Equal(t, []*expiryInfo{expinfo4}, listExpinfo3)

// Clear entries for keys expiring at block 13 and 15 and again retrieve by expiring block 13, 15, and 17
expiryKeeper.updateBookkeeping(nil, []*expiryInfoKey{expinfo1.expiryInfoKey, expinfo2.expiryInfoKey, expinfo3.expiryInfoKey})
listExpinfo4, _ := expiryKeeper.retrieve(13)
assert.Nil(t, listExpinfo4)

listExpinfo5, _ := expiryKeeper.retrieve(15)
assert.Nil(t, listExpinfo5)

listExpinfo6, _ := expiryKeeper.retrieve(17)
assert.Equal(t, []*expiryInfo{expinfo4}, listExpinfo6)
}

func buildPvtdataKeysForTest(startingEntry int, numEntries int) *PvtdataKeys {
pvtdataKeys := newPvtdataKeys()
for i := startingEntry; i <= startingEntry+numEntries; i++ {
pvtdataKeys.add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("coll-%d", i), fmt.Sprintf("key-%d", i), []byte(fmt.Sprintf("key-%d-hash", i)))
}
return pvtdataKeys
}
135 changes: 135 additions & 0 deletions core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/pvtdata_key.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/pvtdata_key.proto
@@ -0,0 +1,30 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

syntax = "proto3";

option go_package = "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt";

package pvtstatepurgemgmt;

message PvtdataKeys {
map<string, Collections> map = 1;
}

message Collections {
map<string, KeysAndHashes> map = 1;
}

message KeysAndHashes {
repeated KeyAndHash list = 1;
}

message KeyAndHash {
string key = 1;
bytes hash = 2;
}


0 comments on commit 0ca1af5

Please sign in to comment.