Skip to content

Commit 11b5013

Browse files
committed
[FAB-7672] Parallelize processing ns update batch
For loadCommittedVersions() and ApplyUpdates() in statecouchdb.go, goroutines are used to process each namespace batches parallely. Change-Id: Ibb4bab2281fe18932fca2e6430122caf830f6be2 Signed-off-by: senthil <cendhu@gmail.com>
1 parent 88fd880 commit 11b5013

File tree

1 file changed

+140
-88
lines changed

1 file changed

+140
-88
lines changed

core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go

Lines changed: 140 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ type VersionedDBProvider struct {
5757
type CommittedVersions struct {
5858
committedVersions map[statedb.CompositeKey]*version.Height
5959
revisionNumbers map[statedb.CompositeKey]string
60+
mux sync.RWMutex
61+
// For now, we use one mutex for both versionNo and revisionNo. Having
62+
// two mutex might be a overkill.
6063
}
6164

6265
// NewVersionedDBProvider instantiates VersionedDBProvider
@@ -271,7 +274,9 @@ func (vdb *VersionedDB) GetCachedVersion(namespace string, key string) (*version
271274
// Retrieve the version from committed data cache.
272275
// Since the cache was populated based on block readsets,
273276
// checks during validation should find the version here
277+
vdb.committedDataCache.mux.RLock()
274278
version, keyFound := vdb.committedDataCache.committedVersions[compositeKey]
279+
vdb.committedDataCache.mux.RUnlock()
275280

276281
if !keyFound {
277282
return nil, false
@@ -420,89 +425,107 @@ func (vdb *VersionedDB) ExecuteQuery(namespace, query string) (statedb.ResultsIt
420425
func (vdb *VersionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version.Height) error {
421426

422427
// STEP 1: GATHER DOCUMENT REVISION NUMBERS REQUIRED FOR THE COUCHDB BULK UPDATE
423-
// ALSO BUILD PROCESS BATCHES OF UPDATE DOCUMENTS BASED ON THE MAX BATCH SIZE
428+
// ALSO BUILD PROCESS BATCHES OF UPDATE DOCUMENTS PER NAMESPACE BASED ON
429+
// THE MAX BATCH SIZE
424430

425-
// initialize a missing key list
426-
var missingKeys []*statedb.CompositeKey
427-
428-
//initialize a processBatch for updating bulk documents
429-
processBatch := statedb.NewUpdateBatch()
431+
namespaces := batch.GetUpdatedNamespaces()
430432

431-
//get the max size of the batch from core.yaml
432-
maxBatchSize := ledgerconfig.GetMaxBatchUpdateSize()
433+
// Create goroutine wait group.
434+
var processBatchGroup sync.WaitGroup
435+
processBatchGroup.Add(len(namespaces))
433436

434-
//initialize a counter to track the batch size
435-
batchSizeCounter := 0
437+
// Collect error from each goroutine using buffered channel.
438+
errResponses := make(chan error, len(namespaces))
436439

437-
// Iterate through the batch passed in and create process batches
438-
// using the max batch size
439-
namespaces := batch.GetUpdatedNamespaces()
440440
for _, ns := range namespaces {
441-
// TODO: Need to run the following code block in a goroutine so that each chaincode batch
442-
// can be processed and committed parallely.
443-
nsUpdates := batch.GetUpdates(ns)
444-
for k, vv := range nsUpdates {
445-
446-
//increment the batch size counter
447-
batchSizeCounter++
448-
449-
compositeKey := statedb.CompositeKey{Namespace: ns, Key: k}
450-
451-
// Revision numbers are needed for couchdb updates.
452-
// vdb.committedDataCache.revisionNumbers is a cache of revision numbers based on ID
453-
// Document IDs and revision numbers will already be in the cache for read/writes,
454-
// but will be missing in the case of blind writes.
455-
// If the key is missing in the cache, then add the key to missingKeys
456-
_, keyFound := vdb.committedDataCache.revisionNumbers[compositeKey]
457-
458-
if !keyFound {
459-
// Add the key to the missing key list
460-
// As there can be no duplicates in UpdateBatch, no need check for duplicates.
461-
missingKeys = append(missingKeys, &compositeKey)
462-
}
441+
// each namespace batch is processed and committed parallely.
442+
go func(ns string) {
443+
defer processBatchGroup.Done()
463444

464-
//add the record to the process batch
465-
if vv.Value == nil {
466-
processBatch.Delete(ns, k, vv.Version)
467-
} else {
468-
processBatch.Put(ns, k, vv.Value, vv.Version)
469-
}
445+
// initialize a missing key list
446+
var missingKeys []*statedb.CompositeKey
470447

471-
//Check to see if the process batch exceeds the max batch size
472-
if batchSizeCounter >= maxBatchSize {
448+
//initialize a processBatch for updating bulk documents
449+
processBatch := statedb.NewUpdateBatch()
473450

474-
//STEP 2: PROCESS EACH BATCH OF UPDATE DOCUMENTS
451+
//get the max size of the batch from core.yaml
452+
maxBatchSize := ledgerconfig.GetMaxBatchUpdateSize()
475453

476-
err := vdb.processUpdateBatch(processBatch, missingKeys)
477-
if err != nil {
478-
return err
454+
//initialize a counter to track the batch size
455+
batchSizeCounter := 0
456+
457+
nsUpdates := batch.GetUpdates(ns)
458+
for k, vv := range nsUpdates {
459+
460+
//increment the batch size counter
461+
batchSizeCounter++
462+
463+
compositeKey := statedb.CompositeKey{Namespace: ns, Key: k}
464+
465+
// Revision numbers are needed for couchdb updates.
466+
// vdb.committedDataCache.revisionNumbers is a cache of revision numbers based on ID
467+
// Document IDs and revision numbers will already be in the cache for read/writes,
468+
// but will be missing in the case of blind writes.
469+
// If the key is missing in the cache, then add the key to missingKeys
470+
vdb.committedDataCache.mux.RLock()
471+
_, keyFound := vdb.committedDataCache.revisionNumbers[compositeKey]
472+
vdb.committedDataCache.mux.RUnlock()
473+
474+
if !keyFound {
475+
// Add the key to the missing key list
476+
// As there can be no duplicates in UpdateBatch, no need check for duplicates.
477+
missingKeys = append(missingKeys, &compositeKey)
479478
}
480479

481-
//reset the batch size counter
482-
batchSizeCounter = 0
480+
//add the record to the process batch
481+
if vv.Value == nil {
482+
processBatch.Delete(ns, k, vv.Version)
483+
} else {
484+
processBatch.Put(ns, k, vv.Value, vv.Version)
485+
}
483486

484-
//create a new process batch
485-
processBatch = statedb.NewUpdateBatch()
487+
//Check to see if the process batch exceeds the max batch size
488+
if batchSizeCounter >= maxBatchSize {
486489

487-
// reset the missing key list
488-
missingKeys = []*statedb.CompositeKey{}
489-
}
490-
}
490+
//STEP 2: PROCESS EACH BATCH OF UPDATE DOCUMENTS
491491

492-
//STEP 3: PROCESS ANY REMAINING DOCUMENTS
493-
err := vdb.processUpdateBatch(processBatch, missingKeys)
494-
if err != nil {
495-
return err
496-
}
492+
err := vdb.processUpdateBatch(processBatch, missingKeys)
493+
if err != nil {
494+
errResponses <- err
495+
return
496+
}
497+
498+
//reset the batch size counter
499+
batchSizeCounter = 0
500+
501+
//create a new process batch
502+
processBatch = statedb.NewUpdateBatch()
497503

498-
//reset the batch size counter
499-
batchSizeCounter = 0
504+
// reset the missing key list
505+
missingKeys = []*statedb.CompositeKey{}
506+
}
507+
}
500508

501-
//create a new process batch
502-
processBatch = statedb.NewUpdateBatch()
509+
//STEP 3: PROCESS ANY REMAINING DOCUMENTS
510+
err := vdb.processUpdateBatch(processBatch, missingKeys)
511+
if err != nil {
512+
errResponses <- err
513+
return
514+
}
515+
}(ns)
516+
}
503517

504-
// reset the missing key list
505-
missingKeys = []*statedb.CompositeKey{}
518+
// Wait for all goroutines to complete
519+
processBatchGroup.Wait()
520+
521+
// Check if any goroutine resulted in error.
522+
// We can stop all goroutine as soon as any goutine resulted in error rather than
523+
// waiting for all goroutines to complete. As errors are very rare, current sub-optimal
524+
// approach (allowing each subroutine to complete) is adequate for now.
525+
// TODO: Currently, we are returing only one error. We need to create a new error type
526+
// that can encapsulate all the errors and return that type
527+
if len(errResponses) > 0 {
528+
return <-errResponses
506529
}
507530

508531
// Record a savepoint at a given height
@@ -552,7 +575,9 @@ func (vdb *VersionedDB) processUpdateBatch(updateBatch *statedb.UpdateBatch, mis
552575
// retrieve the couchdb revision from the cache
553576
// Documents that do not exist in couchdb will not have revision numbers and will
554577
// exist in the cache with a revision value of nil
578+
vdb.committedDataCache.mux.RLock()
555579
revision := vdb.committedDataCache.revisionNumbers[statedb.CompositeKey{Namespace: ns, Key: key}]
580+
vdb.committedDataCache.mux.RUnlock()
556581

557582
var isDelete bool // initialized to false
558583
if vv.Value == nil {
@@ -592,7 +617,6 @@ func (vdb *VersionedDB) processUpdateBatch(updateBatch *statedb.UpdateBatch, mis
592617

593618
}
594619

595-
//TODO: Run the following in a goroutine so that commit on each namespaceDB can happen parallely
596620
if len(batchUpdateMap) > 0 {
597621

598622
//Add the documents to the batch update array
@@ -699,6 +723,7 @@ func (vdb *VersionedDB) LoadCommittedVersions(keys []*statedb.CompositeKey) erro
699723

700724
missingKeys := make(map[string][]string) // for each namespace/chaincode, store the missingKeys
701725

726+
vdb.committedDataCache.mux.Lock()
702727
for _, key := range keys {
703728

704729
logger.Debugf("Load into version cache: %s~%s", key.Key, key.Namespace)
@@ -726,44 +751,69 @@ func (vdb *VersionedDB) LoadCommittedVersions(keys []*statedb.CompositeKey) erro
726751
}
727752

728753
}
754+
vdb.committedDataCache.mux.Unlock()
729755

730756
//get the max size of the batch from core.yaml
731757
maxBatchSize := ledgerconfig.GetMaxBatchUpdateSize()
732758

733759
// Call the batch retrieve if there is one or more keys to retrieve
734760
if len(missingKeys) > 0 {
735761

762+
// Create goroutine wait group.
763+
var batchRetrieveGroup sync.WaitGroup
764+
batchRetrieveGroup.Add(len(missingKeys))
765+
766+
// Collect error from each goroutine using buffered channel.
767+
errResponses := make(chan error, len(missingKeys))
768+
769+
// For each namespace, we parallely load missing keys into the cache using goroutines
736770
for namespace := range missingKeys {
737-
// TODO: For each namespace, we need to parallely load missing keys into the cache
738-
// The following codes need be moved to goroutine and introduce RWlock for cache.
771+
go func(namespace string) {
772+
defer batchRetrieveGroup.Done()
739773

740-
// Initialize the array of keys to be retrieved
741-
keysToRetrieve := []string{}
774+
// Initialize the array of keys to be retrieved
775+
keysToRetrieve := []string{}
742776

743-
// Iterate through the missingKeys and build a batch of keys for batch retrieval
744-
for _, key := range missingKeys[namespace] {
777+
// Iterate through the missingKeys and build a batch of keys for batch retrieval
778+
for _, key := range missingKeys[namespace] {
745779

746-
keysToRetrieve = append(keysToRetrieve, key)
780+
keysToRetrieve = append(keysToRetrieve, key)
747781

748-
// check to see if the number of keys is greater than the max batch size
749-
if len(keysToRetrieve) >= maxBatchSize {
782+
// check to see if the number of keys is greater than the max batch size
783+
if len(keysToRetrieve) >= maxBatchSize {
784+
err := vdb.batchRetrieveMetaData(namespace, keysToRetrieve)
785+
if err != nil {
786+
errResponses <- err
787+
return
788+
}
789+
// reset the array
790+
keysToRetrieve = []string{}
791+
}
792+
793+
}
794+
795+
// If there are any remaining, retrieve the final batch
796+
if len(keysToRetrieve) > 0 {
750797
err := vdb.batchRetrieveMetaData(namespace, keysToRetrieve)
751798
if err != nil {
752-
return err
799+
errResponses <- err
800+
return
753801
}
754-
// reset the array
755-
keysToRetrieve = []string{}
756802
}
803+
}(namespace)
804+
}
757805

758-
}
759-
760-
// If there are any remaining, retrieve the final batch
761-
if len(keysToRetrieve) > 0 {
762-
err := vdb.batchRetrieveMetaData(namespace, keysToRetrieve)
763-
if err != nil {
764-
return err
765-
}
766-
}
806+
// Wait for all goroutines to complete
807+
batchRetrieveGroup.Wait()
808+
809+
// Check if any goroutine resulted in error.
810+
// We can stop all goroutine as soon as any goutine resulted in error rather than
811+
// waiting for all goroutines to complete. As errors are very rare, current sub-optimal
812+
// approach (allowing each subroutine to complete) is adequate for now.
813+
// TODO: Currently, we are returing only one error. We need to create a new error type
814+
// that can encapsulate all the errors and return that type.
815+
if len(errResponses) > 0 {
816+
return <-errResponses
767817
}
768818

769819
}
@@ -794,8 +844,10 @@ func (vdb *VersionedDB) batchRetrieveMetaData(namespace string, keys []string) e
794844
if len(documentMetadata.Version) != 0 {
795845
compositeKey := statedb.CompositeKey{Namespace: namespace, Key: documentMetadata.ID}
796846

847+
vdb.committedDataCache.mux.Lock()
797848
versionMap[compositeKey] = createVersionHeightFromVersionString(documentMetadata.Version)
798849
revMap[compositeKey] = documentMetadata.Rev
850+
vdb.committedDataCache.mux.Unlock()
799851
}
800852
}
801853

0 commit comments

Comments
 (0)