Skip to content

Commit

Permalink
[FAB-10804] queue chaincode tx on container start
Browse files Browse the repository at this point in the history
Until now, when transactions arrive for a chaincode that is not running,
the first transaction causes the chaincode to launch while all others
are rejected until the chaincode launch has completed. This is
unfriendly and unexpected behavior.

With this change, when transactions arrive for chaincode that is
launching, the transactions will be queued until the launch process has
completed.

If the launch fails for some reason, all queued transactions will be
terminated with the error from the launch. If the initial launch times
out, all queued transactions (regardless of arrival) will be terminated
with the timeout error.

Change-Id: I12a9750b1a57e3e494cf4026539ea490dcf0573e
Signed-off-by: Matthew Sykes <sykesmat@us.ibm.com>
  • Loading branch information
sykesm committed Sep 13, 2018
1 parent 9894396 commit 8a7737d
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 94 deletions.
7 changes: 1 addition & 6 deletions core/chaincode/chaincode_support.go
Expand Up @@ -157,12 +157,7 @@ func (cs *ChaincodeSupport) Launch(chainID, chaincodeName, chaincodeVersion stri

// Stop stops a chaincode if running.
func (cs *ChaincodeSupport) Stop(ccci *ccprovider.ChaincodeContainerInfo) error {
err := cs.Runtime.Stop(ccci)
if err != nil {
return err
}

return nil
return cs.Runtime.Stop(ccci)
}

// HandleChaincodeStream implements ccintf.HandleChaincodeStream for all vms to call with appropriate stream
Expand Down
18 changes: 9 additions & 9 deletions core/chaincode/fake/launch_registry.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 39 additions & 14 deletions core/chaincode/handler_registry.go
Expand Up @@ -22,8 +22,10 @@ type HandlerRegistry struct {
}

type LaunchState struct {
done chan struct{}
err error
mutex sync.Mutex
notified bool
done chan struct{}
err error
}

func NewLaunchState() *LaunchState {
Expand All @@ -32,11 +34,25 @@ func NewLaunchState() *LaunchState {
}
}

func (l *LaunchState) Done() <-chan struct{} { return l.done }
func (l *LaunchState) Err() error { return l.err }
func (l *LaunchState) Done() <-chan struct{} {
return l.done
}

func (l *LaunchState) Err() error {
l.mutex.Lock()
err := l.err
l.mutex.Unlock()
return err
}

func (l *LaunchState) Notify(err error) {
l.err = err
close(l.done)
l.mutex.Lock()
if !l.notified {
l.notified = true
l.err = err
close(l.done)
}
l.mutex.Unlock()
}

// NewHandlerRegistry constructs a HandlerRegistry.
Expand Down Expand Up @@ -67,19 +83,29 @@ func (r *HandlerRegistry) hasLaunched(chaincode string) bool {
}

// Launching indicates that chaincode is being launched. The LaunchState that
// is returned provides mechanisms to determine when the operation has completed
// and whether or not it failed. An error will be returned if chaincode launch
// processing has already been initated.
func (r *HandlerRegistry) Launching(cname string) (*LaunchState, error) {
// is returned provides mechanisms to determine when the operation has
// completed and whether or not it failed. The bool indicates whether or not
// the chaincode has already been started.
func (r *HandlerRegistry) Launching(cname string) (*LaunchState, bool) {
r.mutex.Lock()
defer r.mutex.Unlock()
if r.hasLaunched(cname) {
return nil, errors.Errorf("chaincode %s has already been launched", cname)

// launch happened or already happening
if launchState, ok := r.launching[cname]; ok {
return launchState, true
}

// handler registered without going through launch
if _, ok := r.handlers[cname]; ok {
launchState := NewLaunchState()
launchState.Notify(nil)
return launchState, true
}

// first attempt to launch so the runtime needs to start
launchState := NewLaunchState()
r.launching[cname] = launchState
return launchState, nil
return launchState, false
}

// Ready indicates that the chaincode registration has completed and the
Expand All @@ -90,7 +116,6 @@ func (r *HandlerRegistry) Ready(cname string) {

launchStatus := r.launching[cname]
if launchStatus != nil {
delete(r.launching, cname)
launchStatus.Notify(nil)
}
}
Expand Down
101 changes: 75 additions & 26 deletions core/chaincode/handler_registry_test.go
Expand Up @@ -34,9 +34,7 @@ var _ = Describe("HandlerRegistry", func() {
})

It("returns true when launching", func() {
_, err := hr.Launching("chaincode-name")
Expect(err).NotTo(HaveOccurred())

hr.Launching("chaincode-name")
launched := hr.HasLaunched("chaincode-name")
Expect(launched).To(BeTrue())
})
Expand All @@ -52,21 +50,31 @@ var _ = Describe("HandlerRegistry", func() {

Describe("Launching", func() {
It("returns a LaunchState to wait on for registration", func() {
launchState, err := hr.Launching("chaincode-name")
Expect(err).NotTo(HaveOccurred())
launchState, _ := hr.Launching("chaincode-name")
Consistently(launchState.Done()).ShouldNot(Receive())
Consistently(launchState.Done()).ShouldNot(BeClosed())
})

It("indicates whether or not the chaincode needs to start", func() {
_, started := hr.Launching("chaincode-name")
Expect(started).To(BeFalse())
})

Context("when a chaincode instance is already launching", func() {
BeforeEach(func() {
_, err := hr.Launching("chaincode-name")
Expect(err).NotTo(HaveOccurred())
_, started := hr.Launching("chaincode-name")
Expect(started).To(BeFalse())
})

It("returns an error", func() {
_, err := hr.Launching("chaincode-name")
Expect(err).To(MatchError("chaincode chaincode-name has already been launched"))
It("returns a LaunchState", func() {
launchState, _ := hr.Launching("chaincode-name")
Consistently(launchState.Done()).ShouldNot(Receive())
Consistently(launchState.Done()).ShouldNot(BeClosed())
})

It("indicates already started", func() {
_, started := hr.Launching("chaincode-name")
Expect(started).To(BeTrue())
})
})

Expand All @@ -76,9 +84,15 @@ var _ = Describe("HandlerRegistry", func() {
Expect(err).NotTo(HaveOccurred())
})

It("returns an error", func() {
_, err := hr.Launching("chaincode-name")
Expect(err).To(MatchError("chaincode chaincode-name has already been launched"))
It("returns a ready LaunchState", func() {
launchState, _ := hr.Launching("chaincode-name")
Expect(launchState.Done()).To(BeClosed())
Expect(launchState.Err()).NotTo(HaveOccurred())
})

It("indicates the chaincode has already been started", func() {
_, started := hr.Launching("chaincode-name")
Expect(started).To(BeTrue())
})
})
})
Expand All @@ -87,9 +101,7 @@ var _ = Describe("HandlerRegistry", func() {
var launchState *chaincode.LaunchState

BeforeEach(func() {
var err error
launchState, err = hr.Launching("chaincode-name")
Expect(err).NotTo(HaveOccurred())
launchState, _ = hr.Launching("chaincode-name")
Expect(launchState.Done()).NotTo(BeClosed())
})

Expand All @@ -103,20 +115,18 @@ var _ = Describe("HandlerRegistry", func() {
Expect(launchState.Err()).To(BeNil())
})

It("cleans up the launching state", func() {
It("leaves the launching state in the registry", func() {
hr.Ready("chaincode-name")
launching := hr.HasLaunched("chaincode-name")
Expect(launching).To(BeFalse())
Expect(launching).To(BeTrue())
})
})

Describe("Failed", func() {
var launchState *chaincode.LaunchState

BeforeEach(func() {
var err error
launchState, err = hr.Launching("chaincode-name")
Expect(err).NotTo(HaveOccurred())
launchState, _ = hr.Launching("chaincode-name")
Expect(launchState.Done()).NotTo(BeClosed())
})

Expand Down Expand Up @@ -172,10 +182,10 @@ var _ = Describe("HandlerRegistry", func() {
})

It("allows registration of launching chaincode", func() {
_, err := hr.Launching("chaincode-name")
Expect(err).NotTo(HaveOccurred())
_, started := hr.Launching("chaincode-name")
Expect(started).To(BeFalse())

err = hr.Register(handler)
err := hr.Register(handler)
Expect(err).NotTo(HaveOccurred())

h := hr.Handler("chaincode-name")
Expand Down Expand Up @@ -227,8 +237,9 @@ var _ = Describe("HandlerRegistry", func() {
handler.TXContexts = transactionContexts
txContext.InitializeQueryContext("query-id", fakeResultsIterator)

_, err = hr.Launching("chaincode-name")
Expect(err).NotTo(HaveOccurred())
_, started := hr.Launching("chaincode-name")
Expect(started).To(BeFalse())

err = hr.Register(handler)
Expect(err).NotTo(HaveOccurred())
})
Expand All @@ -249,3 +260,41 @@ var _ = Describe("HandlerRegistry", func() {
})
})
})

var _ = Describe("LaunchState", func() {
var launchState *chaincode.LaunchState

BeforeEach(func() {
launchState = chaincode.NewLaunchState()
})

It("coordinates notification and errors", func() {
Expect(launchState.Done()).NotTo(BeNil())
Consistently(launchState.Done()).ShouldNot(BeClosed())

launchState.Notify(errors.New("jelly"))
Eventually(launchState.Done()).Should(BeClosed())
Expect(launchState.Err()).To(MatchError("jelly"))
})

It("can notify with a nil error", func() {
Expect(launchState.Done()).NotTo(BeNil())
Consistently(launchState.Done()).ShouldNot(BeClosed())

launchState.Notify(nil)
Eventually(launchState.Done()).Should(BeClosed())
Expect(launchState.Err()).To(BeNil())
})

It("can be notified mulitple times but honors the first", func() {
Expect(launchState.Done()).NotTo(BeNil())
Consistently(launchState.Done()).ShouldNot(BeClosed())

launchState.Notify(errors.New("mango"))
launchState.Notify(errors.New("tango"))
launchState.Notify(errors.New("django"))
launchState.Notify(nil)
Eventually(launchState.Done()).Should(BeClosed())
Expect(launchState.Err()).To(MatchError("mango"))
})
})

0 comments on commit 8a7737d

Please sign in to comment.