diff --git a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go index e35851ad2ac..738c9690f70 100644 --- a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go +++ b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go @@ -57,6 +57,9 @@ type VersionedDBProvider struct { type CommittedVersions struct { committedVersions map[statedb.CompositeKey]*version.Height revisionNumbers map[statedb.CompositeKey]string + mux sync.RWMutex + // For now, we use one mutex for both versionNo and revisionNo. Having + // two mutex might be a overkill. } // NewVersionedDBProvider instantiates VersionedDBProvider @@ -271,7 +274,9 @@ func (vdb *VersionedDB) GetCachedVersion(namespace string, key string) (*version // Retrieve the version from committed data cache. // Since the cache was populated based on block readsets, // checks during validation should find the version here + vdb.committedDataCache.mux.RLock() version, keyFound := vdb.committedDataCache.committedVersions[compositeKey] + vdb.committedDataCache.mux.RUnlock() if !keyFound { return nil, false @@ -420,89 +425,107 @@ func (vdb *VersionedDB) ExecuteQuery(namespace, query string) (statedb.ResultsIt func (vdb *VersionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version.Height) error { // STEP 1: GATHER DOCUMENT REVISION NUMBERS REQUIRED FOR THE COUCHDB BULK UPDATE - // ALSO BUILD PROCESS BATCHES OF UPDATE DOCUMENTS BASED ON THE MAX BATCH SIZE + // ALSO BUILD PROCESS BATCHES OF UPDATE DOCUMENTS PER NAMESPACE BASED ON + // THE MAX BATCH SIZE - // initialize a missing key list - var missingKeys []*statedb.CompositeKey - - //initialize a processBatch for updating bulk documents - processBatch := statedb.NewUpdateBatch() + namespaces := batch.GetUpdatedNamespaces() - //get the max size of the batch from core.yaml - maxBatchSize := ledgerconfig.GetMaxBatchUpdateSize() + // Create goroutine wait group. + var processBatchGroup sync.WaitGroup + processBatchGroup.Add(len(namespaces)) - //initialize a counter to track the batch size - batchSizeCounter := 0 + // Collect error from each goroutine using buffered channel. + errResponses := make(chan error, len(namespaces)) - // Iterate through the batch passed in and create process batches - // using the max batch size - namespaces := batch.GetUpdatedNamespaces() for _, ns := range namespaces { - // TODO: Need to run the following code block in a goroutine so that each chaincode batch - // can be processed and committed parallely. - nsUpdates := batch.GetUpdates(ns) - for k, vv := range nsUpdates { - - //increment the batch size counter - batchSizeCounter++ - - compositeKey := statedb.CompositeKey{Namespace: ns, Key: k} - - // Revision numbers are needed for couchdb updates. - // vdb.committedDataCache.revisionNumbers is a cache of revision numbers based on ID - // Document IDs and revision numbers will already be in the cache for read/writes, - // but will be missing in the case of blind writes. - // If the key is missing in the cache, then add the key to missingKeys - _, keyFound := vdb.committedDataCache.revisionNumbers[compositeKey] - - if !keyFound { - // Add the key to the missing key list - // As there can be no duplicates in UpdateBatch, no need check for duplicates. - missingKeys = append(missingKeys, &compositeKey) - } + // each namespace batch is processed and committed parallely. + go func(ns string) { + defer processBatchGroup.Done() - //add the record to the process batch - if vv.Value == nil { - processBatch.Delete(ns, k, vv.Version) - } else { - processBatch.Put(ns, k, vv.Value, vv.Version) - } + // initialize a missing key list + var missingKeys []*statedb.CompositeKey - //Check to see if the process batch exceeds the max batch size - if batchSizeCounter >= maxBatchSize { + //initialize a processBatch for updating bulk documents + processBatch := statedb.NewUpdateBatch() - //STEP 2: PROCESS EACH BATCH OF UPDATE DOCUMENTS + //get the max size of the batch from core.yaml + maxBatchSize := ledgerconfig.GetMaxBatchUpdateSize() - err := vdb.processUpdateBatch(processBatch, missingKeys) - if err != nil { - return err + //initialize a counter to track the batch size + batchSizeCounter := 0 + + nsUpdates := batch.GetUpdates(ns) + for k, vv := range nsUpdates { + + //increment the batch size counter + batchSizeCounter++ + + compositeKey := statedb.CompositeKey{Namespace: ns, Key: k} + + // Revision numbers are needed for couchdb updates. + // vdb.committedDataCache.revisionNumbers is a cache of revision numbers based on ID + // Document IDs and revision numbers will already be in the cache for read/writes, + // but will be missing in the case of blind writes. + // If the key is missing in the cache, then add the key to missingKeys + vdb.committedDataCache.mux.RLock() + _, keyFound := vdb.committedDataCache.revisionNumbers[compositeKey] + vdb.committedDataCache.mux.RUnlock() + + if !keyFound { + // Add the key to the missing key list + // As there can be no duplicates in UpdateBatch, no need check for duplicates. + missingKeys = append(missingKeys, &compositeKey) } - //reset the batch size counter - batchSizeCounter = 0 + //add the record to the process batch + if vv.Value == nil { + processBatch.Delete(ns, k, vv.Version) + } else { + processBatch.Put(ns, k, vv.Value, vv.Version) + } - //create a new process batch - processBatch = statedb.NewUpdateBatch() + //Check to see if the process batch exceeds the max batch size + if batchSizeCounter >= maxBatchSize { - // reset the missing key list - missingKeys = []*statedb.CompositeKey{} - } - } + //STEP 2: PROCESS EACH BATCH OF UPDATE DOCUMENTS - //STEP 3: PROCESS ANY REMAINING DOCUMENTS - err := vdb.processUpdateBatch(processBatch, missingKeys) - if err != nil { - return err - } + err := vdb.processUpdateBatch(processBatch, missingKeys) + if err != nil { + errResponses <- err + return + } + + //reset the batch size counter + batchSizeCounter = 0 + + //create a new process batch + processBatch = statedb.NewUpdateBatch() - //reset the batch size counter - batchSizeCounter = 0 + // reset the missing key list + missingKeys = []*statedb.CompositeKey{} + } + } - //create a new process batch - processBatch = statedb.NewUpdateBatch() + //STEP 3: PROCESS ANY REMAINING DOCUMENTS + err := vdb.processUpdateBatch(processBatch, missingKeys) + if err != nil { + errResponses <- err + return + } + }(ns) + } - // reset the missing key list - missingKeys = []*statedb.CompositeKey{} + // Wait for all goroutines to complete + processBatchGroup.Wait() + + // Check if any goroutine resulted in error. + // We can stop all goroutine as soon as any goutine resulted in error rather than + // waiting for all goroutines to complete. As errors are very rare, current sub-optimal + // approach (allowing each subroutine to complete) is adequate for now. + // TODO: Currently, we are returing only one error. We need to create a new error type + // that can encapsulate all the errors and return that type + if len(errResponses) > 0 { + return <-errResponses } // Record a savepoint at a given height @@ -552,7 +575,9 @@ func (vdb *VersionedDB) processUpdateBatch(updateBatch *statedb.UpdateBatch, mis // retrieve the couchdb revision from the cache // Documents that do not exist in couchdb will not have revision numbers and will // exist in the cache with a revision value of nil + vdb.committedDataCache.mux.RLock() revision := vdb.committedDataCache.revisionNumbers[statedb.CompositeKey{Namespace: ns, Key: key}] + vdb.committedDataCache.mux.RUnlock() var isDelete bool // initialized to false if vv.Value == nil { @@ -592,7 +617,6 @@ func (vdb *VersionedDB) processUpdateBatch(updateBatch *statedb.UpdateBatch, mis } - //TODO: Run the following in a goroutine so that commit on each namespaceDB can happen parallely if len(batchUpdateMap) > 0 { //Add the documents to the batch update array @@ -699,6 +723,7 @@ func (vdb *VersionedDB) LoadCommittedVersions(keys []*statedb.CompositeKey) erro missingKeys := make(map[string][]string) // for each namespace/chaincode, store the missingKeys + vdb.committedDataCache.mux.Lock() for _, key := range keys { logger.Debugf("Load into version cache: %s~%s", key.Key, key.Namespace) @@ -726,6 +751,7 @@ func (vdb *VersionedDB) LoadCommittedVersions(keys []*statedb.CompositeKey) erro } } + vdb.committedDataCache.mux.Unlock() //get the max size of the batch from core.yaml maxBatchSize := ledgerconfig.GetMaxBatchUpdateSize() @@ -733,37 +759,61 @@ func (vdb *VersionedDB) LoadCommittedVersions(keys []*statedb.CompositeKey) erro // Call the batch retrieve if there is one or more keys to retrieve if len(missingKeys) > 0 { + // Create goroutine wait group. + var batchRetrieveGroup sync.WaitGroup + batchRetrieveGroup.Add(len(missingKeys)) + + // Collect error from each goroutine using buffered channel. + errResponses := make(chan error, len(missingKeys)) + + // For each namespace, we parallely load missing keys into the cache using goroutines for namespace := range missingKeys { - // TODO: For each namespace, we need to parallely load missing keys into the cache - // The following codes need be moved to goroutine and introduce RWlock for cache. + go func(namespace string) { + defer batchRetrieveGroup.Done() - // Initialize the array of keys to be retrieved - keysToRetrieve := []string{} + // Initialize the array of keys to be retrieved + keysToRetrieve := []string{} - // Iterate through the missingKeys and build a batch of keys for batch retrieval - for _, key := range missingKeys[namespace] { + // Iterate through the missingKeys and build a batch of keys for batch retrieval + for _, key := range missingKeys[namespace] { - keysToRetrieve = append(keysToRetrieve, key) + keysToRetrieve = append(keysToRetrieve, key) - // check to see if the number of keys is greater than the max batch size - if len(keysToRetrieve) >= maxBatchSize { + // check to see if the number of keys is greater than the max batch size + if len(keysToRetrieve) >= maxBatchSize { + err := vdb.batchRetrieveMetaData(namespace, keysToRetrieve) + if err != nil { + errResponses <- err + return + } + // reset the array + keysToRetrieve = []string{} + } + + } + + // If there are any remaining, retrieve the final batch + if len(keysToRetrieve) > 0 { err := vdb.batchRetrieveMetaData(namespace, keysToRetrieve) if err != nil { - return err + errResponses <- err + return } - // reset the array - keysToRetrieve = []string{} } + }(namespace) + } - } - - // If there are any remaining, retrieve the final batch - if len(keysToRetrieve) > 0 { - err := vdb.batchRetrieveMetaData(namespace, keysToRetrieve) - if err != nil { - return err - } - } + // Wait for all goroutines to complete + batchRetrieveGroup.Wait() + + // Check if any goroutine resulted in error. + // We can stop all goroutine as soon as any goutine resulted in error rather than + // waiting for all goroutines to complete. As errors are very rare, current sub-optimal + // approach (allowing each subroutine to complete) is adequate for now. + // TODO: Currently, we are returing only one error. We need to create a new error type + // that can encapsulate all the errors and return that type. + if len(errResponses) > 0 { + return <-errResponses } } @@ -794,8 +844,10 @@ func (vdb *VersionedDB) batchRetrieveMetaData(namespace string, keys []string) e if len(documentMetadata.Version) != 0 { compositeKey := statedb.CompositeKey{Namespace: namespace, Key: documentMetadata.ID} + vdb.committedDataCache.mux.Lock() versionMap[compositeKey] = createVersionHeightFromVersionString(documentMetadata.Version) revMap[compositeKey] = documentMetadata.Rev + vdb.committedDataCache.mux.Unlock() } }