diff --git a/internal/assets/token_pool.go b/internal/assets/token_pool.go index 4e7f08048..aa59418e1 100644 --- a/internal/assets/token_pool.go +++ b/internal/assets/token_pool.go @@ -111,7 +111,7 @@ func (am *assetManager) createTokenPoolInternal(ctx context.Context, pool *fftyp err = am.database.RunAsGroup(ctx, func(ctx context.Context) (err error) { err = am.database.UpsertTransaction(ctx, tx, false /* should be new, or idempotent replay */) if err == nil { - err = am.database.UpsertOperation(ctx, op, false) + err = am.database.InsertOperation(ctx, op) } return err }) diff --git a/internal/assets/token_pool_test.go b/internal/assets/token_pool_test.go index cf4d02918..b5ef820ca 100644 --- a/internal/assets/token_pool_test.go +++ b/internal/assets/token_pool_test.go @@ -94,7 +94,7 @@ func TestCreateTokenPoolFail(t *testing.T) { mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenPool }), false).Return(nil) - mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) + mdi.On("InsertOperation", context.Background(), mock.Anything).Return(nil) mti.On("CreateTokenPool", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) _, err := am.CreateTokenPool(context.Background(), "ns1", pool, false) @@ -136,7 +136,7 @@ func TestCreateTokenPoolOperationFail(t *testing.T) { mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenPool }), false).Return(nil) - mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(fmt.Errorf("pop")) + mdi.On("InsertOperation", context.Background(), mock.Anything).Return(fmt.Errorf("pop")) _, err := am.CreateTokenPool(context.Background(), "ns1", pool, false) assert.Regexp(t, "pop", err) @@ -160,7 +160,7 @@ func TestCreateTokenPoolSuccess(t *testing.T) { mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenPool }), false).Return(nil) - mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) + mdi.On("InsertOperation", context.Background(), mock.Anything).Return(nil) _, err := am.CreateTokenPool(context.Background(), "ns1", pool, false) assert.NoError(t, err) @@ -184,7 +184,7 @@ func TestCreateTokenPoolUnknownConnectorSuccess(t *testing.T) { mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenPool }), false).Return(nil) - mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) + mdi.On("InsertOperation", context.Background(), mock.Anything).Return(nil) _, err := am.CreateTokenPool(context.Background(), "ns1", pool, false) assert.NoError(t, err) @@ -244,7 +244,7 @@ func TestCreateTokenPoolConfirm(t *testing.T) { mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenPool }), false).Return(nil) - mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil).Times(1) + mdi.On("InsertOperation", context.Background(), mock.Anything).Return(nil).Times(1) msa.On("WaitForTokenPool", context.Background(), "ns1", mock.Anything, mock.Anything). Run(func(args mock.Arguments) { send := args[3].(syncasync.RequestSender) @@ -322,7 +322,7 @@ func TestCreateTokenPoolByTypeFail(t *testing.T) { mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenPool }), false).Return(nil) - mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) + mdi.On("InsertOperation", context.Background(), mock.Anything).Return(nil) mti.On("CreateTokenPool", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) _, err := am.CreateTokenPoolByType(context.Background(), "ns1", "magic-tokens", pool, false) @@ -364,7 +364,7 @@ func TestCreateTokenPoolByTypeOperationFail(t *testing.T) { mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenPool }), false).Return(nil) - mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(fmt.Errorf("pop")) + mdi.On("InsertOperation", context.Background(), mock.Anything).Return(fmt.Errorf("pop")) _, err := am.CreateTokenPoolByType(context.Background(), "ns1", "magic-tokens", pool, false) assert.Regexp(t, "pop", err) @@ -388,7 +388,7 @@ func TestCreateTokenPoolByTypeSuccess(t *testing.T) { mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenPool }), false).Return(nil) - mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) + mdi.On("InsertOperation", context.Background(), mock.Anything).Return(nil) _, err := am.CreateTokenPoolByType(context.Background(), "ns1", "magic-tokens", pool, false) assert.NoError(t, err) @@ -413,7 +413,7 @@ func TestCreateTokenPoolByTypeConfirm(t *testing.T) { mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenPool }), false).Return(nil) - mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil).Times(1) + mdi.On("InsertOperation", context.Background(), mock.Anything).Return(nil).Times(1) msa.On("WaitForTokenPool", context.Background(), "ns1", mock.Anything, mock.Anything). Run(func(args mock.Arguments) { send := args[3].(syncasync.RequestSender) diff --git a/internal/assets/token_transfer.go b/internal/assets/token_transfer.go index e82ce0754..ad4455818 100644 --- a/internal/assets/token_transfer.go +++ b/internal/assets/token_transfer.go @@ -284,7 +284,7 @@ func (s *transferSender) sendInternal(ctx context.Context, method sendMethod) er if err != nil { return err } - if err = s.mgr.database.UpsertOperation(ctx, op, false); err != nil { + if err = s.mgr.database.InsertOperation(ctx, op); err != nil { return err } if s.transfer.Message != nil { diff --git a/internal/assets/token_transfer_test.go b/internal/assets/token_transfer_test.go index 9254be306..f6d4a7867 100644 --- a/internal/assets/token_transfer_test.go +++ b/internal/assets/token_transfer_test.go @@ -130,7 +130,7 @@ func TestMintTokensSuccess(t *testing.T) { mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer }), false).Return(nil) - mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) + mdi.On("InsertOperation", context.Background(), mock.Anything).Return(nil) _, err := am.MintTokens(context.Background(), "ns1", mint, false) assert.NoError(t, err) @@ -160,7 +160,7 @@ func TestMintTokenUnknownConnectorSuccess(t *testing.T) { mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer }), false).Return(nil) - mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) + mdi.On("InsertOperation", context.Background(), mock.Anything).Return(nil) _, err := am.MintTokens(context.Background(), "ns1", mint, false) assert.NoError(t, err) @@ -281,7 +281,7 @@ func TestMintTokenUnknownPoolSuccess(t *testing.T) { mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer }), false).Return(nil) - mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) + mdi.On("InsertOperation", context.Background(), mock.Anything).Return(nil) _, err := am.MintTokens(context.Background(), "ns1", mint, false) assert.NoError(t, err) @@ -453,7 +453,7 @@ func TestMintTokensFail(t *testing.T) { mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer }), false).Return(nil) - mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) + mdi.On("InsertOperation", context.Background(), mock.Anything).Return(nil) mdi.On("UpdateTransaction", context.Background(), mock.Anything, mock.Anything).Return(nil) mdi.On("UpdateOperation", context.Background(), mock.Anything, mock.Anything).Return(nil) @@ -482,7 +482,7 @@ func TestMintTokensFailAndDbFail(t *testing.T) { mim.On("GetLocalOrganization", context.Background()).Return(&fftypes.Organization{Identity: "0x12345"}, nil) mdi.On("GetTokenPool", context.Background(), "ns1", "pool1").Return(pool, nil) mti.On("MintTokens", context.Background(), mock.Anything, "F1", &mint.TokenTransfer).Return(fmt.Errorf("pop")) - mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) + mdi.On("InsertOperation", context.Background(), mock.Anything).Return(nil) mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer && tx.Status != fftypes.OpStatusFailed }), false).Return(nil) @@ -515,7 +515,7 @@ func TestMintTokensOperationFail(t *testing.T) { mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer }), false).Return(nil) - mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(fmt.Errorf("pop")) + mdi.On("InsertOperation", context.Background(), mock.Anything).Return(fmt.Errorf("pop")) _, err := am.MintTokens(context.Background(), "ns1", mint, false) assert.EqualError(t, err, "pop") @@ -547,7 +547,7 @@ func TestMintTokensConfirm(t *testing.T) { mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer }), false).Return(nil) - mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) + mdi.On("InsertOperation", context.Background(), mock.Anything).Return(nil) msa.On("WaitForTokenTransfer", context.Background(), "ns1", mock.Anything, mock.Anything). Run(func(args mock.Arguments) { send := args[3].(syncasync.RequestSender) @@ -587,7 +587,7 @@ func TestMintTokensByTypeSuccess(t *testing.T) { mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer }), false).Return(nil) - mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) + mdi.On("InsertOperation", context.Background(), mock.Anything).Return(nil) _, err := am.MintTokensByType(context.Background(), "ns1", "magic-tokens", "pool1", mint, false) assert.NoError(t, err) @@ -617,7 +617,7 @@ func TestBurnTokensSuccess(t *testing.T) { mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer }), false).Return(nil) - mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) + mdi.On("InsertOperation", context.Background(), mock.Anything).Return(nil) _, err := am.BurnTokens(context.Background(), "ns1", burn, false) assert.NoError(t, err) @@ -671,7 +671,7 @@ func TestBurnTokensConfirm(t *testing.T) { mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer }), false).Return(nil) - mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) + mdi.On("InsertOperation", context.Background(), mock.Anything).Return(nil) msa.On("WaitForTokenTransfer", context.Background(), "ns1", mock.Anything, mock.Anything). Run(func(args mock.Arguments) { send := args[3].(syncasync.RequestSender) @@ -711,7 +711,7 @@ func TestBurnTokensByTypeSuccess(t *testing.T) { mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer }), false).Return(nil) - mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) + mdi.On("InsertOperation", context.Background(), mock.Anything).Return(nil) _, err := am.BurnTokensByType(context.Background(), "ns1", "magic-tokens", "pool1", burn, false) assert.NoError(t, err) @@ -747,7 +747,7 @@ func TestTransferTokensSuccess(t *testing.T) { mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer }), false).Return(nil) - mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) + mdi.On("InsertOperation", context.Background(), mock.Anything).Return(nil) _, err := am.TransferTokens(context.Background(), "ns1", transfer, false) assert.NoError(t, err) @@ -846,7 +846,7 @@ func TestTransferTokensInvalidType(t *testing.T) { mdi.On("UpsertTransaction", am.ctx, mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer }), false).Return(nil) - mdi.On("UpsertOperation", am.ctx, mock.Anything, false).Return(nil) + mdi.On("InsertOperation", am.ctx, mock.Anything).Return(nil) sender := &transferSender{ mgr: am, @@ -933,7 +933,7 @@ func TestTransferTokensWithBroadcastMessage(t *testing.T) { mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer }), false).Return(nil) - mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) + mdi.On("InsertOperation", context.Background(), mock.Anything).Return(nil) mbm.On("NewBroadcast", "ns1", transfer.Message).Return(mms) mms.On("Prepare", context.Background()).Return(nil) mdi.On("UpsertMessage", context.Background(), mock.MatchedBy(func(msg *fftypes.Message) bool { @@ -1031,7 +1031,7 @@ func TestTransferTokensWithPrivateMessage(t *testing.T) { mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer }), false).Return(nil) - mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) + mdi.On("InsertOperation", context.Background(), mock.Anything).Return(nil) mpm.On("NewMessage", "ns1", transfer.Message).Return(mms) mms.On("Prepare", context.Background()).Return(nil) mdi.On("UpsertMessage", context.Background(), mock.MatchedBy(func(msg *fftypes.Message) bool { @@ -1112,7 +1112,7 @@ func TestTransferTokensConfirm(t *testing.T) { mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer }), false).Return(nil) - mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) + mdi.On("InsertOperation", context.Background(), mock.Anything).Return(nil) msa.On("WaitForTokenTransfer", context.Background(), "ns1", mock.Anything, mock.Anything). Run(func(args mock.Arguments) { send := args[3].(syncasync.RequestSender) @@ -1170,7 +1170,7 @@ func TestTransferTokensWithBroadcastConfirm(t *testing.T) { mim.On("GetLocalOrganization", context.Background()).Return(&fftypes.Organization{Identity: "0x12345"}, nil) mdi.On("GetTokenPool", context.Background(), "ns1", "pool1").Return(pool, nil) mti.On("TransferTokens", context.Background(), mock.Anything, "F1", &transfer.TokenTransfer).Return(nil) - mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) + mdi.On("InsertOperation", context.Background(), mock.Anything).Return(nil) mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer }), false).Return(nil) @@ -1230,7 +1230,7 @@ func TestTransferTokensByTypeSuccess(t *testing.T) { mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenTransfer }), false).Return(nil) - mdi.On("UpsertOperation", context.Background(), mock.Anything, false).Return(nil) + mdi.On("InsertOperation", context.Background(), mock.Anything).Return(nil) _, err := am.TransferTokensByType(context.Background(), "ns1", "magic-tokens", "pool1", transfer, false) assert.NoError(t, err) diff --git a/internal/batchpin/batchpin.go b/internal/batchpin/batchpin.go index f5a7db6ca..30c93b534 100644 --- a/internal/batchpin/batchpin.go +++ b/internal/batchpin/batchpin.go @@ -73,7 +73,7 @@ func (bp *batchPinSubmitter) SubmitPinnedBatch(ctx context.Context, batch *fftyp "", fftypes.OpTypeBlockchainBatchPin, fftypes.OpStatusPending) - err = bp.database.UpsertOperation(ctx, op, false) + err = bp.database.InsertOperation(ctx, op) if err != nil { return err } diff --git a/internal/batchpin/batchpin_test.go b/internal/batchpin/batchpin_test.go index 2b603aea6..fdc1abda1 100644 --- a/internal/batchpin/batchpin_test.go +++ b/internal/batchpin/batchpin_test.go @@ -63,12 +63,12 @@ func TestSubmitPinnedBatchOk(t *testing.T) { contexts := []*fftypes.Bytes32{} mdi.On("UpsertTransaction", ctx, mock.Anything, false).Return(nil) - mdi.On("UpsertOperation", ctx, mock.MatchedBy(func(op *fftypes.Operation) bool { + mdi.On("InsertOperation", ctx, mock.MatchedBy(func(op *fftypes.Operation) bool { assert.Equal(t, fftypes.OpTypeBlockchainBatchPin, op.Type) assert.Equal(t, "ut", op.Plugin) assert.Equal(t, *batch.Payload.TX.ID, *op.Transaction) return true - }), false).Return(nil) + })).Return(nil) mbi.On("SubmitBatchPin", ctx, mock.Anything, (*fftypes.UUID)(nil), "0x12345", mock.Anything).Return(nil) err := bp.SubmitPinnedBatch(ctx, batch, contexts) @@ -99,12 +99,12 @@ func TestSubmitPinnedBatchWithMetricsOk(t *testing.T) { contexts := []*fftypes.Bytes32{} mdi.On("UpsertTransaction", ctx, mock.Anything, false).Return(nil) - mdi.On("UpsertOperation", ctx, mock.MatchedBy(func(op *fftypes.Operation) bool { + mdi.On("InsertOperation", ctx, mock.MatchedBy(func(op *fftypes.Operation) bool { assert.Equal(t, fftypes.OpTypeBlockchainBatchPin, op.Type) assert.Equal(t, "ut", op.Plugin) assert.Equal(t, *batch.Payload.TX.ID, *op.Transaction) return true - }), false).Return(nil) + })).Return(nil) mbi.On("SubmitBatchPin", ctx, mock.Anything, (*fftypes.UUID)(nil), "0x12345", mock.Anything).Return(nil) err := bp.SubmitPinnedBatch(ctx, batch, contexts) @@ -133,7 +133,7 @@ func TestSubmitPinnedBatchOpFail(t *testing.T) { contexts := []*fftypes.Bytes32{} mdi.On("UpsertTransaction", ctx, mock.Anything, false).Return(nil) - mdi.On("UpsertOperation", ctx, mock.Anything, false).Return(fmt.Errorf("pop")) + mdi.On("InsertOperation", ctx, mock.Anything).Return(fmt.Errorf("pop")) err := bp.SubmitPinnedBatch(ctx, batch, contexts) assert.Regexp(t, "pop", err) diff --git a/internal/broadcast/manager.go b/internal/broadcast/manager.go index 4430f21c8..34a32fffe 100644 --- a/internal/broadcast/manager.go +++ b/internal/broadcast/manager.go @@ -129,7 +129,7 @@ func (bm *broadcastManager) submitTXAndUpdateDB(ctx context.Context, batch *ffty fftypes.OpTypePublicStorageBatchBroadcast, fftypes.OpStatusSucceeded, // Note we performed the action synchronously above ) - err = bm.database.UpsertOperation(ctx, op, false) + err = bm.database.InsertOperation(ctx, op) if err != nil { return err } diff --git a/internal/broadcast/manager_test.go b/internal/broadcast/manager_test.go index f1d574380..df51ffeb3 100644 --- a/internal/broadcast/manager_test.go +++ b/internal/broadcast/manager_test.go @@ -154,7 +154,7 @@ func TestDispatchBatchSubmitBatchPinSucceed(t *testing.T) { mbp := bm.batchpin.(*batchpinmocks.Submitter) mps.On("PublishData", mock.Anything, mock.Anything).Return("id1", nil) mdi.On("UpdateBatch", mock.Anything, batch.ID, mock.Anything).Return(nil) - mdi.On("UpsertOperation", mock.Anything, mock.Anything, false).Return(nil) + mdi.On("InsertOperation", mock.Anything, mock.Anything).Return(nil) mbp.On("SubmitPinnedBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil) err := bm.dispatchBatch(context.Background(), batch, []*fftypes.Bytes32{fftypes.NewRandB32()}) @@ -170,7 +170,7 @@ func TestDispatchBatchSubmitBroadcastFail(t *testing.T) { mbp := bm.batchpin.(*batchpinmocks.Submitter) mps.On("PublishData", mock.Anything, mock.Anything).Return("id1", nil) mdi.On("UpdateBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil) - mdi.On("UpsertOperation", mock.Anything, mock.Anything, false).Return(nil) + mdi.On("InsertOperation", mock.Anything, mock.Anything).Return(nil) mbp.On("SubmitPinnedBatch", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) err := bm.dispatchBatch(context.Background(), &fftypes.Batch{Identity: fftypes.Identity{Author: "wrong", Key: "wrong"}}, []*fftypes.Bytes32{fftypes.NewRandB32()}) @@ -198,7 +198,7 @@ func TestSubmitTXAndUpdateDBAddOp1Fail(t *testing.T) { mbi := bm.blockchain.(*blockchainmocks.Plugin) mdi.On("UpsertTransaction", mock.Anything, mock.Anything, false).Return(nil) mdi.On("UpdateBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil) - mdi.On("UpsertOperation", mock.Anything, mock.Anything, false).Return(fmt.Errorf("pop")) + mdi.On("InsertOperation", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) mbi.On("SubmitBatchPin", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("txid", nil) mbi.On("Name").Return("unittest") @@ -226,7 +226,7 @@ func TestSubmitTXAndUpdateDBSucceed(t *testing.T) { mbp := bm.batchpin.(*batchpinmocks.Submitter) mdi.On("UpsertTransaction", mock.Anything, mock.Anything, false).Return(nil) mdi.On("UpdateBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil) - mdi.On("UpsertOperation", mock.Anything, mock.Anything, false).Return(nil) + mdi.On("InsertOperation", mock.Anything, mock.Anything).Return(nil) mbi.On("SubmitBatchPin", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) mbp.On("SubmitPinnedBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil) diff --git a/internal/database/sqlcommon/operation_sql.go b/internal/database/sqlcommon/operation_sql.go index 349c85818..c69617d28 100644 --- a/internal/database/sqlcommon/operation_sql.go +++ b/internal/database/sqlcommon/operation_sql.go @@ -50,76 +50,35 @@ var ( } ) -func (s *SQLCommon) UpsertOperation(ctx context.Context, operation *fftypes.Operation, allowExisting bool) (err error) { - +func (s *SQLCommon) InsertOperation(ctx context.Context, operation *fftypes.Operation) (err error) { ctx, tx, autoCommit, err := s.beginOrUseTx(ctx) if err != nil { return err } defer s.rollbackTx(ctx, tx, autoCommit) - existing := false - if allowExisting { - // Do a select within the transaction to detemine if the UUID already exists - opRows, _, err := s.queryTx(ctx, tx, - sq.Select("id"). - From("operations"). - Where(sq.Eq{"id": operation.ID}), - ) - if err != nil { - return err - } - - existing = opRows.Next() - opRows.Close() - } - - if existing { - // Update the operation - if _, err = s.updateTx(ctx, tx, - sq.Update("operations"). - Set("namespace", operation.Namespace). - Set("tx_id", operation.Transaction). - Set("optype", operation.Type). - Set("opstatus", operation.Status). - Set("plugin", operation.Plugin). - Set("backend_id", operation.BackendID). - Set("created", operation.Created). - Set("updated", operation.Updated). - Set("error", operation.Error). - Set("input", operation.Input). - Set("output", operation.Output). - Where(sq.Eq{"id": operation.ID}), - func() { - s.callbacks.UUIDCollectionNSEvent(database.CollectionOperations, fftypes.ChangeEventTypeUpdated, operation.Namespace, operation.ID) - }, - ); err != nil { - return err - } - } else { - if _, err = s.insertTx(ctx, tx, - sq.Insert("operations"). - Columns(opColumns...). - Values( - operation.ID, - operation.Namespace, - operation.Transaction, - string(operation.Type), - string(operation.Status), - operation.Plugin, - operation.BackendID, - operation.Created, - operation.Updated, - operation.Error, - operation.Input, - operation.Output, - ), - func() { - s.callbacks.UUIDCollectionNSEvent(database.CollectionOperations, fftypes.ChangeEventTypeCreated, operation.Namespace, operation.ID) - }, - ); err != nil { - return err - } + if _, err = s.insertTx(ctx, tx, + sq.Insert("operations"). + Columns(opColumns...). + Values( + operation.ID, + operation.Namespace, + operation.Transaction, + string(operation.Type), + string(operation.Status), + operation.Plugin, + operation.BackendID, + operation.Created, + operation.Updated, + operation.Error, + operation.Input, + operation.Output, + ), + func() { + s.callbacks.UUIDCollectionNSEvent(database.CollectionOperations, fftypes.ChangeEventTypeCreated, operation.Namespace, operation.ID) + }, + ); err != nil { + return err } return s.commitTx(ctx, tx, autoCommit) diff --git a/internal/database/sqlcommon/operation_sql_test.go b/internal/database/sqlcommon/operation_sql_test.go index 8a722b16d..e644677b3 100644 --- a/internal/database/sqlcommon/operation_sql_test.go +++ b/internal/database/sqlcommon/operation_sql_test.go @@ -37,29 +37,6 @@ func TestOperationE2EWithDB(t *testing.T) { // Create a new operation entry operationID := fftypes.NewUUID() operation := &fftypes.Operation{ - ID: operationID, - Namespace: "ns1", - Type: fftypes.OpTypeBlockchainBatchPin, - Transaction: fftypes.NewUUID(), - Status: fftypes.OpStatusPending, - Created: fftypes.Now(), - } - s.callbacks.On("UUIDCollectionNSEvent", database.CollectionOperations, fftypes.ChangeEventTypeCreated, "ns1", operationID).Return() - s.callbacks.On("UUIDCollectionNSEvent", database.CollectionOperations, fftypes.ChangeEventTypeUpdated, "ns1", operationID).Return() - err := s.UpsertOperation(ctx, operation, true) - assert.NoError(t, err) - - // Check we get the exact same operation back - operationRead, err := s.GetOperationByID(ctx, operationID) - assert.NoError(t, err) - assert.NotNil(t, operationRead) - operationJson, _ := json.Marshal(&operation) - operationReadJson, _ := json.Marshal(&operationRead) - assert.Equal(t, string(operationJson), string(operationReadJson)) - - // Update the operation (this is testing what's possible at the database layer, - // and does not account for the verification that happens at the higher level) - operationUpdated := &fftypes.Operation{ ID: operationID, Namespace: "ns1", Type: fftypes.OpTypeBlockchainBatchPin, @@ -73,40 +50,41 @@ func TestOperationE2EWithDB(t *testing.T) { Created: fftypes.Now(), Updated: fftypes.Now(), } - err = s.UpsertOperation(context.Background(), operationUpdated, true) + s.callbacks.On("UUIDCollectionNSEvent", database.CollectionOperations, fftypes.ChangeEventTypeCreated, "ns1", operationID).Return() + err := s.InsertOperation(ctx, operation) assert.NoError(t, err) - // Check we get the exact same message back - note the removal of one of the operation elements - operationRead, err = s.GetOperationByID(ctx, operationID) + // Query back the operation (by ID) + operationRead, err := s.GetOperationByID(ctx, operationID) assert.NoError(t, err) - operationJson, _ = json.Marshal(&operationUpdated) - operationReadJson, _ = json.Marshal(&operationRead) + operationJson, _ := json.Marshal(operation) + operationReadJson, _ := json.Marshal(operationRead) assert.Equal(t, string(operationJson), string(operationReadJson)) - // Query back the operation + // Query back the operation (by query filter) fb := database.OperationQueryFactory.NewFilter(ctx) filter := fb.And( - fb.Eq("id", operationUpdated.ID.String()), - fb.Eq("tx", operationUpdated.Transaction), - fb.Eq("type", operationUpdated.Type), - fb.Eq("status", operationUpdated.Status), - fb.Eq("error", operationUpdated.Error), - fb.Eq("plugin", operationUpdated.Plugin), - fb.Eq("backendid", operationUpdated.BackendID), + fb.Eq("id", operation.ID.String()), + fb.Eq("tx", operation.Transaction), + fb.Eq("type", operation.Type), + fb.Eq("status", operation.Status), + fb.Eq("error", operation.Error), + fb.Eq("plugin", operation.Plugin), + fb.Eq("backendid", operation.BackendID), fb.Gt("created", 0), fb.Gt("updated", 0), ) - operations, res, err := s.GetOperations(ctx, filter.Count(true)) assert.NoError(t, err) assert.Equal(t, 1, len(operations)) assert.Equal(t, int64(1), *res.TotalCount) + operationJson, _ = json.Marshal(operation) operationReadJson, _ = json.Marshal(operations[0]) assert.Equal(t, string(operationJson), string(operationReadJson)) // Negative test on filter filter = fb.And( - fb.Eq("id", operationUpdated.ID.String()), + fb.Eq("id", operation.ID.String()), fb.Eq("updated", "0"), ) operations, _, err = s.GetOperations(ctx, filter) @@ -119,12 +97,12 @@ func TestOperationE2EWithDB(t *testing.T) { Set("status", fftypes.OpStatusSucceeded). Set("updated", updateTime). Set("error", "") - err = s.UpdateOperation(ctx, operationUpdated.ID, up) + err = s.UpdateOperation(ctx, operation.ID, up) assert.NoError(t, err) // Test find updated value filter = fb.And( - fb.Eq("id", operationUpdated.ID.String()), + fb.Eq("id", operation.ID.String()), fb.Eq("status", fftypes.OpStatusSucceeded), fb.Eq("error", ""), ) @@ -135,57 +113,32 @@ func TestOperationE2EWithDB(t *testing.T) { s.callbacks.AssertExpectations(t) } -func TestUpsertOperationFailBegin(t *testing.T) { +func TestInsertOperationFailBegin(t *testing.T) { s, mock := newMockProvider().init() mock.ExpectBegin().WillReturnError(fmt.Errorf("pop")) - err := s.UpsertOperation(context.Background(), &fftypes.Operation{}, true) + err := s.InsertOperation(context.Background(), &fftypes.Operation{}) assert.Regexp(t, "FF10114", err) assert.NoError(t, mock.ExpectationsWereMet()) } -func TestUpsertOperationFailSelect(t *testing.T) { +func TestInsertOperationFailInsert(t *testing.T) { s, mock := newMockProvider().init() mock.ExpectBegin() - mock.ExpectQuery("SELECT .*").WillReturnError(fmt.Errorf("pop")) - mock.ExpectRollback() - operationID := fftypes.NewUUID() - err := s.UpsertOperation(context.Background(), &fftypes.Operation{ID: operationID}, true) - assert.Regexp(t, "FF10115", err) - assert.NoError(t, mock.ExpectationsWereMet()) -} - -func TestUpsertOperationFailInsert(t *testing.T) { - s, mock := newMockProvider().init() - mock.ExpectBegin() - mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{})) mock.ExpectExec("INSERT .*").WillReturnError(fmt.Errorf("pop")) mock.ExpectRollback() operationID := fftypes.NewUUID() - err := s.UpsertOperation(context.Background(), &fftypes.Operation{ID: operationID}, true) + err := s.InsertOperation(context.Background(), &fftypes.Operation{ID: operationID}) assert.Regexp(t, "FF10116", err) assert.NoError(t, mock.ExpectationsWereMet()) } -func TestUpsertOperationFailUpdate(t *testing.T) { +func TestInsertOperationFailCommit(t *testing.T) { s, mock := newMockProvider().init() operationID := fftypes.NewUUID() mock.ExpectBegin() - mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(operationID.String())) - mock.ExpectExec("UPDATE .*").WillReturnError(fmt.Errorf("pop")) - mock.ExpectRollback() - err := s.UpsertOperation(context.Background(), &fftypes.Operation{ID: operationID}, true) - assert.Regexp(t, "FF10117", err) - assert.NoError(t, mock.ExpectationsWereMet()) -} - -func TestUpsertOperationFailCommit(t *testing.T) { - s, mock := newMockProvider().init() - operationID := fftypes.NewUUID() - mock.ExpectBegin() - mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{"id"})) mock.ExpectExec("INSERT .*").WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit().WillReturnError(fmt.Errorf("pop")) - err := s.UpsertOperation(context.Background(), &fftypes.Operation{ID: operationID}, true) + err := s.InsertOperation(context.Background(), &fftypes.Operation{ID: operationID}) assert.Regexp(t, "FF10119", err) assert.NoError(t, mock.ExpectationsWereMet()) } diff --git a/internal/events/token_pool_created.go b/internal/events/token_pool_created.go index 33d2d5b05..8808ecf3d 100644 --- a/internal/events/token_pool_created.go +++ b/internal/events/token_pool_created.go @@ -132,7 +132,7 @@ func (em *eventManager) shouldAnnounce(ctx context.Context, ti tokens.Plugin, po "", fftypes.OpTypeTokenAnnouncePool, fftypes.OpStatusPending) - return announcePool, em.database.UpsertOperation(ctx, nextOp, false) + return announcePool, em.database.InsertOperation(ctx, nextOp) } // It is expected that this method might be invoked twice for each pool, depending on the behavior of the connector. diff --git a/internal/events/token_pool_created_test.go b/internal/events/token_pool_created_test.go index f1eef19b1..c5079d295 100644 --- a/internal/events/token_pool_created_test.go +++ b/internal/events/token_pool_created_test.go @@ -321,9 +321,9 @@ func TestTokenPoolCreatedAnnounce(t *testing.T) { mdi.On("GetTokenPoolByProtocolID", em.ctx, "erc1155", "123").Return(nil, nil).Times(2) mdi.On("GetOperations", em.ctx, mock.Anything).Return(nil, nil, fmt.Errorf("pop")).Once() mdi.On("GetOperations", em.ctx, mock.Anything).Return(operations, nil, nil).Once() - mdi.On("UpsertOperation", em.ctx, mock.MatchedBy(func(op *fftypes.Operation) bool { + mdi.On("InsertOperation", em.ctx, mock.MatchedBy(func(op *fftypes.Operation) bool { return op.Type == fftypes.OpTypeTokenAnnouncePool - }), false).Return(nil) + })).Return(nil) mbm.On("BroadcastTokenPool", em.ctx, "test-ns", mock.MatchedBy(func(pool *fftypes.TokenPoolAnnouncement) bool { return pool.Pool.Namespace == "test-ns" && pool.Pool.Name == "my-pool" && *pool.Pool.ID == *poolID }), false).Return(nil, nil) diff --git a/internal/privatemessaging/privatemessaging.go b/internal/privatemessaging/privatemessaging.go index 51a364e7d..9196347f7 100644 --- a/internal/privatemessaging/privatemessaging.go +++ b/internal/privatemessaging/privatemessaging.go @@ -165,7 +165,7 @@ func (pm *privateMessaging) transferBlobs(ctx context.Context, data []*fftypes.D trackingID, fftypes.OpTypeDataExchangeBlobSend, fftypes.OpStatusPending) - if err = pm.database.UpsertOperation(ctx, op, false); err != nil { + if err = pm.database.InsertOperation(ctx, op); err != nil { return err } } @@ -211,7 +211,7 @@ func (pm *privateMessaging) sendData(ctx context.Context, mType string, mID *fft trackingID, fftypes.OpTypeDataExchangeBatchSend, fftypes.OpStatusPending) - if err = pm.database.UpsertOperation(ctx, op, false); err != nil { + if err = pm.database.InsertOperation(ctx, op); err != nil { return err } } diff --git a/internal/privatemessaging/privatemessaging_test.go b/internal/privatemessaging/privatemessaging_test.go index c1412018d..20f8fc460 100644 --- a/internal/privatemessaging/privatemessaging_test.go +++ b/internal/privatemessaging/privatemessaging_test.go @@ -130,22 +130,22 @@ func TestDispatchBatchWithBlobs(t *testing.T) { PayloadRef: "/blob/1", }, nil) mdx.On("TransferBLOB", pm.ctx, "node1", "/blob/1").Return("tracking1", nil) - mdi.On("UpsertOperation", pm.ctx, mock.MatchedBy(func(op *fftypes.Operation) bool { + mdi.On("InsertOperation", pm.ctx, mock.MatchedBy(func(op *fftypes.Operation) bool { return op.BackendID == "tracking1" && op.Type == fftypes.OpTypeDataExchangeBlobSend - }), false).Return(nil, nil) + })).Return(nil, nil) mdx.On("TransferBLOB", pm.ctx, "node2", "/blob/1").Return("tracking2", nil) - mdi.On("UpsertOperation", pm.ctx, mock.MatchedBy(func(op *fftypes.Operation) bool { + mdi.On("InsertOperation", pm.ctx, mock.MatchedBy(func(op *fftypes.Operation) bool { return op.BackendID == "tracking2" && op.Type == fftypes.OpTypeDataExchangeBlobSend - }), false).Return(nil, nil) + })).Return(nil, nil) mdx.On("SendMessage", pm.ctx, mock.Anything, mock.Anything).Return("tracking3", nil).Once() - mdi.On("UpsertOperation", pm.ctx, mock.MatchedBy(func(op *fftypes.Operation) bool { + mdi.On("InsertOperation", pm.ctx, mock.MatchedBy(func(op *fftypes.Operation) bool { return op.BackendID == "tracking3" && op.Type == fftypes.OpTypeDataExchangeBatchSend - }), false).Return(nil, nil) + })).Return(nil, nil) mdx.On("SendMessage", pm.ctx, mock.Anything, mock.Anything).Return("tracking4", nil).Once() - mdi.On("UpsertOperation", pm.ctx, mock.MatchedBy(func(op *fftypes.Operation) bool { + mdi.On("InsertOperation", pm.ctx, mock.MatchedBy(func(op *fftypes.Operation) bool { return op.BackendID == "tracking4" && op.Type == fftypes.OpTypeDataExchangeBatchSend - }), false).Return(nil, nil) + })).Return(nil, nil) mbp.On("SubmitPinnedBatch", pm.ctx, mock.Anything, mock.Anything).Return(nil) @@ -270,7 +270,7 @@ func TestSendImmediateFail(t *testing.T) { assert.Regexp(t, "pop", err) } -func TestSendSubmitUpsertOperationFail(t *testing.T) { +func TestSendSubmitInsertOperationFail(t *testing.T) { pm, cancel := newTestPrivateMessaging(t) defer cancel() @@ -281,7 +281,7 @@ func TestSendSubmitUpsertOperationFail(t *testing.T) { mdx.On("SendMessage", pm.ctx, mock.Anything, mock.Anything).Return("tracking1", nil) mdi := pm.database.(*databasemocks.Plugin) - mdi.On("UpsertOperation", pm.ctx, mock.Anything, false).Return(fmt.Errorf("pop")) + mdi.On("InsertOperation", pm.ctx, mock.Anything).Return(fmt.Errorf("pop")) err := pm.sendAndSubmitBatch(pm.ctx, &fftypes.Batch{ Identity: fftypes.Identity{ @@ -339,7 +339,7 @@ func TestWriteTransactionSubmitBatchPinFail(t *testing.T) { mdi := pm.database.(*databasemocks.Plugin) mdi.On("UpsertTransaction", pm.ctx, mock.Anything, true, false).Return(nil) - mdi.On("UpsertOperation", pm.ctx, mock.Anything, false).Return(nil) + mdi.On("InsertOperation", pm.ctx, mock.Anything).Return(nil) mbp := pm.batchpin.(*batchpinmocks.Submitter) mbp.On("SubmitPinnedBatch", pm.ctx, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) @@ -388,7 +388,7 @@ func TestTransferBlobsOpInsertFail(t *testing.T) { mdi.On("GetBlobMatchingHash", pm.ctx, mock.Anything).Return(&fftypes.Blob{PayloadRef: "blob/1"}, nil) mdx.On("TransferBLOB", pm.ctx, "peer1", "blob/1").Return("tracking1", nil) - mdi.On("UpsertOperation", pm.ctx, mock.Anything, false).Return(fmt.Errorf("pop")) + mdi.On("InsertOperation", pm.ctx, mock.Anything).Return(fmt.Errorf("pop")) err := pm.transferBlobs(pm.ctx, []*fftypes.Data{ {ID: fftypes.NewUUID(), Hash: fftypes.NewRandB32(), Blob: &fftypes.BlobRef{Hash: fftypes.NewRandB32()}}, diff --git a/mocks/databasemocks/plugin.go b/mocks/databasemocks/plugin.go index 659391926..8f0592553 100644 --- a/mocks/databasemocks/plugin.go +++ b/mocks/databasemocks/plugin.go @@ -1707,6 +1707,20 @@ func (_m *Plugin) InsertNextPin(ctx context.Context, nextpin *fftypes.NextPin) e return r0 } +// InsertOperation provides a mock function with given fields: ctx, operation +func (_m *Plugin) InsertOperation(ctx context.Context, operation *fftypes.Operation) error { + ret := _m.Called(ctx, operation) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *fftypes.Operation) error); ok { + r0 = rf(ctx, operation) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // Name provides a mock function with given fields: func (_m *Plugin) Name() string { ret := _m.Called() @@ -2099,20 +2113,6 @@ func (_m *Plugin) UpsertOffset(ctx context.Context, data *fftypes.Offset, allowE return r0 } -// UpsertOperation provides a mock function with given fields: ctx, operation, allowExisting -func (_m *Plugin) UpsertOperation(ctx context.Context, operation *fftypes.Operation, allowExisting bool) error { - ret := _m.Called(ctx, operation, allowExisting) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *fftypes.Operation, bool) error); ok { - r0 = rf(ctx, operation, allowExisting) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // UpsertOrganization provides a mock function with given fields: ctx, data, allowExisting func (_m *Plugin) UpsertOrganization(ctx context.Context, data *fftypes.Organization, allowExisting bool) error { ret := _m.Called(ctx, data, allowExisting) diff --git a/pkg/database/plugin.go b/pkg/database/plugin.go index a22f32c29..aeffb8db6 100644 --- a/pkg/database/plugin.go +++ b/pkg/database/plugin.go @@ -194,8 +194,8 @@ type iPinCollection interface { } type iOperationCollection interface { - // UpsertOperation - Upsert an operation - UpsertOperation(ctx context.Context, operation *fftypes.Operation, allowExisting bool) (err error) + // InsertOperation - Insert an operation + InsertOperation(ctx context.Context, operation *fftypes.Operation) (err error) // UpdateOperation - Update operation by ID UpdateOperation(ctx context.Context, id *fftypes.UUID, update Update) (err error)