diff --git a/source/logrepl/internal/relationset_test.go b/source/logrepl/internal/relationset_test.go index 924c0edc..806ddf52 100644 --- a/source/logrepl/internal/relationset_test.go +++ b/source/logrepl/internal/relationset_test.go @@ -54,10 +54,28 @@ func TestRelationSetAllTypes(t *testing.T) { _, messages := setupSubscription(ctx, t, conn.Config().Config, table) insertRowAllTypes(ctx, t, conn, table) - <-messages // skip first message, it's a begin message - rel := (<-messages).(*pglogrepl.RelationMessage) // second message is a relation - ins := (<-messages).(*pglogrepl.InsertMessage) // third one is the insert - <-messages // fourth one is the commit + var rel *pglogrepl.RelationMessage + var ins *pglogrepl.InsertMessage + + // loop through messages and allow for empty transactions to be received + for { + // first message needs to be a begin message + msg := <-messages + _ = msg.(*pglogrepl.BeginMessage) + + // second message can be either commit (we can catch empty transactions) + // or relation (that's what we are actually interested in) + msg = <-messages + if _, ok := msg.(*pglogrepl.CommitMessage); ok { + continue // empty transaction, skip it + } + + // not an empty transaction, these have to be the messages we are looking for + rel = msg.(*pglogrepl.RelationMessage) // second message is a relation + ins = (<-messages).(*pglogrepl.InsertMessage) // third one is the insert + _ = (<-messages).(*pglogrepl.CommitMessage) // fourth one is the commit + break + } rs := NewRelationSet(conn.ConnInfo())