From c77496c658ffad40adfb8fd0d1229cbf61df8d79 Mon Sep 17 00:00:00 2001 From: Matthew Sykes Date: Thu, 7 Nov 2019 16:58:21 -0500 Subject: [PATCH] Complete chaincode execution on stream termination When the chaincode stream terminated with an error, ProcessStream would return an error but did not do anything to release go routines waiting for chaincode execution to complete. This would delay the release of the transaction simulator lock until the execute timeout occurred. This commit provides a "streamDone" signal back to the execute loop that is used to complete execution when the chaincode stream terminates. FAB-16610 #done Change-Id: I9ba0b3549015a49ae40e8e10b5326e8a8681d128 Signed-off-by: Matthew Sykes --- core/chaincode/handler.go | 25 +++++++++++++---- core/chaincode/handler_internal_test.go | 8 ++++++ core/chaincode/handler_test.go | 36 +++++++++++++++++++++++-- 3 files changed, 62 insertions(+), 7 deletions(-) diff --git a/core/chaincode/handler.go b/core/chaincode/handler.go index b7e418197cf..cb864803d4c 100644 --- a/core/chaincode/handler.go +++ b/core/chaincode/handler.go @@ -149,6 +149,8 @@ type Handler struct { UUIDGenerator UUIDGenerator // AppConfig is used to retrieve the application config for a channel AppConfig ApplicationConfigRetriever + // Metrics holds chaincode handler metrics + Metrics *HandlerMetrics // state holds the current handler state. It will be created, established, or // ready. @@ -166,8 +168,10 @@ type Handler struct { chatStream ccintf.ChaincodeStream // errChan is used to communicate errors from the async send to the receive loop errChan chan error - // Metrics holds chaincode handler metrics - Metrics *HandlerMetrics + // mutex is used to serialze the stream closed chan. + mutex sync.Mutex + // streamDoneChan is closed when the chaincode stream terminates. + streamDoneChan chan struct{} } // handleMessage is called by ProcessStream to dispatch messages. @@ -389,9 +393,20 @@ func (h *Handler) deregister() { } } +func (h *Handler) streamDone() <-chan struct{} { + h.mutex.Lock() + defer h.mutex.Unlock() + return h.streamDoneChan +} + func (h *Handler) ProcessStream(stream ccintf.ChaincodeStream) error { defer h.deregister() + h.mutex.Lock() + h.streamDoneChan = make(chan struct{}) + h.mutex.Unlock() + defer close(h.streamDoneChan) + h.chatStream = stream h.errChan = make(chan error, 1) @@ -1249,9 +1264,9 @@ func (h *Handler) Execute(txParams *ccprovider.TransactionParams, cccid *ccprovi case <-time.After(timeout): err = errors.New("timeout expired while executing transaction") ccName := cccid.Name + ":" + cccid.Version - h.Metrics.ExecuteTimeouts.With( - "chaincode", ccName, - ).Add(1) + h.Metrics.ExecuteTimeouts.With("chaincode", ccName).Add(1) + case <-h.streamDone(): + err = errors.New("chaincode stream terminated") } return ccresp, err diff --git a/core/chaincode/handler_internal_test.go b/core/chaincode/handler_internal_test.go index 07b556fff65..78b6a0222f4 100644 --- a/core/chaincode/handler_internal_test.go +++ b/core/chaincode/handler_internal_test.go @@ -25,3 +25,11 @@ func SetHandlerChatStream(h *Handler, chatStream ccintf.ChaincodeStream) { func SetHandlerCCInstance(h *Handler, ccInstance *sysccprovider.ChaincodeInstance) { h.ccInstance = ccInstance } + +func StreamDone(h *Handler) <-chan struct{} { + return h.streamDone() +} + +func SetStreamDoneChan(h *Handler, ch chan struct{}) { + h.streamDoneChan = ch +} diff --git a/core/chaincode/handler_test.go b/core/chaincode/handler_test.go index c224b8642dd..076597cc016 100644 --- a/core/chaincode/handler_test.go +++ b/core/chaincode/handler_test.go @@ -2583,6 +2583,23 @@ var _ = Describe("Handler", func() { }) }) + Context("when the chaincode stream terminates", func() { + It("returns an error", func() { + streamDoneChan := make(chan struct{}) + chaincode.SetStreamDoneChan(handler, streamDoneChan) + + errCh := make(chan error, 1) + go func() { + _, err := handler.Execute(txParams, cccid, incomingMessage, time.Hour) + errCh <- err + }() + Consistently(errCh).ShouldNot(Receive()) + + close(streamDoneChan) + Eventually(errCh).Should(Receive(MatchError("chaincode stream terminated"))) + }) + }) + Context("when execute times out", func() { It("returns an error", func() { errCh := make(chan error, 1) @@ -2769,6 +2786,22 @@ var _ = Describe("Handler", func() { Eventually(fakeChatStream.RecvCallCount).Should(Equal(100)) }) + It("manages the stream done channel", func() { + releaseChan := make(chan struct{}) + fakeChatStream.RecvStub = func() (*pb.ChaincodeMessage, error) { + <-releaseChan + return nil, errors.New("cc-went-away") + } + go handler.ProcessStream(fakeChatStream) + Eventually(fakeChatStream.RecvCallCount).Should(Equal(1)) + + streamDoneChan := chaincode.StreamDone(handler) + Consistently(streamDoneChan).ShouldNot(Receive()) + + close(releaseChan) + Eventually(streamDoneChan).Should(BeClosed()) + }) + Context("when receive fails with an io.EOF", func() { BeforeEach(func() { fakeChatStream.RecvReturns(nil, io.EOF) @@ -2871,8 +2904,7 @@ var _ = Describe("Handler", func() { var ( cccid *ccprovider.CCContext incomingMessage *pb.ChaincodeMessage - - recvChan chan *pb.ChaincodeMessage + recvChan chan *pb.ChaincodeMessage ) BeforeEach(func() {