Skip to content

Commit

Permalink
EndorsementTimeout should apply to each endorser
Browse files Browse the repository at this point in the history
Currently the options.EndorsementTimeout creates a context that is shared by all endorsement requests in the Endose() and Evaluate() methods.
This commit changes it so It is applied individually to each endorsing peer.
The overall timeout for Endorse/Evaluate is set in the client SDK

Signed-off-by: andrew-coleman <andrew_coleman@uk.ibm.com>
  • Loading branch information
andrew-coleman authored and manish-sethi committed Nov 24, 2021
1 parent bd1aaae commit a6a9fde
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 59 deletions.
166 changes: 107 additions & 59 deletions internal/pkg/gateway/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
type endorserResponse struct {
endorsementSet []*peer.ProposalResponse
err *gp.ErrorDetail
timeoutExpired bool
}

// Evaluate will invoke the transaction function as specified in the SignedProposal
Expand Down Expand Up @@ -63,39 +64,52 @@ func (gs *Server) Evaluate(ctx context.Context, request *gp.EvaluateRequest) (*g
return nil, status.Errorf(codes.Unavailable, "%s", err)
}

ctx, cancel := context.WithTimeout(ctx, gs.options.EndorsementTimeout)
defer cancel()

endorser := plan.endorsers()[0]
var response *peer.Response
var errDetails []proto.Message
for response == nil {
gs.logger.Debugw("Sending to peer:", "channel", channel, "chaincode", chaincodeID, "txID", request.TransactionId, "MSPID", endorser.mspid, "endpoint", endorser.address)

pr, err := endorser.client.ProcessProposal(ctx, signedProposal)
success, message, retry, remove := gs.responseStatus(pr, err)
if success {
response = pr.Response
// Prefer result from proposal response as Response.Payload is not required to be transaction result
if result, err := getResultFromProposalResponse(pr); err == nil {
response.Payload = result
} else {
logger.Warnw("Successful proposal response contained no transaction result", "error", err.Error(), "chaincode", chaincodeID, "channel", channel, "txID", request.TransactionId, "endorserAddress", endorser.endpointConfig.address, "endorserMspid", endorser.endpointConfig.mspid, "status", response.Status, "message", response.Message)
}
} else {
logger.Debugw("Evaluate call to endorser failed", "chaincode", chaincodeID, "channel", channel, "txID", request.TransactionId, "endorserAddress", endorser.endpointConfig.address, "endorserMspid", endorser.endpointConfig.mspid, "error", message)
errDetails = append(errDetails, errorDetail(endorser.endpointConfig, message))
if remove {
gs.registry.removeEndorser(endorser)
}
if retry {
endorser = plan.nextPeerInGroup(endorser)
done := make(chan error)
go func() {
defer close(done)
ctx, cancel := context.WithTimeout(ctx, gs.options.EndorsementTimeout)
defer cancel()
pr, err := endorser.client.ProcessProposal(ctx, signedProposal)
success, message, retry, remove := gs.responseStatus(pr, err)
if success {
response = pr.Response
// Prefer result from proposal response as Response.Payload is not required to be transaction result
if result, err := getResultFromProposalResponse(pr); err == nil {
response.Payload = result
} else {
logger.Warnw("Successful proposal response contained no transaction result", "error", err.Error(), "chaincode", chaincodeID, "channel", channel, "txID", request.TransactionId, "endorserAddress", endorser.endpointConfig.address, "endorserMspid", endorser.endpointConfig.mspid, "status", response.Status, "message", response.Message)
}
} else {
return nil, newRpcError(codes.Aborted, "evaluate call to endorser returned error: "+message, errDetails...)
logger.Debugw("Evaluate call to endorser failed", "chaincode", chaincodeID, "channel", channel, "txID", request.TransactionId, "endorserAddress", endorser.endpointConfig.address, "endorserMspid", endorser.endpointConfig.mspid, "error", message)
errDetails = append(errDetails, errorDetail(endorser.endpointConfig, message))
if remove {
gs.registry.removeEndorser(endorser)
}
if retry {
endorser = plan.nextPeerInGroup(endorser)
} else {
done <- newRpcError(codes.Aborted, "evaluate call to endorser returned error: "+message, errDetails...)
}
if endorser == nil {
done <- newRpcError(codes.Aborted, "failed to evaluate transaction, see attached details for more info", errDetails...)
}
}
if endorser == nil {
return nil, newRpcError(codes.Aborted, "failed to evaluate transaction, see attached details for more info", errDetails...)
}()
select {
case status := <-done:
if status != nil {
return nil, status
}
case <-ctx.Done():
// Overall evaluation timeout expired
logger.Warnw("Evaluate call timed out while processing request", "channel", request.ChannelId, "txID", request.TransactionId)
return nil, newRpcError(codes.DeadlineExceeded, "evaluate timeout expired")
}
}

Expand Down Expand Up @@ -126,9 +140,6 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp.
return nil, status.Errorf(codes.InvalidArgument, "failed to unpack transaction proposal: %s", err)
}

ctx, cancel := context.WithTimeout(ctx, gs.options.EndorsementTimeout)
defer cancel()

defaultInterest := &peer.ChaincodeInterest{
Chaincodes: []*peer.ChaincodeCall{{
Name: chaincodeID,
Expand Down Expand Up @@ -166,18 +177,33 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp.
// 2. Process the proposal on this endorser
var firstResponse *peer.ProposalResponse
var errDetails []proto.Message
for firstResponse == nil && firstEndorser != nil {
firstResponse, err = firstEndorser.client.ProcessProposal(ctx, signedProposal)
success, message, _, remove := gs.responseStatus(firstResponse, err)

if !success {
logger.Warnw("Endorse call to endorser failed", "channel", request.ChannelId, "txID", request.TransactionId, "endorserAddress", firstEndorser.endpointConfig.address, "endorserMspid", firstEndorser.endpointConfig.mspid, "error", message)
errDetails = append(errDetails, errorDetail(firstEndorser.endpointConfig, message))
if remove {
gs.registry.removeEndorser(firstEndorser)
for firstResponse == nil && firstEndorser != nil {
done := make(chan struct{})
go func() {
defer close(done)
ctx, cancel := context.WithTimeout(ctx, gs.options.EndorsementTimeout)
defer cancel()
firstResponse, err = firstEndorser.client.ProcessProposal(ctx, signedProposal)
success, message, _, remove := gs.responseStatus(firstResponse, err)

if !success {
logger.Warnw("Endorse call to endorser failed", "channel", request.ChannelId, "txID", request.TransactionId, "endorserAddress", firstEndorser.endpointConfig.address, "endorserMspid", firstEndorser.endpointConfig.mspid, "error", message)
errDetails = append(errDetails, errorDetail(firstEndorser.endpointConfig, message))
if remove {
gs.registry.removeEndorser(firstEndorser)
}
firstEndorser = plan.nextPeerInGroup(firstEndorser)
firstResponse = nil
}
firstEndorser = plan.nextPeerInGroup(firstEndorser)
firstResponse = nil
}()
select {
case <-done:
// Endorser completed normally
case <-ctx.Done():
// Overall endorsement timeout expired
logger.Warnw("Endorse call timed out while collecting first endorsement", "channel", request.ChannelId, "txID", request.TransactionId)
return nil, newRpcError(codes.DeadlineExceeded, "endorsement timeout expired while collecting first endorsement")
}
}
if firstEndorser == nil || firstResponse == nil {
Expand Down Expand Up @@ -226,30 +252,48 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp.
go func(e *endorser) {
defer wg.Done()
for e != nil {
gs.logger.Debugw("Sending to endorser:", "channel", channel, "chaincode", chaincodeID, "txID", request.TransactionId, "MSPID", e.mspid, "endpoint", e.address)
response, err := e.client.ProcessProposal(ctx, signedProposal)
// Ignore the retry flag returned by the following responseStatus call. Endorse will retry until all endorsement layouts have been exhausted.
// It tries to get a successful endorsement from each org and minimise the changes of a rogue peer scuppering the transaction.
// If an org is behaving badly, it can move on to a different layout.
success, message, _, remove := gs.responseStatus(response, err)
if success {
logger.Debugw("Endorse call to endorser returned success", "channel", request.ChannelId, "txID", request.TransactionId, "numEndorsers", len(endorsers), "endorserAddress", e.endpointConfig.address, "endorserMspid", e.endpointConfig.mspid, "status", response.Response.Status, "message", response.Response.Message)

responseMessage := response.GetResponse()
if responseMessage != nil {
responseMessage.Payload = nil // Remove any duplicate response payload
done := make(chan *endorserResponse)
go func() {
defer close(done)
gs.logger.Debugw("Sending to endorser:", "channel", channel, "chaincode", chaincodeID, "txID", request.TransactionId, "MSPID", e.mspid, "endpoint", e.address)
ctx, cancel := context.WithTimeout(ctx, gs.options.EndorsementTimeout) // timeout of individual endorsement
defer cancel()
response, err := e.client.ProcessProposal(ctx, signedProposal)
// Ignore the retry flag returned by the following responseStatus call. Endorse will retry until all endorsement layouts have been exhausted.
// It tries to get a successful endorsement from each org and minimise the changes of a rogue peer scuppering the transaction.
// If an org is behaving badly, it can move on to a different layout.
success, message, _, remove := gs.responseStatus(response, err)
if success {
logger.Debugw("Endorse call to endorser returned success", "channel", request.ChannelId, "txID", request.TransactionId, "numEndorsers", len(endorsers), "endorserAddress", e.endpointConfig.address, "endorserMspid", e.endpointConfig.mspid, "status", response.Response.Status, "message", response.Response.Message)

responseMessage := response.GetResponse()
if responseMessage != nil {
responseMessage.Payload = nil // Remove any duplicate response payload
}

endorsements := plan.update(e, response)
done <- &endorserResponse{endorsementSet: endorsements}
} else {
logger.Warnw("Endorse call to endorser failed", "channel", request.ChannelId, "txID", request.TransactionId, "numEndorsers", len(endorsers), "endorserAddress", e.endpointConfig.address, "endorserMspid", e.endpointConfig.mspid, "error", message)
if remove {
gs.registry.removeEndorser(e)
}
done <- &endorserResponse{err: errorDetail(e.endpointConfig, message)}
}

endorsements := plan.update(e, response)
responseCh <- &endorserResponse{endorsementSet: endorsements}
e = nil
} else {
logger.Warnw("Endorse call to endorser failed", "channel", request.ChannelId, "txID", request.TransactionId, "numEndorsers", len(endorsers), "endorserAddress", e.endpointConfig.address, "endorserMspid", e.endpointConfig.mspid, "error", message)
responseCh <- &endorserResponse{err: errorDetail(e.endpointConfig, message)}
if remove {
gs.registry.removeEndorser(e)
}()
select {
case resp := <-done:
// Endorser completed normally
if resp.err != nil {
e = plan.nextPeerInGroup(e)
} else {
e = nil
}
e = plan.nextPeerInGroup(e)
responseCh <- resp
case <-ctx.Done():
// Overall endorsement timeout expired
responseCh <- &endorserResponse{timeoutExpired: true}
return
}
}
}(e)
Expand All @@ -258,6 +302,10 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp.
close(responseCh)

for response := range responseCh {
if response.timeoutExpired {
logger.Warnw("Endorse call timed out while collecting endorsements", "channel", request.ChannelId, "txID", request.TransactionId, "numEndorsers", len(endorsers))
return nil, newRpcError(codes.DeadlineExceeded, "endorsement timeout expired while collecting endorsements")
}
if response.endorsementSet != nil {
endorsements = response.endorsementSet
break
Expand Down
53 changes: 53 additions & 0 deletions internal/pkg/gateway/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,23 @@ func TestEvaluate(t *testing.T) {
},
errString: "rpc error: code = Unavailable desc = no peers available to evaluate chaincode test_chaincode in channel test_channel",
},
{
name: "context timeout during evaluate",
plan: endorsementPlan{
"g1": {{endorser: localhostMock, height: 3}}, // msp1
},
postSetup: func(t *testing.T, def *preparedTest) {
var cancel context.CancelFunc
def.ctx, cancel = context.WithTimeout(def.ctx, 100*time.Millisecond)

def.localEndorser.ProcessProposalStub = func(ctx context.Context, proposal *peer.SignedProposal, option ...grpc.CallOption) (*peer.ProposalResponse, error) {
cancel()
time.Sleep(200 * time.Millisecond)
return createProposalResponse(t, peer1Mock.address, "mock_response", 200, ""), nil
}
},
errString: "rpc error: code = DeadlineExceeded desc = evaluate timeout expired",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -883,6 +900,42 @@ func TestEndorse(t *testing.T) {
},
expectedEndorsers: []string{"localhost:7051", "peer2:9051"},
},
{
name: "context timeout during first endorsement",
plan: endorsementPlan{
"g1": {{endorser: localhostMock, height: 3}}, // msp1
"g2": {{endorser: peer4Mock, height: 5}}, // msp3
},
postSetup: func(t *testing.T, def *preparedTest) {
var cancel context.CancelFunc
def.ctx, cancel = context.WithTimeout(def.ctx, 100*time.Millisecond)

def.localEndorser.ProcessProposalStub = func(ctx context.Context, proposal *peer.SignedProposal, option ...grpc.CallOption) (*peer.ProposalResponse, error) {
cancel()
time.Sleep(200 * time.Millisecond)
return createProposalResponse(t, peer1Mock.address, "mock_response", 200, ""), nil
}
},
errString: "rpc error: code = DeadlineExceeded desc = endorsement timeout expired while collecting first endorsement",
},
{
name: "context timeout collecting endorsements",
plan: endorsementPlan{
"g1": {{endorser: localhostMock, height: 3}}, // msp1
"g2": {{endorser: peer4Mock, height: 5}}, // msp3
},
postSetup: func(t *testing.T, def *preparedTest) {
var cancel context.CancelFunc
def.ctx, cancel = context.WithTimeout(def.ctx, 100*time.Millisecond)

peer4Mock.client.(*mocks.EndorserClient).ProcessProposalStub = func(ctx context.Context, proposal *peer.SignedProposal, option ...grpc.CallOption) (*peer.ProposalResponse, error) {
cancel()
time.Sleep(200 * time.Millisecond)
return createProposalResponse(t, peer4Mock.address, "mock_response", 200, ""), nil
}
},
errString: "rpc error: code = DeadlineExceeded desc = endorsement timeout expired while collecting endorsements",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down

0 comments on commit a6a9fde

Please sign in to comment.