Skip to content

Commit

Permalink
Refactor ChaincodeEvents to use ledger iterator (#2891)
Browse files Browse the repository at this point in the history
This implementation supports the ability to replay events. It also avoids any possibility of ledger commit processing being blocked by slow client event consumers.

Signed-off-by: Mark S. Lewis <mark_lewis@uk.ibm.com>
  • Loading branch information
bestbeforetoday committed Sep 6, 2021
1 parent b1d7538 commit dd539d3
Show file tree
Hide file tree
Showing 23 changed files with 1,727 additions and 738 deletions.
44 changes: 0 additions & 44 deletions integration/gateway/gateway_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,9 @@ import (
"encoding/json"
"testing"

"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/integration"
"github.com/hyperledger/fabric/integration/nwo"
"github.com/hyperledger/fabric/protoutil"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
Expand Down Expand Up @@ -54,44 +51,3 @@ var _ = SynchronizedAfterSuite(func() {
func StartPort() int {
return integration.GatewayBasePort.StartPortForNode()
}

func NewProposedTransaction(signingIdentity *nwo.SigningIdentity, channelName, chaincodeName, transactionName string, transientData map[string][]byte, args ...[]byte) (*peer.SignedProposal, string) {
proposal, transactionID := newProposalProto(signingIdentity, channelName, chaincodeName, transactionName, transientData, args...)
signedProposal, err := protoutil.GetSignedProposal(proposal, signingIdentity)
Expect(err).NotTo(HaveOccurred())

return signedProposal, transactionID
}

func newProposalProto(signingIdentity *nwo.SigningIdentity, channelName, chaincodeName, transactionName string, transientData map[string][]byte, args ...[]byte) (*peer.Proposal, string) {
creator, err := signingIdentity.Serialize()
Expect(err).NotTo(HaveOccurred())

invocationSpec := &peer.ChaincodeInvocationSpec{
ChaincodeSpec: &peer.ChaincodeSpec{
Type: peer.ChaincodeSpec_NODE,
ChaincodeId: &peer.ChaincodeID{Name: chaincodeName},
Input: &peer.ChaincodeInput{Args: chaincodeArgs(transactionName, args...)},
},
}

result, transactionID, err := protoutil.CreateChaincodeProposalWithTransient(
common.HeaderType_ENDORSER_TRANSACTION,
channelName,
invocationSpec,
creator,
transientData,
)
Expect(err).NotTo(HaveOccurred())

return result, transactionID
}

func chaincodeArgs(transactionName string, args ...[]byte) [][]byte {
result := make([][]byte, len(args)+1)

result[0] = []byte(transactionName)
copy(result[1:], args)

return result
}
43 changes: 43 additions & 0 deletions integration/gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ import (

docker "github.com/fsouza/go-dockerclient"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/gateway"
"github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/integration/nwo"
"github.com/hyperledger/fabric/protoutil"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/tedsuo/ifrit"
Expand All @@ -27,6 +29,47 @@ import (
"google.golang.org/grpc/status"
)

func NewProposedTransaction(signingIdentity *nwo.SigningIdentity, channelName, chaincodeName, transactionName string, transientData map[string][]byte, args ...[]byte) (*peer.SignedProposal, string) {
proposal, transactionID := newProposalProto(signingIdentity, channelName, chaincodeName, transactionName, transientData, args...)
signedProposal, err := protoutil.GetSignedProposal(proposal, signingIdentity)
Expect(err).NotTo(HaveOccurred())

return signedProposal, transactionID
}

func newProposalProto(signingIdentity *nwo.SigningIdentity, channelName, chaincodeName, transactionName string, transientData map[string][]byte, args ...[]byte) (*peer.Proposal, string) {
creator, err := signingIdentity.Serialize()
Expect(err).NotTo(HaveOccurred())

invocationSpec := &peer.ChaincodeInvocationSpec{
ChaincodeSpec: &peer.ChaincodeSpec{
Type: peer.ChaincodeSpec_NODE,
ChaincodeId: &peer.ChaincodeID{Name: chaincodeName},
Input: &peer.ChaincodeInput{Args: chaincodeArgs(transactionName, args...)},
},
}

result, transactionID, err := protoutil.CreateChaincodeProposalWithTransient(
common.HeaderType_ENDORSER_TRANSACTION,
channelName,
invocationSpec,
creator,
transientData,
)
Expect(err).NotTo(HaveOccurred())

return result, transactionID
}

func chaincodeArgs(transactionName string, args ...[]byte) [][]byte {
result := make([][]byte, len(args)+1)

result[0] = []byte(transactionName)
copy(result[1:], args)

return result
}

var _ = Describe("GatewayService", func() {
var (
testDir string
Expand Down
44 changes: 35 additions & 9 deletions internal/pkg/gateway/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
gp "github.com/hyperledger/fabric-protos-go/gateway"
"github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/core/aclmgmt/resources"
"github.com/hyperledger/fabric/internal/pkg/gateway/event"
"github.com/hyperledger/fabric/protoutil"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -355,21 +356,46 @@ func (gs *Server) ChaincodeEvents(signedRequest *gp.SignedChaincodeEventsRequest
return status.Error(codes.PermissionDenied, err.Error())
}

events, err := gs.eventer.ChaincodeEvents(stream.Context(), request.ChannelId, request.ChaincodeId)
ledger, err := gs.ledgerProvider.Ledger(request.ChannelId)
if err != nil {
return status.Error(codes.FailedPrecondition, err.Error())
return status.Error(codes.InvalidArgument, err.Error())
}

for event := range events {
response := &gp.ChaincodeEventsResponse{
BlockNumber: event.BlockNumber,
Events: event.Events,
ledgerInfo, err := ledger.GetBlockchainInfo()
if err != nil {
return status.Error(codes.Unavailable, err.Error())
}

ledgerIter, err := ledger.GetBlocksIterator(ledgerInfo.Height)
if err != nil {
return status.Error(codes.Unavailable, err.Error())
}

eventsIter := event.NewChaincodeEventsIterator(ledgerIter)
defer eventsIter.Close()

for {
response, err := eventsIter.Next()
if err != nil {
return status.Error(codes.Unavailable, err.Error())
}

var matchingEvents []*peer.ChaincodeEvent

for _, event := range response.Events {
if event.GetChaincodeId() == request.ChaincodeId {
matchingEvents = append(matchingEvents, event)
}
}

if len(matchingEvents) == 0 {
continue
}

response.Events = matchingEvents

if err := stream.Send(response); err != nil {
return err // Likely stream closed by the client
}
}

// If stream is still open, this was a server-side error; otherwise client won't see it anyway
return status.Error(codes.Unavailable, "failed to read events")
}

0 comments on commit dd539d3

Please sign in to comment.