Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 30 additions & 4 deletions source/logrepl/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package logrepl
import (
"context"
"database/sql"
"errors"
"fmt"
"strconv"
"time"
Expand All @@ -29,6 +30,12 @@ import (
"github.com/jackc/pgx/v4"
)

// ErrSnapshotComplete is returned by Next when a snapshot is finished
var ErrSnapshotComplete = errors.New("snapshot complete")

// ErrSnapshotInterrupt is returned by Teardown when a snapshot is interrupted
var ErrSnapshotInterrupt = errors.New("snapshot interrupted")

var psql = sq.StatementBuilder.PlaceholderFormat(sq.Dollar)

const actionSnapshot string = "snapshot"
Expand All @@ -46,6 +53,7 @@ type SnapshotIterator struct {
tx pgx.Tx
rows pgx.Rows

complete bool
internalPos int64
}

Expand Down Expand Up @@ -120,8 +128,8 @@ func (s *SnapshotIterator) Next(ctx context.Context) (sdk.Record, error) {
if err := s.rows.Err(); err != nil {
return sdk.Record{}, fmt.Errorf("rows error: %w", err)
}

return sdk.Record{}, sdk.ErrBackoffRetry
s.complete = true
return sdk.Record{}, ErrSnapshotComplete
}

return s.buildRecord(ctx)
Expand All @@ -135,10 +143,19 @@ func (s *SnapshotIterator) Ack(ctx context.Context, pos sdk.Position) error {
// Teardown attempts to gracefully teardown the iterator.
func (s *SnapshotIterator) Teardown(ctx context.Context) error {
s.rows.Close()
var err error
if commitErr := s.tx.Commit(ctx); commitErr != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be a dumb question, but why do we call s.tx.Commit() here at all?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simply to end the transaction.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I think I need a break.:D I was thinking this is the source teardown, but it's the iterator teardown, and actually a different thing in mind because of that.

sdk.Logger(ctx).Err(commitErr).Msg("teardown commit failed")
err = logOrReturnError(ctx, err, commitErr, "teardown commit failed")
}
if rowsErr := s.rows.Err(); rowsErr != nil {
err = logOrReturnError(ctx, err, rowsErr, "rows returned an error")
}

if !s.complete {
err = logOrReturnError(ctx, err, ErrSnapshotInterrupt, "snapshot interrupted")
}
return s.rows.Err()

return err
}

func (s *SnapshotIterator) buildRecord(ctx context.Context) (sdk.Record, error) {
Expand Down Expand Up @@ -216,3 +233,12 @@ func oidToScannerValue(oid pgtype.OID) scannerValue {
}
return t
}

// logOrReturn
func logOrReturnError(ctx context.Context, oldErr, newErr error, msg string) error {
if oldErr == nil {
return fmt.Errorf(msg+": %w", newErr)
}
sdk.Logger(ctx).Err(newErr).Msg(msg)
return oldErr
}
3 changes: 2 additions & 1 deletion source/logrepl/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package logrepl

import (
"context"
"errors"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -70,7 +71,7 @@ func TestLifecycle(t *testing.T) {
"table": table,
},
})
is.NoErr(s.Teardown(ctx)) // TODO: Should return an error
is.True(errors.Is(s.Teardown(ctx), ErrSnapshotInterrupt))
}

// createTestSnapshot starts a transaction that stays open while a snapshot test
Expand Down