Skip to content

Commit

Permalink
Refactor of ChaincodeEvents service implementation to support resume (h…
Browse files Browse the repository at this point in the history
…yperledger#3283)

Enables the client to (optionally) specify an AfterTransactionId property in addition to a start block number when requesting chaincode events, which causes chaincode events up to that transaction ID (inclusive) to be ignored and not returned to the client. This supports resume of chaincode event listening on client reconnect without duplicating or missing any events.

Signed-off-by: Mark S. Lewis <Mark.S.Lewis@outlook.com>
  • Loading branch information
bestbeforetoday committed Jun 1, 2022
1 parent 02d63c3 commit 3d0a82c
Show file tree
Hide file tree
Showing 9 changed files with 503 additions and 321 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/hyperledger/fabric-chaincode-go v0.0.0-20201119163726-f8ef75b17719
github.com/hyperledger/fabric-config v0.1.0
github.com/hyperledger/fabric-lib-go v1.0.0
github.com/hyperledger/fabric-protos-go v0.0.0-20211118165945-23d738fc3553
github.com/hyperledger/fabric-protos-go v0.0.0-20220315113721-7dc293e117f7
github.com/kr/pretty v0.2.1
github.com/magiconair/properties v1.8.1 // indirect
github.com/mattn/go-runewidth v0.0.4 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ github.com/hyperledger/fabric-lib-go v1.0.0/go.mod h1:H362nMlunurmHwkYqR5uHL2UDW
github.com/hyperledger/fabric-protos-go v0.0.0-20190919234611-2a87503ac7c9/go.mod h1:xVYTjK4DtZRBxZ2D9aE4y6AbLaPwue2o/criQyQbVD0=
github.com/hyperledger/fabric-protos-go v0.0.0-20200424173316-dd554ba3746e/go.mod h1:xVYTjK4DtZRBxZ2D9aE4y6AbLaPwue2o/criQyQbVD0=
github.com/hyperledger/fabric-protos-go v0.0.0-20210911123859-041d13f0980c/go.mod h1:xVYTjK4DtZRBxZ2D9aE4y6AbLaPwue2o/criQyQbVD0=
github.com/hyperledger/fabric-protos-go v0.0.0-20211118165945-23d738fc3553 h1:E9f0v1q4EDfrE+0LdkxVtdYKAZ7PGCaj1bBx45R9yEQ=
github.com/hyperledger/fabric-protos-go v0.0.0-20211118165945-23d738fc3553/go.mod h1:xVYTjK4DtZRBxZ2D9aE4y6AbLaPwue2o/criQyQbVD0=
github.com/hyperledger/fabric-protos-go v0.0.0-20220315113721-7dc293e117f7 h1:YV+siZuYQZwENjRH00t7ZS0CTlywt8Qog/SzL/jf6kE=
github.com/hyperledger/fabric-protos-go v0.0.0-20220315113721-7dc293e117f7/go.mod h1:xVYTjK4DtZRBxZ2D9aE4y6AbLaPwue2o/criQyQbVD0=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
Expand Down
51 changes: 46 additions & 5 deletions integration/gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ var _ = Describe("GatewayService", func() {
chaincodeEvents := func(
ctx context.Context,
startPosition *orderer.SeekPosition,
afterTxID string,
identity func() ([]byte, error),
sign func(msg []byte) ([]byte, error),
) (gateway.Gateway_ChaincodeEventsClient, error) {
Expand All @@ -226,6 +227,9 @@ var _ = Describe("GatewayService", func() {
if startPosition != nil {
request.StartPosition = startPosition
}
if len(afterTxID) > 0 {
request.AfterTransactionId = afterTxID
}

requestBytes, err := proto.Marshal(request)
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -375,7 +379,7 @@ var _ = Describe("GatewayService", func() {
},
}

eventsClient, err := chaincodeEvents(eventCtx, startPosition, signingIdentity.Serialize, signingIdentity.Sign)
eventsClient, err := chaincodeEvents(eventCtx, startPosition, "", signingIdentity.Serialize, signingIdentity.Sign)
Expect(err).NotTo(HaveOccurred())

_, transactionID := submitTransaction("event", []byte("EVENT_NAME"), []byte("EVENT_PAYLOAD"))
Expand Down Expand Up @@ -409,7 +413,7 @@ var _ = Describe("GatewayService", func() {
},
}

eventsClient, err := chaincodeEvents(eventCtx, startPosition, signingIdentity.Serialize, signingIdentity.Sign)
eventsClient, err := chaincodeEvents(eventCtx, startPosition, "", signingIdentity.Serialize, signingIdentity.Sign)
Expect(err).NotTo(HaveOccurred())

event, err := eventsClient.Recv()
Expand All @@ -426,13 +430,50 @@ var _ = Describe("GatewayService", func() {
Expect(proto.Equal(event.Events[0], expectedEvent)).To(BeTrue(), "Expected\n\t%#v\nto proto.Equal\n\t%#v", event.Events[0], expectedEvent)
})

It("should respond with replayed chaincode events after specified transaction ID", func() {
_, afterTransactionID := submitTransaction("event", []byte("WRONG_EVENT_NAME"), []byte("WRONG_EVENT_PAYLOAD"))
_, nextTransactionID := submitTransaction("event", []byte("CORRECT_EVENT_NAME"), []byte("CORRECT_EVENT_PAYLOAD"))

statusResult, err := commitStatus(afterTransactionID, signingIdentity.Serialize, signingIdentity.Sign)
Expect(err).NotTo(HaveOccurred())

_, err = commitStatus(nextTransactionID, signingIdentity.Serialize, signingIdentity.Sign)
Expect(err).NotTo(HaveOccurred())

eventCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

startPosition := &orderer.SeekPosition{
Type: &orderer.SeekPosition_Specified{
Specified: &orderer.SeekSpecified{
Number: statusResult.BlockNumber,
},
},
}

eventsClient, err := chaincodeEvents(eventCtx, startPosition, afterTransactionID, signingIdentity.Serialize, signingIdentity.Sign)
Expect(err).NotTo(HaveOccurred())

event, err := eventsClient.Recv()
Expect(err).NotTo(HaveOccurred())

Expect(event.Events).To(HaveLen(1), "number of events")
expectedEvent := &peer.ChaincodeEvent{
ChaincodeId: "gatewaycc",
TxId: nextTransactionID,
EventName: "CORRECT_EVENT_NAME",
Payload: []byte("CORRECT_EVENT_PAYLOAD"),
}
Expect(proto.Equal(event.Events[0], expectedEvent)).To(BeTrue(), "Expected\n\t%#v\nto proto.Equal\n\t%#v", event.Events[0], expectedEvent)
})

It("should default to next commit if start position not specified", func() {
eventCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

var startPosition *orderer.SeekPosition

eventsClient, err := chaincodeEvents(eventCtx, startPosition, signingIdentity.Serialize, signingIdentity.Sign)
eventsClient, err := chaincodeEvents(eventCtx, startPosition, "", signingIdentity.Serialize, signingIdentity.Sign)
Expect(err).NotTo(HaveOccurred())

_, transactionID := submitTransaction("event", []byte("EVENT_NAME"), []byte("EVENT_PAYLOAD"))
Expand Down Expand Up @@ -462,7 +503,7 @@ var _ = Describe("GatewayService", func() {
},
}

eventsClient, err := chaincodeEvents(eventCtx, startPosition, badIdentity.Serialize, signingIdentity.Sign)
eventsClient, err := chaincodeEvents(eventCtx, startPosition, "", badIdentity.Serialize, signingIdentity.Sign)
Expect(err).NotTo(HaveOccurred())

event, err := eventsClient.Recv()
Expand All @@ -486,7 +527,7 @@ var _ = Describe("GatewayService", func() {
},
}

eventsClient, err := chaincodeEvents(eventCtx, startPosition, signingIdentity.Serialize, badSign)
eventsClient, err := chaincodeEvents(eventCtx, startPosition, "", signingIdentity.Serialize, badSign)
Expect(err).NotTo(HaveOccurred())

event, err := eventsClient.Recv()
Expand Down
29 changes: 27 additions & 2 deletions internal/pkg/gateway/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,8 @@ func (gs *Server) ChaincodeEvents(signedRequest *gp.SignedChaincodeEventsRequest
return err
}

isMatch := chaincodeEventMatcher(request)

ledgerIter, err := ledger.GetBlocksIterator(startBlock)
if err != nil {
return status.Error(codes.Aborted, err.Error())
Expand All @@ -563,9 +565,8 @@ func (gs *Server) ChaincodeEvents(signedRequest *gp.SignedChaincodeEventsRequest
}

var matchingEvents []*peer.ChaincodeEvent

for _, event := range response.Events {
if event.GetChaincodeId() == request.GetChaincodeId() {
if isMatch(event) {
matchingEvents = append(matchingEvents, event)
}
}
Expand All @@ -586,6 +587,30 @@ func (gs *Server) ChaincodeEvents(signedRequest *gp.SignedChaincodeEventsRequest
}
}

func chaincodeEventMatcher(request *gp.ChaincodeEventsRequest) func(event *peer.ChaincodeEvent) bool {
chaincodeID := request.GetChaincodeId()
previousTransactionID := request.GetAfterTransactionId()

if len(previousTransactionID) == 0 {
return func(event *peer.ChaincodeEvent) bool {
return event.GetChaincodeId() == chaincodeID
}
}

passedPreviousTransaction := false

return func(event *peer.ChaincodeEvent) bool {
if !passedPreviousTransaction {
if event.TxId == previousTransactionID {
passedPreviousTransaction = true
}
return false
}

return event.GetChaincodeId() == chaincodeID
}
}

func startBlockFromLedgerPosition(ledger ledger.Ledger, position *ab.SeekPosition) (uint64, error) {
switch seek := position.GetType().(type) {
case nil:
Expand Down
Loading

0 comments on commit 3d0a82c

Please sign in to comment.