Skip to content

Commit

Permalink
[FAB-6431] Pvt Transient Store to use received height
Browse files Browse the repository at this point in the history
Transient store persists private data on authorized
peers while transactions with private data are in-flight.
One of the keys for transient store records was
endorsementBlkHt, this work item will change it to
blockHeight, as it will be better to store transient data
based on 'received at' height rather than endorsement
height, when a peer receives private data via gossip.
The height will be the basis for block-to-live purging
in subsequent changesets.

Also, this change updates comments to indicate that
TxPvtReadWriteSet includes write information but not
read information (the read information is passed in the
hashed rwset rather than in the private set). A
subsequent changeset will rename TxPvtReadWriteSet
to TxPvtWriteSet throughout the code.

Change-Id: I52df4702ad47ca17f2a745faa4c278f78e350d2f
Signed-off-by: David Enyeart <enyeart@us.ibm.com>
  • Loading branch information
denyeart authored and yacovm committed Oct 4, 2017
1 parent 7500e9b commit 74906de
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 144 deletions.
104 changes: 53 additions & 51 deletions core/transientstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,42 +44,43 @@ type RWSetScanner interface {
Close()
}

// Store manages the storage of private read-write sets for a ledgerId.
// Store manages the storage of private write sets for a ledgerId.
// Ideally, a ledger can remove the data from this storage when it is committed to
// the permanent storage or the pruning of some data items is enforced by the policy
type Store interface {
// Persist stores the private read-write set of a transaction in the transient store
Persist(txid string, endorsementBlkHt uint64, privateSimulationResults *rwset.TxPvtReadWriteSet) error
// Persist stores the private write set of a transaction in the transient store
// based on txid and the block height the private data was received at
Persist(txid string, blockHeight uint64, privateSimulationResults *rwset.TxPvtReadWriteSet) error
// GetTxPvtRWSetByTxid returns an iterator due to the fact that the txid may have multiple private
// RWSets persisted from different endorsers (via Gossip)
// write sets persisted from different endorsers (via Gossip)
GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) (RWSetScanner, error)
// PurgeByTxids removes private read-write set of a given set of transactions from the
// PurgeByTxids removes private write sets of a given set of transactions from the
// transient store
PurgeByTxids(txids []string) error
// PurgeByHeight removes private read-writes set generated by endorsers at block height lesser than
// a given maxBlockNumToRetain. In other words, Purge only retains private read-write sets
// that were generated at block height of maxBlockNumToRetain or higher. Though the private
// read-write sets stored in transient store is removed by coordinator using PurgebyTxids()
// PurgeByHeight removes private write sets at block height lesser than
// a given maxBlockNumToRetain. In other words, Purge only retains private write sets
// that were persisted at block height of maxBlockNumToRetain or higher. Though the private
// write sets stored in transient store is removed by coordinator using PurgebyTxids()
// after successful block commit, PurgeByHeight() is still required to remove orphan entries (as
// transaction that gets endorsed may not be submitted by the client for commit)
PurgeByHeight(maxBlockNumToRetain uint64) error
// GetMinEndorsementBlkHt returns the lowest retained endorsement block height
GetMinEndorsementBlkHt() (uint64, error)
// GetMinTransientBlkHt returns the lowest block height remaining in transient store
GetMinTransientBlkHt() (uint64, error)
Shutdown()
}

// EndorserPvtSimulationResults captures the deatils of the simulation results specific to an endorser
// EndorserPvtSimulationResults captures the details of the simulation results specific to an endorser
type EndorserPvtSimulationResults struct {
EndorsementBlockHeight uint64
PvtSimulationResults *rwset.TxPvtReadWriteSet
ReceivedAtBlockHeight uint64
PvtSimulationResults *rwset.TxPvtReadWriteSet
}

//////////////////////////////////////////////
// Implementation
/////////////////////////////////////////////

// storeProvider encapsulates a leveldb provider which is used to store
// private read-write set of simulated transactions, and implements TransientStoreProvider
// private write sets of simulated transactions, and implements TransientStoreProvider
// interface.
type storeProvider struct {
dbProvider *leveldbhelper.Provider
Expand Down Expand Up @@ -114,16 +115,17 @@ func (provider *storeProvider) Close() {
provider.dbProvider.Close()
}

// Persist stores the private read-write set of a transaction in the transient store
func (s *store) Persist(txid string, endorsementBlkHt uint64,
// Persist stores the private write set of a transaction in the transient store
// based on txid and the block height the private data was received at
func (s *store) Persist(txid string, blockHeight uint64,
privateSimulationResults *rwset.TxPvtReadWriteSet) error {
dbBatch := leveldbhelper.NewUpdateBatch()

// Create compositeKey with appropriate prefix, txid, uuid and endorsementBlkHt
// Due to the fact that the txid may have multiple private RWSets persisted from different
// Create compositeKey with appropriate prefix, txid, uuid and blockHeight
// Due to the fact that the txid may have multiple private write sets persisted from different
// endorsers (via Gossip), we postfix an uuid with the txid to avoid collision.
uuid := util.GenerateUUID()
compositeKeyPvtRWSet := createCompositeKeyForPvtRWSet(txid, uuid, endorsementBlkHt)
compositeKeyPvtRWSet := createCompositeKeyForPvtRWSet(txid, uuid, blockHeight)
privateSimulationResultsBytes, err := proto.Marshal(privateSimulationResults)
if err != nil {
return err
Expand All @@ -132,16 +134,16 @@ func (s *store) Persist(txid string, endorsementBlkHt uint64,

// Create two index: (i) by txid, and (ii) by height

// Create compositeKey for purge index by height with appropriate prefix, endorsementBlkHt,
// Create compositeKey for purge index by height with appropriate prefix, blockHeight,
// txid, uuid and store the compositeKey (purge index) with a null byte as value. Note that
// the purge index is used to remove orphan entries in the transient store (which are not removed
// by PurgeTxids()) using BTL policy by PurgeByHeight(). Note that orphan entries are due to transaction
// that gets endorsed but not submitted by the client for commit)
compositeKeyPurgeIndexByHeight := createCompositeKeyForPurgeIndexByHeight(endorsementBlkHt, txid, uuid)
compositeKeyPurgeIndexByHeight := createCompositeKeyForPurgeIndexByHeight(blockHeight, txid, uuid)
dbBatch.Put(compositeKeyPurgeIndexByHeight, emptyValue)

// Create compositeKey for purge index by txid with appropriate prefix, txid, uuid,
// endorsementBlkHt and store the compositeKey (purge index) with a null byte as value.
// blockHeight and store the compositeKey (purge index) with a null byte as value.
// Though compositeKeyPvtRWSet itself can be used to purge private write set by txid,
// we create a separate composite key with a null byte as value. The reason is that
// if we use compositeKeyPvtRWSet, we unnecessarily read (potentially large) private write
Expand All @@ -150,14 +152,14 @@ func (s *store) Persist(txid string, endorsementBlkHt uint64,
// Note: We can create compositeKeyPurgeIndexByTxid by just replacing the prefix of compositeKeyPvtRWSet
// with purgeIndexByTxidPrefix. For code readability and to be expressive, we use a
// createCompositeKeyForPurgeIndexByTxid() instead.
compositeKeyPurgeIndexByTxid := createCompositeKeyForPurgeIndexByTxid(txid, uuid, endorsementBlkHt)
compositeKeyPurgeIndexByTxid := createCompositeKeyForPurgeIndexByTxid(txid, uuid, blockHeight)
dbBatch.Put(compositeKeyPurgeIndexByTxid, emptyValue)

return s.db.WriteBatch(dbBatch, true)
}

// GetTxPvtRWSetByTxid returns an iterator due to the fact that the txid may have multiple private
// RWSets persisted from different endorsers.
// write sets persisted from different endorsers.
func (s *store) GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) (RWSetScanner, error) {
// Construct startKey and endKey to do an range query
startKey := createTxidRangeStartKey(txid)
Expand All @@ -167,7 +169,7 @@ func (s *store) GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter)
return &RwsetScanner{txid, iter, filter}, nil
}

// PurgeByTxids removes private read-write set of a given set of transactions from the
// PurgeByTxids removes private write sets of a given set of transactions from the
// transient store. PurgeByTxids() is expected to be called by coordinator after
// committing a block to ledger.
func (s *store) PurgeByTxids(txids []string) error {
Expand All @@ -181,20 +183,20 @@ func (s *store) PurgeByTxids(txids []string) error {
iter := s.db.GetIterator(startKey, endKey)

// Get all txid and uuid from above result and remove it from transient store (both
// read/write set and the corresponding indexes.
// write set and the corresponding indexes.
for iter.Next() {
// For each entry, remove the private read-write set and correponding indexes

// Remove private read-write set
// Remove private write set
compositeKeyPurgeIndexByTxid := iter.Key()
// Note: We can create compositeKeyPvtRWSet by just replacing the prefix of compositeKeyPurgeIndexByTxid
// with prwsetPrefix. For code readability and to be expressive, we split and create again.
uuid, endorsementBlkHt := splitCompositeKeyOfPurgeIndexByTxid(compositeKeyPurgeIndexByTxid)
compositeKeyPvtRWSet := createCompositeKeyForPvtRWSet(txid, uuid, endorsementBlkHt)
uuid, blockHeight := splitCompositeKeyOfPurgeIndexByTxid(compositeKeyPurgeIndexByTxid)
compositeKeyPvtRWSet := createCompositeKeyForPvtRWSet(txid, uuid, blockHeight)
dbBatch.Delete(compositeKeyPvtRWSet)

// Remove purge index -- purgeIndexByHeight
compositeKeyPurgeIndexByHeight := createCompositeKeyForPurgeIndexByHeight(endorsementBlkHt, txid, uuid)
compositeKeyPurgeIndexByHeight := createCompositeKeyForPurgeIndexByHeight(blockHeight, txid, uuid)
dbBatch.Delete(compositeKeyPurgeIndexByHeight)

// Remove purge index -- purgeIndexByTxid
Expand All @@ -207,10 +209,10 @@ func (s *store) PurgeByTxids(txids []string) error {
return s.db.WriteBatch(dbBatch, true)
}

// PurgeByHeight removes private read-writes set generated by endorsers at block height lesser than
// a given maxBlockNumToRetain. In other words, PurgeByHeight only retains private read-write sets
// that were generated at block height of maxBlockNumToRetain or higher. Though the private
// read-write sets stored in transient store is removed by coordinator using PurgebyTxids()
// PurgeByHeight removes private write sets at block height lesser than
// a given maxBlockNumToRetain. In other words, Purge only retains private write sets
// that were persisted at block height of maxBlockNumToRetain or higher. Though the private
// write sets stored in transient store is removed by coordinator using PurgebyTxids()
// after successful block commit, PurgeByHeight() is still required to remove orphan entries (as
// transaction that gets endorsed may not be submitted by the client for commit)
func (s *store) PurgeByHeight(maxBlockNumToRetain uint64) error {
Expand All @@ -222,18 +224,18 @@ func (s *store) PurgeByHeight(maxBlockNumToRetain uint64) error {
dbBatch := leveldbhelper.NewUpdateBatch()

// Get all txid and uuid from above result and remove it from transient store (both
// read/write set and the corresponding index.
// write set and the corresponding index.
for iter.Next() {
// For each entry, remove the private read-write set and correponding indexes

// Remove private read-write set
// Remove private write set
compositeKeyPurgeIndexByHeight := iter.Key()
txid, uuid, endorsementBlkHt := splitCompositeKeyOfPurgeIndexByHeight(compositeKeyPurgeIndexByHeight)
compositeKeyPvtRWSet := createCompositeKeyForPvtRWSet(txid, uuid, endorsementBlkHt)
txid, uuid, blockHeight := splitCompositeKeyOfPurgeIndexByHeight(compositeKeyPurgeIndexByHeight)
compositeKeyPvtRWSet := createCompositeKeyForPvtRWSet(txid, uuid, blockHeight)
dbBatch.Delete(compositeKeyPvtRWSet)

// Remove purge index -- purgeIndexByTxid
compositeKeyPurgeIndexByTxid := createCompositeKeyForPurgeIndexByTxid(txid, uuid, endorsementBlkHt)
compositeKeyPurgeIndexByTxid := createCompositeKeyForPurgeIndexByTxid(txid, uuid, blockHeight)
dbBatch.Delete(compositeKeyPurgeIndexByTxid)

// Remove purge index -- purgeIndexByHeight
Expand All @@ -244,19 +246,19 @@ func (s *store) PurgeByHeight(maxBlockNumToRetain uint64) error {
return s.db.WriteBatch(dbBatch, true)
}

// GetMinEndorsementBlkHt returns the lowest retained endorsement block height
func (s *store) GetMinEndorsementBlkHt() (uint64, error) {
// GetMinTransientBlkHt returns the lowest block height remaining in transient store
func (s *store) GetMinTransientBlkHt() (uint64, error) {
// Current approach performs a range query on purgeIndex with startKey
// as 0 (i.e., endorsementBlkHt) and returns the first key which denotes
// the lowest retained endorsement block height. An alternative approach
// is to explicitly store the minEndorsementBlkHt in the transientStore.
// as 0 (i.e., blockHeight) and returns the first key which denotes
// the lowest block height remaining in transient store. An alternative approach
// is to explicitly store the minBlockHeight in the transientStore.
startKey := createPurgeIndexByHeightRangeStartKey(0)
iter := s.db.GetIterator(startKey, nil)
// Fetch the minimum endorsement block height
// Fetch the minimum transient block height
if iter.Next() {
dbKey := iter.Key()
_, _, endorsementBlkHt := splitCompositeKeyOfPurgeIndexByHeight(dbKey)
return endorsementBlkHt, nil
_, _, blockHeight := splitCompositeKeyOfPurgeIndexByHeight(dbKey)
return blockHeight, nil
}
iter.Release()
// Returning an error may not be the right thing to do here. May be
Expand All @@ -277,7 +279,7 @@ func (scanner *RwsetScanner) Next() (*EndorserPvtSimulationResults, error) {
}
dbKey := scanner.dbItr.Key()
dbVal := scanner.dbItr.Value()
_, endorsementBlkHt := splitCompositeKeyOfPvtRWSet(dbKey)
_, blockHeight := splitCompositeKeyOfPvtRWSet(dbKey)

txPvtRWSet := &rwset.TxPvtReadWriteSet{}
if err := proto.Unmarshal(dbVal, txPvtRWSet); err != nil {
Expand All @@ -286,8 +288,8 @@ func (scanner *RwsetScanner) Next() (*EndorserPvtSimulationResults, error) {
filteredTxPvtRWSet := pvtdatastorage.TrimPvtWSet(txPvtRWSet, scanner.filter)

return &EndorserPvtSimulationResults{
EndorsementBlockHeight: endorsementBlkHt,
PvtSimulationResults: filteredTxPvtRWSet,
ReceivedAtBlockHeight: blockHeight,
PvtSimulationResults: filteredTxPvtRWSet,
}, nil
}

Expand Down
Loading

0 comments on commit 74906de

Please sign in to comment.