Skip to content

Commit

Permalink
Merge "[FAB-7672] Parallelize processing ns update batch"
Browse files Browse the repository at this point in the history
  • Loading branch information
denyeart authored and Gerrit Code Review committed Jan 31, 2018
2 parents 58d8045 + 11b5013 commit 0c3673e
Showing 1 changed file with 140 additions and 88 deletions.
228 changes: 140 additions & 88 deletions core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ type VersionedDBProvider struct {
type CommittedVersions struct {
committedVersions map[statedb.CompositeKey]*version.Height
revisionNumbers map[statedb.CompositeKey]string
mux sync.RWMutex
// For now, we use one mutex for both versionNo and revisionNo. Having
// two mutex might be a overkill.
}

// NewVersionedDBProvider instantiates VersionedDBProvider
Expand Down Expand Up @@ -271,7 +274,9 @@ func (vdb *VersionedDB) GetCachedVersion(namespace string, key string) (*version
// Retrieve the version from committed data cache.
// Since the cache was populated based on block readsets,
// checks during validation should find the version here
vdb.committedDataCache.mux.RLock()
version, keyFound := vdb.committedDataCache.committedVersions[compositeKey]
vdb.committedDataCache.mux.RUnlock()

if !keyFound {
return nil, false
Expand Down Expand Up @@ -420,89 +425,107 @@ func (vdb *VersionedDB) ExecuteQuery(namespace, query string) (statedb.ResultsIt
func (vdb *VersionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version.Height) error {

// STEP 1: GATHER DOCUMENT REVISION NUMBERS REQUIRED FOR THE COUCHDB BULK UPDATE
// ALSO BUILD PROCESS BATCHES OF UPDATE DOCUMENTS BASED ON THE MAX BATCH SIZE
// ALSO BUILD PROCESS BATCHES OF UPDATE DOCUMENTS PER NAMESPACE BASED ON
// THE MAX BATCH SIZE

// initialize a missing key list
var missingKeys []*statedb.CompositeKey

//initialize a processBatch for updating bulk documents
processBatch := statedb.NewUpdateBatch()
namespaces := batch.GetUpdatedNamespaces()

//get the max size of the batch from core.yaml
maxBatchSize := ledgerconfig.GetMaxBatchUpdateSize()
// Create goroutine wait group.
var processBatchGroup sync.WaitGroup
processBatchGroup.Add(len(namespaces))

//initialize a counter to track the batch size
batchSizeCounter := 0
// Collect error from each goroutine using buffered channel.
errResponses := make(chan error, len(namespaces))

// Iterate through the batch passed in and create process batches
// using the max batch size
namespaces := batch.GetUpdatedNamespaces()
for _, ns := range namespaces {
// TODO: Need to run the following code block in a goroutine so that each chaincode batch
// can be processed and committed parallely.
nsUpdates := batch.GetUpdates(ns)
for k, vv := range nsUpdates {

//increment the batch size counter
batchSizeCounter++

compositeKey := statedb.CompositeKey{Namespace: ns, Key: k}

// Revision numbers are needed for couchdb updates.
// vdb.committedDataCache.revisionNumbers is a cache of revision numbers based on ID
// Document IDs and revision numbers will already be in the cache for read/writes,
// but will be missing in the case of blind writes.
// If the key is missing in the cache, then add the key to missingKeys
_, keyFound := vdb.committedDataCache.revisionNumbers[compositeKey]

if !keyFound {
// Add the key to the missing key list
// As there can be no duplicates in UpdateBatch, no need check for duplicates.
missingKeys = append(missingKeys, &compositeKey)
}
// each namespace batch is processed and committed parallely.
go func(ns string) {
defer processBatchGroup.Done()

//add the record to the process batch
if vv.Value == nil {
processBatch.Delete(ns, k, vv.Version)
} else {
processBatch.Put(ns, k, vv.Value, vv.Version)
}
// initialize a missing key list
var missingKeys []*statedb.CompositeKey

//Check to see if the process batch exceeds the max batch size
if batchSizeCounter >= maxBatchSize {
//initialize a processBatch for updating bulk documents
processBatch := statedb.NewUpdateBatch()

//STEP 2: PROCESS EACH BATCH OF UPDATE DOCUMENTS
//get the max size of the batch from core.yaml
maxBatchSize := ledgerconfig.GetMaxBatchUpdateSize()

err := vdb.processUpdateBatch(processBatch, missingKeys)
if err != nil {
return err
//initialize a counter to track the batch size
batchSizeCounter := 0

nsUpdates := batch.GetUpdates(ns)
for k, vv := range nsUpdates {

//increment the batch size counter
batchSizeCounter++

compositeKey := statedb.CompositeKey{Namespace: ns, Key: k}

// Revision numbers are needed for couchdb updates.
// vdb.committedDataCache.revisionNumbers is a cache of revision numbers based on ID
// Document IDs and revision numbers will already be in the cache for read/writes,
// but will be missing in the case of blind writes.
// If the key is missing in the cache, then add the key to missingKeys
vdb.committedDataCache.mux.RLock()
_, keyFound := vdb.committedDataCache.revisionNumbers[compositeKey]
vdb.committedDataCache.mux.RUnlock()

if !keyFound {
// Add the key to the missing key list
// As there can be no duplicates in UpdateBatch, no need check for duplicates.
missingKeys = append(missingKeys, &compositeKey)
}

//reset the batch size counter
batchSizeCounter = 0
//add the record to the process batch
if vv.Value == nil {
processBatch.Delete(ns, k, vv.Version)
} else {
processBatch.Put(ns, k, vv.Value, vv.Version)
}

//create a new process batch
processBatch = statedb.NewUpdateBatch()
//Check to see if the process batch exceeds the max batch size
if batchSizeCounter >= maxBatchSize {

// reset the missing key list
missingKeys = []*statedb.CompositeKey{}
}
}
//STEP 2: PROCESS EACH BATCH OF UPDATE DOCUMENTS

//STEP 3: PROCESS ANY REMAINING DOCUMENTS
err := vdb.processUpdateBatch(processBatch, missingKeys)
if err != nil {
return err
}
err := vdb.processUpdateBatch(processBatch, missingKeys)
if err != nil {
errResponses <- err
return
}

//reset the batch size counter
batchSizeCounter = 0

//create a new process batch
processBatch = statedb.NewUpdateBatch()

//reset the batch size counter
batchSizeCounter = 0
// reset the missing key list
missingKeys = []*statedb.CompositeKey{}
}
}

//create a new process batch
processBatch = statedb.NewUpdateBatch()
//STEP 3: PROCESS ANY REMAINING DOCUMENTS
err := vdb.processUpdateBatch(processBatch, missingKeys)
if err != nil {
errResponses <- err
return
}
}(ns)
}

// reset the missing key list
missingKeys = []*statedb.CompositeKey{}
// Wait for all goroutines to complete
processBatchGroup.Wait()

// Check if any goroutine resulted in error.
// We can stop all goroutine as soon as any goutine resulted in error rather than
// waiting for all goroutines to complete. As errors are very rare, current sub-optimal
// approach (allowing each subroutine to complete) is adequate for now.
// TODO: Currently, we are returing only one error. We need to create a new error type
// that can encapsulate all the errors and return that type
if len(errResponses) > 0 {
return <-errResponses
}

// Record a savepoint at a given height
Expand Down Expand Up @@ -552,7 +575,9 @@ func (vdb *VersionedDB) processUpdateBatch(updateBatch *statedb.UpdateBatch, mis
// retrieve the couchdb revision from the cache
// Documents that do not exist in couchdb will not have revision numbers and will
// exist in the cache with a revision value of nil
vdb.committedDataCache.mux.RLock()
revision := vdb.committedDataCache.revisionNumbers[statedb.CompositeKey{Namespace: ns, Key: key}]
vdb.committedDataCache.mux.RUnlock()

var isDelete bool // initialized to false
if vv.Value == nil {
Expand Down Expand Up @@ -592,7 +617,6 @@ func (vdb *VersionedDB) processUpdateBatch(updateBatch *statedb.UpdateBatch, mis

}

//TODO: Run the following in a goroutine so that commit on each namespaceDB can happen parallely
if len(batchUpdateMap) > 0 {

//Add the documents to the batch update array
Expand Down Expand Up @@ -699,6 +723,7 @@ func (vdb *VersionedDB) LoadCommittedVersions(keys []*statedb.CompositeKey) erro

missingKeys := make(map[string][]string) // for each namespace/chaincode, store the missingKeys

vdb.committedDataCache.mux.Lock()
for _, key := range keys {

logger.Debugf("Load into version cache: %s~%s", key.Key, key.Namespace)
Expand Down Expand Up @@ -726,44 +751,69 @@ func (vdb *VersionedDB) LoadCommittedVersions(keys []*statedb.CompositeKey) erro
}

}
vdb.committedDataCache.mux.Unlock()

//get the max size of the batch from core.yaml
maxBatchSize := ledgerconfig.GetMaxBatchUpdateSize()

// Call the batch retrieve if there is one or more keys to retrieve
if len(missingKeys) > 0 {

// Create goroutine wait group.
var batchRetrieveGroup sync.WaitGroup
batchRetrieveGroup.Add(len(missingKeys))

// Collect error from each goroutine using buffered channel.
errResponses := make(chan error, len(missingKeys))

// For each namespace, we parallely load missing keys into the cache using goroutines
for namespace := range missingKeys {
// TODO: For each namespace, we need to parallely load missing keys into the cache
// The following codes need be moved to goroutine and introduce RWlock for cache.
go func(namespace string) {
defer batchRetrieveGroup.Done()

// Initialize the array of keys to be retrieved
keysToRetrieve := []string{}
// Initialize the array of keys to be retrieved
keysToRetrieve := []string{}

// Iterate through the missingKeys and build a batch of keys for batch retrieval
for _, key := range missingKeys[namespace] {
// Iterate through the missingKeys and build a batch of keys for batch retrieval
for _, key := range missingKeys[namespace] {

keysToRetrieve = append(keysToRetrieve, key)
keysToRetrieve = append(keysToRetrieve, key)

// check to see if the number of keys is greater than the max batch size
if len(keysToRetrieve) >= maxBatchSize {
// check to see if the number of keys is greater than the max batch size
if len(keysToRetrieve) >= maxBatchSize {
err := vdb.batchRetrieveMetaData(namespace, keysToRetrieve)
if err != nil {
errResponses <- err
return
}
// reset the array
keysToRetrieve = []string{}
}

}

// If there are any remaining, retrieve the final batch
if len(keysToRetrieve) > 0 {
err := vdb.batchRetrieveMetaData(namespace, keysToRetrieve)
if err != nil {
return err
errResponses <- err
return
}
// reset the array
keysToRetrieve = []string{}
}
}(namespace)
}

}

// If there are any remaining, retrieve the final batch
if len(keysToRetrieve) > 0 {
err := vdb.batchRetrieveMetaData(namespace, keysToRetrieve)
if err != nil {
return err
}
}
// Wait for all goroutines to complete
batchRetrieveGroup.Wait()

// Check if any goroutine resulted in error.
// We can stop all goroutine as soon as any goutine resulted in error rather than
// waiting for all goroutines to complete. As errors are very rare, current sub-optimal
// approach (allowing each subroutine to complete) is adequate for now.
// TODO: Currently, we are returing only one error. We need to create a new error type
// that can encapsulate all the errors and return that type.
if len(errResponses) > 0 {
return <-errResponses
}

}
Expand Down Expand Up @@ -794,8 +844,10 @@ func (vdb *VersionedDB) batchRetrieveMetaData(namespace string, keys []string) e
if len(documentMetadata.Version) != 0 {
compositeKey := statedb.CompositeKey{Namespace: namespace, Key: documentMetadata.ID}

vdb.committedDataCache.mux.Lock()
versionMap[compositeKey] = createVersionHeightFromVersionString(documentMetadata.Version)
revMap[compositeKey] = documentMetadata.Rev
vdb.committedDataCache.mux.Unlock()
}
}

Expand Down

0 comments on commit 0c3673e

Please sign in to comment.