Skip to content

Commit

Permalink
[FAB-9776] Fix validation path for intermittent errors
Browse files Browse the repository at this point in the history
This CR fixes the validation path so as to differentaite the intermittent
errors (such as an error returned by ledger since couchdb is not reachable)
from the others. The intermittent errors are propagated upward as opposed
to mark the transaction valid.

Change-Id: If03a8b478af80cb4d0838486b19065b4b19ed2e2
Signed-off-by: manish <manish.sethi@gmail.com>
  • Loading branch information
manish-sethi committed Jan 21, 2019
1 parent f8e97e6 commit 8bc3245
Show file tree
Hide file tree
Showing 6 changed files with 281 additions and 91 deletions.
22 changes: 19 additions & 3 deletions core/committer/txvalidator/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/hyperledger/fabric/common/configtx"
commonerrors "github.com/hyperledger/fabric/common/errors"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/common/resourcesconfig"
coreUtil "github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/chaincode/shim"
Expand All @@ -29,11 +30,15 @@ import (
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/peer"
"github.com/hyperledger/fabric/protos/utils"
"github.com/op/go-logging"
logging "github.com/op/go-logging"
"github.com/pkg/errors"
"golang.org/x/net/context"
)

const (
IntermittentErrorCode = int32(503)
)

// Support provides all of the needed to evaluate the VSCC
type Support interface {
// Acquire implements semaphore-like acquire semantics
Expand Down Expand Up @@ -342,14 +347,22 @@ func validateTx(req *blockValidationRequest, results chan<- *blockValidationResu
if common.HeaderType(chdr.Type) == common.HeaderType_ENDORSER_TRANSACTION {
// Check duplicate transactions
txID = chdr.TxId
if _, err := v.support.Ledger().GetTransactionByID(txID); err == nil {
_, err := v.support.Ledger().GetTransactionByID(txID)
if err == nil {
logger.Error("Duplicate transaction found, ", txID, ", skipping")
results <- &blockValidationResult{
tIdx: tIdx,
validationCode: peer.TxValidationCode_DUPLICATE_TXID,
}
return
}
if err != blkstorage.ErrNotFoundInIndex { // Any error other than blkstorage.ErrNotFoundInIndex is treated as intermittent
results <- &blockValidationResult{
tIdx: tIdx,
err: &commonerrors.VSCCInfoLookupFailureError{Reason: fmt.Sprintf("Error while checking for duplicate txid: %s", err)},
}
return
}

// Validate tx with vscc and policy
logger.Debug("Validating transaction vscc tx validate")
Expand Down Expand Up @@ -852,6 +865,9 @@ func (v *vsccValidatorImpl) VSCCValidateTxForCC(envBytes []byte, txid, chid, vsc
return &commonerrors.VSCCExecutionFailureError{Reason: msg}
}
if res.Status != shim.OK {
if res.Status == IntermittentErrorCode {
return &commonerrors.VSCCExecutionFailureError{Reason: res.Message}
}
return &commonerrors.VSCCEndorsementPolicyError{Reason: fmt.Sprintf("%s", res.Message)}
}

Expand All @@ -866,7 +882,7 @@ func (v *vsccValidatorImpl) getCDataForCC(chid, ccid string) (resourcesconfig.Ch

qe, err := l.NewQueryExecutor()
if err != nil {
return nil, errors.WithMessage(err, "could not retrieve QueryExecutor")
return nil, &commonerrors.VSCCInfoLookupFailureError{Reason: "could not retrieve QueryExecutor"}
}
defer qe.Done()

Expand Down
127 changes: 109 additions & 18 deletions core/committer/txvalidator/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ SPDX-License-Identifier: Apache-2.0
package txvalidator

import (
"errors"
"fmt"
"os"
"testing"

"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/pkg/errors"

"github.com/hyperledger/fabric/common/cauthdsl"
ctxt "github.com/hyperledger/fabric/common/configtx/test"
commonerrors "github.com/hyperledger/fabric/common/errors"
Expand All @@ -31,7 +33,7 @@ import (
mocktxvalidator "github.com/hyperledger/fabric/core/mocks/txvalidator"
"github.com/hyperledger/fabric/msp"
"github.com/hyperledger/fabric/msp/mgmt"
"github.com/hyperledger/fabric/msp/mgmt/testtools"
msptesttools "github.com/hyperledger/fabric/msp/mgmt/testtools"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/peer"
"github.com/hyperledger/fabric/protos/utils"
Expand Down Expand Up @@ -603,24 +605,69 @@ func TestLedgerIsNoAvailable(t *testing.T) {
ccID := "mycc"
tx := getEnv(ccID, createRWset(t, ccID), t)

theLedger.On("GetTransactionByID", mock.Anything).Return(&peer.ProcessedTransaction{}, errors.New("Cannot find the transaction"))
t.Run("Block storage not usable", func(t *testing.T) {
theLedger.On("GetTransactionByID", mock.Anything).Return(&peer.ProcessedTransaction{}, errors.New("error other than blkstorage.ErrNotFoundInIndex"))

queryExecutor := new(mockQueryExecutor)
queryExecutor.On("GetState", mock.Anything, mock.Anything).Return([]byte{}, errors.New("Unable to connect to DB"))
theLedger.On("NewQueryExecutor", mock.Anything).Return(queryExecutor, nil)
queryExecutor := new(mockQueryExecutor)
queryExecutor.On("GetState", mock.Anything, mock.Anything).Return([]byte{}, nil)
theLedger.On("NewQueryExecutor", mock.Anything).Return(queryExecutor, nil)

b := &common.Block{
Data: &common.BlockData{Data: [][]byte{utils.MarshalOrPanic(tx)}},
Header: &common.BlockHeader{},
}
b := &common.Block{
Data: &common.BlockData{Data: [][]byte{utils.MarshalOrPanic(tx)}},
Header: &common.BlockHeader{},
}

err := validator.Validate(b)
err := validator.Validate(b)

assertion := assert.New(t)
// We suppose to get the error which indicates we cannot commit the block
assertion.Error(err)
// The error exptected to be of type VSCCInfoLookupFailureError
assertion.NotNil(err.(*commonerrors.VSCCInfoLookupFailureError))

})

t.Run("NewQueryExecutor returns error", func(t *testing.T) {
theLedger.On("GetTransactionByID", mock.Anything).Return(&peer.ProcessedTransaction{}, blkstorage.ErrNotFoundInIndex)

queryExecutor := new(mockQueryExecutor)
queryExecutor.On("GetState", mock.Anything, mock.Anything).Return([]byte{}, nil)
theLedger.On("NewQueryExecutor", mock.Anything).Return(nil, errors.New("NewQueryExecutor error"))

b := &common.Block{
Data: &common.BlockData{Data: [][]byte{utils.MarshalOrPanic(tx)}},
Header: &common.BlockHeader{},
}

err := validator.Validate(b)

assertion := assert.New(t)
// We suppose to get the error which indicates we cannot commit the block
assertion.Error(err)
// The error exptected to be of type VSCCInfoLookupFailureError
assertion.NotNil(err.(*commonerrors.VSCCInfoLookupFailureError))
assertion := assert.New(t)
// We suppose to get the error which indicates we cannot commit the block
assertion.Error(err)
// The error exptected to be of type VSCCInfoLookupFailureError
assertion.NotNil(err.(*commonerrors.VSCCInfoLookupFailureError))
})

t.Run("GetState returns error", func(t *testing.T) {
theLedger.On("GetTransactionByID", mock.Anything).Return(&peer.ProcessedTransaction{}, blkstorage.ErrNotFoundInIndex)

queryExecutor := new(mockQueryExecutor)
queryExecutor.On("GetState", mock.Anything, mock.Anything).Return([]byte{}, errors.New("Unable to connect to DB"))
theLedger.On("NewQueryExecutor", mock.Anything).Return(queryExecutor, nil)

b := &common.Block{
Data: &common.BlockData{Data: [][]byte{utils.MarshalOrPanic(tx)}},
Header: &common.BlockHeader{},
}

err := validator.Validate(b)

assertion := assert.New(t)
// We suppose to get the error which indicates we cannot commit the block
assertion.Error(err)
// The error exptected to be of type VSCCInfoLookupFailureError
assertion.NotNil(err.(*commonerrors.VSCCInfoLookupFailureError))
})
}

func TestValidationInvalidEndorsing(t *testing.T) {
Expand All @@ -634,7 +681,7 @@ func TestValidationInvalidEndorsing(t *testing.T) {
ccID := "mycc"
tx := getEnv(ccID, createRWset(t, ccID), t)

theLedger.On("GetTransactionByID", mock.Anything).Return(&peer.ProcessedTransaction{}, errors.New("Cannot find the transaction"))
theLedger.On("GetTransactionByID", mock.Anything).Return(&peer.ProcessedTransaction{}, blkstorage.ErrNotFoundInIndex)

cd := &ccp.ChaincodeData{
Name: ccID,
Expand Down Expand Up @@ -678,7 +725,7 @@ func TestValidationResourceUpdate(t *testing.T) {
ccID := "mycc"
tx := getEnvWithType(ccID, createRWset(t, ccID), common.HeaderType_PEER_RESOURCE_UPDATE, t)

theLedger.On("GetTransactionByID", mock.Anything).Return(&peer.ProcessedTransaction{}, errors.New("Cannot find the transaction"))
theLedger.On("GetTransactionByID", mock.Anything).Return(&peer.ProcessedTransaction{}, blkstorage.ErrNotFoundInIndex)

cd := &ccp.ChaincodeData{
Name: ccID,
Expand Down Expand Up @@ -716,6 +763,50 @@ func TestValidationResourceUpdate(t *testing.T) {
assertValid(b2, t)
}

func TestValidationIntermittentErrorCode(t *testing.T) {
theLedger := new(mockLedger)
vcs := struct {
*mocktxvalidator.Support
*semaphore.Weighted
}{&mocktxvalidator.Support{LedgerVal: theLedger, ACVal: &mockconfig.MockApplicationCapabilities{}}, semaphore.NewWeighted(10)}
validator := NewTxValidator("", vcs)

ccID := "mycc"
tx := getEnv(ccID, createRWset(t, ccID), t)

theLedger.On("GetTransactionByID", mock.Anything).Return(&peer.ProcessedTransaction{}, blkstorage.ErrNotFoundInIndex)

cd := &ccp.ChaincodeData{
Name: ccID,
Version: ccVersion,
Vscc: "vscc",
Policy: signedByAnyMember([]string{"DEFAULT"}),
}

cdbytes := utils.MarshalOrPanic(cd)

queryExecutor := new(mockQueryExecutor)
queryExecutor.On("GetState", "lscc", ccID).Return(cdbytes, nil)
theLedger.On("NewQueryExecutor", mock.Anything).Return(queryExecutor, nil)

b := &common.Block{
Data: &common.BlockData{Data: [][]byte{utils.MarshalOrPanic(tx)}},
Header: &common.BlockHeader{},
}

// Keep default callback
c := executeChaincodeProvider.getCallback()
executeChaincodeProvider.setCallback(func() (*peer.Response, *peer.ChaincodeEvent, error) {
return &peer.Response{Status: IntermittentErrorCode}, nil, nil
})
// Restore default callback
defer executeChaincodeProvider.setCallback(c)
err := validator.Validate(b)
assert.Error(t, err)
_, ok := err.(*commonerrors.VSCCExecutionFailureError)
assert.True(t, ok)
}

type ccResultCallback func() (*peer.Response, *peer.ChaincodeEvent, error)

type ccExecuteChaincode struct {
Expand Down
9 changes: 8 additions & 1 deletion core/common/privdata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ type Support interface {
}

type NoSuchCollectionError common.CollectionCriteria
type LedgerIOError struct {
msg string
}

func (e *LedgerIOError) Error() string {
return e.msg
}

func (f NoSuchCollectionError) Error() string {
return fmt.Sprintf("collection %s/%s/%s could not be found", f.Channel, f.Namespace, f.Collection)
Expand All @@ -57,7 +64,7 @@ func (c *simpleCollectionStore) retrieveCollectionConfigPackage(cc common.Collec

cb, err := qe.GetState("lscc", c.s.GetCollectionKVSKey(cc))
if err != nil {
return nil, errors.WithMessage(err, fmt.Sprintf("error while retrieving collection for collection criteria %#v", cc))
return nil, &LedgerIOError{msg: fmt.Sprintf("error while retrieving collection for collection criteria %#v. Err=%s", cc, err)}
}
if cb == nil {
return nil, NoSuchCollectionError(cc)
Expand Down
8 changes: 7 additions & 1 deletion core/common/privdata/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,16 @@ func TestCollectionStore(t *testing.T) {
assert.Error(t, err)

support.QErr = nil

_, err = cs.RetrieveCollection(common.CollectionCriteria{Channel: "ch", Namespace: "cc", Collection: "mycollection"})
_, ok := err.(*LedgerIOError)
assert.True(t, ok)

wState["lscc"] = make(map[string][]byte)

_, err = cs.RetrieveCollection(common.CollectionCriteria{})
assert.Error(t, err)
_, ok = err.(NoSuchCollectionError)
assert.True(t, ok)

ccr := common.CollectionCriteria{Channel: "ch", Namespace: "cc", Collection: "mycollection"}

Expand Down
38 changes: 24 additions & 14 deletions core/scc/vscc/validator_onevalidsignature.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,6 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
SPDX-License-Identifier: Apache-2.0
*/

package vscc
Expand All @@ -25,6 +14,7 @@ import (
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/core/chaincode/shim"
"github.com/hyperledger/fabric/core/committer/txvalidator"
"github.com/hyperledger/fabric/core/common/ccprovider"
"github.com/hyperledger/fabric/core/common/privdata"
"github.com/hyperledger/fabric/core/common/sysccprovider"
Expand Down Expand Up @@ -195,7 +185,11 @@ func (vscc *ValidatorOneValidSignature) Invoke(stub shim.ChaincodeStubInterface)
err = vscc.ValidateLSCCInvocation(stub, chdr.ChannelId, env, cap, payl, ac.Capabilities())
if err != nil {
logger.Errorf("VSCC error: ValidateLSCCInvocation failed, err %s", err)
return shim.Error(err.Error())
response := shim.Error(err.Error())
if _, ok := err.(*intermittentError); ok {
response.Status = txvalidator.IntermittentErrorCode
}
return response
}
}
}
Expand Down Expand Up @@ -283,6 +277,12 @@ func (vscc *ValidatorOneValidSignature) validateDeployRWSetAndCollection(

ccp, err := vscc.collectionStore.RetrieveCollectionConfigPackage(common.CollectionCriteria{Channel: chid, Namespace: ccid})
if err != nil {
if _, ok := err.(*privdata.LedgerIOError); ok {
return &intermittentError{
msg: fmt.Sprintf("Ledger error while trying to retrieve collection config for chaincode %s:%s. Err:%s",
cdRWSet.Name, cdRWSet.Version, err),
}
}
// fail if we get any error other than NoSuchCollectionError
// because it means something went wrong while looking up the
// older collection
Expand Down Expand Up @@ -554,7 +554,9 @@ func (vscc *ValidatorOneValidSignature) getInstantiatedCC(chid, ccid string) (cd

bytes, err := qe.GetState("lscc", ccid)
if err != nil {
err = fmt.Errorf("Could not retrieve state for chaincode %s on channel %s, error %s", ccid, chid, err)
err = &intermittentError{
msg: fmt.Sprintf("Could not retrieve state for chaincode %s on channel %s, error %s", ccid, chid, err),
}
return
}

Expand Down Expand Up @@ -607,3 +609,11 @@ func (vscc *ValidatorOneValidSignature) deduplicateIdentity(cap *pb.ChaincodeAct
logger.Debugf("Signature set is of size %d out of %d endorsement(s)", len(signatureSet), len(cap.Action.Endorsements))
return signatureSet, nil
}

type intermittentError struct {
msg string
}

func (e *intermittentError) Error() string {
return e.msg
}

1 comment on commit 8bc3245

@touchingsoil
Copy link

@touchingsoil touchingsoil commented on 8bc3245 Jul 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is so weird that I can't get this commit by searching "FAB-9776", the same as by "git log" command.
So what is the proper way to find the corresponding commit to a bug fix?Or only this commit can't be searched for a bug?

Please sign in to comment.