Skip to content

Commit

Permalink
remove support for finality listener for all txs (#567)
Browse files Browse the repository at this point in the history
Signed-off-by: Angelo De Caro <adc@zurich.ibm.com>
  • Loading branch information
adecaro committed May 9, 2024
1 parent 0c52e84 commit ae0bd3f
Show file tree
Hide file tree
Showing 11 changed files with 196 additions and 40 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ require (
github.com/kilic/bls12-381 v0.1.0 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/libp2p/go-yamux/v4 v4.0.1 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/mitchellh/go-testing-interface v1.0.0 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/moby/patternmatcher v0.6.0 // indirect
github.com/moby/sys/sequential v0.5.0 // indirect
github.com/moby/sys/user v0.1.0 // indirect
Expand All @@ -111,6 +111,7 @@ require (
github.com/quic-go/quic-go v0.38.2 // indirect
github.com/quic-go/webtransport-go v0.5.3 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions integration/fabric/iou/views/approver.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ func (i *ApproverView) Call(context view.Context) (interface{}, error) {

// Check committer events
var wg sync.WaitGroup
wg.Add(3)
wg.Add(1)
_, ch, err := fabric.GetDefaultChannel(context)
assert.NoError(err)
committer := ch.Committer()
assert.NoError(err, committer.AddFinalityListener(tx.ID(), NewFinalityListener(tx.ID(), driver.Valid, &wg)), "failed to add committer listener")
assert.NoError(err, committer.AddFinalityListener("", NewFinalityListener(tx.ID(), driver.Valid, &wg)), "failed to add committer listener")
assert.Error(committer.AddFinalityListener("", NewFinalityListener(tx.ID(), driver.Valid, &wg)), "must have failed")

// Finally, the approver waits that the transaction completes its lifecycle
_, err = context.RunView(state.NewFinalityWithTimeoutView(tx, 1*time.Minute))
Expand Down
6 changes: 2 additions & 4 deletions integration/fabric/iou/views/borrower.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,11 @@ func (i *CreateIOUView) Call(context view.Context) (interface{}, error) {

// Check committer events
var wg sync.WaitGroup
wg.Add(2)
wg.Add(1)
_, ch, err := fabric.GetDefaultChannel(context)
assert.NoError(err)
committer := ch.Committer()
assert.NoError(err, committer.AddFinalityListener(tx.ID(), NewFinalityListener(tx.ID(), driver.Valid, &wg)), "failed to add committer listener")
assert.NoError(err, committer.AddFinalityListener("", NewFinalityListener(tx.ID(), driver.Valid, &wg)), "failed to add committer listener")

// At this point the borrower can send the transaction to the ordering service and wait for finality.
_, err = context.RunView(state.NewOrderingAndFinalityWithTimeoutView(tx, 1*time.Minute))
Expand Down Expand Up @@ -139,12 +138,11 @@ func (u UpdateIOUView) Call(context view.Context) (interface{}, error) {

// Check committer events
var wg sync.WaitGroup
wg.Add(2)
wg.Add(1)
_, ch, err := fabric.GetDefaultChannel(context)
assert.NoError(err)
committer := ch.Committer()
assert.NoError(err, committer.AddFinalityListener(tx.ID(), NewFinalityListener(tx.ID(), driver.Valid, &wg)), "failed to add committer listener")
assert.NoError(err, committer.AddFinalityListener("", NewFinalityListener(tx.ID(), driver.Valid, &wg)), "failed to add committer listener")

// At this point the borrower can send the transaction to the ordering service and wait for finality.
_, err = context.RunView(state.NewOrderingAndFinalityWithTimeoutView(tx, 1*time.Minute))
Expand Down
32 changes: 11 additions & 21 deletions platform/common/core/generic/committer/finality.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/common/core"
"github.com/hyperledger-labs/fabric-smart-client/platform/common/driver"
"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils"
"github.com/pkg/errors"
)

const (
// How often to poll the vault for newly-committed transactions
checkVaultFrequency = 1 * time.Second
// Listeners that do not listen for a specific txID, but for all transactions
allListenersKey = ""
checkVaultFrequency = 1 * time.Second
defaultEventQueueSize = 1000
)

Expand Down Expand Up @@ -74,7 +73,10 @@ func NewFinalityManager[V comparable](logger Logger, vault Vault[V], statuses ..
}
}

func (c *FinalityManager[V]) AddListener(txID core.TxID, toAdd FinalityListener[V]) {
func (c *FinalityManager[V]) AddListener(txID core.TxID, toAdd FinalityListener[V]) error {
if len(txID) == 0 {
return errors.Errorf("tx id must be not empty")
}
c.mutex.Lock()
defer c.mutex.Unlock()

Expand All @@ -83,6 +85,8 @@ func (c *FinalityManager[V]) AddListener(txID core.TxID, toAdd FinalityListener[
ls = []FinalityListener[V]{}
}
c.txIDListeners[txID] = append(ls, toAdd)

return nil
}

func (c *FinalityManager[V]) RemoveListener(txID core.TxID, toRemove FinalityListener[V]) {
Expand Down Expand Up @@ -177,10 +181,8 @@ func (c *FinalityManager[V]) cloneListeners(txID core.TxID) []FinalityListener[V
defer c.mutex.RUnlock()

txListeners := c.txIDListeners[txID]
allListeners := c.txIDListeners[allListenersKey]
clone := make([]FinalityListener[V], len(txListeners)+len(allListeners))
copy(clone[:len(txListeners)], txListeners)
copy(clone[len(txListeners):], allListeners)
clone := make([]FinalityListener[V], len(txListeners))
copy(clone, txListeners)
delete(c.txIDListeners, txID)

return clone
Expand All @@ -190,17 +192,5 @@ func (c *FinalityManager[V]) txIDs() []core.TxID {
c.mutex.RLock()
defer c.mutex.RUnlock()

allListenersKeyOffset := 0
if _, ok := c.txIDListeners[allListenersKey]; ok {
allListenersKeyOffset = 1
}
res := make([]core.TxID, len(c.txIDListeners)-allListenersKeyOffset)
i := 0
for k := range c.txIDListeners {
if len(k) != 0 {
res[i] = k
i++
}
}
return res
return utils.Keys(c.txIDListeners)
}
170 changes: 170 additions & 0 deletions platform/common/core/generic/committer/finality_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package committer

import (
"context"
"testing"
"time"

"github.com/hyperledger-labs/fabric-smart-client/platform/common/core"
"github.com/hyperledger-labs/fabric-smart-client/platform/common/driver"
"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

type MockVault struct {
mock.Mock
}

func (m *MockVault) Statuses(ids ...string) ([]driver.TxValidationStatus[int], error) {
args := m.Called(ids)
return args.Get(0).([]driver.TxValidationStatus[int]), args.Error(1)
}

type MockFinalityListener struct {
mock.Mock
}

func (m *MockFinalityListener) OnStatus(txID core.TxID, status int, statusMessage string) {
m.Called(txID, status, statusMessage)
}

func TestFinalityManager_AddListener(t *testing.T) {
vault := &MockVault{}
manager := NewFinalityManager[int](flogging.MustGetLogger("committer"), vault)
listener := &MockFinalityListener{}

err := manager.AddListener("txID", listener)
assert.NoError(t, err)
assert.Len(t, manager.txIDListeners, 1)
assert.Contains(t, manager.txIDListeners, "txID")
assert.Contains(t, manager.txIDListeners["txID"], listener)

// Adding listener with empty txID should return an error
err = manager.AddListener("", listener)
assert.Error(t, err)
assert.Len(t, manager.txIDListeners, 1)
}

func TestFinalityManager_RemoveListener(t *testing.T) {
vault := &MockVault{}
manager := NewFinalityManager[int](flogging.MustGetLogger("committer"), vault)
listener := &MockFinalityListener{}

assert.NoError(t, manager.AddListener("txID", listener))

manager.RemoveListener("txID", listener)
assert.Len(t, manager.txIDListeners, 0)

// Removing non-existing listener should do nothing
manager.RemoveListener("non-existing", listener)
assert.Len(t, manager.txIDListeners, 0)
}

func TestFinalityManager_Run(t *testing.T) {
vault := &MockVault{}
manager := NewFinalityManager[int](flogging.MustGetLogger("committer"), vault)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

manager.Run(ctx)

time.Sleep(2 * time.Second) // Wait for some time to let the goroutines run
}

func TestFinalityManager_RunStatusListener(t *testing.T) {
event := FinalityEvent[int]{
TxID: "txID",
ValidationCode: 1,
ValidationMessage: "message",
}

vault := &MockVault{}
manager := NewFinalityManager[int](flogging.MustGetLogger("committer"), vault)
manager.postStatuses = utils.NewSet(1)

// no listeners
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
manager.runStatusListener(ctx)

// with listeners
vault.On("Statuses", []string{"txID"}).Return([]driver.TxValidationStatus[int]{{
TxID: "txID",
ValidationCode: 1,
Message: "message",
}}, nil)
listener := &MockFinalityListener{}
listener.On("OnStatus", event.TxID, event.ValidationCode, event.ValidationMessage).Once()
assert.NoError(t, manager.AddListener("txID", listener))

ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
go manager.runEventQueue(ctx)
manager.runStatusListener(ctx)
listener.AssertExpectations(t)

// Error case: Vault returns an error
vault.On("Statuses", []string{"txID"}).Return(nil, errors.New("some error"))
listener = &MockFinalityListener{}
listener.On("OnStatus", event.TxID, event.ValidationCode, event.ValidationMessage)
assert.NoError(t, manager.AddListener("txID", listener))
manager.txIDListeners["txID"] = []FinalityListener[int]{listener}

ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
manager.runStatusListener(ctx)
listener.AssertNotCalled(t, "OnStatus", event.TxID, event.ValidationCode, event.ValidationMessage)

vault.AssertExpectations(t)
}

func TestFinalityManager_CloneListeners(t *testing.T) {
vault := &MockVault{}
manager := NewFinalityManager[int](flogging.MustGetLogger("committer"), vault)
listener := &MockFinalityListener{}
assert.NoError(t, manager.AddListener("txID", listener))

clone := manager.cloneListeners("txID")
assert.Len(t, clone, 1)
assert.Equal(t, clone[0], listener)
}

func TestFinalityManager_TxIDs(t *testing.T) {
vault := &MockVault{}
manager := NewFinalityManager[int](flogging.MustGetLogger("committer"), vault)

manager.txIDListeners["txID"] = []FinalityListener[int]{}

txIDs := manager.txIDs()
assert.Len(t, txIDs, 1)
assert.Equal(t, txIDs[0], "txID")
}

func TestFinalityManager_Dispatch_PanicRecovery(t *testing.T) {
vault := &MockVault{}
manager := NewFinalityManager[int](flogging.MustGetLogger("committer"), vault)
listener := &MockFinalityListener{}
event := FinalityEvent[int]{
TxID: "txID",
ValidationCode: 1,
}
assert.NoError(t, manager.AddListener("txID", listener))

listener.On("OnStatus", event.TxID, event.ValidationCode, event.ValidationMessage).Once().Run(func(args mock.Arguments) {
panic("listener panic")
})
assert.NotPanics(t, func() {
manager.Dispatch(event)
})
listener.AssertExpectations(t)
}
3 changes: 1 addition & 2 deletions platform/fabric/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ func (c *Committer) Status(txID string) (ValidationCode, string, error) {
// AddFinalityListener registers a listener for transaction status for the passed transaction id.
// If the status is already valid or invalid, the listener is called immediately.
// When the listener is invoked, then it is also removed.
// If the transaction id is empty, the listener will be called on status changes of any transaction.
// In this case, the listener is not removed
// The transaction id must not be empty.
func (c *Committer) AddFinalityListener(txID string, listener FinalityListener) error {
return c.committer.AddFinalityListener(txID, listener)
}
Expand Down
3 changes: 1 addition & 2 deletions platform/fabric/core/generic/committer/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,7 @@ func (c *Service) CommitTX(txID string, block uint64, indexInBlock int, envelope
}

func (c *Service) AddFinalityListener(txID string, listener driver.FinalityListener) error {
c.EventManager.AddListener(txID, listener)
return nil
return c.EventManager.AddListener(txID, listener)
}

func (c *Service) RemoveFinalityListener(txID string, listener driver.FinalityListener) error {
Expand Down
6 changes: 4 additions & 2 deletions platform/fabric/driver/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,10 @@ type Committer interface {
// a list of dependant transaction ids if they exist.
Status(txID string) (ValidationCode, string, error)

// AddFinalityListener registers a listener for transaction status changes for the passed transaction id.
// If the transaction id is empty, the listener will be called for all transactions.
// AddFinalityListener registers a listener for transaction status for the passed transaction id.
// If the status is already valid or invalid, the listener is called immediately.
// When the listener is invoked, then it is also removed.
// The transaction id must not be empty.
AddFinalityListener(txID string, listener FinalityListener) error

// RemoveFinalityListener unregisters the passed listener.
Expand Down
3 changes: 1 addition & 2 deletions platform/orion/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ func NewCommitter(c driver.Committer) *Committer {
// AddFinalityListener registers a listener for transaction status for the passed transaction id.
// If the status is already valid or invalid, the listener is called immediately.
// When the listener is invoked, then it is also removed.
// If the transaction id is empty, the listener will be called on status changes of any transaction.
// In this case, the listener is not removed
// The transaction id must not be empty.
func (c *Committer) AddFinalityListener(txID string, listener FinalityListener) error {
return c.c.AddFinalityListener(txID, listener)
}
Expand Down
3 changes: 1 addition & 2 deletions platform/orion/core/generic/committer/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,7 @@ func (c *committer) IsFinal(ctx context.Context, txID string) error {
}

func (c *committer) AddFinalityListener(txID string, listener driver.FinalityListener) error {
c.EventManager.AddListener(txID, listener)
return nil
return c.EventManager.AddListener(txID, listener)
}

func (c *committer) RemoveFinalityListener(txID string, listener driver.FinalityListener) error {
Expand Down
3 changes: 1 addition & 2 deletions platform/orion/driver/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ type Committer interface {
// AddFinalityListener registers a listener for transaction status for the passed transaction id.
// If the status is already valid or invalid, the listener is called immediately.
// When the listener is invoked, then it is also removed.
// If the transaction id is empty, the listener will be called on status changes of any transaction.
// In this case, the listener is not removed
// The transaction id must not be empty.
AddFinalityListener(txID string, listener FinalityListener) error

// RemoveFinalityListener unregisters the passed listener.
Expand Down

0 comments on commit ae0bd3f

Please sign in to comment.