diff --git a/internal/database/sqlcommon/data_sql.go b/internal/database/sqlcommon/data_sql.go index a57b5ea9e3..79c05f7dce 100644 --- a/internal/database/sqlcommon/data_sql.go +++ b/internal/database/sqlcommon/data_sql.go @@ -79,9 +79,9 @@ func (s *SQLCommon) attemptDataUpdate(ctx context.Context, tx *txWrapper, data * }) } -func (s *SQLCommon) attemptDataInsert(ctx context.Context, tx *txWrapper, data *fftypes.Data, datatype *fftypes.DatatypeRef, blob *fftypes.BlobRef) (int64, error) { +func (s *SQLCommon) attemptDataInsert(ctx context.Context, tx *txWrapper, data *fftypes.Data, datatype *fftypes.DatatypeRef, blob *fftypes.BlobRef, requestConflictEmptyResult bool) (int64, error) { data.ValueSize = data.Value.Length() - return s.insertTx(ctx, tx, + return s.insertTxExt(ctx, tx, sq.Insert("data"). Columns(dataColumnsWithValue...). Values( @@ -101,7 +101,7 @@ func (s *SQLCommon) attemptDataInsert(ctx context.Context, tx *txWrapper, data * ), func() { s.callbacks.UUIDCollectionNSEvent(database.CollectionData, fftypes.ChangeEventTypeCreated, data.Namespace, data.ID) - }) + }, requestConflictEmptyResult) } func (s *SQLCommon) UpsertData(ctx context.Context, data *fftypes.Data, optimization database.UpsertOptimization) (err error) { @@ -127,7 +127,7 @@ func (s *SQLCommon) UpsertData(ctx context.Context, data *fftypes.Data, optimiza // as only recovery paths require us to go down the un-optimized route. optimized := false if optimization == database.UpsertOptimizationNew { - _, opErr := s.attemptDataInsert(ctx, tx, data, datatype, blob) + _, opErr := s.attemptDataInsert(ctx, tx, data, datatype, blob, true /* we want a failure here we can progress past */) optimized = opErr == nil } else if optimization == database.UpsertOptimizationExisting { rowsAffected, opErr := s.attemptDataUpdate(ctx, tx, data, datatype, blob) @@ -162,7 +162,7 @@ func (s *SQLCommon) UpsertData(ctx context.Context, data *fftypes.Data, optimiza return err } } else { - if _, err = s.attemptDataInsert(ctx, tx, data, datatype, blob); err != nil { + if _, err = s.attemptDataInsert(ctx, tx, data, datatype, blob, false); err != nil { return err } } diff --git a/internal/database/sqlcommon/message_sql.go b/internal/database/sqlcommon/message_sql.go index dbb70c8ecd..303aa22ef6 100644 --- a/internal/database/sqlcommon/message_sql.go +++ b/internal/database/sqlcommon/message_sql.go @@ -84,8 +84,8 @@ func (s *SQLCommon) attemptMessageUpdate(ctx context.Context, tx *txWrapper, mes }) } -func (s *SQLCommon) attemptMessageInsert(ctx context.Context, tx *txWrapper, message *fftypes.Message) (err error) { - message.Sequence, err = s.insertTx(ctx, tx, +func (s *SQLCommon) attemptMessageInsert(ctx context.Context, tx *txWrapper, message *fftypes.Message, requestConflictEmptyResult bool) (err error) { + message.Sequence, err = s.insertTxExt(ctx, tx, sq.Insert("messages"). Columns(msgColumns...). Values( @@ -109,7 +109,7 @@ func (s *SQLCommon) attemptMessageInsert(ctx context.Context, tx *txWrapper, mes ), func() { s.callbacks.OrderedUUIDCollectionNSEvent(database.CollectionMessages, fftypes.ChangeEventTypeCreated, message.Header.Namespace, message.Header.ID, message.Sequence) - }) + }, requestConflictEmptyResult) return err } @@ -128,7 +128,7 @@ func (s *SQLCommon) UpsertMessage(ctx context.Context, message *fftypes.Message, optimized := false recreateDatarefs := false if optimization == database.UpsertOptimizationNew { - opErr := s.attemptMessageInsert(ctx, tx, message) + opErr := s.attemptMessageInsert(ctx, tx, message, true /* we want a failure here we can progress past */) optimized = opErr == nil } else if optimization == database.UpsertOptimizationExisting { rowsAffected, opErr := s.attemptMessageUpdate(ctx, tx, message) @@ -165,7 +165,7 @@ func (s *SQLCommon) UpsertMessage(ctx context.Context, message *fftypes.Message, return err } } else { - if err = s.attemptMessageInsert(ctx, tx, message); err != nil { + if err = s.attemptMessageInsert(ctx, tx, message, false); err != nil { return err } } @@ -201,7 +201,7 @@ func (s *SQLCommon) ReplaceMessage(ctx context.Context, message *fftypes.Message return err } - if err = s.attemptMessageInsert(ctx, tx, message); err != nil { + if err = s.attemptMessageInsert(ctx, tx, message, false); err != nil { return err }