Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions internal/database/sqlcommon/message_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,34 @@ func (s *SQLCommon) UpsertMessage(ctx context.Context, message *fftypes.Message,
return s.commitTx(ctx, tx, autoCommit)
}

// In SQL update+bump is a delete+insert within a TX
func (s *SQLCommon) ReplaceMessage(ctx context.Context, message *fftypes.Message) (err error) {
ctx, tx, autoCommit, err := s.beginOrUseTx(ctx)
if err != nil {
return err
}
defer s.rollbackTx(ctx, tx, autoCommit)

if err := s.deleteTx(ctx, tx,
sq.Delete("messages").
Where(sq.And{
sq.Eq{"id": message.Header.ID},
}),
nil, // no change event
); err != nil {
return err
}

if err = s.attemptMessageInsert(ctx, tx, message); err != nil {
return err
}

// Note there is no call to updateMessageDataRefs as the data refs are not allowed to change,
// and are correlated by UUID (not sequence)

return s.commitTx(ctx, tx, autoCommit)
}

func (s *SQLCommon) updateMessageDataRefs(ctx context.Context, tx *txWrapper, message *fftypes.Message, recreateDatarefs bool) error {

if recreateDatarefs {
Expand Down
45 changes: 43 additions & 2 deletions internal/database/sqlcommon/message_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ func TestUpsertE2EWithDB(t *testing.T) {
TxType: fftypes.TransactionTypeNone,
},
Hash: fftypes.NewRandB32(),
State: fftypes.MessageStateReady,
State: fftypes.MessageStateStaged,
Confirmed: nil,
Data: []*fftypes.DataRef{
{ID: dataID1, Hash: rand1},
{ID: dataID2, Hash: rand2},
},
}

s.callbacks.On("OrderedUUIDCollectionNSEvent", database.CollectionMessages, fftypes.ChangeEventTypeCreated, "ns12345", msgID, mock.Anything).Return()
s.callbacks.On("OrderedUUIDCollectionNSEvent", database.CollectionMessages, fftypes.ChangeEventTypeCreated, "ns12345", msgID, mock.Anything).Return().Twice()
s.callbacks.On("OrderedUUIDCollectionNSEvent", database.CollectionMessages, fftypes.ChangeEventTypeUpdated, "ns12345", msgID, mock.Anything).Return()

err := s.UpsertMessage(ctx, msg, database.UpsertOptimizationNew)
Expand Down Expand Up @@ -196,6 +196,15 @@ func TestUpsertE2EWithDB(t *testing.T) {
assert.Equal(t, 1, len(msgs))
assert.Equal(t, *bid2, *msgs[0].BatchID)

// Bump and Update - this is for a ready transition
msgUpdated.State = fftypes.MessageStateReady
err = s.ReplaceMessage(context.Background(), msgUpdated)
assert.NoError(t, err)
msgRead, err = s.GetMessageByID(ctx, msgUpdated.Header.ID)
msgJson, _ = json.Marshal(&msgUpdated)
msgReadJson, _ = json.Marshal(msgRead)
assert.Equal(t, string(msgJson), string(msgReadJson))

s.callbacks.AssertExpectations(t)
}

Expand Down Expand Up @@ -267,6 +276,38 @@ func TestUpsertMessageFailCommit(t *testing.T) {
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestReplaceMessageFailBegin(t *testing.T) {
s, mock := newMockProvider().init()
mock.ExpectBegin().WillReturnError(fmt.Errorf("pop"))
msgID := fftypes.NewUUID()
err := s.ReplaceMessage(context.Background(), &fftypes.Message{Header: fftypes.MessageHeader{ID: msgID}})
assert.Regexp(t, "FF10114", err)
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestReplaceMessageFailDelete(t *testing.T) {
s, mock := newMockProvider().init()
mock.ExpectBegin()
mock.ExpectExec("DELETE .*").WillReturnError(fmt.Errorf("pop"))
mock.ExpectRollback()
msgID := fftypes.NewUUID()
err := s.ReplaceMessage(context.Background(), &fftypes.Message{Header: fftypes.MessageHeader{ID: msgID}})
assert.Regexp(t, "FF10118", err)
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestReplaceMessageFailInsert(t *testing.T) {
s, mock := newMockProvider().init()
mock.ExpectBegin()
mock.ExpectExec("DELETE .*").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("INSERT .*").WillReturnError(fmt.Errorf("pop"))
mock.ExpectRollback()
msgID := fftypes.NewUUID()
err := s.ReplaceMessage(context.Background(), &fftypes.Message{Header: fftypes.MessageHeader{ID: msgID}})
assert.Regexp(t, "FF10116", err)
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestUpdateMessageDataRefsNilID(t *testing.T) {
s, mock := newMockProvider().init()
msgID := fftypes.NewUUID()
Expand Down
2 changes: 1 addition & 1 deletion internal/events/tokens_transferred.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (em *eventManager) TokensTransferred(ti tokens.Plugin, transfer *tokens.Tok
if msg.State == fftypes.MessageStateStaged {
// Message can now be sent
msg.State = fftypes.MessageStateReady
if err := em.database.UpsertMessage(ctx, msg, database.UpsertOptimizationExisting); err != nil {
if err := em.database.ReplaceMessage(ctx, msg); err != nil {
return err
}
} else {
Expand Down
6 changes: 2 additions & 4 deletions internal/events/tokens_transferred_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/hyperledger/firefly/mocks/databasemocks"
"github.com/hyperledger/firefly/mocks/tokenmocks"
"github.com/hyperledger/firefly/pkg/blockchain"
"github.com/hyperledger/firefly/pkg/database"
"github.com/hyperledger/firefly/pkg/fftypes"
"github.com/hyperledger/firefly/pkg/tokens"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -430,10 +429,9 @@ func TestTokensTransferredWithMessageSend(t *testing.T) {
mdi.On("UpsertTokenTransfer", em.ctx, &transfer.TokenTransfer).Return(nil).Times(2)
mdi.On("UpdateTokenBalances", em.ctx, &transfer.TokenTransfer).Return(nil).Times(2)
mdi.On("GetMessageByID", em.ctx, mock.Anything).Return(message, nil).Times(2)
mdi.On("UpsertMessage", em.ctx, mock.Anything, database.UpsertOptimizationExisting).Return(fmt.Errorf("pop"))
mdi.On("UpsertMessage", em.ctx, mock.MatchedBy(func(msg *fftypes.Message) bool {
mdi.On("ReplaceMessage", em.ctx, mock.MatchedBy(func(msg *fftypes.Message) bool {
return msg.State == fftypes.MessageStateReady
}), database.UpsertOptimizationExisting).Return(nil)
})).Return(fmt.Errorf("pop"))
mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(ev *fftypes.Event) bool {
return ev.Type == fftypes.EventTypeTransferConfirmed && ev.Reference == transfer.LocalID && ev.Namespace == pool.Namespace
})).Return(nil).Once()
Expand Down
14 changes: 14 additions & 0 deletions mocks/databasemocks/plugin.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/database/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ type iMessageCollection interface {
// UpdateMessage - Update message
UpdateMessage(ctx context.Context, id *fftypes.UUID, update Update) (err error)

// ReplaceMessage updates the message, and assigns it a new sequence number at the front of the list.
// A new event is raised for the message, with the new sequence number - as if it was brand new.
ReplaceMessage(ctx context.Context, message *fftypes.Message) (err error)

// UpdateMessages - Update messages
UpdateMessages(ctx context.Context, filter Filter, update Update) (err error)

Expand Down