Skip to content

Commit

Permalink
FABGW-8: Wait for transaction commits
Browse files Browse the repository at this point in the history
Initial implementation of commit waiting capability for the embedded
Gateway. Not yet integrated into Gateway services.

Signed-off-by: Mark S. Lewis <mark_lewis@uk.ibm.com>
  • Loading branch information
bestbeforetoday authored and sykesm committed Mar 16, 2021
1 parent b40f703 commit 76763d1
Show file tree
Hide file tree
Showing 10 changed files with 689 additions and 16 deletions.
6 changes: 3 additions & 3 deletions core/peer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

"github.com/hyperledger/fabric/core/config"
"github.com/hyperledger/fabric/internal/pkg/comm"
"github.com/hyperledger/fabric/internal/pkg/gateway"
gatewayconfig "github.com/hyperledger/fabric/internal/pkg/gateway/config"
"github.com/pkg/errors"
"github.com/spf13/viper"
)
Expand Down Expand Up @@ -213,7 +213,7 @@ type Config struct {
// The gateway service is used by client SDKs to
// interact with fabric networks

GatewayOptions gateway.Options
GatewayOptions gatewayconfig.Options
}

// GlobalConfig obtains a set of configuration from viper, build and returns
Expand Down Expand Up @@ -273,7 +273,7 @@ func (c *Config) load() error {
c.DeliverClientKeepaliveOptions.ClientTimeout = viper.GetDuration("peer.keepalive.deliveryClient.timeout")
}

c.GatewayOptions = gateway.GetOptions(viper.GetViper())
c.GatewayOptions = gatewayconfig.GetOptions(viper.GetViper())

c.VMEndpoint = viper.GetString("vm.endpoint")
c.VMDockerTLSEnabled = viper.GetBool("vm.docker.tls.enabled")
Expand Down
8 changes: 4 additions & 4 deletions core/peer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

"github.com/hyperledger/fabric/common/crypto/tlsgen"
"github.com/hyperledger/fabric/internal/pkg/comm"
"github.com/hyperledger/fabric/internal/pkg/gateway"
"github.com/hyperledger/fabric/internal/pkg/gateway/config"
"github.com/spf13/viper"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -373,7 +373,7 @@ func TestGlobalConfig(t *testing.T) {
DockerKey: filepath.Join(cwd, "test/vm/tls/key/file"),
DockerCA: filepath.Join(cwd, "test/vm/tls/ca/file"),

GatewayOptions: gateway.Options{
GatewayOptions: config.Options{
Enabled: true,
EndorsementTimeout: 10 * time.Second,
DialTimeout: 60 * time.Second,
Expand All @@ -396,7 +396,7 @@ func TestGlobalConfigDefault(t *testing.T) {
ValidatorPoolSize: runtime.NumCPU(),
VMNetworkMode: "host",
DeliverClientKeepaliveOptions: comm.DefaultKeepaliveOptions,
GatewayOptions: gateway.GetOptions(viper.GetViper()),
GatewayOptions: config.GetOptions(viper.GetViper()),
}

require.Equal(t, expectedConfig, coreConfig)
Expand Down Expand Up @@ -451,7 +451,7 @@ func TestPropagateEnvironment(t *testing.T) {
Path: "/testPath",
},
},
GatewayOptions: gateway.GetOptions(viper.GetViper()),
GatewayOptions: config.GetOptions(viper.GetViper()),
}
require.Equal(t, expectedConfig, coreConfig)
}
Expand Down
10 changes: 5 additions & 5 deletions internal/pkg/gateway/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/common"
gdiscovery "github.com/hyperledger/fabric/gossip/discovery"
"github.com/hyperledger/fabric/internal/pkg/gateway/config"
"github.com/hyperledger/fabric/internal/pkg/gateway/mocks"
idmocks "github.com/hyperledger/fabric/internal/pkg/identity/mocks"
"github.com/hyperledger/fabric/protoutil"
Expand Down Expand Up @@ -120,7 +121,7 @@ func TestGateway(t *testing.T) {

ca, err := tlsgen.NewCA()
require.NoError(t, err)
config := &dp.ConfigResult{
configResult := &dp.ConfigResult{
Orderers: map[string]*dp.Endpoints{
"msp1": {
Endpoint: []*dp.Endpoint{
Expand All @@ -141,18 +142,17 @@ func TestGateway(t *testing.T) {
{"id3", "peer2:9051", "msp1"},
}

disc := mockDiscovery(t, tt.plan, members, config)
disc := mockDiscovery(t, tt.plan, members, configResult)
if tt.setupDiscovery != nil {
tt.setupDiscovery(disc)
}

options := Options{
options := config.Options{
Enabled: true,
EndorsementTimeout: endorsementTimeout,
}

server := CreateServer(localEndorser, disc, "localhost:7051", "msp1", options)

server.registry.endpointFactory = createEndpointFactory(t, epDef)

require.NoError(t, err, "Failed to sign the proposal")
Expand Down Expand Up @@ -589,7 +589,7 @@ func TestGateway(t *testing.T) {
}

func TestNilArgs(t *testing.T) {
server := CreateServer(&mocks.EndorserClient{}, &mocks.Discovery{}, "localhost:7051", "msp1", GetOptions(viper.New()))
server := CreateServer(&mocks.EndorserClient{}, &mocks.Discovery{}, "localhost:7051", "msp1", config.GetOptions(viper.New()))
ctx := context.Background()

_, err := server.Evaluate(ctx, nil)
Expand Down
148 changes: 148 additions & 0 deletions internal/pkg/gateway/commit/channelnotifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
Copyright 2021 IBM All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package commit

import (
"sync"

"github.com/hyperledger/fabric/core/ledger"
)

type channelLevelNotifier struct {
commitChannel <-chan *ledger.CommitNotification
lock sync.Mutex
listeners map[string][]*transactionListener
done <-chan struct{}
}

func newChannelNotifier(done <-chan struct{}, commitChannel <-chan *ledger.CommitNotification) *channelLevelNotifier {
notifier := &channelLevelNotifier{
commitChannel: commitChannel,
listeners: make(map[string][]*transactionListener),
done: done,
}
go notifier.run()
return notifier
}

func (notifier *channelLevelNotifier) run() {
for {
select {
case blockCommit, ok := <-notifier.commitChannel:
if !ok {
// This should never happen and behaviour is currently intentionally undefined / untested
notifier.close()
return
}
notifier.removeCompletedListeners()
notifier.receiveBlock(blockCommit)
case <-notifier.done:
notifier.close()
return
}
}
}

func (notifier *channelLevelNotifier) receiveBlock(blockCommit *ledger.CommitNotification) {
for transactionID, status := range blockCommit.TxIDValidationCodes {
notification := &Notification{
BlockNumber: blockCommit.BlockNumber,
TransactionID: transactionID,
ValidationCode: status,
}
notifier.notify(notification)
}
}

func (notifier *channelLevelNotifier) removeCompletedListeners() {
notifier.lock.Lock()
defer notifier.lock.Unlock()

for key, listeners := range notifier.listeners {
for i := 0; i < len(listeners); {
if !listeners[i].isDone() {
i++
continue
}

listeners[i].close()

lastIndex := len(listeners) - 1
listeners[i] = listeners[lastIndex]
listeners = listeners[:lastIndex]
}

if len(listeners) > 0 {
notifier.listeners[key] = listeners
} else {
delete(notifier.listeners, key)
}
}
}

func (notifier *channelLevelNotifier) notify(notification *Notification) {
notifier.lock.Lock()
defer notifier.lock.Unlock()

for _, listener := range notifier.listeners[notification.TransactionID] {
listener.receive(notification)
listener.close()
}

delete(notifier.listeners, notification.TransactionID)
}

func (notifier *channelLevelNotifier) registerListener(done <-chan struct{}, transactionID string) <-chan Notification {
notifyChannel := make(chan Notification, 1) // avoid blocking and only expect one notification per channel
listener := &transactionListener{
done: done,
transactionID: transactionID,
notifyChannel: notifyChannel,
}

notifier.lock.Lock()
notifier.listeners[transactionID] = append(notifier.listeners[transactionID], listener)
notifier.lock.Unlock()

return notifyChannel
}

func (notifier *channelLevelNotifier) close() {
notifier.lock.Lock()
defer notifier.lock.Unlock()

for _, listeners := range notifier.listeners {
for _, listener := range listeners {
listener.close()
}
}

notifier.listeners = nil
}

type transactionListener struct {
done <-chan struct{}
transactionID string
notifyChannel chan<- Notification
}

func (listener *transactionListener) isDone() bool {
select {
case <-listener.done:
return true
default:
return false
}
}

func (listener *transactionListener) close() {
close(listener.notifyChannel)
}

func (listener *transactionListener) receive(notification *Notification) {
listener.notifyChannel <- *notification
}
119 changes: 119 additions & 0 deletions internal/pkg/gateway/commit/mock/notificationsupplier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 76763d1

Please sign in to comment.