From 9d731452158d0af148d436e781a3fefb2f4910f9 Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Tue, 7 Mar 2023 13:02:00 -0500 Subject: [PATCH] Replace "UpsertBatch" with "InsertOrGetBatch" When persisting an off-chain batch initially received from data exchange or shared storage, it should always be an insert. If the batch exists, just move on. Otherwise we may accidentally revert a confirmed batch to its previous state if it happens to get redelivered. Signed-off-by: Andrew Richardson --- internal/batch/batch_manager_test.go | 10 +- internal/batch/batch_processor.go | 3 +- internal/batch/batch_processor_test.go | 10 +- internal/database/sqlcommon/batch_sql.go | 106 ++++++------------ internal/database/sqlcommon/batch_sql_test.go | 92 +++------------ internal/events/batch_pin_complete_test.go | 24 +--- internal/events/dx_callbacks_test.go | 14 +-- internal/events/persist_batch.go | 15 +-- internal/events/persist_batch_test.go | 27 ++++- internal/events/ss_callbacks_test.go | 4 +- internal/events/webhooks/webhooks.go | 2 +- mocks/databasemocks/plugin.go | 40 ++++--- pkg/database/plugin.go | 4 +- 13 files changed, 134 insertions(+), 217 deletions(-) diff --git a/internal/batch/batch_manager_test.go b/internal/batch/batch_manager_test.go index 48ad65c53a..5a89670f9a 100644 --- a/internal/batch/batch_manager_test.go +++ b/internal/batch/batch_manager_test.go @@ -6,7 +6,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -132,7 +132,7 @@ func TestE2EDispatchBroadcast(t *testing.T) { mdm.On("UpdateMessageIfCached", mock.Anything, mock.Anything).Return() mdi.On("GetMessageIDs", mock.Anything, "ns1", mock.Anything).Return([]*core.IDAndSequence{{ID: *msg.Header.ID}}, nil).Once() mdi.On("GetMessageIDs", mock.Anything, "ns1", mock.Anything).Return([]*core.IDAndSequence{}, nil) - mdi.On("UpsertBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil) + mdi.On("InsertOrGetBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) mdi.On("UpdateBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil) mdi.On("UpdateMessage", mock.Anything, "ns1", mock.Anything, mock.Anything).Return(nil) // pins rag := mdi.On("RunAsGroup", mock.Anything, mock.Anything, mock.Anything).Return(nil) @@ -255,7 +255,7 @@ func TestE2EDispatchPrivateUnpinned(t *testing.T) { mdi.On("GetMessageIDs", mock.Anything, "ns1", mock.Anything).Return([]*core.IDAndSequence{{ID: *msg.Header.ID}}, nil).Once() mdi.On("GetMessageIDs", mock.Anything, "ns1", mock.Anything).Return([]*core.IDAndSequence{}, nil) mdi.On("UpdateMessage", mock.Anything, "ns1", mock.Anything, mock.Anything).Return(nil) // pins - mdi.On("UpsertBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil) + mdi.On("InsertOrGetBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) mdi.On("UpdateBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil) rag := mdi.On("RunAsGroup", mock.Anything, mock.Anything, mock.Anything).Return(nil) rag.RunFn = func(a mock.Arguments) { @@ -441,7 +441,7 @@ func TestMessageSequencerUpdateMessagesFail(t *testing.T) { mdm.On("UpdateMessageIfCached", mock.Anything, mock.Anything).Return() mdi.On("InsertTransaction", mock.Anything, mock.Anything).Return(nil) mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) // transaction submit - mdi.On("UpsertBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil) + mdi.On("InsertOrGetBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) mdi.On("UpdateMessages", mock.Anything, "ns1", mock.Anything, mock.Anything).Return(fmt.Errorf("fizzle")) rag := mdi.On("RunAsGroup", mock.Anything, mock.Anything, mock.Anything) rag.RunFn = func(a mock.Arguments) { @@ -538,7 +538,7 @@ func TestMessageSequencerUpdateBatchFail(t *testing.T) { } mdi.On("GetMessageIDs", mock.Anything, "ns1", mock.Anything).Return([]*core.IDAndSequence{{ID: *msg.Header.ID}}, nil) mdm.On("GetMessageWithDataCached", mock.Anything, mock.Anything).Return(msg, core.DataArray{{ID: dataID}}, true, nil) - mdi.On("UpsertBatch", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("fizzle")) + mdi.On("InsertOrGetBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil, fmt.Errorf("fizzle")) rag := mdi.On("RunAsGroup", mock.Anything, mock.Anything, mock.Anything) rag.RunFn = func(a mock.Arguments) { ctx := a.Get(0).(context.Context) diff --git a/internal/batch/batch_processor.go b/internal/batch/batch_processor.go index 279e9073d5..a6c530f2f1 100644 --- a/internal/batch/batch_processor.go +++ b/internal/batch/batch_processor.go @@ -576,7 +576,8 @@ func (bp *batchProcessor) sealBatch(state *DispatchState) (err error) { log.L(ctx).Debugf("Batch %s sealed. Hash=%s", state.Persisted.ID, state.Persisted.Hash) // At this point the manifest of the batch is finalized. We write it to the database - return bp.database.UpsertBatch(ctx, &state.Persisted) + _, err = bp.database.InsertOrGetBatch(ctx, &state.Persisted) + return err }) }) if err != nil { diff --git a/internal/batch/batch_processor_test.go b/internal/batch/batch_processor_test.go index 602cec9f7f..9428b5a46f 100644 --- a/internal/batch/batch_processor_test.go +++ b/internal/batch/batch_processor_test.go @@ -6,7 +6,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -85,7 +85,7 @@ func TestUnfilledBatch(t *testing.T) { mockRunAsGroupPassthrough(mdi) mdi.On("UpdateMessages", mock.Anything, "ns1", mock.Anything, mock.Anything).Return(nil) - mdi.On("UpsertBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil) + mdi.On("InsertOrGetBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) mth := bp.txHelper.(*txcommonmocks.Helper) mth.On("SubmitNewTransaction", mock.Anything, core.TransactionTypeBatchPin, core.IdempotencyKey("")).Return(fftypes.NewUUID(), nil) @@ -134,7 +134,7 @@ func TestBatchSizeOverflow(t *testing.T) { bp.conf.BatchMaxBytes = batchSizeEstimateBase + (&core.Message{}).EstimateSize(false) + 100 mockRunAsGroupPassthrough(mdi) mdi.On("UpdateMessages", mock.Anything, "ns1", mock.Anything, mock.Anything).Return(nil) - mdi.On("UpsertBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil) + mdi.On("InsertOrGetBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) mth := bp.txHelper.(*txcommonmocks.Helper) mth.On("SubmitNewTransaction", mock.Anything, core.TransactionTypeBatchPin, core.IdempotencyKey("")).Return(fftypes.NewUUID(), nil) @@ -419,7 +419,7 @@ func TestMarkMessageDispatchedUnpinnedOK(t *testing.T) { mockRunAsGroupPassthrough(mdi) mdi.On("UpdateMessages", mock.Anything, "ns1", mock.Anything, mock.Anything).Return(nil) - mdi.On("UpsertBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil) + mdi.On("InsertOrGetBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")).Once() mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) @@ -476,7 +476,7 @@ func TestMaskContextsRetryAfterPinsAssigned(t *testing.T) { return dbNonce.Nonce == 12347 // twice incremented })).Return(nil).Once() mdi.On("UpdateMessage", mock.Anything, "ns1", mock.Anything, mock.Anything).Return(nil).Twice() - mdi.On("UpsertBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil) + mdi.On("InsertOrGetBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) mdm := bp.data.(*datamocks.Manager) mdm.On("UpdateMessageIfCached", mock.Anything, mock.Anything).Return() diff --git a/internal/database/sqlcommon/batch_sql.go b/internal/database/sqlcommon/batch_sql.go index e5b8f31f91..6f77f17c2b 100644 --- a/internal/database/sqlcommon/batch_sql.go +++ b/internal/database/sqlcommon/batch_sql.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -56,87 +56,49 @@ var ( const batchesTable = "batches" -func (s *SQLCommon) UpsertBatch(ctx context.Context, batch *core.BatchPersisted) (err error) { +func (s *SQLCommon) InsertOrGetBatch(ctx context.Context, batch *core.BatchPersisted) (existing *core.BatchPersisted, err error) { ctx, tx, autoCommit, err := s.beginOrUseTx(ctx) if err != nil { - return err + return nil, err } defer s.rollbackTx(ctx, tx, autoCommit) - // Do a select within the transaction to detemine if the UUID already exists - batchRows, _, err := s.queryTx(ctx, batchesTable, tx, - sq.Select("hash"). - From(batchesTable). - Where(sq.Eq{"id": batch.ID, "namespace": batch.Namespace}), + // Try the insert first + _, insertErr := s.insertTxExt(ctx, batchesTable, tx, + sq.Insert(batchesTable). + Columns(batchColumns...). + Values( + batch.ID, + string(batch.Type), + batch.Namespace, + batch.Author, + batch.Key, + batch.Group, + batch.Created, + batch.Hash, + batch.Manifest, + batch.Confirmed, + batch.TX.Type, + batch.TX.ID, + batch.Node, + ), + func() { + s.callbacks.UUIDCollectionNSEvent(database.CollectionBatches, core.ChangeEventTypeCreated, batch.Namespace, batch.ID) + }, + true, /* we want a failure here we can progress past */ ) - if err != nil { - return err + if insertErr == nil { + return nil, s.commitTx(ctx, tx, autoCommit) } - existing := batchRows.Next() - if existing { - var hash *fftypes.Bytes32 - _ = batchRows.Scan(&hash) - if !fftypes.SafeHashCompare(hash, batch.Hash) { - batchRows.Close() - log.L(ctx).Errorf("Existing=%s New=%s", hash, batch.Hash) - return database.HashMismatch - } - } - batchRows.Close() - - if existing { - - // Update the batch - if _, err = s.updateTx(ctx, batchesTable, tx, - sq.Update(batchesTable). - Set("btype", string(batch.Type)). - Set("author", batch.Author). - Set("key", batch.Key). - Set("group_hash", batch.Group). - Set("created", batch.Created). - Set("hash", batch.Hash). - Set("manifest", batch.Manifest). - Set("confirmed", batch.Confirmed). - Set("tx_type", batch.TX.Type). - Set("tx_id", batch.TX.ID). - Set("node_id", batch.Node). - Where(sq.Eq{"id": batch.ID, "namespace": batch.Namespace}), - func() { - s.callbacks.UUIDCollectionNSEvent(database.CollectionBatches, core.ChangeEventTypeUpdated, batch.Namespace, batch.ID) - }, - ); err != nil { - return err - } - } else { - - if _, err = s.insertTx(ctx, batchesTable, tx, - sq.Insert(batchesTable). - Columns(batchColumns...). - Values( - batch.ID, - string(batch.Type), - batch.Namespace, - batch.Author, - batch.Key, - batch.Group, - batch.Created, - batch.Hash, - batch.Manifest, - batch.Confirmed, - batch.TX.Type, - batch.TX.ID, - batch.Node, - ), - func() { - s.callbacks.UUIDCollectionNSEvent(database.CollectionBatches, core.ChangeEventTypeCreated, batch.Namespace, batch.ID) - }, - ); err != nil { - return err - } + // Do a select within the transaction to determine if the batch already exists + existing, err = s.GetBatchByID(ctx, batch.Namespace, batch.ID) + if err != nil || existing != nil { + return existing, err } - return s.commitTx(ctx, tx, autoCommit) + // Error was apparently not an ID conflict - must have been something else + return nil, insertErr } func (s *SQLCommon) batchResult(ctx context.Context, row *sql.Rows) (*core.BatchPersisted, error) { diff --git a/internal/database/sqlcommon/batch_sql_test.go b/internal/database/sqlcommon/batch_sql_test.go index 969113e03b..0553e4da8f 100644 --- a/internal/database/sqlcommon/batch_sql_test.go +++ b/internal/database/sqlcommon/batch_sql_test.go @@ -63,10 +63,10 @@ func TestBatch2EWithDB(t *testing.T) { } s.callbacks.On("UUIDCollectionNSEvent", database.CollectionBatches, core.ChangeEventTypeCreated, "ns1", batchID, mock.Anything).Return() - s.callbacks.On("UUIDCollectionNSEvent", database.CollectionBatches, core.ChangeEventTypeUpdated, "ns1", batchID, mock.Anything).Return() - err := s.UpsertBatch(ctx, batch) + existing, err := s.InsertOrGetBatch(ctx, batch) assert.NoError(t, err) + assert.Nil(t, existing) // Check we get the exact same batch back batchRead, err := s.GetBatchByID(ctx, "ns1", batchID) @@ -76,58 +76,17 @@ func TestBatch2EWithDB(t *testing.T) { batchReadJson, _ := json.Marshal(&batchRead) assert.Equal(t, string(batchJson), string(batchReadJson)) - // Update the batch (this is testing what's possible at the database layer, - // and does not account for the verification that happens at the higher level) - txid := fftypes.NewUUID() - msgID2 := fftypes.NewUUID() - batchUpdated := &core.BatchPersisted{ - BatchHeader: core.BatchHeader{ - ID: batchID, - Type: core.BatchTypePrivate, - SignerRef: core.SignerRef{ - Key: "0x12345", - Author: "did:firefly:org/abcd", - }, - Namespace: "ns1", - Node: fftypes.NewUUID(), - Created: fftypes.Now(), - }, - Hash: fftypes.NewRandB32(), - TX: core.TransactionRef{ - ID: txid, - Type: core.TransactionTypeBatchPin, - }, - Manifest: fftypes.JSONAnyPtr((&core.BatchManifest{ - Messages: []*core.MessageManifestEntry{ - {MessageRef: core.MessageRef{ID: msgID1}}, - {MessageRef: core.MessageRef{ID: msgID2}}, - }, - }).String()), - Confirmed: fftypes.Now(), - } - - // Rejects hash change - err = s.UpsertBatch(context.Background(), batchUpdated) - assert.Equal(t, database.HashMismatch, err) - - batchUpdated.Hash = batch.Hash - err = s.UpsertBatch(context.Background(), batchUpdated) + // Try to insert again - should get back the existing row + existing, err = s.InsertOrGetBatch(ctx, batch) assert.NoError(t, err) - - // Check we get the exact same message back - note the removal of one of the batch elements - batchRead, err = s.GetBatchByID(ctx, "ns1", batchID) - assert.NoError(t, err) - batchJson, _ = json.Marshal(&batchUpdated) - batchReadJson, _ = json.Marshal(&batchRead) - assert.Equal(t, string(batchJson), string(batchReadJson)) + assert.NotNil(t, existing) // Query back the batch fb := database.BatchQueryFactory.NewFilter(ctx) filter := fb.And( - fb.Eq("id", batchUpdated.ID.String()), - fb.Eq("author", batchUpdated.Author), + fb.Eq("id", batch.ID.String()), + fb.Eq("author", batch.Author), fb.Gt("created", "0"), - fb.Gt("confirmed", "0"), ) batches, _, err := s.GetBatches(ctx, "ns1", filter) assert.NoError(t, err) @@ -137,7 +96,7 @@ func TestBatch2EWithDB(t *testing.T) { // Negative test on filter filter = fb.And( - fb.Eq("id", batchUpdated.ID.String()), + fb.Eq("id", batch.ID.String()), fb.Eq("created", "0"), ) batches, _, err = s.GetBatches(ctx, "ns1", filter) @@ -152,7 +111,7 @@ func TestBatch2EWithDB(t *testing.T) { // Test find updated value filter = fb.And( - fb.Eq("id", batchUpdated.ID.String()), + fb.Eq("id", batch.ID.String()), fb.Eq("author", author2), ) batches, res, err := s.GetBatches(ctx, "ns1", filter.Count(true)) @@ -166,55 +125,30 @@ func TestBatch2EWithDB(t *testing.T) { func TestUpsertBatchFailBegin(t *testing.T) { s, mock := newMockProvider().init() mock.ExpectBegin().WillReturnError(fmt.Errorf("pop")) - err := s.UpsertBatch(context.Background(), &core.BatchPersisted{}) + _, err := s.InsertOrGetBatch(context.Background(), &core.BatchPersisted{}) assert.Regexp(t, "FF10114", err) assert.NoError(t, mock.ExpectationsWereMet()) } -func TestUpsertBatchFailSelect(t *testing.T) { - s, mock := newMockProvider().init() - mock.ExpectBegin() - mock.ExpectQuery("SELECT .*").WillReturnError(fmt.Errorf("pop")) - mock.ExpectRollback() - batchID := fftypes.NewUUID() - err := s.UpsertBatch(context.Background(), &core.BatchPersisted{BatchHeader: core.BatchHeader{ID: batchID}}) - assert.Regexp(t, "FF10115", err) - assert.NoError(t, mock.ExpectationsWereMet()) -} - func TestUpsertBatchFailInsert(t *testing.T) { s, mock := newMockProvider().init() mock.ExpectBegin() - mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{})) mock.ExpectExec("INSERT .*").WillReturnError(fmt.Errorf("pop")) + mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{})) mock.ExpectRollback() batchID := fftypes.NewUUID() - err := s.UpsertBatch(context.Background(), &core.BatchPersisted{BatchHeader: core.BatchHeader{ID: batchID}}) + _, err := s.InsertOrGetBatch(context.Background(), &core.BatchPersisted{BatchHeader: core.BatchHeader{ID: batchID}}) assert.Regexp(t, "FF10116", err) assert.NoError(t, mock.ExpectationsWereMet()) } -func TestUpsertBatchFailUpdate(t *testing.T) { - s, mock := newMockProvider().init() - batchID := fftypes.NewUUID() - hash := fftypes.NewRandB32() - mock.ExpectBegin() - mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{"hash"}).AddRow(hash)) - mock.ExpectExec("UPDATE .*").WillReturnError(fmt.Errorf("pop")) - mock.ExpectRollback() - err := s.UpsertBatch(context.Background(), &core.BatchPersisted{BatchHeader: core.BatchHeader{ID: batchID}, Hash: hash}) - assert.Regexp(t, "FF10117", err) - assert.NoError(t, mock.ExpectationsWereMet()) -} - func TestUpsertBatchFailCommit(t *testing.T) { s, mock := newMockProvider().init() batchID := fftypes.NewUUID() mock.ExpectBegin() - mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{"id"})) mock.ExpectExec("INSERT .*").WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit().WillReturnError(fmt.Errorf("pop")) - err := s.UpsertBatch(context.Background(), &core.BatchPersisted{BatchHeader: core.BatchHeader{ID: batchID}}) + _, err := s.InsertOrGetBatch(context.Background(), &core.BatchPersisted{BatchHeader: core.BatchHeader{ID: batchID}}) assert.Regexp(t, "FF10119", err) assert.NoError(t, mock.ExpectationsWereMet()) } diff --git a/internal/events/batch_pin_complete_test.go b/internal/events/batch_pin_complete_test.go index 0065a56130..776df40fda 100644 --- a/internal/events/batch_pin_complete_test.go +++ b/internal/events/batch_pin_complete_test.go @@ -469,20 +469,6 @@ func TestPersistBatchMismatchChainHash(t *testing.T) { assert.False(t, valid) } -func TestPersistBatchUpsertBatchMismatchHash(t *testing.T) { - em := newTestEventManager(t) - defer em.cleanup(t) - data := &core.Data{ID: fftypes.NewUUID(), Value: fftypes.JSONAnyPtr(`"test"`)} - batch := sampleBatch(t, core.BatchTypeBroadcast, core.TransactionTypeBatchPin, core.DataArray{data}) - - em.mdi.On("UpsertBatch", mock.Anything, mock.Anything).Return(database.HashMismatch) - - bp, valid, err := em.persistBatch(context.Background(), batch) - assert.False(t, valid) - assert.Nil(t, bp) - assert.NoError(t, err) -} - func TestPersistBatchBadHash(t *testing.T) { em := newTestEventManager(t) defer em.cleanup(t) @@ -528,7 +514,7 @@ func TestPersistBatchUpsertBatchFail(t *testing.T) { data := &core.Data{ID: fftypes.NewUUID(), Value: fftypes.JSONAnyPtr(`"test"`)} batch := sampleBatch(t, core.BatchTypeBroadcast, core.TransactionTypeBatchPin, core.DataArray{data}) - em.mdi.On("UpsertBatch", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) + em.mdi.On("InsertOrGetBatch", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("pop")) bp, valid, err := em.persistBatch(context.Background(), batch) assert.Nil(t, bp) @@ -559,7 +545,7 @@ func TestPersistBatchSwallowBadData(t *testing.T) { } batch.Hash = batch.Payload.Hash() - em.mdi.On("UpsertBatch", mock.Anything, mock.Anything).Return(nil) + em.mdi.On("InsertOrGetBatch", mock.Anything, mock.Anything).Return(nil, nil) bp, valid, err := em.persistBatch(context.Background(), batch) assert.False(t, valid) @@ -573,7 +559,7 @@ func TestPersistBatchGoodDataUpsertOptimizeFail(t *testing.T) { data := &core.Data{ID: fftypes.NewUUID(), Value: fftypes.JSONAnyPtr(`"test"`)} batch := sampleBatch(t, core.BatchTypeBroadcast, core.TransactionTypeBatchPin, core.DataArray{data}) - em.mdi.On("UpsertBatch", mock.Anything, mock.Anything).Return(nil) + em.mdi.On("InsertOrGetBatch", mock.Anything, mock.Anything).Return(nil, nil) em.mdi.On("InsertDataArray", mock.Anything, mock.Anything).Return(fmt.Errorf("optimzation miss")) em.mdi.On("UpsertData", mock.Anything, mock.Anything, database.UpsertOptimizationExisting).Return(fmt.Errorf("pop")) @@ -591,7 +577,7 @@ func TestPersistBatchGoodDataMessageFail(t *testing.T) { data := &core.Data{ID: fftypes.NewUUID(), Value: fftypes.JSONAnyPtr(`"test"`)} batch := sampleBatch(t, core.BatchTypeBroadcast, core.TransactionTypeBatchPin, core.DataArray{data}) - em.mdi.On("UpsertBatch", mock.Anything, mock.Anything).Return(nil) + em.mdi.On("InsertOrGetBatch", mock.Anything, mock.Anything).Return(nil, nil) em.mdi.On("InsertDataArray", mock.Anything, mock.Anything).Return(nil) em.mdi.On("InsertMessages", mock.Anything, mock.Anything, mock.AnythingOfType("database.PostCompletionHook")).Return(fmt.Errorf("optimzation miss")) em.mdi.On("UpsertMessage", mock.Anything, mock.Anything, database.UpsertOptimizationExisting, mock.AnythingOfType("database.PostCompletionHook")).Return(fmt.Errorf("pop")) @@ -614,7 +600,7 @@ func TestPersistBatchGoodMessageAuthorMismatch(t *testing.T) { batch.Payload.Messages[0].Hash = batch.Payload.Messages[0].Header.Hash() batch.Hash = batch.Payload.Hash() - em.mdi.On("UpsertBatch", mock.Anything, mock.Anything).Return(nil) + em.mdi.On("InsertOrGetBatch", mock.Anything, mock.Anything).Return(nil, nil) bp, valid, err := em.persistBatch(context.Background(), batch) assert.Nil(t, bp) diff --git a/internal/events/dx_callbacks_test.go b/internal/events/dx_callbacks_test.go index 0b272a5bff..0c22a07b8a 100644 --- a/internal/events/dx_callbacks_test.go +++ b/internal/events/dx_callbacks_test.go @@ -152,7 +152,7 @@ func TestPinnedReceiveOK(t *testing.T) { em.mim.On("GetLocalNode", mock.Anything).Return(testNode, nil) em.mim.On("ValidateNodeOwner", em.ctx, mock.Anything, mock.Anything).Return(true, nil) - em.mdi.On("UpsertBatch", em.ctx, mock.Anything).Return(nil, nil) + em.mdi.On("InsertOrGetBatch", em.ctx, mock.Anything).Return(nil, nil) em.mdi.On("InsertDataArray", em.ctx, mock.Anything).Return(nil, nil) em.mdi.On("InsertMessages", em.ctx, mock.Anything, mock.AnythingOfType("database.PostCompletionHook")).Return(nil, nil).Run(func(args mock.Arguments) { args[2].(database.PostCompletionHook)() @@ -223,7 +223,7 @@ func TestMessageReceivePersistBatchError(t *testing.T) { }).Return(node1, nil) em.mim.On("CachedIdentityLookupMustExist", em.ctx, "signingOrg").Return(org1, false, nil) em.mim.On("ValidateNodeOwner", em.ctx, mock.Anything, mock.Anything).Return(true, nil) - em.mdi.On("UpsertBatch", em.ctx, mock.Anything).Return(fmt.Errorf("pop")) + em.mdi.On("InsertOrGetBatch", em.ctx, mock.Anything).Return(nil, fmt.Errorf("pop")) // no ack as we are simulating termination mid retry mde := newMessageReceivedNoAck("peer1", b) @@ -593,7 +593,7 @@ func TestMessageReceiveMessagePersistMessageFail(t *testing.T) { em.mim.On("GetLocalNode", mock.Anything).Return(testNode, nil) em.mim.On("ValidateNodeOwner", em.ctx, mock.Anything, mock.Anything).Return(true, nil) - em.mdi.On("UpsertBatch", em.ctx, mock.Anything).Return(nil, nil) + em.mdi.On("InsertOrGetBatch", em.ctx, mock.Anything).Return(nil, nil) em.mdi.On("InsertDataArray", em.ctx, mock.Anything).Return(nil) em.mdi.On("InsertMessages", em.ctx, mock.Anything, mock.AnythingOfType("database.PostCompletionHook")).Return(fmt.Errorf("optimization fail")) em.mdi.On("UpsertMessage", em.ctx, mock.Anything, database.UpsertOptimizationExisting, mock.AnythingOfType("database.PostCompletionHook")).Return(fmt.Errorf("pop")) @@ -634,7 +634,7 @@ func TestMessageReceiveMessagePersistDataFail(t *testing.T) { em.mim.On("GetLocalNode", mock.Anything).Return(testNode, nil) em.mim.On("ValidateNodeOwner", em.ctx, mock.Anything, mock.Anything).Return(true, nil) - em.mdi.On("UpsertBatch", em.ctx, mock.Anything).Return(nil, nil) + em.mdi.On("InsertOrGetBatch", em.ctx, mock.Anything).Return(nil, nil) em.mdi.On("InsertDataArray", em.ctx, mock.Anything).Return(fmt.Errorf("optimization miss")) em.mdi.On("UpsertData", em.ctx, mock.Anything, database.UpsertOptimizationExisting).Return(fmt.Errorf("pop")) @@ -673,7 +673,7 @@ func TestMessageReceiveUnpinnedBatchOk(t *testing.T) { em.mim.On("GetLocalNode", mock.Anything).Return(testNode, nil) em.mim.On("ValidateNodeOwner", em.ctx, mock.Anything, mock.Anything).Return(true, nil) - em.mdi.On("UpsertBatch", em.ctx, mock.Anything).Return(nil, nil) + em.mdi.On("InsertOrGetBatch", em.ctx, mock.Anything).Return(nil, nil) em.mdi.On("InsertDataArray", em.ctx, mock.Anything).Return(nil) em.mdi.On("InsertMessages", em.ctx, mock.Anything, mock.AnythingOfType("database.PostCompletionHook")).Return(nil, nil).Run(func(args mock.Arguments) { args[2].(database.PostCompletionHook)() @@ -716,7 +716,7 @@ func TestMessageReceiveUnpinnedBatchConfirmMessagesFail(t *testing.T) { em.mim.On("GetLocalNode", mock.Anything).Return(testNode, nil) em.mim.On("ValidateNodeOwner", em.ctx, mock.Anything, mock.Anything).Return(true, nil) - em.mdi.On("UpsertBatch", em.ctx, mock.Anything).Return(nil, nil) + em.mdi.On("InsertOrGetBatch", em.ctx, mock.Anything).Return(nil, nil) em.mdi.On("InsertDataArray", em.ctx, mock.Anything).Return(nil) em.mdi.On("InsertMessages", em.ctx, mock.Anything, mock.AnythingOfType("database.PostCompletionHook")).Return(nil, nil).Run(func(args mock.Arguments) { args[2].(database.PostCompletionHook)() @@ -759,7 +759,7 @@ func TestMessageReceiveUnpinnedBatchPersistEventFail(t *testing.T) { em.mim.On("GetLocalNode", mock.Anything).Return(testNode, nil) em.mim.On("ValidateNodeOwner", em.ctx, mock.Anything, mock.Anything).Return(true, nil) - em.mdi.On("UpsertBatch", em.ctx, mock.Anything).Return(nil, nil) + em.mdi.On("InsertOrGetBatch", em.ctx, mock.Anything).Return(nil, nil) em.mdi.On("InsertDataArray", em.ctx, mock.Anything).Return(nil) em.mdi.On("InsertMessages", em.ctx, mock.Anything, mock.AnythingOfType("database.PostCompletionHook")).Return(nil, nil).Run(func(args mock.Arguments) { args[2].(database.PostCompletionHook)() diff --git a/internal/events/persist_batch.go b/internal/events/persist_batch.go index 4bca0c0caa..69d2f27f84 100644 --- a/internal/events/persist_batch.go +++ b/internal/events/persist_batch.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -69,13 +69,9 @@ func (em *eventManager) persistBatch(ctx context.Context, batch *core.Batch) (pe } } - // Upsert the batch - err = em.database.UpsertBatch(ctx, persistedBatch) + // Insert the batch + existing, err := em.database.InsertOrGetBatch(ctx, persistedBatch) if err != nil { - if err == database.HashMismatch { - l.Errorf("Invalid batch '%s'. Batch hash mismatch with existing record", batch.ID) - return nil, false, nil // This is not retryable. skip this batch - } l.Errorf("Failed to insert batch '%s': %s", batch.ID, err) return nil, false, err // a persistence failure here is considered retryable (so returned) } @@ -84,6 +80,11 @@ func (em *eventManager) persistBatch(ctx context.Context, batch *core.Batch) (pe if err != nil || !valid { return nil, valid, err } + + if existing != nil { + l.Infof("Skipped insert of batch '%s' (already exists)", batch.ID) + return existing, true, nil + } em.aggregator.cacheBatch(em.aggregator.getBatchCacheKey(persistedBatch.ID, persistedBatch.Hash), persistedBatch, manifest) return persistedBatch, true, err } diff --git a/internal/events/persist_batch_test.go b/internal/events/persist_batch_test.go index cd6bf4af5e..40e6c25b52 100644 --- a/internal/events/persist_batch_test.go +++ b/internal/events/persist_batch_test.go @@ -33,7 +33,7 @@ func TestPersistBatch(t *testing.T) { em := newTestEventManager(t) defer em.cleanup(t) - em.mdi.On("UpsertBatch", em.ctx, mock.Anything).Return(fmt.Errorf(("pop"))) + em.mdi.On("InsertOrGetBatch", em.ctx, mock.Anything).Return(nil, fmt.Errorf(("pop"))) org := newTestOrg("org1") orgBytes, err := json.Marshal(&org) @@ -88,12 +88,33 @@ func TestPersistBatch(t *testing.T) { } +func TestPersistBatchAlreadyExisting(t *testing.T) { + + em := newTestEventManager(t) + defer em.cleanup(t) + + existing := &core.BatchPersisted{} + em.mdi.On("InsertOrGetBatch", em.ctx, mock.Anything).Return(existing, nil) + em.mim.On("GetLocalNode", mock.Anything).Return(testNode, nil) + em.mdi.On("InsertDataArray", mock.Anything, mock.Anything).Return(nil) + em.mdi.On("InsertMessages", em.ctx, mock.Anything, mock.Anything).Return(nil, nil) + + data := &core.Data{ID: fftypes.NewUUID(), Value: fftypes.JSONAnyPtr(`"test"`)} + batch := sampleBatch(t, core.BatchTypeBroadcast, core.TransactionTypeBatchPin, core.DataArray{data}) + + result, valid, err := em.persistBatch(em.ctx, batch) + assert.True(t, valid) + assert.NoError(t, err) + assert.Equal(t, existing, result) + +} + func TestPersistBatchNoCacheDataNotInBatch(t *testing.T) { em := newTestEventManager(t) defer em.cleanup(t) - em.mdi.On("UpsertBatch", em.ctx, mock.Anything).Return(nil) + em.mdi.On("InsertOrGetBatch", em.ctx, mock.Anything).Return(nil, nil) data := &core.Data{ID: fftypes.NewUUID(), Value: fftypes.JSONAnyPtr(`"test"`)} batch := sampleBatch(t, core.BatchTypeBroadcast, core.TransactionTypeBatchPin, core.DataArray{data}) @@ -113,7 +134,7 @@ func TestPersistBatchExtraDataInBatch(t *testing.T) { em := newTestEventManager(t) defer em.cleanup(t) - em.mdi.On("UpsertBatch", em.ctx, mock.Anything).Return(nil) + em.mdi.On("InsertOrGetBatch", em.ctx, mock.Anything).Return(nil, nil) data := &core.Data{ID: fftypes.NewUUID(), Value: fftypes.JSONAnyPtr(`"test"`)} batch := sampleBatch(t, core.BatchTypeBroadcast, core.TransactionTypeBatchPin, core.DataArray{data}) diff --git a/internal/events/ss_callbacks_test.go b/internal/events/ss_callbacks_test.go index 607699cd34..a7caab56ad 100644 --- a/internal/events/ss_callbacks_test.go +++ b/internal/events/ss_callbacks_test.go @@ -39,7 +39,7 @@ func TestSharedStorageBatchDownloadedOk(t *testing.T) { b, _ := json.Marshal(&batch) mss := &sharedstoragemocks.Plugin{} - em.mdi.On("UpsertBatch", em.ctx, mock.Anything).Return(nil, nil) + em.mdi.On("InsertOrGetBatch", em.ctx, mock.Anything).Return(nil, nil) em.mdi.On("InsertDataArray", em.ctx, mock.Anything).Return(nil, nil) em.mdi.On("InsertMessages", em.ctx, mock.Anything, mock.AnythingOfType("database.PostCompletionHook")).Return(nil, nil).Run(func(args mock.Arguments) { args[2].(database.PostCompletionHook)() @@ -71,7 +71,7 @@ func TestSharedStorageBatchDownloadedPersistFail(t *testing.T) { b, _ := json.Marshal(&batch) mss := &sharedstoragemocks.Plugin{} - em.mdi.On("UpsertBatch", em.ctx, mock.Anything).Return(fmt.Errorf("pop")) + em.mdi.On("InsertOrGetBatch", em.ctx, mock.Anything).Return(nil, fmt.Errorf("pop")) mss.On("Name").Return("utdx").Maybe() _, err := em.SharedStorageBatchDownloaded(mss, "payload1", b) diff --git a/internal/events/webhooks/webhooks.go b/internal/events/webhooks/webhooks.go index 3cf30c4693..790a2e4e91 100644 --- a/internal/events/webhooks/webhooks.go +++ b/internal/events/webhooks/webhooks.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/mocks/databasemocks/plugin.go b/mocks/databasemocks/plugin.go index d42a533b29..94d3266ca9 100644 --- a/mocks/databasemocks/plugin.go +++ b/mocks/databasemocks/plugin.go @@ -2206,6 +2206,32 @@ func (_m *Plugin) InsertOperation(ctx context.Context, operation *core.Operation return r0 } +// InsertOrGetBatch provides a mock function with given fields: ctx, data +func (_m *Plugin) InsertOrGetBatch(ctx context.Context, data *core.BatchPersisted) (*core.BatchPersisted, error) { + ret := _m.Called(ctx, data) + + var r0 *core.BatchPersisted + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *core.BatchPersisted) (*core.BatchPersisted, error)); ok { + return rf(ctx, data) + } + if rf, ok := ret.Get(0).(func(context.Context, *core.BatchPersisted) *core.BatchPersisted); ok { + r0 = rf(ctx, data) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*core.BatchPersisted) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *core.BatchPersisted) error); ok { + r1 = rf(ctx, data) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // InsertOrGetBlockchainEvent provides a mock function with given fields: ctx, event func (_m *Plugin) InsertOrGetBlockchainEvent(ctx context.Context, event *core.BlockchainEvent) (*core.BlockchainEvent, error) { ret := _m.Called(ctx, event) @@ -2486,20 +2512,6 @@ func (_m *Plugin) UpdateTransaction(ctx context.Context, namespace string, id *f return r0 } -// UpsertBatch provides a mock function with given fields: ctx, data -func (_m *Plugin) UpsertBatch(ctx context.Context, data *core.BatchPersisted) error { - ret := _m.Called(ctx, data) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *core.BatchPersisted) error); ok { - r0 = rf(ctx, data) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // UpsertContractAPI provides a mock function with given fields: ctx, cd func (_m *Plugin) UpsertContractAPI(ctx context.Context, cd *core.ContractAPI) error { ret := _m.Called(ctx, cd) diff --git a/pkg/database/plugin.go b/pkg/database/plugin.go index 0beb36abea..f2292d17e7 100644 --- a/pkg/database/plugin.go +++ b/pkg/database/plugin.go @@ -135,8 +135,8 @@ type iDataCollection interface { } type iBatchCollection interface { - // UpsertBatch - Upsert a batch - the hash cannot change - UpsertBatch(ctx context.Context, data *core.BatchPersisted) (err error) + // InsertBatch - Insert a new batch, or retrieve the existing one if it has already been recorded + InsertOrGetBatch(ctx context.Context, data *core.BatchPersisted) (existing *core.BatchPersisted, err error) // UpdateBatch - Update data UpdateBatch(ctx context.Context, namespace string, id *fftypes.UUID, update Update) (err error)