From 0ca1af54e793e2fe972dfed38dfe8bf7728fe15e Mon Sep 17 00:00:00 2001 From: manish Date: Mon, 9 Oct 2017 18:35:48 -0400 Subject: [PATCH] [FAB-6554] Expiry schedule keeper for pvt-statedb data This CR introduces expiry schedule keeper that maintains an index in an underlying leveldb for the expiry time of the data items. The main functions exposed are insert of new expiry schedules, retrive existing schedules by blocknumber (expiry block - based on BTL), and clear scheduled entries This is intented to be used in the purging of pvt data upon expiry Change-Id: Iefa3f77d48fddc73fb577628258ad4029dc06f1f Signed-off-by: manish --- .../txmgmt/pvtstatepurgemgmt/expiry_keeper.go | 124 ++++++++++++++++ .../pvtstatepurgemgmt/expiry_keeper_test.go | 74 ++++++++++ .../pvtstatepurgemgmt/pvtdata_key.pb.go | 135 ++++++++++++++++++ .../pvtstatepurgemgmt/pvtdata_key.proto | 30 ++++ .../pvtstatepurgemgmt/pvtdata_key_helper.go | 39 +++++ 5 files changed, 402 insertions(+) create mode 100644 core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/expiry_keeper.go create mode 100644 core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/expiry_keeper_test.go create mode 100644 core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/pvtdata_key.pb.go create mode 100644 core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/pvtdata_key.proto create mode 100644 core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/pvtdata_key_helper.go diff --git a/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/expiry_keeper.go b/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/expiry_keeper.go new file mode 100644 index 00000000000..1d1436cb575 --- /dev/null +++ b/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/expiry_keeper.go @@ -0,0 +1,124 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package pvtstatepurgemgmt + +import ( + proto "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/common/flogging" + "github.com/hyperledger/fabric/common/ledger/util" + "github.com/hyperledger/fabric/common/ledger/util/leveldbhelper" + "github.com/hyperledger/fabric/core/ledger/kvledger/bookkeeping" +) + +var logger = flogging.MustGetLogger("pvtstatepurgemgmt") + +const ( + expiryPrefix = '1' +) + +// expiryInfoKey is used as a key of an entry in the bookkeeper (backed by a leveldb instance) +type expiryInfoKey struct { + committingBlk uint64 + expiryBlk uint64 +} + +// expiryInfo encapsulates an 'expiryInfoKey' and corresponding private data keys. +// In another words, this struct encapsulates the keys and key-hashes that are committed by +// the block number 'expiryInfoKey.committingBlk' and should be expired (and hence purged) +// with the commit of block number 'expiryInfoKey.expiryBlk' +type expiryInfo struct { + expiryInfoKey *expiryInfoKey + pvtdataKeys *PvtdataKeys +} + +// expiryKeeper is used to keep track of the expired items in the pvtdata space +type expiryKeeper interface { + // updateBookkeeping keeps track of the list of keys and their corresponding expiry block number + updateBookkeeping(toTrack []*expiryInfo, toClear []*expiryInfoKey) error + // retrieve returns the keys info that are supposed to be expired by the given block number + retrieve(expiringAtBlkNum uint64) ([]*expiryInfo, error) +} + +func newExpiryKeeper(ledgerid string, provider bookkeeping.Provider) expiryKeeper { + return &expKeeper{provider.GetDBHandle(ledgerid, bookkeeping.PvtdataExpiry)} +} + +type expKeeper struct { + db *leveldbhelper.DBHandle +} + +// updateBookkeeping updates the information stored in the bookkeeper +// 'toTrack' parameter causes new entries in the bookkeeper and 'toClear' parameter contains the entries that +// are to be removed from the bookkeeper. This function is invoked with the commit of every block. As an +// example, the commit of the block with block number 50, 'toTrack' parameter may contain following two entries: +// (1) &expiryInfo{&expiryInfoKey{committingBlk: 50, expiryBlk: 55}, pvtdataKeys....} and +// (2) &expiryInfo{&expiryInfoKey{committingBlk: 50, expiryBlk: 60}, pvtdataKeys....} +// The 'pvtdataKeys' in the first entry contains all the keys (and key-hashes) that are to be expired at block 55 (i.e., these collections have a BTL configured to 4) +// and the 'pvtdataKeys' in second entry contains all the keys (and key-hashes) that are to be expired at block 60 (i.e., these collections have a BTL configured to 9). +// Similarly, continuing with the above example, the parameter 'toClear' may contain following two entries +// (1) &expiryInfoKey{committingBlk: 45, expiryBlk: 50} and (2) &expiryInfoKey{committingBlk: 40, expiryBlk: 50}. The first entry was created +// at the time of the commit of the block number 45 and the second entry was created at the time of the commit of the block number 40, however +// both are expiring with the commit of block number 50. +func (ek *expKeeper) updateBookkeeping(toTrack []*expiryInfo, toClear []*expiryInfoKey) error { + updateBatch := leveldbhelper.NewUpdateBatch() + for _, expinfo := range toTrack { + k, v, err := encodeKV(expinfo) + if err != nil { + return err + } + updateBatch.Put(k, v) + } + for _, expinfokey := range toClear { + updateBatch.Delete(encodeExpiryInfoKey(expinfokey)) + } + return ek.db.WriteBatch(updateBatch, true) +} + +func (ek *expKeeper) retrieve(expiringAtBlkNum uint64) ([]*expiryInfo, error) { + startKey := encodeExpiryInfoKey(&expiryInfoKey{expiryBlk: expiringAtBlkNum, committingBlk: 0}) + endKey := encodeExpiryInfoKey(&expiryInfoKey{expiryBlk: expiringAtBlkNum + 1, committingBlk: 0}) + itr := ek.db.GetIterator(startKey, endKey) + defer itr.Release() + + var listExpinfo []*expiryInfo + for itr.Next() { + expinfo, err := decodeExpiryInfo(itr.Key(), itr.Value()) + if err != nil { + return nil, err + } + listExpinfo = append(listExpinfo, expinfo) + } + return listExpinfo, nil +} + +func encodeKV(expinfo *expiryInfo) (key []byte, value []byte, err error) { + key = encodeExpiryInfoKey(expinfo.expiryInfoKey) + value, err = encodeExpiryInfoValue(expinfo.pvtdataKeys) + return +} + +func encodeExpiryInfoKey(expinfoKey *expiryInfoKey) []byte { + key := append([]byte{expiryPrefix}, util.EncodeOrderPreservingVarUint64(expinfoKey.expiryBlk)...) + return append(key, util.EncodeOrderPreservingVarUint64(expinfoKey.committingBlk)...) +} + +func encodeExpiryInfoValue(pvtdataKeys *PvtdataKeys) ([]byte, error) { + return proto.Marshal(pvtdataKeys) +} + +func decodeExpiryInfo(key []byte, value []byte) (*expiryInfo, error) { + expiryBlk, n := util.DecodeOrderPreservingVarUint64(key[1:]) + committingBlk, _ := util.DecodeOrderPreservingVarUint64(key[n+1:]) + pvtdataKeys := &PvtdataKeys{} + if err := proto.Unmarshal(value, pvtdataKeys); err != nil { + return nil, err + } + return &expiryInfo{ + expiryInfoKey: &expiryInfoKey{committingBlk: committingBlk, expiryBlk: expiryBlk}, + pvtdataKeys: pvtdataKeys}, + nil +} diff --git a/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/expiry_keeper_test.go b/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/expiry_keeper_test.go new file mode 100644 index 00000000000..7f7cf0683a4 --- /dev/null +++ b/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/expiry_keeper_test.go @@ -0,0 +1,74 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package pvtstatepurgemgmt + +import ( + fmt "fmt" + "testing" + + "github.com/davecgh/go-spew/spew" + "github.com/hyperledger/fabric/common/ledger/testutil" + "github.com/hyperledger/fabric/core/ledger/kvledger/bookkeeping" + "github.com/stretchr/testify/assert" +) + +func TestExpiryKVEncoding(t *testing.T) { + pvtdataKeys := newPvtdataKeys() + pvtdataKeys.add("ns1", "coll-1", "key-1", []byte("key-1-hash")) + expiryInfo := &expiryInfo{&expiryInfoKey{expiryBlk: 10, committingBlk: 2}, pvtdataKeys} + t.Logf("expiryInfo:%s", spew.Sdump(expiryInfo)) + k, v, err := encodeKV(expiryInfo) + testutil.AssertNoError(t, err, "") + expiryInfo1, err := decodeExpiryInfo(k, v) + testutil.AssertNoError(t, err, "") + testutil.AssertEquals(t, expiryInfo, expiryInfo1) +} + +func TestExpiryKeeper(t *testing.T) { + testenv := bookkeeping.NewTestEnv(t) + defer testenv.Cleanup() + expiryKeeper := newExpiryKeeper("testledger", testenv.TestProvider) + + expinfo1 := &expiryInfo{&expiryInfoKey{committingBlk: 3, expiryBlk: 13}, buildPvtdataKeysForTest(1, 1)} + expinfo2 := &expiryInfo{&expiryInfoKey{committingBlk: 3, expiryBlk: 15}, buildPvtdataKeysForTest(2, 2)} + expinfo3 := &expiryInfo{&expiryInfoKey{committingBlk: 4, expiryBlk: 13}, buildPvtdataKeysForTest(3, 3)} + expinfo4 := &expiryInfo{&expiryInfoKey{committingBlk: 5, expiryBlk: 17}, buildPvtdataKeysForTest(4, 4)} + + // Insert entries for keys at committingBlk 3 + expiryKeeper.updateBookkeeping([]*expiryInfo{expinfo1, expinfo2}, nil) + // Insert entries for keys at committingBlk 4 and 5 + expiryKeeper.updateBookkeeping([]*expiryInfo{expinfo3, expinfo4}, nil) + + // Retrieve entries by expiring block 13, 15, and 17 + listExpinfo1, _ := expiryKeeper.retrieve(13) + assert.Equal(t, []*expiryInfo{expinfo1, expinfo3}, listExpinfo1) + + listExpinfo2, _ := expiryKeeper.retrieve(15) + assert.Equal(t, []*expiryInfo{expinfo2}, listExpinfo2) + + listExpinfo3, _ := expiryKeeper.retrieve(17) + assert.Equal(t, []*expiryInfo{expinfo4}, listExpinfo3) + + // Clear entries for keys expiring at block 13 and 15 and again retrieve by expiring block 13, 15, and 17 + expiryKeeper.updateBookkeeping(nil, []*expiryInfoKey{expinfo1.expiryInfoKey, expinfo2.expiryInfoKey, expinfo3.expiryInfoKey}) + listExpinfo4, _ := expiryKeeper.retrieve(13) + assert.Nil(t, listExpinfo4) + + listExpinfo5, _ := expiryKeeper.retrieve(15) + assert.Nil(t, listExpinfo5) + + listExpinfo6, _ := expiryKeeper.retrieve(17) + assert.Equal(t, []*expiryInfo{expinfo4}, listExpinfo6) +} + +func buildPvtdataKeysForTest(startingEntry int, numEntries int) *PvtdataKeys { + pvtdataKeys := newPvtdataKeys() + for i := startingEntry; i <= startingEntry+numEntries; i++ { + pvtdataKeys.add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("coll-%d", i), fmt.Sprintf("key-%d", i), []byte(fmt.Sprintf("key-%d-hash", i))) + } + return pvtdataKeys +} diff --git a/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/pvtdata_key.pb.go b/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/pvtdata_key.pb.go new file mode 100644 index 00000000000..07c57876c9e --- /dev/null +++ b/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/pvtdata_key.pb.go @@ -0,0 +1,135 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: pvtdata_key.proto + +/* +Package pvtstatepurgemgmt is a generated protocol buffer package. + +It is generated from these files: + pvtdata_key.proto + +It has these top-level messages: + PvtdataKeys + Collections + KeysAndHashes + KeyAndHash +*/ +package pvtstatepurgemgmt + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type PvtdataKeys struct { + Map map[string]*Collections `protobuf:"bytes,1,rep,name=map" json:"map,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` +} + +func (m *PvtdataKeys) Reset() { *m = PvtdataKeys{} } +func (m *PvtdataKeys) String() string { return proto.CompactTextString(m) } +func (*PvtdataKeys) ProtoMessage() {} +func (*PvtdataKeys) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func (m *PvtdataKeys) GetMap() map[string]*Collections { + if m != nil { + return m.Map + } + return nil +} + +type Collections struct { + Map map[string]*KeysAndHashes `protobuf:"bytes,1,rep,name=map" json:"map,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` +} + +func (m *Collections) Reset() { *m = Collections{} } +func (m *Collections) String() string { return proto.CompactTextString(m) } +func (*Collections) ProtoMessage() {} +func (*Collections) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +func (m *Collections) GetMap() map[string]*KeysAndHashes { + if m != nil { + return m.Map + } + return nil +} + +type KeysAndHashes struct { + List []*KeyAndHash `protobuf:"bytes,1,rep,name=list" json:"list,omitempty"` +} + +func (m *KeysAndHashes) Reset() { *m = KeysAndHashes{} } +func (m *KeysAndHashes) String() string { return proto.CompactTextString(m) } +func (*KeysAndHashes) ProtoMessage() {} +func (*KeysAndHashes) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } + +func (m *KeysAndHashes) GetList() []*KeyAndHash { + if m != nil { + return m.List + } + return nil +} + +type KeyAndHash struct { + Key string `protobuf:"bytes,1,opt,name=key" json:"key,omitempty"` + Hash []byte `protobuf:"bytes,2,opt,name=hash,proto3" json:"hash,omitempty"` +} + +func (m *KeyAndHash) Reset() { *m = KeyAndHash{} } +func (m *KeyAndHash) String() string { return proto.CompactTextString(m) } +func (*KeyAndHash) ProtoMessage() {} +func (*KeyAndHash) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } + +func (m *KeyAndHash) GetKey() string { + if m != nil { + return m.Key + } + return "" +} + +func (m *KeyAndHash) GetHash() []byte { + if m != nil { + return m.Hash + } + return nil +} + +func init() { + proto.RegisterType((*PvtdataKeys)(nil), "pvtstatepurgemgmt.PvtdataKeys") + proto.RegisterType((*Collections)(nil), "pvtstatepurgemgmt.Collections") + proto.RegisterType((*KeysAndHashes)(nil), "pvtstatepurgemgmt.KeysAndHashes") + proto.RegisterType((*KeyAndHash)(nil), "pvtstatepurgemgmt.KeyAndHash") +} + +func init() { proto.RegisterFile("pvtdata_key.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 297 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x92, 0x4d, 0x4b, 0xc3, 0x40, + 0x10, 0x86, 0xd9, 0xb6, 0x8a, 0x4e, 0x14, 0x74, 0x4f, 0x45, 0x50, 0x42, 0x2f, 0xf6, 0x94, 0x60, + 0x14, 0x51, 0x6f, 0x56, 0x04, 0xa1, 0x14, 0x24, 0x07, 0x11, 0x2f, 0xb2, 0x49, 0xc6, 0x24, 0xe4, + 0x63, 0x97, 0xdd, 0x4d, 0x30, 0xff, 0x46, 0xfc, 0xa5, 0x92, 0x34, 0x62, 0x62, 0x83, 0xde, 0x86, + 0x77, 0x9e, 0x79, 0x79, 0x16, 0x16, 0x0e, 0x45, 0xa9, 0x03, 0xa6, 0xd9, 0x6b, 0x82, 0x95, 0x25, + 0x24, 0xd7, 0x9c, 0xd6, 0x91, 0xd2, 0x4c, 0xa3, 0x28, 0x64, 0x88, 0x59, 0x98, 0xe9, 0xd9, 0x07, + 0x01, 0xe3, 0x71, 0x0d, 0x2e, 0xb1, 0x52, 0xf4, 0x1a, 0xc6, 0x19, 0x13, 0x53, 0x62, 0x8e, 0xe7, + 0x86, 0x73, 0x6a, 0x6d, 0x1c, 0x58, 0x1d, 0xd8, 0x5a, 0x31, 0x71, 0x9f, 0x6b, 0x59, 0xb9, 0xf5, + 0xcd, 0xd1, 0x13, 0xec, 0x7c, 0x07, 0xf4, 0x00, 0xc6, 0x09, 0x56, 0x53, 0x62, 0x92, 0xf9, 0xae, + 0x5b, 0x8f, 0xf4, 0x02, 0xb6, 0x4a, 0x96, 0x16, 0x38, 0x1d, 0x99, 0x64, 0x6e, 0x38, 0x27, 0x03, + 0xd5, 0x77, 0x3c, 0x4d, 0xd1, 0xd7, 0x31, 0xcf, 0x95, 0xbb, 0x86, 0x6f, 0x46, 0x57, 0x64, 0xf6, + 0x49, 0xc0, 0xe8, 0xac, 0xfe, 0x57, 0xec, 0xc0, 0xbf, 0x14, 0x9f, 0xff, 0x54, 0xbc, 0xec, 0x2b, + 0x9a, 0x03, 0xd5, 0xf5, 0xb3, 0x6f, 0xf3, 0xe0, 0x81, 0xa9, 0x08, 0x7b, 0x92, 0x0b, 0xd8, 0xef, + 0xed, 0xe8, 0x19, 0x4c, 0xd2, 0x58, 0xe9, 0x56, 0xf3, 0x78, 0xb8, 0xab, 0xc5, 0xdd, 0x06, 0x9d, + 0x39, 0x00, 0x3f, 0xd9, 0x80, 0x1f, 0x85, 0x49, 0xc4, 0x54, 0xd4, 0xe8, 0xed, 0xb9, 0xcd, 0xbc, + 0x58, 0xbd, 0x2c, 0xc3, 0x58, 0x47, 0x85, 0x67, 0xf9, 0x3c, 0xb3, 0xa3, 0x4a, 0xa0, 0x4c, 0x31, + 0x08, 0x51, 0xda, 0x6f, 0xcc, 0x93, 0xb1, 0x6f, 0xfb, 0x5c, 0xa2, 0xdd, 0x46, 0x49, 0xd9, 0x0e, + 0xfa, 0xbd, 0x36, 0xb0, 0x37, 0x9c, 0xbc, 0xed, 0xe6, 0xa3, 0x9c, 0x7f, 0x05, 0x00, 0x00, 0xff, + 0xff, 0xdf, 0x2c, 0x02, 0xd0, 0x3d, 0x02, 0x00, 0x00, +} diff --git a/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/pvtdata_key.proto b/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/pvtdata_key.proto new file mode 100644 index 00000000000..93c2650d0da --- /dev/null +++ b/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/pvtdata_key.proto @@ -0,0 +1,30 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +syntax = "proto3"; + +option go_package = "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt"; + +package pvtstatepurgemgmt; + +message PvtdataKeys { + map map = 1; +} + +message Collections { + map map = 1; +} + +message KeysAndHashes { + repeated KeyAndHash list = 1; +} + +message KeyAndHash { + string key = 1; + bytes hash = 2; +} + + diff --git a/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/pvtdata_key_helper.go b/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/pvtdata_key_helper.go new file mode 100644 index 00000000000..3292b0bb6d9 --- /dev/null +++ b/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/pvtdata_key_helper.go @@ -0,0 +1,39 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package pvtstatepurgemgmt + +func (pvtdataKeys *PvtdataKeys) add(ns string, coll string, key string, keyhash []byte) { + colls := pvtdataKeys.getOrCreateCollections(ns) + keysAndHashes := colls.getOrCreateKeysAndHashes(coll) + keysAndHashes.List = append(keysAndHashes.List, &KeyAndHash{Key: key, Hash: keyhash}) +} + +func (pvtdataKeys *PvtdataKeys) getOrCreateCollections(ns string) *Collections { + colls, ok := pvtdataKeys.Map[ns] + if !ok { + colls = newCollections() + pvtdataKeys.Map[ns] = colls + } + return colls +} + +func (colls *Collections) getOrCreateKeysAndHashes(coll string) *KeysAndHashes { + keysAndHashes, ok := colls.Map[coll] + if !ok { + keysAndHashes = &KeysAndHashes{} + colls.Map[coll] = keysAndHashes + } + return keysAndHashes +} + +func newPvtdataKeys() *PvtdataKeys { + return &PvtdataKeys{make(map[string]*Collections)} +} + +func newCollections() *Collections { + return &Collections{make(map[string]*KeysAndHashes)} +}