Skip to content

Commit fbb4c89

Browse files
committed
Complete chaincode execution on stream termination
When the chaincode stream terminated with an error, ProcessStream would return an error but did not do anything to release go routines waiting for chaincode execution to complete. This would delay the release of the transaction simulator lock until the execute timeout occurred. This commit provides a "streamDone" signal back to the execute loop that is used to complete execution when the chaincode stream terminates. FAB-16610 #done Change-Id: I9ba0b3549015a49ae40e8e10b5326e8a8681d128 Signed-off-by: Matthew Sykes <sykesmat@us.ibm.com>
1 parent cd97bc3 commit fbb4c89

File tree

3 files changed

+62
-7
lines changed

3 files changed

+62
-7
lines changed

core/chaincode/handler.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ type Handler struct {
121121
UUIDGenerator UUIDGenerator
122122
// AppConfig is used to retrieve the application config for a channel
123123
AppConfig ApplicationConfigRetriever
124+
// Metrics holds chaincode handler metrics
125+
Metrics *HandlerMetrics
124126

125127
// state holds the current handler state. It will be created, established, or
126128
// ready.
@@ -135,8 +137,10 @@ type Handler struct {
135137
chatStream ccintf.ChaincodeStream
136138
// errChan is used to communicate errors from the async send to the receive loop
137139
errChan chan error
138-
// Metrics holds chaincode handler metrics
139-
Metrics *HandlerMetrics
140+
// mutex is used to serialze the stream closed chan.
141+
mutex sync.Mutex
142+
// streamDoneChan is closed when the chaincode stream terminates.
143+
streamDoneChan chan struct{}
140144
}
141145

142146
// handleMessage is called by ProcessStream to dispatch messages.
@@ -342,9 +346,20 @@ func (h *Handler) deregister() {
342346
h.Registry.Deregister(h.chaincodeID)
343347
}
344348

349+
func (h *Handler) streamDone() <-chan struct{} {
350+
h.mutex.Lock()
351+
defer h.mutex.Unlock()
352+
return h.streamDoneChan
353+
}
354+
345355
func (h *Handler) ProcessStream(stream ccintf.ChaincodeStream) error {
346356
defer h.deregister()
347357

358+
h.mutex.Lock()
359+
h.streamDoneChan = make(chan struct{})
360+
h.mutex.Unlock()
361+
defer close(h.streamDoneChan)
362+
348363
h.chatStream = stream
349364
h.errChan = make(chan error, 1)
350365

@@ -1198,9 +1213,9 @@ func (h *Handler) Execute(txParams *ccprovider.TransactionParams, namespace stri
11981213
// are typically treated as error
11991214
case <-time.After(timeout):
12001215
err = errors.New("timeout expired while executing transaction")
1201-
h.Metrics.ExecuteTimeouts.With(
1202-
"chaincode", h.chaincodeID,
1203-
).Add(1)
1216+
h.Metrics.ExecuteTimeouts.With("chaincode", h.chaincodeID).Add(1)
1217+
case <-h.streamDone():
1218+
err = errors.New("chaincode stream terminated")
12041219
}
12051220

12061221
return ccresp, err

core/chaincode/handler_internal_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,11 @@ func SetHandlerChaincodeID(h *Handler, chaincodeID string) {
1919
func SetHandlerChatStream(h *Handler, chatStream ccintf.ChaincodeStream) {
2020
h.chatStream = chatStream
2121
}
22+
23+
func StreamDone(h *Handler) <-chan struct{} {
24+
return h.streamDone()
25+
}
26+
27+
func SetStreamDoneChan(h *Handler, ch chan struct{}) {
28+
h.streamDoneChan = ch
29+
}

core/chaincode/handler_test.go

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2539,6 +2539,23 @@ var _ = Describe("Handler", func() {
25392539
})
25402540
})
25412541

2542+
Context("when the chaincode stream terminates", func() {
2543+
It("returns an error", func() {
2544+
streamDoneChan := make(chan struct{})
2545+
chaincode.SetStreamDoneChan(handler, streamDoneChan)
2546+
2547+
errCh := make(chan error, 1)
2548+
go func() {
2549+
_, err := handler.Execute(txParams, "chaincode-name", incomingMessage, time.Hour)
2550+
errCh <- err
2551+
}()
2552+
Consistently(errCh).ShouldNot(Receive())
2553+
2554+
close(streamDoneChan)
2555+
Eventually(errCh).Should(Receive(MatchError("chaincode stream terminated")))
2556+
})
2557+
})
2558+
25422559
Context("when execute times out", func() {
25432560
It("returns an error", func() {
25442561
errCh := make(chan error, 1)
@@ -2716,6 +2733,22 @@ var _ = Describe("Handler", func() {
27162733
Eventually(fakeChatStream.RecvCallCount).Should(Equal(100))
27172734
})
27182735

2736+
It("manages the stream done channel", func() {
2737+
releaseChan := make(chan struct{})
2738+
fakeChatStream.RecvStub = func() (*pb.ChaincodeMessage, error) {
2739+
<-releaseChan
2740+
return nil, errors.New("cc-went-away")
2741+
}
2742+
go handler.ProcessStream(fakeChatStream)
2743+
Eventually(fakeChatStream.RecvCallCount).Should(Equal(1))
2744+
2745+
streamDoneChan := chaincode.StreamDone(handler)
2746+
Consistently(streamDoneChan).ShouldNot(Receive())
2747+
2748+
close(releaseChan)
2749+
Eventually(streamDoneChan).Should(BeClosed())
2750+
})
2751+
27192752
Context("when receive fails with an io.EOF", func() {
27202753
BeforeEach(func() {
27212754
fakeChatStream.RecvReturns(nil, io.EOF)
@@ -2817,8 +2850,7 @@ var _ = Describe("Handler", func() {
28172850
Context("when an async error is sent", func() {
28182851
var (
28192852
incomingMessage *pb.ChaincodeMessage
2820-
2821-
recvChan chan *pb.ChaincodeMessage
2853+
recvChan chan *pb.ChaincodeMessage
28222854
)
28232855

28242856
BeforeEach(func() {

0 commit comments

Comments
 (0)