Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
53be2c2
snapshot iterator
dylanlott Apr 1, 2022
ee5ea8b
adds lifecycle test for proper cleanup
dylanlott Apr 5, 2022
7aa9acd
linter fixes
dylanlott Apr 5, 2022
17517bf
Merge branch 'main' of github.com:ConduitIO/conduit-connector-postgre…
dylanlott Apr 5, 2022
4242433
refactors tx handling to use pgx.Tx
dylanlott Apr 5, 2022
a549d9f
use is instead of i for package reference
dylanlott Apr 5, 2022
3292e8c
clean up and review feedback
dylanlott Apr 6, 2022
3f20c27
includes table name in snapshot positions
dylanlott Apr 6, 2022
0c597bf
use pooled connection in tests
dylanlott Apr 6, 2022
c12791e
returns snapshot incomplete error when snapshot is interrupted
dylanlott Apr 6, 2022
c9c958d
linter fix
dylanlott Apr 6, 2022
9c97b1b
refactors the snapshot iterator setup in tests
dylanlott Apr 6, 2022
f17f690
adds full iteration test
dylanlott Apr 6, 2022
8173c48
WIP iteration tests
dylanlott Apr 7, 2022
b9e2a1d
checks for context errors in Next
dylanlott Apr 7, 2022
21c8b05
Merge branch 'dylan/snapshot-iterator' of github.com:ConduitIO/condui…
dylanlott Apr 7, 2022
659faf9
adds log or return behavior to Teardown
dylanlott Apr 7, 2022
896e5c6
Merge branch 'dylan/teardown' of github.com:ConduitIO/conduit-connect…
dylanlott Apr 7, 2022
f60638a
update comment
dylanlott Apr 7, 2022
19ca9e0
handle rollback error in loadRows
dylanlott Apr 7, 2022
27ba7c9
Merge branch 'dylan/snapshot-iterator' into dylan/teardown
dylanlott Apr 7, 2022
784bc9e
Merge branch 'dylan/teardown' into dylan/test-iteration
dylanlott Apr 7, 2022
4c9630a
use query row instead of query in test setup
dylanlott Apr 7, 2022
b46f343
returns snapshot incomplete error when snapshot is interrupted
dylanlott Apr 6, 2022
17902c3
linter fix
dylanlott Apr 6, 2022
b544e42
adds log or return behavior to Teardown
dylanlott Apr 7, 2022
ab42b0b
Merge branch 'dylan/teardown' of github.com:ConduitIO/conduit-connect…
dylanlott Apr 7, 2022
8cfa075
linter fixes
dylanlott Apr 7, 2022
7287006
remove unnecessary log
dylanlott Apr 7, 2022
c1add03
Merge branch 'main' of github.com:ConduitIO/conduit-connector-postgre…
dylanlott Apr 8, 2022
22e5c1a
use logOrReturn in Teardown
dylanlott Apr 8, 2022
2d3db93
Merge branch 'dylan/teardown' into dylan/test-iteration
dylanlott Apr 8, 2022
264aed8
Merge branch 'main' of github.com:ConduitIO/conduit-connector-postgre…
dylanlott Apr 8, 2022
3fe21b8
Merge branch 'main' of github.com:ConduitIO/conduit-connector-postgre…
dylanlott Apr 11, 2022
875780a
WIP refactoring createTestSnapshotIterator
dylanlott Apr 11, 2022
53f6d90
WIP refactoring createTestSnapshot
dylanlott Apr 11, 2022
ff935b5
refactor context, pool, table, and snaphost creation in logrepl/snaps…
dylanlott Apr 11, 2022
14c2c8b
linter fix
dylanlott Apr 11, 2022
f084548
refactor snapshot tests
dylanlott Apr 11, 2022
18125a4
updates tests and comments
dylanlott Apr 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 84 additions & 16 deletions source/logrepl/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,37 +24,64 @@ import (
"github.com/conduitio/conduit-connector-postgres/test"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/jackc/pgx/v4/pgxpool"

"github.com/matryer/is"
)

func TestLifecycle(t *testing.T) {
var (
columns = []string{"id", "key", "column1", "column2", "column3"}
key = "key"
)

func TestAtomicSnapshot(t *testing.T) {
is := is.New(t)
pool := test.ConnectPool(context.Background(), t, test.RegularConnString)
ctx := context.Background()

pool := test.ConnectPool(ctx, t, test.RegularConnString)
table := test.SetupTestTable(ctx, t, pool)
name := createTestSnapshot(t, pool)

conn, err := pool.Acquire(ctx)
is.NoErr(err)
s, err := NewSnapshotIterator(context.Background(), conn.Conn(), SnapshotConfig{
name := createTestSnapshot(ctx, t, pool)
s := createTestSnapshotIterator(ctx, t, pool, SnapshotConfig{
SnapshotName: name,
Table: table,
Columns: []string{"id", "key", "column1", "column2", "column3"},
KeyColumn: "key",
Columns: columns,
KeyColumn: key,
})
t.Cleanup(func() { is.NoErr(s.Teardown(ctx)) })

// add a record to our table after snapshot started
insertQuery := fmt.Sprintf(`INSERT INTO %s (id, column1, column2, column3)
VALUES (5, 'bizz', 456, false)`, table)
_, err := pool.Exec(ctx, insertQuery)
is.NoErr(err)
t.Cleanup(conn.Release)

// assert record does not appear in snapshot
for i := 0; i < 5; i++ {
r, err := s.Next(ctx)
if err != nil {
is.True(errors.Is(err, ErrSnapshotComplete))
is.Equal(r, sdk.Record{})
}
}
}

func TestSnapshotInterrupted(t *testing.T) {
is := is.New(t)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sidenote: watch out, using the same is instance in subtests means that once the first subtest fails the whole test will stop and no other subtests will even be run. It would be better to call is.New for each subtest. That said, this won't be a problem if we split the subtests into separate tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's good to know 👍 Thanks!

pool := test.ConnectPool(context.Background(), t, test.RegularConnString)
ctx := context.Background()
table := test.SetupTestTable(ctx, t, pool)
name := createTestSnapshot(ctx, t, pool)
s := createTestSnapshotIterator(ctx, t, pool, SnapshotConfig{
SnapshotName: name,
Table: table,
Columns: columns,
KeyColumn: key,
})
now := time.Now()

rec, err := s.Next(ctx)
is.NoErr(err)

is.True(rec.CreatedAt.After(now))
is.Equal(rec.Metadata["action"], "snapshot")
rec.CreatedAt = time.Time{} // reset time for comparison

is.Equal(rec, sdk.Record{
Position: sdk.Position(fmt.Sprintf("%s:0", table)),
Key: sdk.StructuredData{
Expand All @@ -74,15 +101,38 @@ func TestLifecycle(t *testing.T) {
is.True(errors.Is(s.Teardown(ctx), ErrSnapshotInterrupt))
}

func TestFullIteration(t *testing.T) {
is := is.New(t)
ctx := context.Background()
pool := test.ConnectPool(ctx, t, test.RegularConnString)
table := test.SetupTestTable(ctx, t, pool)
name := createTestSnapshot(ctx, t, pool)
s := createTestSnapshotIterator(ctx, t, pool, SnapshotConfig{
SnapshotName: name,
Table: table,
Columns: columns,
KeyColumn: key,
})

for i := 0; i < 4; i++ {
rec, err := s.Next(ctx)
is.Equal(rec.Position, sdk.Position(fmt.Sprintf("%s:%d", table, i)))
is.NoErr(err)
}

r, err := s.Next(ctx)
is.Equal(r, sdk.Record{})
is.True(errors.Is(err, ErrSnapshotComplete))
is.NoErr(s.Teardown(ctx))
}

// createTestSnapshot starts a transaction that stays open while a snapshot test
// runs. Otherwise, Postgres deletes the snapshot as soon as the transaction
// commits or rolls back, and our snapshot iterator won't find a snapshot with
// the specified name.
// https://www.postgresql.org/docs/current/sql-set-transaction.html
func createTestSnapshot(t *testing.T, pool *pgxpool.Pool) string {
ctx := context.Background()
func createTestSnapshot(ctx context.Context, t *testing.T, pool *pgxpool.Pool) string {
is := is.New(t)

conn, err := pool.Acquire(ctx)
is.NoErr(err)

Expand All @@ -103,3 +153,21 @@ func createTestSnapshot(t *testing.T, pool *pgxpool.Pool) string {

return name
}

// creates a snapshot iterator for testing that hands its connection's cleanup.
func createTestSnapshotIterator(ctx context.Context, t *testing.T,
pool *pgxpool.Pool, cfg SnapshotConfig) *SnapshotIterator {
is := is.New(t)

conn, err := pool.Acquire(ctx)
is.NoErr(err)
s, err := NewSnapshotIterator(context.Background(), conn.Conn(), SnapshotConfig{
SnapshotName: cfg.SnapshotName,
Table: cfg.Table,
Columns: cfg.Columns,
KeyColumn: cfg.KeyColumn,
})
is.NoErr(err)
t.Cleanup(conn.Release)
return s
}
4 changes: 3 additions & 1 deletion test/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ func SetupTestTable(ctx context.Context, t *testing.T, conn Querier) string {
}

func RandomIdentifier(t *testing.T) string {
return fmt.Sprintf("conduit_%v_%d", strings.ToLower(t.Name()), time.Now().UnixMicro()%1000)
return fmt.Sprintf("conduit_%v_%d",
strings.ReplaceAll(strings.ToLower(t.Name()), "/", "_"),
time.Now().UnixMicro()%1000)
}

func IsPgError(is *is.I, err error, wantCode string) {
Expand Down