Skip to content

Commit

Permalink
[FAB-7672] Parallelize processing ns update batch
Browse files Browse the repository at this point in the history
For loadCommittedVersions() and ApplyUpdates() in statecouchdb.go,
goroutines are used to process each namespace batches parallely.

Change-Id: Ibb4bab2281fe18932fca2e6430122caf830f6be2
Signed-off-by: senthil <cendhu@gmail.com>
  • Loading branch information
cendhu committed Jan 23, 2018
1 parent 88fd880 commit 11b5013
Showing 1 changed file with 140 additions and 88 deletions.
228 changes: 140 additions & 88 deletions core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ type VersionedDBProvider struct {
type CommittedVersions struct {
committedVersions map[statedb.CompositeKey]*version.Height
revisionNumbers map[statedb.CompositeKey]string
mux sync.RWMutex
// For now, we use one mutex for both versionNo and revisionNo. Having
// two mutex might be a overkill.
}

// NewVersionedDBProvider instantiates VersionedDBProvider
Expand Down Expand Up @@ -271,7 +274,9 @@ func (vdb *VersionedDB) GetCachedVersion(namespace string, key string) (*version
// Retrieve the version from committed data cache.
// Since the cache was populated based on block readsets,
// checks during validation should find the version here
vdb.committedDataCache.mux.RLock()
version, keyFound := vdb.committedDataCache.committedVersions[compositeKey]
vdb.committedDataCache.mux.RUnlock()

if !keyFound {
return nil, false
Expand Down Expand Up @@ -420,89 +425,107 @@ func (vdb *VersionedDB) ExecuteQuery(namespace, query string) (statedb.ResultsIt
func (vdb *VersionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version.Height) error {

// STEP 1: GATHER DOCUMENT REVISION NUMBERS REQUIRED FOR THE COUCHDB BULK UPDATE
// ALSO BUILD PROCESS BATCHES OF UPDATE DOCUMENTS BASED ON THE MAX BATCH SIZE
// ALSO BUILD PROCESS BATCHES OF UPDATE DOCUMENTS PER NAMESPACE BASED ON
// THE MAX BATCH SIZE

// initialize a missing key list
var missingKeys []*statedb.CompositeKey

//initialize a processBatch for updating bulk documents
processBatch := statedb.NewUpdateBatch()
namespaces := batch.GetUpdatedNamespaces()

//get the max size of the batch from core.yaml
maxBatchSize := ledgerconfig.GetMaxBatchUpdateSize()
// Create goroutine wait group.
var processBatchGroup sync.WaitGroup
processBatchGroup.Add(len(namespaces))

//initialize a counter to track the batch size
batchSizeCounter := 0
// Collect error from each goroutine using buffered channel.
errResponses := make(chan error, len(namespaces))

// Iterate through the batch passed in and create process batches
// using the max batch size
namespaces := batch.GetUpdatedNamespaces()
for _, ns := range namespaces {
// TODO: Need to run the following code block in a goroutine so that each chaincode batch
// can be processed and committed parallely.
nsUpdates := batch.GetUpdates(ns)
for k, vv := range nsUpdates {

//increment the batch size counter
batchSizeCounter++

compositeKey := statedb.CompositeKey{Namespace: ns, Key: k}

// Revision numbers are needed for couchdb updates.
// vdb.committedDataCache.revisionNumbers is a cache of revision numbers based on ID
// Document IDs and revision numbers will already be in the cache for read/writes,
// but will be missing in the case of blind writes.
// If the key is missing in the cache, then add the key to missingKeys
_, keyFound := vdb.committedDataCache.revisionNumbers[compositeKey]

if !keyFound {
// Add the key to the missing key list
// As there can be no duplicates in UpdateBatch, no need check for duplicates.
missingKeys = append(missingKeys, &compositeKey)
}
// each namespace batch is processed and committed parallely.
go func(ns string) {
defer processBatchGroup.Done()

//add the record to the process batch
if vv.Value == nil {
processBatch.Delete(ns, k, vv.Version)
} else {
processBatch.Put(ns, k, vv.Value, vv.Version)
}
// initialize a missing key list
var missingKeys []*statedb.CompositeKey

//Check to see if the process batch exceeds the max batch size
if batchSizeCounter >= maxBatchSize {
//initialize a processBatch for updating bulk documents
processBatch := statedb.NewUpdateBatch()

//STEP 2: PROCESS EACH BATCH OF UPDATE DOCUMENTS
//get the max size of the batch from core.yaml
maxBatchSize := ledgerconfig.GetMaxBatchUpdateSize()

err := vdb.processUpdateBatch(processBatch, missingKeys)
if err != nil {
return err
//initialize a counter to track the batch size
batchSizeCounter := 0

nsUpdates := batch.GetUpdates(ns)
for k, vv := range nsUpdates {

//increment the batch size counter
batchSizeCounter++

compositeKey := statedb.CompositeKey{Namespace: ns, Key: k}

// Revision numbers are needed for couchdb updates.
// vdb.committedDataCache.revisionNumbers is a cache of revision numbers based on ID
// Document IDs and revision numbers will already be in the cache for read/writes,
// but will be missing in the case of blind writes.
// If the key is missing in the cache, then add the key to missingKeys
vdb.committedDataCache.mux.RLock()
_, keyFound := vdb.committedDataCache.revisionNumbers[compositeKey]
vdb.committedDataCache.mux.RUnlock()

if !keyFound {
// Add the key to the missing key list
// As there can be no duplicates in UpdateBatch, no need check for duplicates.
missingKeys = append(missingKeys, &compositeKey)
}

//reset the batch size counter
batchSizeCounter = 0
//add the record to the process batch
if vv.Value == nil {
processBatch.Delete(ns, k, vv.Version)
} else {
processBatch.Put(ns, k, vv.Value, vv.Version)
}

//create a new process batch
processBatch = statedb.NewUpdateBatch()
//Check to see if the process batch exceeds the max batch size
if batchSizeCounter >= maxBatchSize {

// reset the missing key list
missingKeys = []*statedb.CompositeKey{}
}
}
//STEP 2: PROCESS EACH BATCH OF UPDATE DOCUMENTS

//STEP 3: PROCESS ANY REMAINING DOCUMENTS
err := vdb.processUpdateBatch(processBatch, missingKeys)
if err != nil {
return err
}
err := vdb.processUpdateBatch(processBatch, missingKeys)
if err != nil {
errResponses <- err
return
}

//reset the batch size counter
batchSizeCounter = 0

//create a new process batch
processBatch = statedb.NewUpdateBatch()

//reset the batch size counter
batchSizeCounter = 0
// reset the missing key list
missingKeys = []*statedb.CompositeKey{}
}
}

//create a new process batch
processBatch = statedb.NewUpdateBatch()
//STEP 3: PROCESS ANY REMAINING DOCUMENTS
err := vdb.processUpdateBatch(processBatch, missingKeys)
if err != nil {
errResponses <- err
return
}
}(ns)
}

// reset the missing key list
missingKeys = []*statedb.CompositeKey{}
// Wait for all goroutines to complete
processBatchGroup.Wait()

// Check if any goroutine resulted in error.
// We can stop all goroutine as soon as any goutine resulted in error rather than
// waiting for all goroutines to complete. As errors are very rare, current sub-optimal
// approach (allowing each subroutine to complete) is adequate for now.
// TODO: Currently, we are returing only one error. We need to create a new error type
// that can encapsulate all the errors and return that type
if len(errResponses) > 0 {
return <-errResponses
}

// Record a savepoint at a given height
Expand Down Expand Up @@ -552,7 +575,9 @@ func (vdb *VersionedDB) processUpdateBatch(updateBatch *statedb.UpdateBatch, mis
// retrieve the couchdb revision from the cache
// Documents that do not exist in couchdb will not have revision numbers and will
// exist in the cache with a revision value of nil
vdb.committedDataCache.mux.RLock()
revision := vdb.committedDataCache.revisionNumbers[statedb.CompositeKey{Namespace: ns, Key: key}]
vdb.committedDataCache.mux.RUnlock()

var isDelete bool // initialized to false
if vv.Value == nil {
Expand Down Expand Up @@ -592,7 +617,6 @@ func (vdb *VersionedDB) processUpdateBatch(updateBatch *statedb.UpdateBatch, mis

}

//TODO: Run the following in a goroutine so that commit on each namespaceDB can happen parallely
if len(batchUpdateMap) > 0 {

//Add the documents to the batch update array
Expand Down Expand Up @@ -699,6 +723,7 @@ func (vdb *VersionedDB) LoadCommittedVersions(keys []*statedb.CompositeKey) erro

missingKeys := make(map[string][]string) // for each namespace/chaincode, store the missingKeys

vdb.committedDataCache.mux.Lock()
for _, key := range keys {

logger.Debugf("Load into version cache: %s~%s", key.Key, key.Namespace)
Expand Down Expand Up @@ -726,44 +751,69 @@ func (vdb *VersionedDB) LoadCommittedVersions(keys []*statedb.CompositeKey) erro
}

}
vdb.committedDataCache.mux.Unlock()

//get the max size of the batch from core.yaml
maxBatchSize := ledgerconfig.GetMaxBatchUpdateSize()

// Call the batch retrieve if there is one or more keys to retrieve
if len(missingKeys) > 0 {

// Create goroutine wait group.
var batchRetrieveGroup sync.WaitGroup
batchRetrieveGroup.Add(len(missingKeys))

// Collect error from each goroutine using buffered channel.
errResponses := make(chan error, len(missingKeys))

// For each namespace, we parallely load missing keys into the cache using goroutines
for namespace := range missingKeys {
// TODO: For each namespace, we need to parallely load missing keys into the cache
// The following codes need be moved to goroutine and introduce RWlock for cache.
go func(namespace string) {
defer batchRetrieveGroup.Done()

// Initialize the array of keys to be retrieved
keysToRetrieve := []string{}
// Initialize the array of keys to be retrieved
keysToRetrieve := []string{}

// Iterate through the missingKeys and build a batch of keys for batch retrieval
for _, key := range missingKeys[namespace] {
// Iterate through the missingKeys and build a batch of keys for batch retrieval
for _, key := range missingKeys[namespace] {

keysToRetrieve = append(keysToRetrieve, key)
keysToRetrieve = append(keysToRetrieve, key)

// check to see if the number of keys is greater than the max batch size
if len(keysToRetrieve) >= maxBatchSize {
// check to see if the number of keys is greater than the max batch size
if len(keysToRetrieve) >= maxBatchSize {
err := vdb.batchRetrieveMetaData(namespace, keysToRetrieve)
if err != nil {
errResponses <- err
return
}
// reset the array
keysToRetrieve = []string{}
}

}

// If there are any remaining, retrieve the final batch
if len(keysToRetrieve) > 0 {
err := vdb.batchRetrieveMetaData(namespace, keysToRetrieve)
if err != nil {
return err
errResponses <- err
return
}
// reset the array
keysToRetrieve = []string{}
}
}(namespace)
}

}

// If there are any remaining, retrieve the final batch
if len(keysToRetrieve) > 0 {
err := vdb.batchRetrieveMetaData(namespace, keysToRetrieve)
if err != nil {
return err
}
}
// Wait for all goroutines to complete
batchRetrieveGroup.Wait()

// Check if any goroutine resulted in error.
// We can stop all goroutine as soon as any goutine resulted in error rather than
// waiting for all goroutines to complete. As errors are very rare, current sub-optimal
// approach (allowing each subroutine to complete) is adequate for now.
// TODO: Currently, we are returing only one error. We need to create a new error type
// that can encapsulate all the errors and return that type.
if len(errResponses) > 0 {
return <-errResponses
}

}
Expand Down Expand Up @@ -794,8 +844,10 @@ func (vdb *VersionedDB) batchRetrieveMetaData(namespace string, keys []string) e
if len(documentMetadata.Version) != 0 {
compositeKey := statedb.CompositeKey{Namespace: namespace, Key: documentMetadata.ID}

vdb.committedDataCache.mux.Lock()
versionMap[compositeKey] = createVersionHeightFromVersionString(documentMetadata.Version)
revMap[compositeKey] = documentMetadata.Rev
vdb.committedDataCache.mux.Unlock()
}
}

Expand Down

0 comments on commit 11b5013

Please sign in to comment.