Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions source/logrepl/internal/relationset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,28 @@ func TestRelationSetAllTypes(t *testing.T) {
_, messages := setupSubscription(ctx, t, conn.Config().Config, table)
insertRowAllTypes(ctx, t, conn, table)

<-messages // skip first message, it's a begin message
rel := (<-messages).(*pglogrepl.RelationMessage) // second message is a relation
ins := (<-messages).(*pglogrepl.InsertMessage) // third one is the insert
<-messages // fourth one is the commit
var rel *pglogrepl.RelationMessage
var ins *pglogrepl.InsertMessage

// loop through messages and allow for empty transactions to be received
for {
// first message needs to be a begin message
msg := <-messages
_ = msg.(*pglogrepl.BeginMessage)

// second message can be either commit (we can catch empty transactions)
// or relation (that's what we are actually interested in)
msg = <-messages
if _, ok := msg.(*pglogrepl.CommitMessage); ok {
continue // empty transaction, skip it
}

// not an empty transaction, these have to be the messages we are looking for
rel = msg.(*pglogrepl.RelationMessage) // second message is a relation
ins = (<-messages).(*pglogrepl.InsertMessage) // third one is the insert
_ = (<-messages).(*pglogrepl.CommitMessage) // fourth one is the commit
break
}

rs := NewRelationSet(conn.ConnInfo())

Expand Down