From d5d9b1de001405c549f02cce2b1f8efa9ebb8eb1 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Fri, 11 Feb 2022 17:12:52 -0500 Subject: [PATCH 1/4] Implement and use UpsertOptimization for groups Signed-off-by: Peter Broadhurst --- internal/database/sqlcommon/group_sql.go | 113 +++++++++++------- internal/database/sqlcommon/group_sql_test.go | 49 ++++---- internal/privatemessaging/groupmanager.go | 6 +- .../privatemessaging/groupmanager_test.go | 14 +-- internal/privatemessaging/recipients_test.go | 2 +- mocks/databasemocks/plugin.go | 10 +- pkg/database/plugin.go | 4 +- 7 files changed, 109 insertions(+), 89 deletions(-) diff --git a/internal/database/sqlcommon/group_sql.go b/internal/database/sqlcommon/group_sql.go index b93e9b26e1..cb6fac9b23 100644 --- a/internal/database/sqlcommon/group_sql.go +++ b/internal/database/sqlcommon/group_sql.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2022 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -41,16 +41,26 @@ var ( } ) -func (s *SQLCommon) UpsertGroup(ctx context.Context, group *fftypes.Group, allowExisting bool) (err error) { +func (s *SQLCommon) UpsertGroup(ctx context.Context, group *fftypes.Group, optimization database.UpsertOptimization) (err error) { ctx, tx, autoCommit, err := s.beginOrUseTx(ctx) if err != nil { return err } defer s.rollbackTx(ctx, tx, autoCommit) - existing := false - if allowExisting { - // Do a select within the transaction to detemine if the UUID already exists + // We use an upsert optimization here for performance, but also to account for the situation where two threads + // try to perform an insert concurrently and ensure a non-failure outcome. + optimized := false + if optimization == database.UpsertOptimizationNew { + opErr := s.attemptGroupInsert(ctx, tx, group) + optimized = opErr == nil + } else if optimization == database.UpsertOptimizationExisting { + rowsAffected, opErr := s.attemptGroupUpdate(ctx, tx, group) + optimized = opErr == nil && rowsAffected == 1 + } + + if !optimized { + // Do a select within the transaction to determine if the UUID already exists groupRows, _, err := s.queryTx(ctx, tx, sq.Select("hash"). From("groups"). @@ -59,57 +69,68 @@ func (s *SQLCommon) UpsertGroup(ctx context.Context, group *fftypes.Group, allow if err != nil { return err } - existing = groupRows.Next() + existing := groupRows.Next() groupRows.Close() - } - if existing { - - // Update the group - if _, err = s.updateTx(ctx, tx, - sq.Update("groups"). - Set("message_id", group.Message). - Set("namespace", group.Namespace). - Set("name", group.Name). - Set("ledger", group.Ledger). - Set("hash", group.Hash). - Set("created", group.Created). - Where(sq.Eq{"hash": group.Hash}), - func() { - s.callbacks.HashCollectionNSEvent(database.CollectionGroups, fftypes.ChangeEventTypeUpdated, group.Namespace, group.Hash) - }, - ); err != nil { - return err - } - } else { - _, err := s.insertTx(ctx, tx, - sq.Insert("groups"). - Columns(groupColumns...). - Values( - group.Message, - group.Namespace, - group.Name, - group.Ledger, - group.Hash, - group.Created, - ), - func() { - s.callbacks.HashCollectionNSEvent(database.CollectionGroups, fftypes.ChangeEventTypeCreated, group.Namespace, group.Hash) - }, - ) - if err != nil { - return err + if existing { + if _, err = s.attemptGroupUpdate(ctx, tx, group); err != nil { + return err + } + } else { + if err = s.attemptGroupInsert(ctx, tx, group); err != nil { + return err + } } - } - if err = s.updateMembers(ctx, tx, group, existing); err != nil { - return err + // Note the member list is not allowed to change, as it is part of the hash. + // So the optimization above relies on the fact these are in a transaction, so the + // whole group (with members) will have been inserted + if !optimized || optimization == database.UpsertOptimizationNew { + if err = s.updateMembers(ctx, tx, group, false); err != nil { + return err + } } return s.commitTx(ctx, tx, autoCommit) } +func (s *SQLCommon) attemptGroupUpdate(ctx context.Context, tx *txWrapper, group *fftypes.Group) (int64, error) { + // Update the group + return s.updateTx(ctx, tx, + sq.Update("groups"). + Set("message_id", group.Message). + Set("namespace", group.Namespace). + Set("name", group.Name). + Set("ledger", group.Ledger). + Set("hash", group.Hash). + Set("created", group.Created). + Where(sq.Eq{"hash": group.Hash}), + func() { + s.callbacks.HashCollectionNSEvent(database.CollectionGroups, fftypes.ChangeEventTypeUpdated, group.Namespace, group.Hash) + }, + ) +} + +func (s *SQLCommon) attemptGroupInsert(ctx context.Context, tx *txWrapper, group *fftypes.Group) error { + _, err := s.insertTx(ctx, tx, + sq.Insert("groups"). + Columns(groupColumns...). + Values( + group.Message, + group.Namespace, + group.Name, + group.Ledger, + group.Hash, + group.Created, + ), + func() { + s.callbacks.HashCollectionNSEvent(database.CollectionGroups, fftypes.ChangeEventTypeCreated, group.Namespace, group.Hash) + }, + ) + return err +} + func (s *SQLCommon) updateMembers(ctx context.Context, tx *txWrapper, group *fftypes.Group, existing bool) error { if existing { diff --git a/internal/database/sqlcommon/group_sql_test.go b/internal/database/sqlcommon/group_sql_test.go index be1b167ab0..e2c55968c0 100644 --- a/internal/database/sqlcommon/group_sql_test.go +++ b/internal/database/sqlcommon/group_sql_test.go @@ -18,7 +18,6 @@ package sqlcommon import ( "context" - "database/sql/driver" "encoding/json" "fmt" "testing" @@ -56,7 +55,7 @@ func TestUpsertGroupE2EWithDB(t *testing.T) { s.callbacks.On("HashCollectionNSEvent", database.CollectionGroups, fftypes.ChangeEventTypeCreated, "ns1", groupHash, mock.Anything).Return() s.callbacks.On("HashCollectionNSEvent", database.CollectionGroups, fftypes.ChangeEventTypeUpdated, "ns1", groupHash, mock.Anything).Return() - err := s.UpsertGroup(ctx, group, true) + err := s.UpsertGroup(ctx, group, database.UpsertOptimizationNew) assert.NoError(t, err) // Check we get the exact same group back @@ -72,18 +71,15 @@ func TestUpsertGroupE2EWithDB(t *testing.T) { GroupIdentity: fftypes.GroupIdentity{ Name: "group1", Namespace: "ns1", - Members: fftypes.Members{ - {Identity: "0x12345", Node: fftypes.NewUUID()}, - group.Members[0], - }, - Ledger: fftypes.NewUUID(), + Members: group.Members, + Ledger: fftypes.NewUUID(), }, Created: fftypes.Now(), Message: fftypes.NewUUID(), Hash: groupHash, } - err = s.UpsertGroup(context.Background(), groupUpdated, true) + err = s.UpsertGroup(context.Background(), groupUpdated, database.UpsertOptimizationExisting) assert.NoError(t, err) // Check we get the exact same group back - note the removal of one of the data elements @@ -139,7 +135,7 @@ func TestUpsertGroupE2EWithDB(t *testing.T) { func TestUpsertGroupFailBegin(t *testing.T) { s, mock := newMockProvider().init() mock.ExpectBegin().WillReturnError(fmt.Errorf("pop")) - err := s.UpsertGroup(context.Background(), &fftypes.Group{}, true) + err := s.UpsertGroup(context.Background(), &fftypes.Group{}, database.UpsertOptimizationSkip) assert.Regexp(t, "FF10114", err) assert.NoError(t, mock.ExpectationsWereMet()) } @@ -150,7 +146,7 @@ func TestUpsertGroupFailSelect(t *testing.T) { mock.ExpectQuery("SELECT .*").WillReturnError(fmt.Errorf("pop")) mock.ExpectRollback() groupID := fftypes.NewRandB32() - err := s.UpsertGroup(context.Background(), &fftypes.Group{Hash: groupID}, true) + err := s.UpsertGroup(context.Background(), &fftypes.Group{Hash: groupID}, database.UpsertOptimizationSkip) assert.Regexp(t, "FF10115", err) assert.NoError(t, mock.ExpectationsWereMet()) } @@ -162,7 +158,7 @@ func TestUpsertGroupFailInsert(t *testing.T) { mock.ExpectExec("INSERT .*").WillReturnError(fmt.Errorf("pop")) mock.ExpectRollback() groupID := fftypes.NewRandB32() - err := s.UpsertGroup(context.Background(), &fftypes.Group{Hash: groupID}, true) + err := s.UpsertGroup(context.Background(), &fftypes.Group{Hash: groupID}, database.UpsertOptimizationSkip) assert.Regexp(t, "FF10116", err) assert.NoError(t, mock.ExpectationsWereMet()) } @@ -174,33 +170,36 @@ func TestUpsertGroupFailUpdate(t *testing.T) { mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{"hash"}).AddRow(groupID.String())) mock.ExpectExec("UPDATE .*").WillReturnError(fmt.Errorf("pop")) mock.ExpectRollback() - err := s.UpsertGroup(context.Background(), &fftypes.Group{Hash: groupID}, true) + err := s.UpsertGroup(context.Background(), &fftypes.Group{Hash: groupID}, database.UpsertOptimizationSkip) assert.Regexp(t, "FF10117", err) assert.NoError(t, mock.ExpectationsWereMet()) } -func TestUpsertGroupFailUpdateMembers(t *testing.T) { +func TestUpsertGroupFailCommit(t *testing.T) { s, mock := newMockProvider().init() groupID := fftypes.NewRandB32() mock.ExpectBegin() - mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{"hash"}).AddRow(groupID.String())) - mock.ExpectExec("UPDATE .*").WillReturnResult(driver.ResultNoRows) - mock.ExpectExec("DELETE .*").WillReturnError(fmt.Errorf("pop")) - mock.ExpectRollback() - err := s.UpsertGroup(context.Background(), &fftypes.Group{Hash: groupID}, true) - assert.Regexp(t, "FF10118", err) + mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{"hash"})) + mock.ExpectExec("INSERT .*").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit().WillReturnError(fmt.Errorf("pop")) + err := s.UpsertGroup(context.Background(), &fftypes.Group{Hash: groupID}, database.UpsertOptimizationSkip) + assert.Regexp(t, "FF10119", err) assert.NoError(t, mock.ExpectationsWereMet()) } -func TestUpsertGroupFailCommit(t *testing.T) { +func TestUpdateMembersRecreateFail(t *testing.T) { s, mock := newMockProvider().init() groupID := fftypes.NewRandB32() mock.ExpectBegin() - mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{"hash"})) - mock.ExpectExec("INSERT .*").WillReturnResult(sqlmock.NewResult(1, 1)) - mock.ExpectCommit().WillReturnError(fmt.Errorf("pop")) - err := s.UpsertGroup(context.Background(), &fftypes.Group{Hash: groupID}, true) - assert.Regexp(t, "FF10119", err) + mock.ExpectExec("DELETE .*").WillReturnError(fmt.Errorf("pop")) + tx, _ := s.db.Begin() + err := s.updateMembers(context.Background(), &txWrapper{sqlTX: tx}, &fftypes.Group{ + Hash: groupID, + GroupIdentity: fftypes.GroupIdentity{ + Members: fftypes.Members{{Node: fftypes.NewUUID()}}, + }, + }, true) + assert.Regexp(t, "FF10118", err) assert.NoError(t, mock.ExpectationsWereMet()) } diff --git a/internal/privatemessaging/groupmanager.go b/internal/privatemessaging/groupmanager.go index f348d51986..b8bdbd7c1d 100644 --- a/internal/privatemessaging/groupmanager.go +++ b/internal/privatemessaging/groupmanager.go @@ -70,7 +70,7 @@ func (gm *groupManager) EnsureLocalGroup(ctx context.Context, group *fftypes.Gro log.L(ctx).Errorf("Attempt to insert invalid group %s:%s: %s", group.Namespace, group.Hash, err) return false, nil } - err = gm.database.UpsertGroup(ctx, group, false) + err = gm.database.UpsertGroup(ctx, group, database.UpsertOptimizationNew /* it could have been created by another thread, but we think we're first */) if err != nil { return false, err } @@ -102,7 +102,7 @@ func (gm *groupManager) groupInit(ctx context.Context, signer *fftypes.Identity, // So it can be used straight away. // We're able to do this by making the identifier of the group a hash of the identity fields // (name, ledger and member list), as that is all the group contains. There's no data in there. - if err = gm.database.UpsertGroup(ctx, group, true); err != nil { + if err = gm.database.UpsertGroup(ctx, group, database.UpsertOptimizationNew /* we think we're first */); err != nil { return err } @@ -227,7 +227,7 @@ func (gm *groupManager) ResolveInitGroup(ctx context.Context, msg *fftypes.Messa return nil, nil } newGroup.Message = msg.Header.ID - err = gm.database.UpsertGroup(ctx, &newGroup, true) + err = gm.database.UpsertGroup(ctx, &newGroup, database.UpsertOptimizationNew /* we think we're first to create this */) if err != nil { return nil, err } diff --git a/internal/privatemessaging/groupmanager_test.go b/internal/privatemessaging/groupmanager_test.go index 69c5dc09b6..b321f4b337 100644 --- a/internal/privatemessaging/groupmanager_test.go +++ b/internal/privatemessaging/groupmanager_test.go @@ -44,7 +44,7 @@ func TestGroupInitWriteGroupFail(t *testing.T) { defer cancel() mdi := pm.database.(*databasemocks.Plugin) - mdi.On("UpsertGroup", mock.Anything, mock.Anything, true).Return(fmt.Errorf("pop")) + mdi.On("UpsertGroup", mock.Anything, mock.Anything, database.UpsertOptimizationNew).Return(fmt.Errorf("pop")) group := &fftypes.Group{ GroupIdentity: fftypes.GroupIdentity{ @@ -65,7 +65,7 @@ func TestGroupInitWriteDataFail(t *testing.T) { defer cancel() mdi := pm.database.(*databasemocks.Plugin) - mdi.On("UpsertGroup", mock.Anything, mock.Anything, true).Return(nil) + mdi.On("UpsertGroup", mock.Anything, mock.Anything, database.UpsertOptimizationNew).Return(nil) mdi.On("UpsertData", mock.Anything, mock.Anything, database.UpsertOptimizationNew).Return(fmt.Errorf("pop")) group := &fftypes.Group{ @@ -214,7 +214,7 @@ func TestResolveInitGroupUpsertFail(t *testing.T) { {ID: fftypes.NewUUID(), Value: fftypes.JSONAnyPtrBytes(b)}, }, true, nil) mdi := pm.database.(*databasemocks.Plugin) - mdi.On("UpsertGroup", pm.ctx, mock.Anything, true).Return(fmt.Errorf("pop")) + mdi.On("UpsertGroup", pm.ctx, mock.Anything, database.UpsertOptimizationNew).Return(fmt.Errorf("pop")) _, err := pm.ResolveInitGroup(pm.ctx, &fftypes.Message{ Header: fftypes.MessageHeader{ @@ -254,7 +254,7 @@ func TestResolveInitGroupNewOk(t *testing.T) { {ID: fftypes.NewUUID(), Value: fftypes.JSONAnyPtrBytes(b)}, }, true, nil) mdi := pm.database.(*databasemocks.Plugin) - mdi.On("UpsertGroup", pm.ctx, mock.Anything, true).Return(nil) + mdi.On("UpsertGroup", pm.ctx, mock.Anything, database.UpsertOptimizationNew).Return(nil) mdi.On("InsertEvent", pm.ctx, mock.Anything).Return(nil) group, err := pm.ResolveInitGroup(pm.ctx, &fftypes.Message{ @@ -278,7 +278,7 @@ func TestResolveInitGroupExistingOK(t *testing.T) { defer cancel() mdi := pm.database.(*databasemocks.Plugin) - mdi.On("UpsertGroup", pm.ctx, mock.Anything, true).Return(nil) + mdi.On("UpsertGroup", pm.ctx, mock.Anything, database.UpsertOptimizationNew).Return(nil) mdi.On("GetGroupByHash", pm.ctx, mock.Anything).Return(&fftypes.Group{}, nil) _, err := pm.ResolveInitGroup(pm.ctx, &fftypes.Message{ @@ -509,7 +509,7 @@ func TestEnsureLocalGroupNewOk(t *testing.T) { mdi := pm.database.(*databasemocks.Plugin) mdi.On("GetGroupByHash", pm.ctx, mock.Anything).Return(nil, nil) - mdi.On("UpsertGroup", pm.ctx, group, false).Return(nil) + mdi.On("UpsertGroup", pm.ctx, group, database.UpsertOptimizationNew).Return(nil) ok, err := pm.EnsureLocalGroup(pm.ctx, group) assert.NoError(t, err) @@ -590,7 +590,7 @@ func TestEnsureLocalGroupInsertErr(t *testing.T) { mdi := pm.database.(*databasemocks.Plugin) mdi.On("GetGroupByHash", pm.ctx, mock.Anything).Return(nil, nil) - mdi.On("UpsertGroup", pm.ctx, mock.Anything, false).Return(fmt.Errorf("pop")) + mdi.On("UpsertGroup", pm.ctx, mock.Anything, database.UpsertOptimizationNew).Return(fmt.Errorf("pop")) ok, err := pm.EnsureLocalGroup(pm.ctx, group) assert.EqualError(t, err, "pop") diff --git a/internal/privatemessaging/recipients_test.go b/internal/privatemessaging/recipients_test.go index 9ee867b5f9..1234fe89ca 100644 --- a/internal/privatemessaging/recipients_test.go +++ b/internal/privatemessaging/recipients_test.go @@ -53,7 +53,7 @@ func TestResolveMemberListNewGroupE2E(t *testing.T) { mdi.On("GetNodes", pm.ctx, mock.Anything).Return([]*fftypes.Node{{ID: nodeIDRemote, Name: "node2", Owner: signingKeyRemote}}, nil, nil).Once() mdi.On("GetNodes", pm.ctx, mock.Anything).Return([]*fftypes.Node{{ID: nodeIDLocal, Name: "node1", Owner: signingKeyLocal}}, nil, nil).Once() mdi.On("GetGroups", pm.ctx, mock.Anything).Return([]*fftypes.Group{}, nil, nil) - mdi.On("UpsertGroup", pm.ctx, mock.Anything, true).Return(nil) + mdi.On("UpsertGroup", pm.ctx, mock.Anything, database.UpsertOptimizationNew).Return(nil) mim := pm.identity.(*identitymanagermocks.Manager) mim.On("ResolveLocalOrgDID", pm.ctx).Return(orgDIDLocal, nil) mim.On("GetLocalOrganization", pm.ctx).Return(&fftypes.Organization{Identity: signingKeyLocal}, nil) diff --git a/mocks/databasemocks/plugin.go b/mocks/databasemocks/plugin.go index bf33a9b4be..e910456030 100644 --- a/mocks/databasemocks/plugin.go +++ b/mocks/databasemocks/plugin.go @@ -2591,13 +2591,13 @@ func (_m *Plugin) UpsertFFIMethod(ctx context.Context, method *fftypes.FFIMethod return r0 } -// UpsertGroup provides a mock function with given fields: ctx, data, allowExisting -func (_m *Plugin) UpsertGroup(ctx context.Context, data *fftypes.Group, allowExisting bool) error { - ret := _m.Called(ctx, data, allowExisting) +// UpsertGroup provides a mock function with given fields: ctx, data, optimization +func (_m *Plugin) UpsertGroup(ctx context.Context, data *fftypes.Group, optimization database.UpsertOptimization) error { + ret := _m.Called(ctx, data, optimization) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *fftypes.Group, bool) error); ok { - r0 = rf(ctx, data, allowExisting) + if rf, ok := ret.Get(0).(func(context.Context, *fftypes.Group, database.UpsertOptimization) error); ok { + r0 = rf(ctx, data, optimization) } else { r0 = ret.Error(0) } diff --git a/pkg/database/plugin.go b/pkg/database/plugin.go index 755fb73bdf..446c71841e 100644 --- a/pkg/database/plugin.go +++ b/pkg/database/plugin.go @@ -282,8 +282,8 @@ type iNodeCollection interface { } type iGroupCollection interface { - // UpserGroup - Upsert a group - UpsertGroup(ctx context.Context, data *fftypes.Group, allowExisting bool) (err error) + // UpserGroup - Upsert a group, with a hint to whether to optmize for existing or new + UpsertGroup(ctx context.Context, data *fftypes.Group, optimization UpsertOptimization) (err error) // UpdateGroup - Update group UpdateGroup(ctx context.Context, hash *fftypes.Bytes32, update Update) (err error) From 5cdf9754e5d7d2d7338be20bae07934b507be065 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Fri, 11 Feb 2022 19:29:22 -0500 Subject: [PATCH 2/4] Use ON CONFLICT DO NOTHING to avoid rollback of TX on insert conflict with PSQL Signed-off-by: Peter Broadhurst --- internal/database/postgres/postgres.go | 9 +++++++-- internal/database/postgres/postgres_test.go | 4 ++-- internal/database/sqlcommon/group_sql.go | 14 +++++++------ internal/database/sqlcommon/group_sql_test.go | 20 +++++++++++++++++++ internal/database/sqlcommon/provider.go | 2 +- .../database/sqlcommon/provider_mock_test.go | 2 +- .../sqlcommon/provider_sqlitego_test.go | 2 +- internal/database/sqlcommon/sqlcommon.go | 6 +++++- internal/database/sqlite3/sqlite3.go | 2 +- internal/database/sqlite3/sqlite3_test.go | 3 ++- 10 files changed, 48 insertions(+), 16 deletions(-) diff --git a/internal/database/postgres/postgres.go b/internal/database/postgres/postgres.go index e4ae1a9463..a48ce04712 100644 --- a/internal/database/postgres/postgres.go +++ b/internal/database/postgres/postgres.go @@ -60,8 +60,13 @@ func (psql *Postgres) Features() sqlcommon.SQLFeatures { return features } -func (psql *Postgres) UpdateInsertForSequenceReturn(insert sq.InsertBuilder) (sq.InsertBuilder, bool) { - return insert.Suffix(" RETURNING seq"), true +func (psql *Postgres) UpdateInsertForSequenceReturn(insert sq.InsertBuilder, requestConflictEmptyResult bool) (sq.InsertBuilder, bool) { + suffix := " RETURNING seq" + if requestConflictEmptyResult { + // Caller wants us to return an empty result set on insert conflict, rather than an error + suffix = fmt.Sprintf(" ON CONFLICT DO NOTHING%s", suffix) + } + return insert.Suffix(suffix), true } func (psql *Postgres) Open(url string) (*sql.DB, error) { diff --git a/internal/database/postgres/postgres_test.go b/internal/database/postgres/postgres_test.go index 2f8f50b4f5..f58b8d3942 100644 --- a/internal/database/postgres/postgres_test.go +++ b/internal/database/postgres/postgres_test.go @@ -43,9 +43,9 @@ func TestPostgresProvider(t *testing.T) { assert.Equal(t, `LOCK TABLE "events" IN EXCLUSIVE MODE;`, psql.Features().ExclusiveTableLockSQL("events")) insert := sq.Insert("test").Columns("col1").Values("val1") - insert, query := psql.UpdateInsertForSequenceReturn(insert) + insert, query := psql.UpdateInsertForSequenceReturn(insert, true) sql, _, err := insert.ToSql() assert.NoError(t, err) - assert.Equal(t, "INSERT INTO test (col1) VALUES (?) RETURNING seq", sql) + assert.Equal(t, "INSERT INTO test (col1) VALUES (?) ON CONFLICT DO NOTHING RETURNING seq", sql) assert.True(t, query) } diff --git a/internal/database/sqlcommon/group_sql.go b/internal/database/sqlcommon/group_sql.go index cb6fac9b23..19d71d4a01 100644 --- a/internal/database/sqlcommon/group_sql.go +++ b/internal/database/sqlcommon/group_sql.go @@ -52,13 +52,14 @@ func (s *SQLCommon) UpsertGroup(ctx context.Context, group *fftypes.Group, optim // try to perform an insert concurrently and ensure a non-failure outcome. optimized := false if optimization == database.UpsertOptimizationNew { - opErr := s.attemptGroupInsert(ctx, tx, group) + opErr := s.attemptGroupInsert(ctx, tx, group, true /* we want a failure here we can progress past */) optimized = opErr == nil } else if optimization == database.UpsertOptimizationExisting { rowsAffected, opErr := s.attemptGroupUpdate(ctx, tx, group) optimized = opErr == nil && rowsAffected == 1 } + existing := false if !optimized { // Do a select within the transaction to determine if the UUID already exists groupRows, _, err := s.queryTx(ctx, tx, @@ -69,7 +70,7 @@ func (s *SQLCommon) UpsertGroup(ctx context.Context, group *fftypes.Group, optim if err != nil { return err } - existing := groupRows.Next() + existing = groupRows.Next() groupRows.Close() if existing { @@ -77,7 +78,7 @@ func (s *SQLCommon) UpsertGroup(ctx context.Context, group *fftypes.Group, optim return err } } else { - if err = s.attemptGroupInsert(ctx, tx, group); err != nil { + if err = s.attemptGroupInsert(ctx, tx, group, false); err != nil { return err } } @@ -86,7 +87,7 @@ func (s *SQLCommon) UpsertGroup(ctx context.Context, group *fftypes.Group, optim // Note the member list is not allowed to change, as it is part of the hash. // So the optimization above relies on the fact these are in a transaction, so the // whole group (with members) will have been inserted - if !optimized || optimization == database.UpsertOptimizationNew { + if (optimized && optimization == database.UpsertOptimizationNew) || (!optimized && !existing) { if err = s.updateMembers(ctx, tx, group, false); err != nil { return err } @@ -112,8 +113,8 @@ func (s *SQLCommon) attemptGroupUpdate(ctx context.Context, tx *txWrapper, group ) } -func (s *SQLCommon) attemptGroupInsert(ctx context.Context, tx *txWrapper, group *fftypes.Group) error { - _, err := s.insertTx(ctx, tx, +func (s *SQLCommon) attemptGroupInsert(ctx context.Context, tx *txWrapper, group *fftypes.Group, requestConflictEmptyResult bool) error { + _, err := s.insertTxExt(ctx, tx, sq.Insert("groups"). Columns(groupColumns...). Values( @@ -127,6 +128,7 @@ func (s *SQLCommon) attemptGroupInsert(ctx context.Context, tx *txWrapper, group func() { s.callbacks.HashCollectionNSEvent(database.CollectionGroups, fftypes.ChangeEventTypeCreated, group.Namespace, group.Hash) }, + requestConflictEmptyResult, ) return err } diff --git a/internal/database/sqlcommon/group_sql_test.go b/internal/database/sqlcommon/group_sql_test.go index e2c55968c0..c5e17e4407 100644 --- a/internal/database/sqlcommon/group_sql_test.go +++ b/internal/database/sqlcommon/group_sql_test.go @@ -175,6 +175,26 @@ func TestUpsertGroupFailUpdate(t *testing.T) { assert.NoError(t, mock.ExpectationsWereMet()) } +func TestUpsertGroupFailMembers(t *testing.T) { + s, mock := newMockProvider().init() + groupID := fftypes.NewRandB32() + mock.ExpectBegin() + mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{"hash"})) + mock.ExpectExec("INSERT .*").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("INSERT .*").WillReturnError(fmt.Errorf("pop")) + mock.ExpectRollback() + err := s.UpsertGroup(context.Background(), &fftypes.Group{ + Hash: groupID, + GroupIdentity: fftypes.GroupIdentity{ + Members: fftypes.Members{ + {Identity: "org1", Node: fftypes.NewUUID()}, + }, + }, + }, database.UpsertOptimizationSkip) + assert.Regexp(t, "FF10116", err) + assert.NoError(t, mock.ExpectationsWereMet()) +} + func TestUpsertGroupFailCommit(t *testing.T) { s, mock := newMockProvider().init() groupID := fftypes.NewRandB32() diff --git a/internal/database/sqlcommon/provider.go b/internal/database/sqlcommon/provider.go index 905e6c2215..4ef5342318 100644 --- a/internal/database/sqlcommon/provider.go +++ b/internal/database/sqlcommon/provider.go @@ -59,5 +59,5 @@ type Provider interface { Features() SQLFeatures // UpdateInsertForSequenceReturn updates the INSERT query for returning the Sequence, and returns whether it needs to be run as a query to return the Sequence field - UpdateInsertForSequenceReturn(insert sq.InsertBuilder) (updatedInsert sq.InsertBuilder, runAsQuery bool) + UpdateInsertForSequenceReturn(insert sq.InsertBuilder, requestConflictEmptyResult bool) (updatedInsert sq.InsertBuilder, runAsQuery bool) } diff --git a/internal/database/sqlcommon/provider_mock_test.go b/internal/database/sqlcommon/provider_mock_test.go index 6c7c58523d..d37697fb14 100644 --- a/internal/database/sqlcommon/provider_mock_test.go +++ b/internal/database/sqlcommon/provider_mock_test.go @@ -77,7 +77,7 @@ func (psql *mockProvider) Features() SQLFeatures { return features } -func (mp *mockProvider) UpdateInsertForSequenceReturn(insert sq.InsertBuilder) (sq.InsertBuilder, bool) { +func (mp *mockProvider) UpdateInsertForSequenceReturn(insert sq.InsertBuilder, requestConflictEmptyResult bool) (sq.InsertBuilder, bool) { if mp.fakePSQLInsert { return insert.Suffix(" RETURNING seq"), true } diff --git a/internal/database/sqlcommon/provider_sqlitego_test.go b/internal/database/sqlcommon/provider_sqlitego_test.go index f67cee4da9..4b76261797 100644 --- a/internal/database/sqlcommon/provider_sqlitego_test.go +++ b/internal/database/sqlcommon/provider_sqlitego_test.go @@ -85,7 +85,7 @@ func (psql *sqliteGoTestProvider) Features() SQLFeatures { return features } -func (tp *sqliteGoTestProvider) UpdateInsertForSequenceReturn(insert sq.InsertBuilder) (sq.InsertBuilder, bool) { +func (tp *sqliteGoTestProvider) UpdateInsertForSequenceReturn(insert sq.InsertBuilder, requestConflictEmptyResult bool) (sq.InsertBuilder, bool) { // Nothing required - QL supports the query for returning the generated ID, and we use that for the sequence return insert, false } diff --git a/internal/database/sqlcommon/sqlcommon.go b/internal/database/sqlcommon/sqlcommon.go index 161008147d..c56634908c 100644 --- a/internal/database/sqlcommon/sqlcommon.go +++ b/internal/database/sqlcommon/sqlcommon.go @@ -236,8 +236,12 @@ func (s *SQLCommon) queryRes(ctx context.Context, tx *txWrapper, tableName strin } func (s *SQLCommon) insertTx(ctx context.Context, tx *txWrapper, q sq.InsertBuilder, postCommit func()) (int64, error) { + return s.insertTxExt(ctx, tx, q, postCommit, false) +} + +func (s *SQLCommon) insertTxExt(ctx context.Context, tx *txWrapper, q sq.InsertBuilder, postCommit func(), requestConflictEmptyResult bool) (int64, error) { l := log.L(ctx) - q, useQuery := s.provider.UpdateInsertForSequenceReturn(q) + q, useQuery := s.provider.UpdateInsertForSequenceReturn(q, requestConflictEmptyResult) sqlQuery, args, err := q.PlaceholderFormat(s.features.PlaceholderFormat).ToSql() if err != nil { diff --git a/internal/database/sqlite3/sqlite3.go b/internal/database/sqlite3/sqlite3.go index 90e4cf3463..46879498bc 100644 --- a/internal/database/sqlite3/sqlite3.go +++ b/internal/database/sqlite3/sqlite3.go @@ -73,7 +73,7 @@ func (sqlite *SQLite3) Features() sqlcommon.SQLFeatures { return features } -func (sqlite *SQLite3) UpdateInsertForSequenceReturn(insert sq.InsertBuilder) (sq.InsertBuilder, bool) { +func (sqlite *SQLite3) UpdateInsertForSequenceReturn(insert sq.InsertBuilder, requestConflictEmptyResult bool) (sq.InsertBuilder, bool) { return insert, false } diff --git a/internal/database/sqlite3/sqlite3_test.go b/internal/database/sqlite3/sqlite3_test.go index 26d10ae5fe..6a8d523db9 100644 --- a/internal/database/sqlite3/sqlite3_test.go +++ b/internal/database/sqlite3/sqlite3_test.go @@ -14,6 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build cgo // +build cgo package sqlite3 @@ -50,7 +51,7 @@ func TestSQLite3GoProvider(t *testing.T) { assert.Equal(t, sq.Dollar, sqlite.Features().PlaceholderFormat) insert := sq.Insert("test").Columns("col1").Values("val1") - insert, query := sqlite.UpdateInsertForSequenceReturn(insert) + insert, query := sqlite.UpdateInsertForSequenceReturn(insert, false) sql, _, err := insert.ToSql() assert.NoError(t, err) assert.Equal(t, "INSERT INTO test (col1) VALUES (?)", sql) From 47663277b8ca217bc12d03ffacf360edade4789f Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Fri, 11 Feb 2022 19:58:36 -0500 Subject: [PATCH 3/4] Drop level to debug for optimized case Signed-off-by: Peter Broadhurst --- internal/database/sqlcommon/sqlcommon.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/internal/database/sqlcommon/sqlcommon.go b/internal/database/sqlcommon/sqlcommon.go index c56634908c..0450cff09c 100644 --- a/internal/database/sqlcommon/sqlcommon.go +++ b/internal/database/sqlcommon/sqlcommon.go @@ -28,6 +28,7 @@ import ( "github.com/hyperledger/firefly/internal/log" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" + "github.com/sirupsen/logrus" // Import migrate file source _ "github.com/golang-migrate/migrate/v4/source/file" @@ -253,7 +254,11 @@ func (s *SQLCommon) insertTxExt(ctx context.Context, tx *txWrapper, q sq.InsertB if useQuery { err := tx.sqlTX.QueryRowContext(ctx, sqlQuery, args...).Scan(&sequence) if err != nil { - l.Errorf(`SQL insert failed: %s sql=[ %s ]: %s`, err, sqlQuery, err) + level := logrus.DebugLevel + if !requestConflictEmptyResult { + level = logrus.ErrorLevel + } + l.Logf(level, `SQL insert failed (conflictEmptyRequested=%t): %s sql=[ %s ]: %s`, requestConflictEmptyResult, err, sqlQuery, err) return -1, i18n.WrapError(ctx, err, i18n.MsgDBInsertFailed) } } else { From 55fa3e29dd0054e62fcf60fc4817a3e5067c8734 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Sat, 12 Feb 2022 12:35:48 -0500 Subject: [PATCH 4/4] Tweaks from review Signed-off-by: Peter Broadhurst --- internal/database/postgres/postgres.go | 2 +- internal/database/postgres/postgres_test.go | 2 +- internal/database/sqlcommon/provider.go | 4 ++-- internal/database/sqlcommon/provider_mock_test.go | 2 +- internal/database/sqlcommon/provider_sqlitego_test.go | 2 +- internal/database/sqlcommon/sqlcommon.go | 2 +- internal/database/sqlite3/sqlite3.go | 2 +- internal/database/sqlite3/sqlite3_test.go | 2 +- pkg/database/plugin.go | 2 +- 9 files changed, 10 insertions(+), 10 deletions(-) diff --git a/internal/database/postgres/postgres.go b/internal/database/postgres/postgres.go index a48ce04712..115f4c5795 100644 --- a/internal/database/postgres/postgres.go +++ b/internal/database/postgres/postgres.go @@ -60,7 +60,7 @@ func (psql *Postgres) Features() sqlcommon.SQLFeatures { return features } -func (psql *Postgres) UpdateInsertForSequenceReturn(insert sq.InsertBuilder, requestConflictEmptyResult bool) (sq.InsertBuilder, bool) { +func (psql *Postgres) ApplyInsertQueryCustomizations(insert sq.InsertBuilder, requestConflictEmptyResult bool) (sq.InsertBuilder, bool) { suffix := " RETURNING seq" if requestConflictEmptyResult { // Caller wants us to return an empty result set on insert conflict, rather than an error diff --git a/internal/database/postgres/postgres_test.go b/internal/database/postgres/postgres_test.go index f58b8d3942..976b26a73a 100644 --- a/internal/database/postgres/postgres_test.go +++ b/internal/database/postgres/postgres_test.go @@ -43,7 +43,7 @@ func TestPostgresProvider(t *testing.T) { assert.Equal(t, `LOCK TABLE "events" IN EXCLUSIVE MODE;`, psql.Features().ExclusiveTableLockSQL("events")) insert := sq.Insert("test").Columns("col1").Values("val1") - insert, query := psql.UpdateInsertForSequenceReturn(insert, true) + insert, query := psql.ApplyInsertQueryCustomizations(insert, true) sql, _, err := insert.ToSql() assert.NoError(t, err) assert.Equal(t, "INSERT INTO test (col1) VALUES (?) ON CONFLICT DO NOTHING RETURNING seq", sql) diff --git a/internal/database/sqlcommon/provider.go b/internal/database/sqlcommon/provider.go index 4ef5342318..2cd033029f 100644 --- a/internal/database/sqlcommon/provider.go +++ b/internal/database/sqlcommon/provider.go @@ -58,6 +58,6 @@ type Provider interface { // Features returns database specific configuration switches Features() SQLFeatures - // UpdateInsertForSequenceReturn updates the INSERT query for returning the Sequence, and returns whether it needs to be run as a query to return the Sequence field - UpdateInsertForSequenceReturn(insert sq.InsertBuilder, requestConflictEmptyResult bool) (updatedInsert sq.InsertBuilder, runAsQuery bool) + // ApplyInsertQueryCustomizations updates the INSERT query for returning the Sequence, and returns whether it needs to be run as a query to return the Sequence field + ApplyInsertQueryCustomizations(insert sq.InsertBuilder, requestConflictEmptyResult bool) (updatedInsert sq.InsertBuilder, runAsQuery bool) } diff --git a/internal/database/sqlcommon/provider_mock_test.go b/internal/database/sqlcommon/provider_mock_test.go index d37697fb14..2a4c404968 100644 --- a/internal/database/sqlcommon/provider_mock_test.go +++ b/internal/database/sqlcommon/provider_mock_test.go @@ -77,7 +77,7 @@ func (psql *mockProvider) Features() SQLFeatures { return features } -func (mp *mockProvider) UpdateInsertForSequenceReturn(insert sq.InsertBuilder, requestConflictEmptyResult bool) (sq.InsertBuilder, bool) { +func (mp *mockProvider) ApplyInsertQueryCustomizations(insert sq.InsertBuilder, requestConflictEmptyResult bool) (sq.InsertBuilder, bool) { if mp.fakePSQLInsert { return insert.Suffix(" RETURNING seq"), true } diff --git a/internal/database/sqlcommon/provider_sqlitego_test.go b/internal/database/sqlcommon/provider_sqlitego_test.go index 4b76261797..999d79bbc4 100644 --- a/internal/database/sqlcommon/provider_sqlitego_test.go +++ b/internal/database/sqlcommon/provider_sqlitego_test.go @@ -85,7 +85,7 @@ func (psql *sqliteGoTestProvider) Features() SQLFeatures { return features } -func (tp *sqliteGoTestProvider) UpdateInsertForSequenceReturn(insert sq.InsertBuilder, requestConflictEmptyResult bool) (sq.InsertBuilder, bool) { +func (tp *sqliteGoTestProvider) ApplyInsertQueryCustomizations(insert sq.InsertBuilder, requestConflictEmptyResult bool) (sq.InsertBuilder, bool) { // Nothing required - QL supports the query for returning the generated ID, and we use that for the sequence return insert, false } diff --git a/internal/database/sqlcommon/sqlcommon.go b/internal/database/sqlcommon/sqlcommon.go index 0450cff09c..24fa5818cc 100644 --- a/internal/database/sqlcommon/sqlcommon.go +++ b/internal/database/sqlcommon/sqlcommon.go @@ -242,7 +242,7 @@ func (s *SQLCommon) insertTx(ctx context.Context, tx *txWrapper, q sq.InsertBuil func (s *SQLCommon) insertTxExt(ctx context.Context, tx *txWrapper, q sq.InsertBuilder, postCommit func(), requestConflictEmptyResult bool) (int64, error) { l := log.L(ctx) - q, useQuery := s.provider.UpdateInsertForSequenceReturn(q, requestConflictEmptyResult) + q, useQuery := s.provider.ApplyInsertQueryCustomizations(q, requestConflictEmptyResult) sqlQuery, args, err := q.PlaceholderFormat(s.features.PlaceholderFormat).ToSql() if err != nil { diff --git a/internal/database/sqlite3/sqlite3.go b/internal/database/sqlite3/sqlite3.go index 46879498bc..bb372568e8 100644 --- a/internal/database/sqlite3/sqlite3.go +++ b/internal/database/sqlite3/sqlite3.go @@ -73,7 +73,7 @@ func (sqlite *SQLite3) Features() sqlcommon.SQLFeatures { return features } -func (sqlite *SQLite3) UpdateInsertForSequenceReturn(insert sq.InsertBuilder, requestConflictEmptyResult bool) (sq.InsertBuilder, bool) { +func (sqlite *SQLite3) ApplyInsertQueryCustomizations(insert sq.InsertBuilder, requestConflictEmptyResult bool) (sq.InsertBuilder, bool) { return insert, false } diff --git a/internal/database/sqlite3/sqlite3_test.go b/internal/database/sqlite3/sqlite3_test.go index 6a8d523db9..c874c27daf 100644 --- a/internal/database/sqlite3/sqlite3_test.go +++ b/internal/database/sqlite3/sqlite3_test.go @@ -51,7 +51,7 @@ func TestSQLite3GoProvider(t *testing.T) { assert.Equal(t, sq.Dollar, sqlite.Features().PlaceholderFormat) insert := sq.Insert("test").Columns("col1").Values("val1") - insert, query := sqlite.UpdateInsertForSequenceReturn(insert, false) + insert, query := sqlite.ApplyInsertQueryCustomizations(insert, false) sql, _, err := insert.ToSql() assert.NoError(t, err) assert.Equal(t, "INSERT INTO test (col1) VALUES (?)", sql) diff --git a/pkg/database/plugin.go b/pkg/database/plugin.go index 446c71841e..f3fc6b32ef 100644 --- a/pkg/database/plugin.go +++ b/pkg/database/plugin.go @@ -282,7 +282,7 @@ type iNodeCollection interface { } type iGroupCollection interface { - // UpserGroup - Upsert a group, with a hint to whether to optmize for existing or new + // UpsertGroup - Upsert a group, with a hint to whether to optmize for existing or new UpsertGroup(ctx context.Context, data *fftypes.Group, optimization UpsertOptimization) (err error) // UpdateGroup - Update group