From 53be2c290fa449ee1ffcf8cc3734a75c03cc33b1 Mon Sep 17 00:00:00 2001 From: dylan_lott Date: Fri, 1 Apr 2022 11:21:26 -0600 Subject: [PATCH 01/30] snapshot iterator --- source/logrepl/snapshot.go | 221 ++++++++++++++++++++++++++++++++ source/logrepl/snapshot_test.go | 155 ++++++++++++++++++++++ 2 files changed, 376 insertions(+) create mode 100644 source/logrepl/snapshot.go create mode 100644 source/logrepl/snapshot_test.go diff --git a/source/logrepl/snapshot.go b/source/logrepl/snapshot.go new file mode 100644 index 00000000..e64a0b1a --- /dev/null +++ b/source/logrepl/snapshot.go @@ -0,0 +1,221 @@ +// Copyright © 2022 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package logrepl + +import ( + "context" + "database/sql" + "fmt" + "strconv" + "time" + + "github.com/conduitio/conduit-connector-postgres/pgutil" + sdk "github.com/conduitio/conduit-connector-sdk" + + sq "github.com/Masterminds/squirrel" + "github.com/jackc/pgtype" + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" +) + +var psql = sq.StatementBuilder.PlaceholderFormat(sq.Dollar) + +const actionSnapshot string = "snapshot" + +type SnapshotConfig struct { + URI string + SnapshotName string + Table string + Columns []string + KeyColumn string +} + +type SnapshotIterator struct { + config SnapshotConfig + + pool *pgxpool.Pool + rows pgx.Rows + tx pgx.Tx + + internalPos int64 +} + +func NewSnapshotIterator(ctx context.Context, cfg SnapshotConfig) (*SnapshotIterator, error) { + s := &SnapshotIterator{ + config: cfg, + } + + err := s.attachPool(ctx) + if err != nil { + return nil, fmt.Errorf("failed to attach pool: %w", err) + } + + err = s.startTransaction(ctx) + if err != nil { + return nil, fmt.Errorf("failed to acquire snapshot transaction: %w", err) + } + + err = s.loadRows(ctx) + if err != nil { + return nil, fmt.Errorf("failed to load rows: %w", err) + } + + return s, nil +} + +func (s *SnapshotIterator) attachPool(ctx context.Context) error { + pool, err := pgxpool.Connect(ctx, s.config.URI) + if err != nil { + return err + } + s.pool = pool + return nil +} + +func (s *SnapshotIterator) loadRows(ctx context.Context) error { + snapshotTx := fmt.Sprintf("SET TRANSACTION SNAPSHOT '%s';", s.config.SnapshotName) + _, err := s.tx.Query(ctx, snapshotTx) + if err != nil { + return err + } + + query, args, err := psql. + Select(s.config.Columns...). + From(s.config.Table). + ToSql() + if err != nil { + return fmt.Errorf("failed to create read query: %w", err) + } + rows, err := s.tx.Query(ctx, query, args...) + if err != nil { + return fmt.Errorf("failed to query context: %w", err) + } + s.rows = rows + return nil +} + +func (s *SnapshotIterator) startTransaction(ctx context.Context) error { + tx, err := s.pool.BeginTx(ctx, pgx.TxOptions{ + IsoLevel: pgx.RepeatableRead, + AccessMode: pgx.ReadOnly, + }) + if err != nil { + return err + } + s.tx = tx + return nil +} + +func (s *SnapshotIterator) Next(ctx context.Context) (sdk.Record, error) { + if !s.rows.Next() { + if err := s.rows.Err(); err != nil { + return sdk.Record{}, fmt.Errorf("rows error: %w", err) + } + return sdk.Record{}, sdk.ErrBackoffRetry + } + + return s.buildRecord(ctx) +} + +// Ack is a noop for snapshots +func (s *SnapshotIterator) Ack(ctx context.Context, pos sdk.Position) error { + return nil // noop for snapshots +} + +// Teardown attempts to gracefully teardown the iterator. +func (s *SnapshotIterator) Teardown(ctx context.Context) error { + defer s.pool.Close() + s.rows.Close() + err := s.commit(ctx) + return err +} + +func (s *SnapshotIterator) commit(ctx context.Context) error { + err := s.tx.Commit(ctx) + if err != nil { + return fmt.Errorf("failed to commit snapshot tx: %w", err) + } + return nil +} + +func (s *SnapshotIterator) buildRecord(ctx context.Context) (sdk.Record, error) { + if err := s.rows.Err(); err != nil { + return sdk.Record{}, fmt.Errorf("rows error: %w", err) + } + + r, err := withPayloadAndKey(sdk.Record{}, s.rows, s.config.Columns, s.config.KeyColumn) + if err != nil { + return sdk.Record{}, err + } + r.CreatedAt = time.Now() + r.Metadata = map[string]string{ + "action": actionSnapshot, + "table": s.config.Table, + } + + r.Position = sdk.Position(strconv.FormatInt(s.internalPos, 10)) + s.internalPos++ + + return r, nil +} + +// withPayloadAndKey builds a record's payload from *sql.Rows. +func withPayloadAndKey(rec sdk.Record, rows pgx.Rows, columns []string, key string) (sdk.Record, error) { + colTypes := rows.FieldDescriptions() + + vals := make([]interface{}, len(columns)) + for i := range columns { + vals[i] = oidToScannerValue(pgtype.OID(colTypes[i].DataTypeOID)) + } + + err := rows.Scan(vals...) + if err != nil { + return sdk.Record{}, fmt.Errorf("failed to scan: %w", err) + } + + payload := make(sdk.StructuredData) + for i, col := range columns { + val := vals[i].(pgtype.Value) + + // handle and assign the record a Key + if key == col { + // TODO: Handle composite keys + rec.Key = sdk.StructuredData{ + col: val.Get(), + } + // continue without assigning so payload doesn't duplicate key data + continue + } + + payload[col] = val.Get() + } + + rec.Payload = payload + return rec, nil +} + +type scannerValue interface { + pgtype.Value + sql.Scanner +} + +func oidToScannerValue(oid pgtype.OID) scannerValue { + t, ok := pgutil.OIDToPgType(oid).(scannerValue) + if !ok { + // not all pg types implement pgtype.Value and sql.Scanner + return &pgtype.Unknown{} + } + return t +} diff --git a/source/logrepl/snapshot_test.go b/source/logrepl/snapshot_test.go new file mode 100644 index 00000000..c2659d27 --- /dev/null +++ b/source/logrepl/snapshot_test.go @@ -0,0 +1,155 @@ +// Copyright © 2022 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package logrepl + +import ( + "context" + "reflect" + "strconv" + "testing" + "time" + + "github.com/conduitio/conduit-connector-postgres/test" + sdk "github.com/conduitio/conduit-connector-sdk" + "github.com/google/go-cmp/cmp" + "github.com/jackc/pgx/v4" + "github.com/matryer/is" +) + +func TestSnapshotIterator_Next(t *testing.T) { + i := is.New(t) + ctx := context.Background() + db := test.ConnectSimple(ctx, t, test.RepmgrConnString) + + type fields struct { + uri string + table string + columns []string + keyColumn string + snapshotName string + } + type args struct { + in0 context.Context + } + tests := []struct { + name string + fields fields + args args + want sdk.Record + wantErr bool + }{ + { + name: "should return the first record", + fields: fields{ + table: test.SetupTestTable(ctx, t, db), + columns: []string{"id", "key", "column1", "column2", "column3"}, + keyColumn: "key", + uri: test.RepmgrConnString, + snapshotName: createTestSnapshot(t, db), + }, + args: args{ + in0: context.Background(), + }, + want: sdk.Record{ + Position: sdk.Position("0"), + Key: sdk.StructuredData{ + "key": []uint8("1"), + }, + Payload: sdk.StructuredData{ + "id": int64(1), + "column1": "foo", + "column2": int32(123), + "column3": bool(false), + }, + Metadata: map[string]string{ + "action": actionSnapshot, + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, err := NewSnapshotIterator(context.Background(), SnapshotConfig{ + SnapshotName: tt.fields.snapshotName, + Table: tt.fields.table, + Columns: tt.fields.columns, + KeyColumn: tt.fields.keyColumn, + URI: tt.fields.uri, + }) + i.NoErr(err) + + t.Cleanup(func() { + i.NoErr(s.Teardown(context.Background())) + }) + + now := time.Now() + got, err := s.Next(tt.args.in0) + if (err != nil) != tt.wantErr { + t.Errorf("SnapshotIterator.Next() error = %v, wantErr %v", err, tt.wantErr) + } + + i.True(got.Metadata["table"] == tt.fields.table) + delete(got.Metadata, "table") // delete so we don't compare in diff + i.True(got.CreatedAt.After(now)) + got.CreatedAt = time.Time{} // reset so we don't compare in diff + + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("SnapshotIterator.Next() = %v, want %v", got, tt.want) + if diff := cmp.Diff(got, tt.want); diff != "" { + t.Log(diff) + } + } + }) + } +} + +func TestIteration(t *testing.T) { + i := is.New(t) + ctx := context.Background() + db := test.ConnectSimple(ctx, t, test.RepmgrConnString) + + s, err := NewSnapshotIterator(context.Background(), SnapshotConfig{ + SnapshotName: createTestSnapshot(t, db), + Table: test.SetupTestTable(ctx, t, db), + Columns: []string{"id", "key", "column1", "column2", "column3"}, + KeyColumn: "key", + URI: test.RepmgrConnString, + }) + i.NoErr(err) + + now := time.Now() + for idx := 0; idx < 2; idx++ { + rec, err := s.Next(ctx) + i.Equal(string(rec.Position), strconv.FormatInt(int64(idx), 10)) + i.NoErr(err) + t.Log(rec) + rec.CreatedAt.After(now) + } + + i.NoErr(s.Teardown(ctx)) +} + +func createTestSnapshot(t *testing.T, db *pgx.Conn) string { + var n *string + query := `SELECT * FROM pg_catalog.pg_export_snapshot();` + row := db.QueryRow(context.Background(), query) + err := row.Scan(&n) + if err != nil { + t.Logf("failed to scan name: %s", err) + t.Fail() + } + return *n +} From ee5ea8b1fddc334ee706dbf9041d03e99db8a448 Mon Sep 17 00:00:00 2001 From: dylan_lott Date: Mon, 4 Apr 2022 20:55:17 -0600 Subject: [PATCH 02/30] adds lifecycle test for proper cleanup --- source/logrepl/snapshot.go | 68 ++++++-------- source/logrepl/snapshot_test.go | 158 +++++++++----------------------- 2 files changed, 71 insertions(+), 155 deletions(-) diff --git a/source/logrepl/snapshot.go b/source/logrepl/snapshot.go index e64a0b1a..b9120210 100644 --- a/source/logrepl/snapshot.go +++ b/source/logrepl/snapshot.go @@ -27,7 +27,6 @@ import ( sq "github.com/Masterminds/squirrel" "github.com/jackc/pgtype" "github.com/jackc/pgx/v4" - "github.com/jackc/pgx/v4/pgxpool" ) var psql = sq.StatementBuilder.PlaceholderFormat(sq.Dollar) @@ -45,26 +44,21 @@ type SnapshotConfig struct { type SnapshotIterator struct { config SnapshotConfig - pool *pgxpool.Pool + conn *pgx.Conn rows pgx.Rows - tx pgx.Tx internalPos int64 } -func NewSnapshotIterator(ctx context.Context, cfg SnapshotConfig) (*SnapshotIterator, error) { +func NewSnapshotIterator(ctx context.Context, conn *pgx.Conn, cfg SnapshotConfig) (*SnapshotIterator, error) { s := &SnapshotIterator{ config: cfg, + conn: conn, } - err := s.attachPool(ctx) + err := s.startSnapshotTx(ctx) if err != nil { - return nil, fmt.Errorf("failed to attach pool: %w", err) - } - - err = s.startTransaction(ctx) - if err != nil { - return nil, fmt.Errorf("failed to acquire snapshot transaction: %w", err) + return nil, fmt.Errorf("failed to start snapshot tx: %w", err) } err = s.loadRows(ctx) @@ -75,22 +69,7 @@ func NewSnapshotIterator(ctx context.Context, cfg SnapshotConfig) (*SnapshotIter return s, nil } -func (s *SnapshotIterator) attachPool(ctx context.Context) error { - pool, err := pgxpool.Connect(ctx, s.config.URI) - if err != nil { - return err - } - s.pool = pool - return nil -} - func (s *SnapshotIterator) loadRows(ctx context.Context) error { - snapshotTx := fmt.Sprintf("SET TRANSACTION SNAPSHOT '%s';", s.config.SnapshotName) - _, err := s.tx.Query(ctx, snapshotTx) - if err != nil { - return err - } - query, args, err := psql. Select(s.config.Columns...). From(s.config.Table). @@ -98,23 +77,28 @@ func (s *SnapshotIterator) loadRows(ctx context.Context) error { if err != nil { return fmt.Errorf("failed to create read query: %w", err) } - rows, err := s.tx.Query(ctx, query, args...) + + rows, err := s.conn.Query(ctx, query, args...) if err != nil { - return fmt.Errorf("failed to query context: %w", err) + return fmt.Errorf("failed to query rows: %w", err) } s.rows = rows + return nil } -func (s *SnapshotIterator) startTransaction(ctx context.Context) error { - tx, err := s.pool.BeginTx(ctx, pgx.TxOptions{ - IsoLevel: pgx.RepeatableRead, - AccessMode: pgx.ReadOnly, - }) +func (s *SnapshotIterator) startSnapshotTx(ctx context.Context) error { + _, err := s.conn.Exec(ctx, `BEGIN ISOLATION LEVEL REPEATABLE READ;`) + if err != nil { + return nil + } + + snapshotTx := fmt.Sprintf(`SET TRANSACTION SNAPSHOT '%s'`, s.config.SnapshotName) + _, err = s.conn.Exec(ctx, snapshotTx) if err != nil { return err } - s.tx = tx + return nil } @@ -123,6 +107,7 @@ func (s *SnapshotIterator) Next(ctx context.Context) (sdk.Record, error) { if err := s.rows.Err(); err != nil { return sdk.Record{}, fmt.Errorf("rows error: %w", err) } + return sdk.Record{}, sdk.ErrBackoffRetry } @@ -136,30 +121,30 @@ func (s *SnapshotIterator) Ack(ctx context.Context, pos sdk.Position) error { // Teardown attempts to gracefully teardown the iterator. func (s *SnapshotIterator) Teardown(ctx context.Context) error { - defer s.pool.Close() s.rows.Close() - err := s.commit(ctx) - return err + return s.commit(ctx) } func (s *SnapshotIterator) commit(ctx context.Context) error { - err := s.tx.Commit(ctx) + _, err := s.conn.Exec(ctx, `COMMIT;`) if err != nil { - return fmt.Errorf("failed to commit snapshot tx: %w", err) + return fmt.Errorf("failed to commit: %w", err) } return nil } func (s *SnapshotIterator) buildRecord(ctx context.Context) (sdk.Record, error) { if err := s.rows.Err(); err != nil { - return sdk.Record{}, fmt.Errorf("rows error: %w", err) + return sdk.Record{}, fmt.Errorf("build record rows error: %w", err) } r, err := withPayloadAndKey(sdk.Record{}, s.rows, s.config.Columns, s.config.KeyColumn) if err != nil { return sdk.Record{}, err } + r.CreatedAt = time.Now() + r.Metadata = map[string]string{ "action": actionSnapshot, "table": s.config.Table, @@ -171,7 +156,8 @@ func (s *SnapshotIterator) buildRecord(ctx context.Context) (sdk.Record, error) return r, nil } -// withPayloadAndKey builds a record's payload from *sql.Rows. +// withPayloadAndKey builds a record's payload from *sql.Rows. It calls +// Scan so it assumes that Next has been checked previously. func withPayloadAndKey(rec sdk.Record, rows pgx.Rows, columns []string, key string) (sdk.Record, error) { colTypes := rows.FieldDescriptions() diff --git a/source/logrepl/snapshot_test.go b/source/logrepl/snapshot_test.go index c2659d27..68a2b188 100644 --- a/source/logrepl/snapshot_test.go +++ b/source/logrepl/snapshot_test.go @@ -16,140 +16,70 @@ package logrepl import ( "context" - "reflect" - "strconv" "testing" "time" "github.com/conduitio/conduit-connector-postgres/test" sdk "github.com/conduitio/conduit-connector-sdk" - "github.com/google/go-cmp/cmp" - "github.com/jackc/pgx/v4" + "github.com/matryer/is" ) -func TestSnapshotIterator_Next(t *testing.T) { +func TestLifecycle(t *testing.T) { i := is.New(t) ctx := context.Background() - db := test.ConnectSimple(ctx, t, test.RepmgrConnString) - - type fields struct { - uri string - table string - columns []string - keyColumn string - snapshotName string - } - type args struct { - in0 context.Context - } - tests := []struct { - name string - fields fields - args args - want sdk.Record - wantErr bool - }{ - { - name: "should return the first record", - fields: fields{ - table: test.SetupTestTable(ctx, t, db), - columns: []string{"id", "key", "column1", "column2", "column3"}, - keyColumn: "key", - uri: test.RepmgrConnString, - snapshotName: createTestSnapshot(t, db), - }, - args: args{ - in0: context.Background(), - }, - want: sdk.Record{ - Position: sdk.Position("0"), - Key: sdk.StructuredData{ - "key": []uint8("1"), - }, - Payload: sdk.StructuredData{ - "id": int64(1), - "column1": "foo", - "column2": int32(123), - "column3": bool(false), - }, - Metadata: map[string]string{ - "action": actionSnapshot, - }, - }, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - s, err := NewSnapshotIterator(context.Background(), SnapshotConfig{ - SnapshotName: tt.fields.snapshotName, - Table: tt.fields.table, - Columns: tt.fields.columns, - KeyColumn: tt.fields.keyColumn, - URI: tt.fields.uri, - }) - i.NoErr(err) - t.Cleanup(func() { - i.NoErr(s.Teardown(context.Background())) - }) + testConn := test.ConnectSimple(ctx, t, test.RegularConnString) + table := test.SetupTestTable(ctx, t, test.ConnectSimple(ctx, t, test.RegularConnString)) - now := time.Now() - got, err := s.Next(tt.args.in0) - if (err != nil) != tt.wantErr { - t.Errorf("SnapshotIterator.Next() error = %v, wantErr %v", err, tt.wantErr) - } - - i.True(got.Metadata["table"] == tt.fields.table) - delete(got.Metadata, "table") // delete so we don't compare in diff - i.True(got.CreatedAt.After(now)) - got.CreatedAt = time.Time{} // reset so we don't compare in diff - - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("SnapshotIterator.Next() = %v, want %v", got, tt.want) - if diff := cmp.Diff(got, tt.want); diff != "" { - t.Log(diff) - } - } - }) - } -} + testConn.Exec(ctx, "BEGIN ISOLATION LEVEL REPEATABLE READ;") + query := `SELECT * FROM pg_catalog.pg_export_snapshot();` + rows, err := testConn.Query(context.Background(), query) + i.NoErr(err) -func TestIteration(t *testing.T) { - i := is.New(t) - ctx := context.Background() - db := test.ConnectSimple(ctx, t, test.RepmgrConnString) + var name *string + i.True(rows.Next()) + err = rows.Scan(&name) + i.NoErr(err) - s, err := NewSnapshotIterator(context.Background(), SnapshotConfig{ - SnapshotName: createTestSnapshot(t, db), - Table: test.SetupTestTable(ctx, t, db), + snapshotConn := test.ConnectSimple(ctx, t, test.RegularConnString) + s, err := NewSnapshotIterator(context.Background(), snapshotConn, SnapshotConfig{ + SnapshotName: *name, + URI: test.RegularConnString, + Table: table, Columns: []string{"id", "key", "column1", "column2", "column3"}, KeyColumn: "key", - URI: test.RepmgrConnString, }) i.NoErr(err) now := time.Now() - for idx := 0; idx < 2; idx++ { - rec, err := s.Next(ctx) - i.Equal(string(rec.Position), strconv.FormatInt(int64(idx), 10)) - i.NoErr(err) - t.Log(rec) - rec.CreatedAt.After(now) - } + rec, err := s.Next(ctx) + i.NoErr(err) - i.NoErr(s.Teardown(ctx)) -} + i.True(rec.CreatedAt.After(now)) + i.Equal(rec.Metadata["action"], "snapshot") + rec.CreatedAt = time.Time{} // reset time for comparison -func createTestSnapshot(t *testing.T, db *pgx.Conn) string { - var n *string - query := `SELECT * FROM pg_catalog.pg_export_snapshot();` - row := db.QueryRow(context.Background(), query) - err := row.Scan(&n) - if err != nil { - t.Logf("failed to scan name: %s", err) - t.Fail() - } - return *n + i.Equal(rec, sdk.Record{ + Position: sdk.Position("0"), + Key: sdk.StructuredData{ + "key": []uint8("1"), + }, + Payload: sdk.StructuredData{ + "id": int64(1), + "column1": "foo", + "column2": int32(123), + "column3": bool(false), + }, + Metadata: map[string]string{ + "action": actionSnapshot, + "table": table, + }, + }) + + err = s.Teardown(ctx) + i.NoErr(err) + + rows.Close() + i.NoErr(err) } From 7aa9acdd3b23f360e3bb7cbf1b0fefdb26961f74 Mon Sep 17 00:00:00 2001 From: dylan_lott Date: Mon, 4 Apr 2022 21:27:27 -0600 Subject: [PATCH 03/30] linter fixes --- source/logrepl/snapshot_test.go | 4 +++- source/source.go | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/source/logrepl/snapshot_test.go b/source/logrepl/snapshot_test.go index 68a2b188..52c4bb01 100644 --- a/source/logrepl/snapshot_test.go +++ b/source/logrepl/snapshot_test.go @@ -32,7 +32,9 @@ func TestLifecycle(t *testing.T) { testConn := test.ConnectSimple(ctx, t, test.RegularConnString) table := test.SetupTestTable(ctx, t, test.ConnectSimple(ctx, t, test.RegularConnString)) - testConn.Exec(ctx, "BEGIN ISOLATION LEVEL REPEATABLE READ;") + _, err := testConn.Exec(ctx, "BEGIN ISOLATION LEVEL REPEATABLE READ;") + i.NoErr(err) + query := `SELECT * FROM pg_catalog.pg_export_snapshot();` rows, err := testConn.Query(context.Background(), query) i.NoErr(err) diff --git a/source/source.go b/source/source.go index a5151e8b..77c0f707 100644 --- a/source/source.go +++ b/source/source.go @@ -64,6 +64,8 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error { fallthrough case CDCModeLogrepl: if s.config.SnapshotMode == SnapshotModeInitial { + // TODO wire this up + // TODO create snapshot iterator for logical replication and pass // the snapshot mode in the config sdk.Logger(ctx).Warn().Msg("snapshot not supported in logical replication mode") From 4242433389f795d13fd0cec916cb34f9785e9fb3 Mon Sep 17 00:00:00 2001 From: dylan_lott Date: Tue, 5 Apr 2022 17:26:33 -0600 Subject: [PATCH 04/30] refactors tx handling to use pgx.Tx --- source/logrepl/snapshot.go | 30 +++++++++++++----------------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/source/logrepl/snapshot.go b/source/logrepl/snapshot.go index b9120210..a8ffa060 100644 --- a/source/logrepl/snapshot.go +++ b/source/logrepl/snapshot.go @@ -44,7 +44,7 @@ type SnapshotConfig struct { type SnapshotIterator struct { config SnapshotConfig - conn *pgx.Conn + tx pgx.Tx rows pgx.Rows internalPos int64 @@ -53,10 +53,9 @@ type SnapshotIterator struct { func NewSnapshotIterator(ctx context.Context, conn *pgx.Conn, cfg SnapshotConfig) (*SnapshotIterator, error) { s := &SnapshotIterator{ config: cfg, - conn: conn, } - err := s.startSnapshotTx(ctx) + err := s.startSnapshotTx(ctx, conn) if err != nil { return nil, fmt.Errorf("failed to start snapshot tx: %w", err) } @@ -78,7 +77,7 @@ func (s *SnapshotIterator) loadRows(ctx context.Context) error { return fmt.Errorf("failed to create read query: %w", err) } - rows, err := s.conn.Query(ctx, query, args...) + rows, err := s.tx.Query(ctx, query, args...) if err != nil { return fmt.Errorf("failed to query rows: %w", err) } @@ -87,14 +86,19 @@ func (s *SnapshotIterator) loadRows(ctx context.Context) error { return nil } -func (s *SnapshotIterator) startSnapshotTx(ctx context.Context) error { - _, err := s.conn.Exec(ctx, `BEGIN ISOLATION LEVEL REPEATABLE READ;`) +func (s *SnapshotIterator) startSnapshotTx(ctx context.Context, conn *pgx.Conn) error { + tx, err := conn.BeginTx(ctx, pgx.TxOptions{ + IsoLevel: pgx.RepeatableRead, + AccessMode: pgx.ReadOnly, + }) if err != nil { - return nil + return err } + s.tx = tx + snapshotTx := fmt.Sprintf(`SET TRANSACTION SNAPSHOT '%s'`, s.config.SnapshotName) - _, err = s.conn.Exec(ctx, snapshotTx) + _, err = tx.Exec(ctx, snapshotTx) if err != nil { return err } @@ -122,15 +126,7 @@ func (s *SnapshotIterator) Ack(ctx context.Context, pos sdk.Position) error { // Teardown attempts to gracefully teardown the iterator. func (s *SnapshotIterator) Teardown(ctx context.Context) error { s.rows.Close() - return s.commit(ctx) -} - -func (s *SnapshotIterator) commit(ctx context.Context) error { - _, err := s.conn.Exec(ctx, `COMMIT;`) - if err != nil { - return fmt.Errorf("failed to commit: %w", err) - } - return nil + return s.tx.Commit(ctx) } func (s *SnapshotIterator) buildRecord(ctx context.Context) (sdk.Record, error) { From a549d9f4da35d946b1f5b6c1c51590b454312518 Mon Sep 17 00:00:00 2001 From: dylan_lott Date: Tue, 5 Apr 2022 17:28:03 -0600 Subject: [PATCH 05/30] use is instead of i for package reference --- source/logrepl/snapshot_test.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/source/logrepl/snapshot_test.go b/source/logrepl/snapshot_test.go index 52c4bb01..1555fbf5 100644 --- a/source/logrepl/snapshot_test.go +++ b/source/logrepl/snapshot_test.go @@ -26,23 +26,23 @@ import ( ) func TestLifecycle(t *testing.T) { - i := is.New(t) + is := is.New(t) ctx := context.Background() testConn := test.ConnectSimple(ctx, t, test.RegularConnString) table := test.SetupTestTable(ctx, t, test.ConnectSimple(ctx, t, test.RegularConnString)) _, err := testConn.Exec(ctx, "BEGIN ISOLATION LEVEL REPEATABLE READ;") - i.NoErr(err) + is.NoErr(err) query := `SELECT * FROM pg_catalog.pg_export_snapshot();` rows, err := testConn.Query(context.Background(), query) - i.NoErr(err) + is.NoErr(err) var name *string - i.True(rows.Next()) + is.True(rows.Next()) err = rows.Scan(&name) - i.NoErr(err) + is.NoErr(err) snapshotConn := test.ConnectSimple(ctx, t, test.RegularConnString) s, err := NewSnapshotIterator(context.Background(), snapshotConn, SnapshotConfig{ @@ -52,17 +52,17 @@ func TestLifecycle(t *testing.T) { Columns: []string{"id", "key", "column1", "column2", "column3"}, KeyColumn: "key", }) - i.NoErr(err) + is.NoErr(err) now := time.Now() rec, err := s.Next(ctx) - i.NoErr(err) + is.NoErr(err) - i.True(rec.CreatedAt.After(now)) - i.Equal(rec.Metadata["action"], "snapshot") + is.True(rec.CreatedAt.After(now)) + is.Equal(rec.Metadata["action"], "snapshot") rec.CreatedAt = time.Time{} // reset time for comparison - i.Equal(rec, sdk.Record{ + is.Equal(rec, sdk.Record{ Position: sdk.Position("0"), Key: sdk.StructuredData{ "key": []uint8("1"), @@ -80,8 +80,8 @@ func TestLifecycle(t *testing.T) { }) err = s.Teardown(ctx) - i.NoErr(err) + is.NoErr(err) rows.Close() - i.NoErr(err) + is.NoErr(err) } From 3292e8ca0bb0c9014f1843bb23e1922fff1de89d Mon Sep 17 00:00:00 2001 From: dylan_lott Date: Wed, 6 Apr 2022 11:06:26 -0600 Subject: [PATCH 06/30] clean up and review feedback --- source/logrepl/snapshot.go | 13 +++++---- source/logrepl/snapshot_test.go | 47 ++++++++++++++++++++------------- 2 files changed, 35 insertions(+), 25 deletions(-) diff --git a/source/logrepl/snapshot.go b/source/logrepl/snapshot.go index a8ffa060..85e93dd3 100644 --- a/source/logrepl/snapshot.go +++ b/source/logrepl/snapshot.go @@ -34,7 +34,6 @@ var psql = sq.StatementBuilder.PlaceholderFormat(sq.Dollar) const actionSnapshot string = "snapshot" type SnapshotConfig struct { - URI string SnapshotName string Table string Columns []string @@ -74,11 +73,13 @@ func (s *SnapshotIterator) loadRows(ctx context.Context) error { From(s.config.Table). ToSql() if err != nil { + s.rows.Close() return fmt.Errorf("failed to create read query: %w", err) } rows, err := s.tx.Query(ctx, query, args...) if err != nil { + s.rows.Close() return fmt.Errorf("failed to query rows: %w", err) } s.rows = rows @@ -100,6 +101,7 @@ func (s *SnapshotIterator) startSnapshotTx(ctx context.Context, conn *pgx.Conn) snapshotTx := fmt.Sprintf(`SET TRANSACTION SNAPSHOT '%s'`, s.config.SnapshotName) _, err = tx.Exec(ctx, snapshotTx) if err != nil { + defer s.tx.Rollback(ctx) return err } @@ -125,15 +127,12 @@ func (s *SnapshotIterator) Ack(ctx context.Context, pos sdk.Position) error { // Teardown attempts to gracefully teardown the iterator. func (s *SnapshotIterator) Teardown(ctx context.Context) error { - s.rows.Close() - return s.tx.Commit(ctx) + defer s.tx.Commit(ctx) + defer s.rows.Close() + return s.rows.Err() } func (s *SnapshotIterator) buildRecord(ctx context.Context) (sdk.Record, error) { - if err := s.rows.Err(); err != nil { - return sdk.Record{}, fmt.Errorf("build record rows error: %w", err) - } - r, err := withPayloadAndKey(sdk.Record{}, s.rows, s.config.Columns, s.config.KeyColumn) if err != nil { return sdk.Record{}, err diff --git a/source/logrepl/snapshot_test.go b/source/logrepl/snapshot_test.go index 1555fbf5..df2ee2e5 100644 --- a/source/logrepl/snapshot_test.go +++ b/source/logrepl/snapshot_test.go @@ -21,6 +21,8 @@ import ( "github.com/conduitio/conduit-connector-postgres/test" sdk "github.com/conduitio/conduit-connector-sdk" + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" "github.com/matryer/is" ) @@ -28,31 +30,24 @@ import ( func TestLifecycle(t *testing.T) { is := is.New(t) ctx := context.Background() + pool := test.ConnectPool(ctx, t, test.RegularConnString) + table := test.SetupTestTable(ctx, t, pool) - testConn := test.ConnectSimple(ctx, t, test.RegularConnString) - table := test.SetupTestTable(ctx, t, test.ConnectSimple(ctx, t, test.RegularConnString)) + tx, name := createTestSnapshot(t, pool) + t.Cleanup(func() { tx.Commit(ctx) }) - _, err := testConn.Exec(ctx, "BEGIN ISOLATION LEVEL REPEATABLE READ;") + snapshotConn, err := pool.Acquire(ctx) is.NoErr(err) + t.Cleanup(func() { snapshotConn.Release() }) - query := `SELECT * FROM pg_catalog.pg_export_snapshot();` - rows, err := testConn.Query(context.Background(), query) - is.NoErr(err) - - var name *string - is.True(rows.Next()) - err = rows.Scan(&name) - is.NoErr(err) - - snapshotConn := test.ConnectSimple(ctx, t, test.RegularConnString) - s, err := NewSnapshotIterator(context.Background(), snapshotConn, SnapshotConfig{ - SnapshotName: *name, - URI: test.RegularConnString, + s, err := NewSnapshotIterator(context.Background(), snapshotConn.Conn(), SnapshotConfig{ + SnapshotName: name, Table: table, Columns: []string{"id", "key", "column1", "column2", "column3"}, KeyColumn: "key", }) is.NoErr(err) + t.Cleanup(func() { s.Teardown(ctx) }) now := time.Now() rec, err := s.Next(ctx) @@ -78,10 +73,26 @@ func TestLifecycle(t *testing.T) { "table": table, }, }) +} - err = s.Teardown(ctx) +// createTestSnapshot starts a transaction that stays open while a snapshot run. +// Otherwise, postgres deletes the snapshot as soon as this tx commits, +// an our snapshot iterator won't find a snapshot at the specifiedname. +// https://www.postgresql.org/docs/current/sql-set-transaction.html +func createTestSnapshot(t *testing.T, pool *pgxpool.Pool) (pgx.Tx, string) { + ctx := context.Background() + is := is.New(t) + + tx, err := pool.Begin(ctx) + is.NoErr(err) + query := `SELECT * FROM pg_catalog.pg_export_snapshot();` + rows, err := tx.Query(context.Background(), query) is.NoErr(err) - rows.Close() + var name *string + is.True(rows.Next()) + err = rows.Scan(&name) is.NoErr(err) + + return tx, *name } From 3f20c277ef08602a1147ee3d5b909086cc51d8f1 Mon Sep 17 00:00:00 2001 From: dylan_lott Date: Wed, 6 Apr 2022 11:16:38 -0600 Subject: [PATCH 07/30] includes table name in snapshot positions --- source/logrepl/snapshot.go | 11 +++++++++-- source/logrepl/snapshot_test.go | 3 ++- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/source/logrepl/snapshot.go b/source/logrepl/snapshot.go index 85e93dd3..0a78bdf7 100644 --- a/source/logrepl/snapshot.go +++ b/source/logrepl/snapshot.go @@ -145,12 +145,19 @@ func (s *SnapshotIterator) buildRecord(ctx context.Context) (sdk.Record, error) "table": s.config.Table, } - r.Position = sdk.Position(strconv.FormatInt(s.internalPos, 10)) - s.internalPos++ + r.Position = s.formatPosition() return r, nil } +// withPosition adds a position to a record that contains the table name and +// the record's position in the current snapshot, aka it's number. +func (s *SnapshotIterator) formatPosition() sdk.Position { + position := fmt.Sprintf("%s:%s", s.config.Table, strconv.FormatInt(s.internalPos, 10)) + s.internalPos++ + return sdk.Position(position) +} + // withPayloadAndKey builds a record's payload from *sql.Rows. It calls // Scan so it assumes that Next has been checked previously. func withPayloadAndKey(rec sdk.Record, rows pgx.Rows, columns []string, key string) (sdk.Record, error) { diff --git a/source/logrepl/snapshot_test.go b/source/logrepl/snapshot_test.go index df2ee2e5..a2268124 100644 --- a/source/logrepl/snapshot_test.go +++ b/source/logrepl/snapshot_test.go @@ -16,6 +16,7 @@ package logrepl import ( "context" + "fmt" "testing" "time" @@ -58,7 +59,7 @@ func TestLifecycle(t *testing.T) { rec.CreatedAt = time.Time{} // reset time for comparison is.Equal(rec, sdk.Record{ - Position: sdk.Position("0"), + Position: sdk.Position(fmt.Sprintf("%s:0", table)), Key: sdk.StructuredData{ "key": []uint8("1"), }, From 0c597bfdaf84a2c6fd9732af0ba34e9e6b969560 Mon Sep 17 00:00:00 2001 From: dylan_lott Date: Wed, 6 Apr 2022 16:25:54 -0600 Subject: [PATCH 08/30] use pooled connection in tests --- source/logrepl/snapshot.go | 12 ++++++---- source/logrepl/snapshot_test.go | 42 ++++++++++++++++++++++----------- source/source.go | 2 -- 3 files changed, 36 insertions(+), 20 deletions(-) diff --git a/source/logrepl/snapshot.go b/source/logrepl/snapshot.go index 0a78bdf7..8feaf3f2 100644 --- a/source/logrepl/snapshot.go +++ b/source/logrepl/snapshot.go @@ -101,8 +101,10 @@ func (s *SnapshotIterator) startSnapshotTx(ctx context.Context, conn *pgx.Conn) snapshotTx := fmt.Sprintf(`SET TRANSACTION SNAPSHOT '%s'`, s.config.SnapshotName) _, err = tx.Exec(ctx, snapshotTx) if err != nil { - defer s.tx.Rollback(ctx) - return err + if rollErr := s.tx.Rollback(ctx); rollErr != nil { + sdk.Logger(ctx).Err(rollErr).Msg("set transaction rollback failed") + } + return fmt.Errorf("failed to set transaction snapshot id: %w", err) } return nil @@ -127,8 +129,10 @@ func (s *SnapshotIterator) Ack(ctx context.Context, pos sdk.Position) error { // Teardown attempts to gracefully teardown the iterator. func (s *SnapshotIterator) Teardown(ctx context.Context) error { - defer s.tx.Commit(ctx) - defer s.rows.Close() + s.rows.Close() + if commitErr := s.tx.Commit(ctx); commitErr != nil { + sdk.Logger(ctx).Err(commitErr).Msg("teardown commit failed") + } return s.rows.Err() } diff --git a/source/logrepl/snapshot_test.go b/source/logrepl/snapshot_test.go index a2268124..b09d5a58 100644 --- a/source/logrepl/snapshot_test.go +++ b/source/logrepl/snapshot_test.go @@ -22,7 +22,6 @@ import ( "github.com/conduitio/conduit-connector-postgres/test" sdk "github.com/conduitio/conduit-connector-sdk" - "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" "github.com/matryer/is" @@ -31,24 +30,21 @@ import ( func TestLifecycle(t *testing.T) { is := is.New(t) ctx := context.Background() - pool := test.ConnectPool(ctx, t, test.RegularConnString) - table := test.SetupTestTable(ctx, t, pool) - tx, name := createTestSnapshot(t, pool) - t.Cleanup(func() { tx.Commit(ctx) }) + pool := test.ConnectPool(ctx, t, test.RegularConnString) + table := createTestTable(t, pool) + name := createTestSnapshot(t, pool) - snapshotConn, err := pool.Acquire(ctx) + conn, err := pool.Acquire(ctx) is.NoErr(err) - t.Cleanup(func() { snapshotConn.Release() }) - - s, err := NewSnapshotIterator(context.Background(), snapshotConn.Conn(), SnapshotConfig{ + s, err := NewSnapshotIterator(context.Background(), conn.Conn(), SnapshotConfig{ SnapshotName: name, Table: table, Columns: []string{"id", "key", "column1", "column2", "column3"}, KeyColumn: "key", }) is.NoErr(err) - t.Cleanup(func() { s.Teardown(ctx) }) + t.Cleanup(func() { conn.Release() }) now := time.Now() rec, err := s.Next(ctx) @@ -74,17 +70,19 @@ func TestLifecycle(t *testing.T) { "table": table, }, }) + is.NoErr(s.Teardown(ctx)) // TODO: Should return an error } // createTestSnapshot starts a transaction that stays open while a snapshot run. // Otherwise, postgres deletes the snapshot as soon as this tx commits, // an our snapshot iterator won't find a snapshot at the specifiedname. // https://www.postgresql.org/docs/current/sql-set-transaction.html -func createTestSnapshot(t *testing.T, pool *pgxpool.Pool) (pgx.Tx, string) { +func createTestSnapshot(t *testing.T, pool *pgxpool.Pool) string { ctx := context.Background() is := is.New(t) - - tx, err := pool.Begin(ctx) + conn, err := pool.Acquire(ctx) + is.NoErr(err) + tx, err := conn.Begin(ctx) is.NoErr(err) query := `SELECT * FROM pg_catalog.pg_export_snapshot();` rows, err := tx.Query(context.Background(), query) @@ -95,5 +93,21 @@ func createTestSnapshot(t *testing.T, pool *pgxpool.Pool) (pgx.Tx, string) { err = rows.Scan(&name) is.NoErr(err) - return tx, *name + t.Cleanup(func() { + rows.Close() + is.NoErr(tx.Commit(ctx)) + conn.Release() + }) + + return *name +} + +func createTestTable(t *testing.T, pool *pgxpool.Pool) string { + is := is.New(t) + ctx := context.Background() + tblConn, err := pool.Acquire(ctx) + is.NoErr(err) + table := test.SetupTestTable(ctx, t, tblConn.Conn()) + t.Cleanup(func() { tblConn.Release() }) + return table } diff --git a/source/source.go b/source/source.go index 77c0f707..a5151e8b 100644 --- a/source/source.go +++ b/source/source.go @@ -64,8 +64,6 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error { fallthrough case CDCModeLogrepl: if s.config.SnapshotMode == SnapshotModeInitial { - // TODO wire this up - // TODO create snapshot iterator for logical replication and pass // the snapshot mode in the config sdk.Logger(ctx).Warn().Msg("snapshot not supported in logical replication mode") From c12791ea02edcbaff2e7aa2ca2b4895c84afba8e Mon Sep 17 00:00:00 2001 From: dylan_lott Date: Wed, 6 Apr 2022 16:35:08 -0600 Subject: [PATCH 09/30] returns snapshot incomplete error when snapshot is interrupted --- source/logrepl/snapshot.go | 22 +++++++++++++++++++--- source/logrepl/snapshot_test.go | 2 +- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/source/logrepl/snapshot.go b/source/logrepl/snapshot.go index 8feaf3f2..0e881b20 100644 --- a/source/logrepl/snapshot.go +++ b/source/logrepl/snapshot.go @@ -17,6 +17,7 @@ package logrepl import ( "context" "database/sql" + "errors" "fmt" "strconv" "time" @@ -29,6 +30,12 @@ import ( "github.com/jackc/pgx/v4" ) +// ErrSnapshotComplete is returned by Next when a snapshot is finished +var ErrSnapshotComplete = errors.New("ErrSnapshotComplete") + +// ErrSnapshotInterrupted is returned by Teardown when a snapshot is interrupted +var ErrSnapshotInterrupt = errors.New("ErrSnapshotInterrupt") + var psql = sq.StatementBuilder.PlaceholderFormat(sq.Dollar) const actionSnapshot string = "snapshot" @@ -46,6 +53,7 @@ type SnapshotIterator struct { tx pgx.Tx rows pgx.Rows + complete bool internalPos int64 } @@ -115,8 +123,8 @@ func (s *SnapshotIterator) Next(ctx context.Context) (sdk.Record, error) { if err := s.rows.Err(); err != nil { return sdk.Record{}, fmt.Errorf("rows error: %w", err) } - - return sdk.Record{}, sdk.ErrBackoffRetry + s.complete = true + return sdk.Record{}, ErrSnapshotComplete } return s.buildRecord(ctx) @@ -133,7 +141,15 @@ func (s *SnapshotIterator) Teardown(ctx context.Context) error { if commitErr := s.tx.Commit(ctx); commitErr != nil { sdk.Logger(ctx).Err(commitErr).Msg("teardown commit failed") } - return s.rows.Err() + if rowsErr := s.rows.Err(); rowsErr != nil { + sdk.Logger(ctx).Err(rowsErr).Msg("rows returned an error") + } + + if !s.complete { + sdk.Logger(ctx).Warn().Msg("snapshot interrupted") + return ErrSnapshotInterrupt + } + return nil } func (s *SnapshotIterator) buildRecord(ctx context.Context) (sdk.Record, error) { diff --git a/source/logrepl/snapshot_test.go b/source/logrepl/snapshot_test.go index b09d5a58..40f2e435 100644 --- a/source/logrepl/snapshot_test.go +++ b/source/logrepl/snapshot_test.go @@ -70,7 +70,7 @@ func TestLifecycle(t *testing.T) { "table": table, }, }) - is.NoErr(s.Teardown(ctx)) // TODO: Should return an error + is.Equal(ErrSnapshotInterrupt.Error(), s.Teardown(ctx).Error()) } // createTestSnapshot starts a transaction that stays open while a snapshot run. From c9c958d8fb97129aaa4b22693ed90c8cffe7baa3 Mon Sep 17 00:00:00 2001 From: dylan_lott Date: Wed, 6 Apr 2022 16:43:46 -0600 Subject: [PATCH 10/30] linter fix --- source/logrepl/snapshot.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/logrepl/snapshot.go b/source/logrepl/snapshot.go index 0e881b20..c9b304cd 100644 --- a/source/logrepl/snapshot.go +++ b/source/logrepl/snapshot.go @@ -33,7 +33,7 @@ import ( // ErrSnapshotComplete is returned by Next when a snapshot is finished var ErrSnapshotComplete = errors.New("ErrSnapshotComplete") -// ErrSnapshotInterrupted is returned by Teardown when a snapshot is interrupted +// ErrSnapshotInterrupt is returned by Teardown when a snapshot is interrupted var ErrSnapshotInterrupt = errors.New("ErrSnapshotInterrupt") var psql = sq.StatementBuilder.PlaceholderFormat(sq.Dollar) From 9c97b1b1e1aa2f4feabc8a1c34075be6dbe79bd1 Mon Sep 17 00:00:00 2001 From: dylan_lott Date: Wed, 6 Apr 2022 16:54:49 -0600 Subject: [PATCH 11/30] refactors the snapshot iterator setup in tests --- source/logrepl/snapshot_test.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/source/logrepl/snapshot_test.go b/source/logrepl/snapshot_test.go index 40f2e435..ab3fe8e8 100644 --- a/source/logrepl/snapshot_test.go +++ b/source/logrepl/snapshot_test.go @@ -27,7 +27,11 @@ import ( "github.com/matryer/is" ) -func TestLifecycle(t *testing.T) { +// createTestSnapshotIterator creates a new test table, starts a snapshot tx +// on it, and then creates a test SnapshotIterator with the ID of that snapshot. +// It returns that SnapshotIterator and the string name of the test table. +// This function handles its own pooled connection cleanup. +func createTestSnapshotIterator(t *testing.T) (*SnapshotIterator, string) { is := is.New(t) ctx := context.Background() @@ -45,6 +49,14 @@ func TestLifecycle(t *testing.T) { }) is.NoErr(err) t.Cleanup(func() { conn.Release() }) + return s, table +} + +func TestLifecycle(t *testing.T) { + is := is.New(t) + ctx := context.Background() + + s, table := createTestSnapshotIterator(t) now := time.Now() rec, err := s.Next(ctx) From f17f6908cd163184837d703114bc804091c913d7 Mon Sep 17 00:00:00 2001 From: dylan_lott Date: Wed, 6 Apr 2022 17:07:26 -0600 Subject: [PATCH 12/30] adds full iteration test --- source/logrepl/snapshot_test.go | 60 +++++++++++++++++++++------------ 1 file changed, 39 insertions(+), 21 deletions(-) diff --git a/source/logrepl/snapshot_test.go b/source/logrepl/snapshot_test.go index ab3fe8e8..63e02f8b 100644 --- a/source/logrepl/snapshot_test.go +++ b/source/logrepl/snapshot_test.go @@ -17,6 +17,7 @@ package logrepl import ( "context" "fmt" + "log" "testing" "time" @@ -27,29 +28,20 @@ import ( "github.com/matryer/is" ) -// createTestSnapshotIterator creates a new test table, starts a snapshot tx -// on it, and then creates a test SnapshotIterator with the ID of that snapshot. -// It returns that SnapshotIterator and the string name of the test table. -// This function handles its own pooled connection cleanup. -func createTestSnapshotIterator(t *testing.T) (*SnapshotIterator, string) { - is := is.New(t) +func TestFullIteration(t *testing.T) { ctx := context.Background() + is := is.New(t) + s, table := createTestSnapshotIterator(t) - pool := test.ConnectPool(ctx, t, test.RegularConnString) - table := createTestTable(t, pool) - name := createTestSnapshot(t, pool) - - conn, err := pool.Acquire(ctx) - is.NoErr(err) - s, err := NewSnapshotIterator(context.Background(), conn.Conn(), SnapshotConfig{ - SnapshotName: name, - Table: table, - Columns: []string{"id", "key", "column1", "column2", "column3"}, - KeyColumn: "key", - }) - is.NoErr(err) - t.Cleanup(func() { conn.Release() }) - return s, table + for i := 0; i < 4; i++ { + rec, err := s.Next(ctx) + log.Println(i) + is.Equal(rec.Position, sdk.Position(fmt.Sprintf("%s:%d", table, i))) + is.NoErr(err) + } + r, err := s.Next(ctx) + is.Equal(r, sdk.Record{}) + is.Equal(err.Error(), ErrSnapshotComplete.Error()) } func TestLifecycle(t *testing.T) { @@ -114,6 +106,32 @@ func createTestSnapshot(t *testing.T, pool *pgxpool.Pool) string { return *name } +// createTestSnapshotIterator creates a new test table, starts a snapshot tx +// on it, and then creates a test SnapshotIterator with the ID of that snapshot. +// It returns that SnapshotIterator and the string name of the test table. +// This function handles its own pooled connection cleanup. +func createTestSnapshotIterator(t *testing.T) (*SnapshotIterator, string) { + is := is.New(t) + ctx := context.Background() + + pool := test.ConnectPool(ctx, t, test.RegularConnString) + table := createTestTable(t, pool) + name := createTestSnapshot(t, pool) + + conn, err := pool.Acquire(ctx) + is.NoErr(err) + s, err := NewSnapshotIterator(context.Background(), conn.Conn(), SnapshotConfig{ + SnapshotName: name, + Table: table, + Columns: []string{"id", "key", "column1", "column2", "column3"}, + KeyColumn: "key", + }) + is.NoErr(err) + t.Cleanup(func() { conn.Release() }) + return s, table +} + +// createTestTable sets up a test table and cleans up after itself. func createTestTable(t *testing.T, pool *pgxpool.Pool) string { is := is.New(t) ctx := context.Background() From 8173c48eaaa947a4a623875dc3221f22c3705696 Mon Sep 17 00:00:00 2001 From: dylan_lott Date: Thu, 7 Apr 2022 09:34:51 -0600 Subject: [PATCH 13/30] WIP iteration tests --- source/logrepl/snapshot.go | 1 - source/logrepl/snapshot_test.go | 89 +++++++++++++++++++++++++++++---- test/helper.go | 4 +- 3 files changed, 83 insertions(+), 11 deletions(-) diff --git a/source/logrepl/snapshot.go b/source/logrepl/snapshot.go index c9b304cd..e4e632f3 100644 --- a/source/logrepl/snapshot.go +++ b/source/logrepl/snapshot.go @@ -144,7 +144,6 @@ func (s *SnapshotIterator) Teardown(ctx context.Context) error { if rowsErr := s.rows.Err(); rowsErr != nil { sdk.Logger(ctx).Err(rowsErr).Msg("rows returned an error") } - if !s.complete { sdk.Logger(ctx).Warn().Msg("snapshot interrupted") return ErrSnapshotInterrupt diff --git a/source/logrepl/snapshot_test.go b/source/logrepl/snapshot_test.go index 63e02f8b..57e3842f 100644 --- a/source/logrepl/snapshot_test.go +++ b/source/logrepl/snapshot_test.go @@ -24,15 +24,84 @@ import ( "github.com/conduitio/conduit-connector-postgres/test" sdk "github.com/conduitio/conduit-connector-sdk" "github.com/jackc/pgx/v4/pgxpool" - "github.com/matryer/is" ) +var ( + columns = []string{"id", "key", "column1", "column2", "column3"} + key = "key" +) + +func TestSnapshotIterator_Teardown(t *testing.T) { + is := is.New(t) + type args struct { + ctx context.Context + } + tests := []struct { + name string + setup func(t *testing.T) *SnapshotIterator + args args + wantErr bool + wanted error + }{ + { + name: "should return interrupt when next never called", + setup: func(t *testing.T) *SnapshotIterator { + s, _ := createTestSnapshotIterator(t, columns, key) + return s + }, + args: args{ + ctx: context.Background(), + }, + wantErr: true, + wanted: ErrSnapshotInterrupt, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := tt.setup(t) + if err := s.Teardown(tt.args.ctx); (err != nil) != tt.wantErr { + if tt.wantErr { + is.Equal(tt.wanted, err) + } else { + t.Errorf("SnapshotIterator.Teardown() error = %v, wantErr %v", err, tt.wantErr) + } + } + }) + } +} + +func TestSnapshotAtomicity(t *testing.T) { + is := is.New(t) + ctx := context.Background() + pool := test.ConnectPool(ctx, t, test.RegularConnString) + + // start our snapshot iterator + s, table := createTestSnapshotIterator(t, columns, key) + t.Cleanup(func() { s.Teardown(ctx) }) + is.Equal(s.complete, false) + + // add a record to our table after snapshot started + insertQuery := fmt.Sprintf(`INSERT INTO %s (id, column1, column2, column3) + VALUES (6, 'bizz', 456, false)`, table) + _, err := pool.Exec(ctx, insertQuery) + is.NoErr(err) + + // assert record does not appear in snapshot + for i := 0; i < 4; i++ { + r, err := s.Next(ctx) + if err != nil { + is.Equal(err, ErrSnapshotComplete) + is.Equal(r, sdk.Record{}) + is.Equal(s.complete, true) + } + } +} + func TestFullIteration(t *testing.T) { ctx := context.Background() is := is.New(t) - s, table := createTestSnapshotIterator(t) - + s, table := createTestSnapshotIterator(t, []string{"id", "key"}, "key") for i := 0; i < 4; i++ { rec, err := s.Next(ctx) log.Println(i) @@ -44,11 +113,12 @@ func TestFullIteration(t *testing.T) { is.Equal(err.Error(), ErrSnapshotComplete.Error()) } -func TestLifecycle(t *testing.T) { +func TestLifecycleErrInterrupt(t *testing.T) { is := is.New(t) ctx := context.Background() - s, table := createTestSnapshotIterator(t) + s, table := createTestSnapshotIterator(t, + []string{"id", "key", "column1", "column2", "column3"}, "key") now := time.Now() rec, err := s.Next(ctx) @@ -109,8 +179,9 @@ func createTestSnapshot(t *testing.T, pool *pgxpool.Pool) string { // createTestSnapshotIterator creates a new test table, starts a snapshot tx // on it, and then creates a test SnapshotIterator with the ID of that snapshot. // It returns that SnapshotIterator and the string name of the test table. -// This function handles its own pooled connection cleanup. -func createTestSnapshotIterator(t *testing.T) (*SnapshotIterator, string) { +// This function handles its own pooled connection cleanup, but _not_, the +// SnapshotIterator's Teardown. +func createTestSnapshotIterator(t *testing.T, columns []string, key string) (*SnapshotIterator, string) { is := is.New(t) ctx := context.Background() @@ -123,8 +194,8 @@ func createTestSnapshotIterator(t *testing.T) (*SnapshotIterator, string) { s, err := NewSnapshotIterator(context.Background(), conn.Conn(), SnapshotConfig{ SnapshotName: name, Table: table, - Columns: []string{"id", "key", "column1", "column2", "column3"}, - KeyColumn: "key", + Columns: columns, + KeyColumn: key, }) is.NoErr(err) t.Cleanup(func() { conn.Release() }) diff --git a/test/helper.go b/test/helper.go index b29b0863..7c74b677 100644 --- a/test/helper.go +++ b/test/helper.go @@ -100,7 +100,9 @@ func SetupTestTable(ctx context.Context, t *testing.T, conn Querier) string { } func RandomIdentifier(t *testing.T) string { - return fmt.Sprintf("conduit_%v_%d", strings.ToLower(t.Name()), time.Now().UnixMicro()%1000) + return fmt.Sprintf("conduit_%v_%d", + strings.Replace(strings.ToLower(t.Name()), "/", "_", -1), + time.Now().UnixMicro()%1000) } func IsPgError(is *is.I, err error, wantCode string) { From b9e2a1da12403dbfcae10f3f567c8a34fd16cafa Mon Sep 17 00:00:00 2001 From: dylan_lott Date: Thu, 7 Apr 2022 12:16:03 -0600 Subject: [PATCH 14/30] checks for context errors in Next --- source/logrepl/snapshot.go | 7 +++++-- source/logrepl/snapshot_test.go | 29 +++++++++++------------------ 2 files changed, 16 insertions(+), 20 deletions(-) diff --git a/source/logrepl/snapshot.go b/source/logrepl/snapshot.go index 8feaf3f2..ed476546 100644 --- a/source/logrepl/snapshot.go +++ b/source/logrepl/snapshot.go @@ -61,6 +61,7 @@ func NewSnapshotIterator(ctx context.Context, conn *pgx.Conn, cfg SnapshotConfig err = s.loadRows(ctx) if err != nil { + defer s.tx.Rollback(ctx) return nil, fmt.Errorf("failed to load rows: %w", err) } @@ -73,13 +74,11 @@ func (s *SnapshotIterator) loadRows(ctx context.Context) error { From(s.config.Table). ToSql() if err != nil { - s.rows.Close() return fmt.Errorf("failed to create read query: %w", err) } rows, err := s.tx.Query(ctx, query, args...) if err != nil { - s.rows.Close() return fmt.Errorf("failed to query rows: %w", err) } s.rows = rows @@ -111,6 +110,10 @@ func (s *SnapshotIterator) startSnapshotTx(ctx context.Context, conn *pgx.Conn) } func (s *SnapshotIterator) Next(ctx context.Context) (sdk.Record, error) { + if err := ctx.Err(); err != nil { + return sdk.Record{}, fmt.Errorf("context err: %w", err) + } + if !s.rows.Next() { if err := s.rows.Err(); err != nil { return sdk.Record{}, fmt.Errorf("rows error: %w", err) diff --git a/source/logrepl/snapshot_test.go b/source/logrepl/snapshot_test.go index b09d5a58..57874e99 100644 --- a/source/logrepl/snapshot_test.go +++ b/source/logrepl/snapshot_test.go @@ -32,7 +32,7 @@ func TestLifecycle(t *testing.T) { ctx := context.Background() pool := test.ConnectPool(ctx, t, test.RegularConnString) - table := createTestTable(t, pool) + table := test.SetupTestTable(ctx, t, pool) name := createTestSnapshot(t, pool) conn, err := pool.Acquire(ctx) @@ -73,22 +73,25 @@ func TestLifecycle(t *testing.T) { is.NoErr(s.Teardown(ctx)) // TODO: Should return an error } -// createTestSnapshot starts a transaction that stays open while a snapshot run. -// Otherwise, postgres deletes the snapshot as soon as this tx commits, -// an our snapshot iterator won't find a snapshot at the specifiedname. +// createTestSnapshot starts a transaction that stays open while a snapshot test +// runs. Otherwise, Postgres deletes the snapshot as soon as the transaction +// commits or rolls back, and our snapshot iterator won't find a snapshot with +// the specified name. // https://www.postgresql.org/docs/current/sql-set-transaction.html func createTestSnapshot(t *testing.T, pool *pgxpool.Pool) string { ctx := context.Background() is := is.New(t) + conn, err := pool.Acquire(ctx) is.NoErr(err) + tx, err := conn.Begin(ctx) is.NoErr(err) + query := `SELECT * FROM pg_catalog.pg_export_snapshot();` - rows, err := tx.Query(context.Background(), query) + rows, err := tx.Query(ctx, query) is.NoErr(err) - - var name *string + var name string is.True(rows.Next()) err = rows.Scan(&name) is.NoErr(err) @@ -99,15 +102,5 @@ func createTestSnapshot(t *testing.T, pool *pgxpool.Pool) string { conn.Release() }) - return *name -} - -func createTestTable(t *testing.T, pool *pgxpool.Pool) string { - is := is.New(t) - ctx := context.Background() - tblConn, err := pool.Acquire(ctx) - is.NoErr(err) - table := test.SetupTestTable(ctx, t, tblConn.Conn()) - t.Cleanup(func() { tblConn.Release() }) - return table + return name } From 659faf9ce4c9fc3d3f5b1cb044f42d417a490906 Mon Sep 17 00:00:00 2001 From: dylan_lott Date: Thu, 7 Apr 2022 13:31:38 -0600 Subject: [PATCH 15/30] adds log or return behavior to Teardown --- source/logrepl/snapshot.go | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/source/logrepl/snapshot.go b/source/logrepl/snapshot.go index af747dd7..a84c10cc 100644 --- a/source/logrepl/snapshot.go +++ b/source/logrepl/snapshot.go @@ -141,18 +141,20 @@ func (s *SnapshotIterator) Ack(ctx context.Context, pos sdk.Position) error { // Teardown attempts to gracefully teardown the iterator. func (s *SnapshotIterator) Teardown(ctx context.Context) error { s.rows.Close() + var err error if commitErr := s.tx.Commit(ctx); commitErr != nil { - sdk.Logger(ctx).Err(commitErr).Msg("teardown commit failed") + err = logOrReturnError(ctx, err, commitErr, "teardown commit failed") } if rowsErr := s.rows.Err(); rowsErr != nil { - sdk.Logger(ctx).Err(rowsErr).Msg("rows returned an error") + err = logOrReturnError(ctx, err, rowsErr, "rows returned an error") } if !s.complete { sdk.Logger(ctx).Warn().Msg("snapshot interrupted") return ErrSnapshotInterrupt } - return nil + + return err } func (s *SnapshotIterator) buildRecord(ctx context.Context) (sdk.Record, error) { @@ -230,3 +232,12 @@ func oidToScannerValue(oid pgtype.OID) scannerValue { } return t } + +// logOrReturn +func logOrReturnError(ctx context.Context, oldErr, newErr error, msg string) error { + if oldErr == nil { + return fmt.Errorf(msg+": %w", newErr) + } + sdk.Logger(ctx).Err(newErr).Msg(msg) + return oldErr +} From f60638a8f770341e49078f6e845e8098ad5324bf Mon Sep 17 00:00:00 2001 From: dylan_lott Date: Thu, 7 Apr 2022 13:46:15 -0600 Subject: [PATCH 16/30] update comment --- source/logrepl/snapshot_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/logrepl/snapshot_test.go b/source/logrepl/snapshot_test.go index 0ec0c8d0..a4d70241 100644 --- a/source/logrepl/snapshot_test.go +++ b/source/logrepl/snapshot_test.go @@ -179,10 +179,10 @@ func createTestSnapshot(t *testing.T, pool *pgxpool.Pool) string { return name } -// createTestSnapshotIterator creates a new test table, starts a snapshot tx -// on it, and then creates a test SnapshotIterator with the ID of that snapshot. -// It returns that SnapshotIterator and the string name of the test table. -// This function handles its own pooled connection cleanup, but _not_, the +// createTestSnapshotIterator creates a new test table, starts a snapshot +// on it, then creates a test SnapshotIterator with the ID of that snapshot. +// * It returns that SnapshotIterator and the string name of the test table. +// * This function handles its own pooled connection cleanup, but _not_ the // SnapshotIterator's Teardown. func createTestSnapshotIterator(t *testing.T, columns []string, key string) (*SnapshotIterator, string) { is := is.New(t) From 19ca9e0168660f7ef3618659689f59c9136b237c Mon Sep 17 00:00:00 2001 From: dylan_lott Date: Thu, 7 Apr 2022 13:59:03 -0600 Subject: [PATCH 17/30] handle rollback error in loadRows --- source/logrepl/snapshot.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/source/logrepl/snapshot.go b/source/logrepl/snapshot.go index ed476546..2676870e 100644 --- a/source/logrepl/snapshot.go +++ b/source/logrepl/snapshot.go @@ -61,7 +61,10 @@ func NewSnapshotIterator(ctx context.Context, conn *pgx.Conn, cfg SnapshotConfig err = s.loadRows(ctx) if err != nil { - defer s.tx.Rollback(ctx) + if rollErr := s.tx.Rollback(ctx); err != nil { + sdk.Logger(ctx).Err(err).Msg("load rows failed") + return nil, fmt.Errorf("rollback failed: %w", rollErr) + } return nil, fmt.Errorf("failed to load rows: %w", err) } From 4c9630ad22bff207d60b7fb52a27367b7973843f Mon Sep 17 00:00:00 2001 From: dylan_lott Date: Thu, 7 Apr 2022 14:15:02 -0600 Subject: [PATCH 18/30] use query row instead of query in test setup --- source/logrepl/snapshot_test.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/source/logrepl/snapshot_test.go b/source/logrepl/snapshot_test.go index 57874e99..7625be66 100644 --- a/source/logrepl/snapshot_test.go +++ b/source/logrepl/snapshot_test.go @@ -44,7 +44,7 @@ func TestLifecycle(t *testing.T) { KeyColumn: "key", }) is.NoErr(err) - t.Cleanup(func() { conn.Release() }) + t.Cleanup(conn.Release) now := time.Now() rec, err := s.Next(ctx) @@ -88,16 +88,14 @@ func createTestSnapshot(t *testing.T, pool *pgxpool.Pool) string { tx, err := conn.Begin(ctx) is.NoErr(err) + var name string query := `SELECT * FROM pg_catalog.pg_export_snapshot();` - rows, err := tx.Query(ctx, query) + row := tx.QueryRow(ctx, query) is.NoErr(err) - var name string - is.True(rows.Next()) - err = rows.Scan(&name) + err = row.Scan(&name) is.NoErr(err) t.Cleanup(func() { - rows.Close() is.NoErr(tx.Commit(ctx)) conn.Release() }) From b46f3436d59d09a0f68cc2b45b8d10afd91d8ad0 Mon Sep 17 00:00:00 2001 From: dylan_lott Date: Wed, 6 Apr 2022 16:35:08 -0600 Subject: [PATCH 19/30] returns snapshot incomplete error when snapshot is interrupted --- source/logrepl/snapshot.go | 22 +++++++++++++++++++--- source/logrepl/snapshot_test.go | 2 +- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/source/logrepl/snapshot.go b/source/logrepl/snapshot.go index 2676870e..2f0c0181 100644 --- a/source/logrepl/snapshot.go +++ b/source/logrepl/snapshot.go @@ -17,6 +17,7 @@ package logrepl import ( "context" "database/sql" + "errors" "fmt" "strconv" "time" @@ -29,6 +30,12 @@ import ( "github.com/jackc/pgx/v4" ) +// ErrSnapshotComplete is returned by Next when a snapshot is finished +var ErrSnapshotComplete = errors.New("ErrSnapshotComplete") + +// ErrSnapshotInterrupted is returned by Teardown when a snapshot is interrupted +var ErrSnapshotInterrupt = errors.New("ErrSnapshotInterrupt") + var psql = sq.StatementBuilder.PlaceholderFormat(sq.Dollar) const actionSnapshot string = "snapshot" @@ -46,6 +53,7 @@ type SnapshotIterator struct { tx pgx.Tx rows pgx.Rows + complete bool internalPos int64 } @@ -121,8 +129,8 @@ func (s *SnapshotIterator) Next(ctx context.Context) (sdk.Record, error) { if err := s.rows.Err(); err != nil { return sdk.Record{}, fmt.Errorf("rows error: %w", err) } - - return sdk.Record{}, sdk.ErrBackoffRetry + s.complete = true + return sdk.Record{}, ErrSnapshotComplete } return s.buildRecord(ctx) @@ -139,7 +147,15 @@ func (s *SnapshotIterator) Teardown(ctx context.Context) error { if commitErr := s.tx.Commit(ctx); commitErr != nil { sdk.Logger(ctx).Err(commitErr).Msg("teardown commit failed") } - return s.rows.Err() + if rowsErr := s.rows.Err(); rowsErr != nil { + sdk.Logger(ctx).Err(rowsErr).Msg("rows returned an error") + } + + if !s.complete { + sdk.Logger(ctx).Warn().Msg("snapshot interrupted") + return ErrSnapshotInterrupt + } + return nil } func (s *SnapshotIterator) buildRecord(ctx context.Context) (sdk.Record, error) { diff --git a/source/logrepl/snapshot_test.go b/source/logrepl/snapshot_test.go index 7625be66..0aa45d77 100644 --- a/source/logrepl/snapshot_test.go +++ b/source/logrepl/snapshot_test.go @@ -70,7 +70,7 @@ func TestLifecycle(t *testing.T) { "table": table, }, }) - is.NoErr(s.Teardown(ctx)) // TODO: Should return an error + is.Equal(ErrSnapshotInterrupt.Error(), s.Teardown(ctx).Error()) } // createTestSnapshot starts a transaction that stays open while a snapshot test From 17902c3fb52f28b3d2bb4acdfce10967959b02e0 Mon Sep 17 00:00:00 2001 From: dylan_lott Date: Wed, 6 Apr 2022 16:43:46 -0600 Subject: [PATCH 20/30] linter fix --- source/logrepl/snapshot.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/logrepl/snapshot.go b/source/logrepl/snapshot.go index 2f0c0181..c14f088c 100644 --- a/source/logrepl/snapshot.go +++ b/source/logrepl/snapshot.go @@ -33,7 +33,7 @@ import ( // ErrSnapshotComplete is returned by Next when a snapshot is finished var ErrSnapshotComplete = errors.New("ErrSnapshotComplete") -// ErrSnapshotInterrupted is returned by Teardown when a snapshot is interrupted +// ErrSnapshotInterrupt is returned by Teardown when a snapshot is interrupted var ErrSnapshotInterrupt = errors.New("ErrSnapshotInterrupt") var psql = sq.StatementBuilder.PlaceholderFormat(sq.Dollar) From b544e421355317df55b00bcdb2febf659ddda9bb Mon Sep 17 00:00:00 2001 From: dylan_lott Date: Thu, 7 Apr 2022 13:31:38 -0600 Subject: [PATCH 21/30] adds log or return behavior to Teardown --- source/logrepl/snapshot.go | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/source/logrepl/snapshot.go b/source/logrepl/snapshot.go index c14f088c..078c593f 100644 --- a/source/logrepl/snapshot.go +++ b/source/logrepl/snapshot.go @@ -144,18 +144,20 @@ func (s *SnapshotIterator) Ack(ctx context.Context, pos sdk.Position) error { // Teardown attempts to gracefully teardown the iterator. func (s *SnapshotIterator) Teardown(ctx context.Context) error { s.rows.Close() + var err error if commitErr := s.tx.Commit(ctx); commitErr != nil { - sdk.Logger(ctx).Err(commitErr).Msg("teardown commit failed") + err = logOrReturnError(ctx, err, commitErr, "teardown commit failed") } if rowsErr := s.rows.Err(); rowsErr != nil { - sdk.Logger(ctx).Err(rowsErr).Msg("rows returned an error") + err = logOrReturnError(ctx, err, rowsErr, "rows returned an error") } if !s.complete { sdk.Logger(ctx).Warn().Msg("snapshot interrupted") return ErrSnapshotInterrupt } - return nil + + return err } func (s *SnapshotIterator) buildRecord(ctx context.Context) (sdk.Record, error) { @@ -233,3 +235,12 @@ func oidToScannerValue(oid pgtype.OID) scannerValue { } return t } + +// logOrReturn +func logOrReturnError(ctx context.Context, oldErr, newErr error, msg string) error { + if oldErr == nil { + return fmt.Errorf(msg+": %w", newErr) + } + sdk.Logger(ctx).Err(newErr).Msg(msg) + return oldErr +} From 8cfa075241b802826226b669e1c37c19c414c340 Mon Sep 17 00:00:00 2001 From: dylan_lott Date: Thu, 7 Apr 2022 14:35:27 -0600 Subject: [PATCH 22/30] linter fixes --- source/logrepl/snapshot_test.go | 6 +++--- test/helper.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/source/logrepl/snapshot_test.go b/source/logrepl/snapshot_test.go index 2fa285b9..da088df4 100644 --- a/source/logrepl/snapshot_test.go +++ b/source/logrepl/snapshot_test.go @@ -78,17 +78,17 @@ func TestSnapshotAtomicity(t *testing.T) { // start our snapshot iterator s, table := createTestSnapshotIterator(t, columns, key) - t.Cleanup(func() { s.Teardown(ctx) }) + t.Cleanup(func() { is.NoErr(s.Teardown(ctx)) }) is.Equal(s.complete, false) // add a record to our table after snapshot started insertQuery := fmt.Sprintf(`INSERT INTO %s (id, column1, column2, column3) - VALUES (6, 'bizz', 456, false)`, table) + VALUES (5, 'bizz', 456, false)`, table) _, err := pool.Exec(ctx, insertQuery) is.NoErr(err) // assert record does not appear in snapshot - for i := 0; i < 4; i++ { + for i := 0; i < 5; i++ { r, err := s.Next(ctx) if err != nil { is.Equal(err, ErrSnapshotComplete) diff --git a/test/helper.go b/test/helper.go index 7c74b677..7bb27928 100644 --- a/test/helper.go +++ b/test/helper.go @@ -101,7 +101,7 @@ func SetupTestTable(ctx context.Context, t *testing.T, conn Querier) string { func RandomIdentifier(t *testing.T) string { return fmt.Sprintf("conduit_%v_%d", - strings.Replace(strings.ToLower(t.Name()), "/", "_", -1), + strings.ReplaceAll(strings.ToLower(t.Name()), "/", "_"), time.Now().UnixMicro()%1000) } From 72870065c1aede51c95eba110493a600f0a2b07b Mon Sep 17 00:00:00 2001 From: dylan_lott Date: Thu, 7 Apr 2022 14:58:05 -0600 Subject: [PATCH 23/30] remove unnecessary log --- source/logrepl/snapshot_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/source/logrepl/snapshot_test.go b/source/logrepl/snapshot_test.go index da088df4..f1405b6f 100644 --- a/source/logrepl/snapshot_test.go +++ b/source/logrepl/snapshot_test.go @@ -17,7 +17,6 @@ package logrepl import ( "context" "fmt" - "log" "testing" "time" @@ -104,7 +103,6 @@ func TestFullIteration(t *testing.T) { s, table := createTestSnapshotIterator(t, []string{"id", "key"}, "key") for i := 0; i < 4; i++ { rec, err := s.Next(ctx) - log.Println(i) is.Equal(rec.Position, sdk.Position(fmt.Sprintf("%s:%d", table, i))) is.NoErr(err) } From 22e5c1a9637eda3d89b50d7d49b1cf82a6aabbc9 Mon Sep 17 00:00:00 2001 From: dylan_lott Date: Fri, 8 Apr 2022 11:12:59 -0600 Subject: [PATCH 24/30] use logOrReturn in Teardown --- source/logrepl/snapshot.go | 7 +++---- source/logrepl/snapshot_test.go | 3 ++- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/source/logrepl/snapshot.go b/source/logrepl/snapshot.go index 6005b34b..d6e3cd8d 100644 --- a/source/logrepl/snapshot.go +++ b/source/logrepl/snapshot.go @@ -31,10 +31,10 @@ import ( ) // ErrSnapshotComplete is returned by Next when a snapshot is finished -var ErrSnapshotComplete = errors.New("ErrSnapshotComplete") +var ErrSnapshotComplete = errors.New("snapshot complete") // ErrSnapshotInterrupt is returned by Teardown when a snapshot is interrupted -var ErrSnapshotInterrupt = errors.New("ErrSnapshotInterrupt") +var ErrSnapshotInterrupt = errors.New("snapshot interrupted") var psql = sq.StatementBuilder.PlaceholderFormat(sq.Dollar) @@ -152,8 +152,7 @@ func (s *SnapshotIterator) Teardown(ctx context.Context) error { } if !s.complete { - sdk.Logger(ctx).Warn().Msg("snapshot interrupted") - return ErrSnapshotInterrupt + err = logOrReturnError(ctx, err, ErrSnapshotInterrupt, "snapshot interrupted") } return err diff --git a/source/logrepl/snapshot_test.go b/source/logrepl/snapshot_test.go index 0aa45d77..a47951fe 100644 --- a/source/logrepl/snapshot_test.go +++ b/source/logrepl/snapshot_test.go @@ -16,6 +16,7 @@ package logrepl import ( "context" + "errors" "fmt" "testing" "time" @@ -70,7 +71,7 @@ func TestLifecycle(t *testing.T) { "table": table, }, }) - is.Equal(ErrSnapshotInterrupt.Error(), s.Teardown(ctx).Error()) + is.True(errors.Is(s.Teardown(ctx), ErrSnapshotInterrupt)) } // createTestSnapshot starts a transaction that stays open while a snapshot test From 875780a4c01139e53f29acd1a5f2ae59e952aa23 Mon Sep 17 00:00:00 2001 From: dylan_lott Date: Mon, 11 Apr 2022 15:22:52 -0600 Subject: [PATCH 25/30] WIP refactoring createTestSnapshotIterator --- source/logrepl/snapshot_test.go | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/source/logrepl/snapshot_test.go b/source/logrepl/snapshot_test.go index a5425f7d..35301c45 100644 --- a/source/logrepl/snapshot_test.go +++ b/source/logrepl/snapshot_test.go @@ -34,6 +34,8 @@ var ( func TestSnapshotIterator_Teardown(t *testing.T) { is := is.New(t) + pool := test.ConnectPool(context.Background(), t, test.RegularConnString) + type args struct { ctx context.Context } @@ -47,7 +49,9 @@ func TestSnapshotIterator_Teardown(t *testing.T) { { name: "should return interrupt when next never called", setup: func(t *testing.T) *SnapshotIterator { - s, _ := createTestSnapshotIterator(t, columns, key) + ctx := context.Background() + table := test.SetupTestTable(ctx, t, pool) + s := createTestSnapshotIterator(t, pool, table, columns, key) return s }, args: args{ @@ -75,9 +79,10 @@ func TestSnapshotAtomicity(t *testing.T) { is := is.New(t) ctx := context.Background() pool := test.ConnectPool(ctx, t, test.RegularConnString) + table := test.SetupTestTable(ctx, t, pool) // start our snapshot iterator - s, table := createTestSnapshotIterator(t, columns, key) + s := createTestSnapshotIterator(t, pool, table, columns, key) t.Cleanup(func() { is.NoErr(s.Teardown(ctx)) }) is.Equal(s.complete, false) @@ -101,12 +106,16 @@ func TestSnapshotAtomicity(t *testing.T) { func TestFullIteration(t *testing.T) { ctx := context.Background() is := is.New(t) - s, table := createTestSnapshotIterator(t, []string{"id", "key"}, "key") + pool := test.ConnectPool(ctx, t, test.RegularConnString) + table := test.SetupTestTable(ctx, t, pool) + + s := createTestSnapshotIterator(t, pool, table, []string{"id", "key"}, "key") for i := 0; i < 4; i++ { rec, err := s.Next(ctx) is.Equal(rec.Position, sdk.Position(fmt.Sprintf("%s:%d", table, i))) is.NoErr(err) } + r, err := s.Next(ctx) is.Equal(r, sdk.Record{}) is.Equal(err.Error(), ErrSnapshotComplete.Error()) @@ -115,8 +124,10 @@ func TestFullIteration(t *testing.T) { func TestLifecycleErrInterrupt(t *testing.T) { is := is.New(t) ctx := context.Background() + pool := test.ConnectPool(ctx, t, test.RepmgrConnString) + table := test.SetupTestTable(ctx, t, pool) - s, table := createTestSnapshotIterator(t, + s := createTestSnapshotIterator(t, pool, table, []string{"id", "key", "column1", "column2", "column3"}, "key") now := time.Now() @@ -151,10 +162,8 @@ func TestLifecycleErrInterrupt(t *testing.T) { // commits or rolls back, and our snapshot iterator won't find a snapshot with // the specified name. // https://www.postgresql.org/docs/current/sql-set-transaction.html -func createTestSnapshot(t *testing.T, pool *pgxpool.Pool) string { - ctx := context.Background() +func createTestSnapshot(ctx context.Context, t *testing.T, pool *pgxpool.Pool) string { is := is.New(t) - conn, err := pool.Acquire(ctx) is.NoErr(err) @@ -181,13 +190,11 @@ func createTestSnapshot(t *testing.T, pool *pgxpool.Pool) string { // * It returns that SnapshotIterator and the string name of the test table. // * This function handles its own pooled connection cleanup, but _not_ the // SnapshotIterator's Teardown. -func createTestSnapshotIterator(t *testing.T, columns []string, key string) (*SnapshotIterator, string) { +func createTestSnapshotIterator(t *testing.T, pool *pgxpool.Pool, table string, columns []string, key string) *SnapshotIterator { is := is.New(t) ctx := context.Background() - pool := test.ConnectPool(ctx, t, test.RegularConnString) - table := test.SetupTestTable(ctx, t, pool) - name := createTestSnapshot(t, pool) + name := createTestSnapshot(ctx, t, pool) conn, err := pool.Acquire(ctx) is.NoErr(err) @@ -199,5 +206,5 @@ func createTestSnapshotIterator(t *testing.T, columns []string, key string) (*Sn }) is.NoErr(err) t.Cleanup(func() { conn.Release() }) - return s, table + return s } From 53f6d9068d338e879f4d62fe59467d391c22563e Mon Sep 17 00:00:00 2001 From: dylan_lott Date: Mon, 11 Apr 2022 15:26:40 -0600 Subject: [PATCH 26/30] WIP refactoring createTestSnapshot --- source/logrepl/snapshot_test.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/source/logrepl/snapshot_test.go b/source/logrepl/snapshot_test.go index 35301c45..b99b8662 100644 --- a/source/logrepl/snapshot_test.go +++ b/source/logrepl/snapshot_test.go @@ -51,7 +51,8 @@ func TestSnapshotIterator_Teardown(t *testing.T) { setup: func(t *testing.T) *SnapshotIterator { ctx := context.Background() table := test.SetupTestTable(ctx, t, pool) - s := createTestSnapshotIterator(t, pool, table, columns, key) + name := createTestSnapshot(ctx, t, pool) + s := createTestSnapshotIterator(t, pool, name, table, columns, key) return s }, args: args{ @@ -80,9 +81,10 @@ func TestSnapshotAtomicity(t *testing.T) { ctx := context.Background() pool := test.ConnectPool(ctx, t, test.RegularConnString) table := test.SetupTestTable(ctx, t, pool) + name := createTestSnapshot(ctx, t, pool) // start our snapshot iterator - s := createTestSnapshotIterator(t, pool, table, columns, key) + s := createTestSnapshotIterator(t, pool, name, table, columns, key) t.Cleanup(func() { is.NoErr(s.Teardown(ctx)) }) is.Equal(s.complete, false) @@ -108,8 +110,9 @@ func TestFullIteration(t *testing.T) { is := is.New(t) pool := test.ConnectPool(ctx, t, test.RegularConnString) table := test.SetupTestTable(ctx, t, pool) + name := createTestSnapshot(ctx, t, pool) - s := createTestSnapshotIterator(t, pool, table, []string{"id", "key"}, "key") + s := createTestSnapshotIterator(t, pool, name, table, []string{"id", "key"}, "key") for i := 0; i < 4; i++ { rec, err := s.Next(ctx) is.Equal(rec.Position, sdk.Position(fmt.Sprintf("%s:%d", table, i))) @@ -126,8 +129,9 @@ func TestLifecycleErrInterrupt(t *testing.T) { ctx := context.Background() pool := test.ConnectPool(ctx, t, test.RepmgrConnString) table := test.SetupTestTable(ctx, t, pool) + name := createTestSnapshot(ctx, t, pool) - s := createTestSnapshotIterator(t, pool, table, + s := createTestSnapshotIterator(t, pool, name, table, []string{"id", "key", "column1", "column2", "column3"}, "key") now := time.Now() @@ -190,16 +194,14 @@ func createTestSnapshot(ctx context.Context, t *testing.T, pool *pgxpool.Pool) s // * It returns that SnapshotIterator and the string name of the test table. // * This function handles its own pooled connection cleanup, but _not_ the // SnapshotIterator's Teardown. -func createTestSnapshotIterator(t *testing.T, pool *pgxpool.Pool, table string, columns []string, key string) *SnapshotIterator { +func createTestSnapshotIterator(t *testing.T, pool *pgxpool.Pool, snapshotName string, table string, columns []string, key string) *SnapshotIterator { is := is.New(t) ctx := context.Background() - name := createTestSnapshot(ctx, t, pool) - conn, err := pool.Acquire(ctx) is.NoErr(err) s, err := NewSnapshotIterator(context.Background(), conn.Conn(), SnapshotConfig{ - SnapshotName: name, + SnapshotName: snapshotName, Table: table, Columns: columns, KeyColumn: key, From ff935b58e24801c9dfb68d69d8db3d8d8972b65e Mon Sep 17 00:00:00 2001 From: dylan_lott Date: Mon, 11 Apr 2022 15:34:21 -0600 Subject: [PATCH 27/30] refactor context, pool, table, and snaphost creation in logrepl/snapshot tests --- source/logrepl/snapshot_test.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/source/logrepl/snapshot_test.go b/source/logrepl/snapshot_test.go index b99b8662..84d9b846 100644 --- a/source/logrepl/snapshot_test.go +++ b/source/logrepl/snapshot_test.go @@ -52,7 +52,7 @@ func TestSnapshotIterator_Teardown(t *testing.T) { ctx := context.Background() table := test.SetupTestTable(ctx, t, pool) name := createTestSnapshot(ctx, t, pool) - s := createTestSnapshotIterator(t, pool, name, table, columns, key) + s := createTestSnapshotIterator(t, ctx, pool, name, table, columns, key) return s }, args: args{ @@ -84,9 +84,8 @@ func TestSnapshotAtomicity(t *testing.T) { name := createTestSnapshot(ctx, t, pool) // start our snapshot iterator - s := createTestSnapshotIterator(t, pool, name, table, columns, key) + s := createTestSnapshotIterator(t, ctx, pool, name, table, columns, key) t.Cleanup(func() { is.NoErr(s.Teardown(ctx)) }) - is.Equal(s.complete, false) // add a record to our table after snapshot started insertQuery := fmt.Sprintf(`INSERT INTO %s (id, column1, column2, column3) @@ -100,7 +99,6 @@ func TestSnapshotAtomicity(t *testing.T) { if err != nil { is.Equal(err, ErrSnapshotComplete) is.Equal(r, sdk.Record{}) - is.Equal(s.complete, true) } } } @@ -112,7 +110,10 @@ func TestFullIteration(t *testing.T) { table := test.SetupTestTable(ctx, t, pool) name := createTestSnapshot(ctx, t, pool) - s := createTestSnapshotIterator(t, pool, name, table, []string{"id", "key"}, "key") + s := createTestSnapshotIterator(t, ctx, pool, name, table, + []string{"id", "key"}, "key") + t.Cleanup(func() { is.NoErr(s.Teardown(ctx)) }) + for i := 0; i < 4; i++ { rec, err := s.Next(ctx) is.Equal(rec.Position, sdk.Position(fmt.Sprintf("%s:%d", table, i))) @@ -131,7 +132,7 @@ func TestLifecycleErrInterrupt(t *testing.T) { table := test.SetupTestTable(ctx, t, pool) name := createTestSnapshot(ctx, t, pool) - s := createTestSnapshotIterator(t, pool, name, table, + s := createTestSnapshotIterator(t, ctx, pool, name, table, []string{"id", "key", "column1", "column2", "column3"}, "key") now := time.Now() @@ -194,10 +195,10 @@ func createTestSnapshot(ctx context.Context, t *testing.T, pool *pgxpool.Pool) s // * It returns that SnapshotIterator and the string name of the test table. // * This function handles its own pooled connection cleanup, but _not_ the // SnapshotIterator's Teardown. -func createTestSnapshotIterator(t *testing.T, pool *pgxpool.Pool, snapshotName string, table string, columns []string, key string) *SnapshotIterator { +func createTestSnapshotIterator(t *testing.T, ctx context.Context, + pool *pgxpool.Pool, snapshotName string, table string, columns []string, + key string) *SnapshotIterator { is := is.New(t) - ctx := context.Background() - conn, err := pool.Acquire(ctx) is.NoErr(err) s, err := NewSnapshotIterator(context.Background(), conn.Conn(), SnapshotConfig{ From 14c2c8b202efc34a6017b75c30cb9fcff293bd94 Mon Sep 17 00:00:00 2001 From: dylan_lott Date: Mon, 11 Apr 2022 15:37:21 -0600 Subject: [PATCH 28/30] linter fix --- source/logrepl/snapshot_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/logrepl/snapshot_test.go b/source/logrepl/snapshot_test.go index 84d9b846..15a13370 100644 --- a/source/logrepl/snapshot_test.go +++ b/source/logrepl/snapshot_test.go @@ -52,7 +52,7 @@ func TestSnapshotIterator_Teardown(t *testing.T) { ctx := context.Background() table := test.SetupTestTable(ctx, t, pool) name := createTestSnapshot(ctx, t, pool) - s := createTestSnapshotIterator(t, ctx, pool, name, table, columns, key) + s := createTestSnapshotIterator(ctx, t, pool, name, table, columns, key) return s }, args: args{ @@ -84,7 +84,7 @@ func TestSnapshotAtomicity(t *testing.T) { name := createTestSnapshot(ctx, t, pool) // start our snapshot iterator - s := createTestSnapshotIterator(t, ctx, pool, name, table, columns, key) + s := createTestSnapshotIterator(ctx, t, pool, name, table, columns, key) t.Cleanup(func() { is.NoErr(s.Teardown(ctx)) }) // add a record to our table after snapshot started @@ -110,9 +110,8 @@ func TestFullIteration(t *testing.T) { table := test.SetupTestTable(ctx, t, pool) name := createTestSnapshot(ctx, t, pool) - s := createTestSnapshotIterator(t, ctx, pool, name, table, + s := createTestSnapshotIterator(ctx, t, pool, name, table, []string{"id", "key"}, "key") - t.Cleanup(func() { is.NoErr(s.Teardown(ctx)) }) for i := 0; i < 4; i++ { rec, err := s.Next(ctx) @@ -123,6 +122,7 @@ func TestFullIteration(t *testing.T) { r, err := s.Next(ctx) is.Equal(r, sdk.Record{}) is.Equal(err.Error(), ErrSnapshotComplete.Error()) + is.NoErr(s.Teardown(ctx)) } func TestLifecycleErrInterrupt(t *testing.T) { @@ -132,7 +132,7 @@ func TestLifecycleErrInterrupt(t *testing.T) { table := test.SetupTestTable(ctx, t, pool) name := createTestSnapshot(ctx, t, pool) - s := createTestSnapshotIterator(t, ctx, pool, name, table, + s := createTestSnapshotIterator(ctx, t, pool, name, table, []string{"id", "key", "column1", "column2", "column3"}, "key") now := time.Now() @@ -195,7 +195,7 @@ func createTestSnapshot(ctx context.Context, t *testing.T, pool *pgxpool.Pool) s // * It returns that SnapshotIterator and the string name of the test table. // * This function handles its own pooled connection cleanup, but _not_ the // SnapshotIterator's Teardown. -func createTestSnapshotIterator(t *testing.T, ctx context.Context, +func createTestSnapshotIterator(ctx context.Context, t *testing.T, pool *pgxpool.Pool, snapshotName string, table string, columns []string, key string) *SnapshotIterator { is := is.New(t) From f084548b99cc0b9ae89f46c4a8f3644a08ccec70 Mon Sep 17 00:00:00 2001 From: dylan_lott Date: Mon, 11 Apr 2022 16:29:06 -0600 Subject: [PATCH 29/30] refactor snapshot tests --- source/logrepl/snapshot_test.go | 188 ++++++++++++++------------------ 1 file changed, 80 insertions(+), 108 deletions(-) diff --git a/source/logrepl/snapshot_test.go b/source/logrepl/snapshot_test.go index 15a13370..850d9efa 100644 --- a/source/logrepl/snapshot_test.go +++ b/source/logrepl/snapshot_test.go @@ -32,134 +32,106 @@ var ( key = "key" ) -func TestSnapshotIterator_Teardown(t *testing.T) { +func TestTeardown(t *testing.T) { is := is.New(t) pool := test.ConnectPool(context.Background(), t, test.RegularConnString) - type args struct { - ctx context.Context - } - tests := []struct { - name string - setup func(t *testing.T) *SnapshotIterator - args args - wantErr bool - wanted error - }{ - { - name: "should return interrupt when next never called", - setup: func(t *testing.T) *SnapshotIterator { - ctx := context.Background() - table := test.SetupTestTable(ctx, t, pool) - name := createTestSnapshot(ctx, t, pool) - s := createTestSnapshotIterator(ctx, t, pool, name, table, columns, key) - return s - }, - args: args{ - ctx: context.Background(), - }, - wantErr: true, - wanted: ErrSnapshotInterrupt, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - s := tt.setup(t) - if err := s.Teardown(tt.args.ctx); (err != nil) != tt.wantErr { - if tt.wantErr { - is.Equal(tt.wanted, err) - } else { - t.Errorf("SnapshotIterator.Teardown() error = %v, wantErr %v", err, tt.wantErr) - } - } - }) - } + t.Run("should return interrupt error if next never called", func(t *testing.T) { + ctx := context.Background() + table := test.SetupTestTable(ctx, t, pool) + name := createTestSnapshot(ctx, t, pool) + s := createTestSnapshotIterator(ctx, t, pool, name, table, columns, key) + + err := s.Teardown(ctx) + + is.True(errors.Is(err, ErrSnapshotInterrupt)) + }) } -func TestSnapshotAtomicity(t *testing.T) { +func TestSnapshot(t *testing.T) { is := is.New(t) - ctx := context.Background() - pool := test.ConnectPool(ctx, t, test.RegularConnString) - table := test.SetupTestTable(ctx, t, pool) - name := createTestSnapshot(ctx, t, pool) + pool := test.ConnectPool(context.Background(), t, test.RegularConnString) - // start our snapshot iterator - s := createTestSnapshotIterator(ctx, t, pool, name, table, columns, key) - t.Cleanup(func() { is.NoErr(s.Teardown(ctx)) }) + t.Run("should take atomic snapshot", func(t *testing.T) { + ctx := context.Background() + table := test.SetupTestTable(ctx, t, pool) + name := createTestSnapshot(ctx, t, pool) - // add a record to our table after snapshot started - insertQuery := fmt.Sprintf(`INSERT INTO %s (id, column1, column2, column3) + // start our snapshot iterator + s := createTestSnapshotIterator(ctx, t, pool, name, table, columns, key) + t.Cleanup(func() { is.NoErr(s.Teardown(ctx)) }) + + // add a record to our table after snapshot started + insertQuery := fmt.Sprintf(`INSERT INTO %s (id, column1, column2, column3) VALUES (5, 'bizz', 456, false)`, table) - _, err := pool.Exec(ctx, insertQuery) - is.NoErr(err) + _, err := pool.Exec(ctx, insertQuery) + is.NoErr(err) - // assert record does not appear in snapshot - for i := 0; i < 5; i++ { - r, err := s.Next(ctx) - if err != nil { - is.Equal(err, ErrSnapshotComplete) - is.Equal(r, sdk.Record{}) + // assert record does not appear in snapshot + for i := 0; i < 5; i++ { + r, err := s.Next(ctx) + if err != nil { + is.Equal(err, ErrSnapshotComplete) + is.Equal(r, sdk.Record{}) + } } - } -} + }) -func TestFullIteration(t *testing.T) { - ctx := context.Background() - is := is.New(t) - pool := test.ConnectPool(ctx, t, test.RegularConnString) - table := test.SetupTestTable(ctx, t, pool) - name := createTestSnapshot(ctx, t, pool) + t.Run("should iterate full snapshot", func(t *testing.T) { + ctx := context.Background() + pool := test.ConnectPool(ctx, t, test.RegularConnString) + table := test.SetupTestTable(ctx, t, pool) + name := createTestSnapshot(ctx, t, pool) - s := createTestSnapshotIterator(ctx, t, pool, name, table, - []string{"id", "key"}, "key") + s := createTestSnapshotIterator(ctx, t, pool, name, table, + []string{"id", "key"}, "key") - for i := 0; i < 4; i++ { - rec, err := s.Next(ctx) - is.Equal(rec.Position, sdk.Position(fmt.Sprintf("%s:%d", table, i))) - is.NoErr(err) - } + for i := 0; i < 4; i++ { + rec, err := s.Next(ctx) + is.Equal(rec.Position, sdk.Position(fmt.Sprintf("%s:%d", table, i))) + is.NoErr(err) + } - r, err := s.Next(ctx) - is.Equal(r, sdk.Record{}) - is.Equal(err.Error(), ErrSnapshotComplete.Error()) - is.NoErr(s.Teardown(ctx)) -} + r, err := s.Next(ctx) + is.Equal(r, sdk.Record{}) + is.Equal(err.Error(), ErrSnapshotComplete.Error()) + is.NoErr(s.Teardown(ctx)) + }) -func TestLifecycleErrInterrupt(t *testing.T) { - is := is.New(t) - ctx := context.Background() - pool := test.ConnectPool(ctx, t, test.RepmgrConnString) - table := test.SetupTestTable(ctx, t, pool) - name := createTestSnapshot(ctx, t, pool) + t.Run("should return interrupt error", func(t *testing.T) { + ctx := context.Background() + table := test.SetupTestTable(ctx, t, pool) + name := createTestSnapshot(ctx, t, pool) - s := createTestSnapshotIterator(ctx, t, pool, name, table, - []string{"id", "key", "column1", "column2", "column3"}, "key") + s := createTestSnapshotIterator(ctx, t, pool, name, table, + []string{"id", "key", "column1", "column2", "column3"}, "key") - now := time.Now() - rec, err := s.Next(ctx) - is.NoErr(err) + now := time.Now() + rec, err := s.Next(ctx) + is.NoErr(err) - is.True(rec.CreatedAt.After(now)) - is.Equal(rec.Metadata["action"], "snapshot") - rec.CreatedAt = time.Time{} // reset time for comparison - - is.Equal(rec, sdk.Record{ - Position: sdk.Position(fmt.Sprintf("%s:0", table)), - Key: sdk.StructuredData{ - "key": []uint8("1"), - }, - Payload: sdk.StructuredData{ - "id": int64(1), - "column1": "foo", - "column2": int32(123), - "column3": bool(false), - }, - Metadata: map[string]string{ - "action": actionSnapshot, - "table": table, - }, + is.True(rec.CreatedAt.After(now)) + is.Equal(rec.Metadata["action"], "snapshot") + rec.CreatedAt = time.Time{} // reset time for comparison + + is.Equal(rec, sdk.Record{ + Position: sdk.Position(fmt.Sprintf("%s:0", table)), + Key: sdk.StructuredData{ + "key": []uint8("1"), + }, + Payload: sdk.StructuredData{ + "id": int64(1), + "column1": "foo", + "column2": int32(123), + "column3": bool(false), + }, + Metadata: map[string]string{ + "action": actionSnapshot, + "table": table, + }, + }) + is.True(errors.Is(s.Teardown(ctx), ErrSnapshotInterrupt)) }) - is.True(errors.Is(s.Teardown(ctx), ErrSnapshotInterrupt)) } // createTestSnapshot starts a transaction that stays open while a snapshot test From 18125a461a190f4ccb5c19ab20448c1615a67c50 Mon Sep 17 00:00:00 2001 From: dylan_lott Date: Tue, 12 Apr 2022 13:41:20 -0600 Subject: [PATCH 30/30] updates tests and comments --- source/logrepl/snapshot_test.go | 178 +++++++++++++++----------------- 1 file changed, 83 insertions(+), 95 deletions(-) diff --git a/source/logrepl/snapshot_test.go b/source/logrepl/snapshot_test.go index 850d9efa..a817a239 100644 --- a/source/logrepl/snapshot_test.go +++ b/source/logrepl/snapshot_test.go @@ -32,106 +32,98 @@ var ( key = "key" ) -func TestTeardown(t *testing.T) { +func TestAtomicSnapshot(t *testing.T) { is := is.New(t) pool := test.ConnectPool(context.Background(), t, test.RegularConnString) + ctx := context.Background() + table := test.SetupTestTable(ctx, t, pool) + name := createTestSnapshot(ctx, t, pool) + s := createTestSnapshotIterator(ctx, t, pool, SnapshotConfig{ + SnapshotName: name, + Table: table, + Columns: columns, + KeyColumn: key, + }) + t.Cleanup(func() { is.NoErr(s.Teardown(ctx)) }) - t.Run("should return interrupt error if next never called", func(t *testing.T) { - ctx := context.Background() - table := test.SetupTestTable(ctx, t, pool) - name := createTestSnapshot(ctx, t, pool) - s := createTestSnapshotIterator(ctx, t, pool, name, table, columns, key) - - err := s.Teardown(ctx) + // add a record to our table after snapshot started + insertQuery := fmt.Sprintf(`INSERT INTO %s (id, column1, column2, column3) + VALUES (5, 'bizz', 456, false)`, table) + _, err := pool.Exec(ctx, insertQuery) + is.NoErr(err) - is.True(errors.Is(err, ErrSnapshotInterrupt)) - }) + // assert record does not appear in snapshot + for i := 0; i < 5; i++ { + r, err := s.Next(ctx) + if err != nil { + is.True(errors.Is(err, ErrSnapshotComplete)) + is.Equal(r, sdk.Record{}) + } + } } -func TestSnapshot(t *testing.T) { +func TestSnapshotInterrupted(t *testing.T) { is := is.New(t) pool := test.ConnectPool(context.Background(), t, test.RegularConnString) - - t.Run("should take atomic snapshot", func(t *testing.T) { - ctx := context.Background() - table := test.SetupTestTable(ctx, t, pool) - name := createTestSnapshot(ctx, t, pool) - - // start our snapshot iterator - s := createTestSnapshotIterator(ctx, t, pool, name, table, columns, key) - t.Cleanup(func() { is.NoErr(s.Teardown(ctx)) }) - - // add a record to our table after snapshot started - insertQuery := fmt.Sprintf(`INSERT INTO %s (id, column1, column2, column3) - VALUES (5, 'bizz', 456, false)`, table) - _, err := pool.Exec(ctx, insertQuery) - is.NoErr(err) - - // assert record does not appear in snapshot - for i := 0; i < 5; i++ { - r, err := s.Next(ctx) - if err != nil { - is.Equal(err, ErrSnapshotComplete) - is.Equal(r, sdk.Record{}) - } - } + ctx := context.Background() + table := test.SetupTestTable(ctx, t, pool) + name := createTestSnapshot(ctx, t, pool) + s := createTestSnapshotIterator(ctx, t, pool, SnapshotConfig{ + SnapshotName: name, + Table: table, + Columns: columns, + KeyColumn: key, }) + now := time.Now() - t.Run("should iterate full snapshot", func(t *testing.T) { - ctx := context.Background() - pool := test.ConnectPool(ctx, t, test.RegularConnString) - table := test.SetupTestTable(ctx, t, pool) - name := createTestSnapshot(ctx, t, pool) - - s := createTestSnapshotIterator(ctx, t, pool, name, table, - []string{"id", "key"}, "key") - - for i := 0; i < 4; i++ { - rec, err := s.Next(ctx) - is.Equal(rec.Position, sdk.Position(fmt.Sprintf("%s:%d", table, i))) - is.NoErr(err) - } + rec, err := s.Next(ctx) + is.NoErr(err) - r, err := s.Next(ctx) - is.Equal(r, sdk.Record{}) - is.Equal(err.Error(), ErrSnapshotComplete.Error()) - is.NoErr(s.Teardown(ctx)) + is.True(rec.CreatedAt.After(now)) + is.Equal(rec.Metadata["action"], "snapshot") + rec.CreatedAt = time.Time{} // reset time for comparison + is.Equal(rec, sdk.Record{ + Position: sdk.Position(fmt.Sprintf("%s:0", table)), + Key: sdk.StructuredData{ + "key": []uint8("1"), + }, + Payload: sdk.StructuredData{ + "id": int64(1), + "column1": "foo", + "column2": int32(123), + "column3": bool(false), + }, + Metadata: map[string]string{ + "action": actionSnapshot, + "table": table, + }, }) + is.True(errors.Is(s.Teardown(ctx), ErrSnapshotInterrupt)) +} - t.Run("should return interrupt error", func(t *testing.T) { - ctx := context.Background() - table := test.SetupTestTable(ctx, t, pool) - name := createTestSnapshot(ctx, t, pool) - - s := createTestSnapshotIterator(ctx, t, pool, name, table, - []string{"id", "key", "column1", "column2", "column3"}, "key") +func TestFullIteration(t *testing.T) { + is := is.New(t) + ctx := context.Background() + pool := test.ConnectPool(ctx, t, test.RegularConnString) + table := test.SetupTestTable(ctx, t, pool) + name := createTestSnapshot(ctx, t, pool) + s := createTestSnapshotIterator(ctx, t, pool, SnapshotConfig{ + SnapshotName: name, + Table: table, + Columns: columns, + KeyColumn: key, + }) - now := time.Now() + for i := 0; i < 4; i++ { rec, err := s.Next(ctx) + is.Equal(rec.Position, sdk.Position(fmt.Sprintf("%s:%d", table, i))) is.NoErr(err) + } - is.True(rec.CreatedAt.After(now)) - is.Equal(rec.Metadata["action"], "snapshot") - rec.CreatedAt = time.Time{} // reset time for comparison - - is.Equal(rec, sdk.Record{ - Position: sdk.Position(fmt.Sprintf("%s:0", table)), - Key: sdk.StructuredData{ - "key": []uint8("1"), - }, - Payload: sdk.StructuredData{ - "id": int64(1), - "column1": "foo", - "column2": int32(123), - "column3": bool(false), - }, - Metadata: map[string]string{ - "action": actionSnapshot, - "table": table, - }, - }) - is.True(errors.Is(s.Teardown(ctx), ErrSnapshotInterrupt)) - }) + r, err := s.Next(ctx) + is.Equal(r, sdk.Record{}) + is.True(errors.Is(err, ErrSnapshotComplete)) + is.NoErr(s.Teardown(ctx)) } // createTestSnapshot starts a transaction that stays open while a snapshot test @@ -162,24 +154,20 @@ func createTestSnapshot(ctx context.Context, t *testing.T, pool *pgxpool.Pool) s return name } -// createTestSnapshotIterator creates a new test table, starts a snapshot -// on it, then creates a test SnapshotIterator with the ID of that snapshot. -// * It returns that SnapshotIterator and the string name of the test table. -// * This function handles its own pooled connection cleanup, but _not_ the -// SnapshotIterator's Teardown. +// creates a snapshot iterator for testing that hands its connection's cleanup. func createTestSnapshotIterator(ctx context.Context, t *testing.T, - pool *pgxpool.Pool, snapshotName string, table string, columns []string, - key string) *SnapshotIterator { + pool *pgxpool.Pool, cfg SnapshotConfig) *SnapshotIterator { is := is.New(t) + conn, err := pool.Acquire(ctx) is.NoErr(err) s, err := NewSnapshotIterator(context.Background(), conn.Conn(), SnapshotConfig{ - SnapshotName: snapshotName, - Table: table, - Columns: columns, - KeyColumn: key, + SnapshotName: cfg.SnapshotName, + Table: cfg.Table, + Columns: cfg.Columns, + KeyColumn: cfg.KeyColumn, }) is.NoErr(err) - t.Cleanup(func() { conn.Release() }) + t.Cleanup(conn.Release) return s }