Skip to content

Commit

Permalink
VReplication: Ensure ROW events are sent within a transaction (vitess…
Browse files Browse the repository at this point in the history
…io#13547) (vitessio#13580)

Signed-off-by: Arthur Schreiber <arthurschreiber@github.com>
Co-authored-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
2 people authored and arthurschreiber committed Jul 26, 2023
1 parent 9f9b77f commit 9e24435
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 12 deletions.
12 changes: 12 additions & 0 deletions go/vt/vtgate/endtoend/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ create table t1(
primary key(id1)
) Engine=InnoDB;
create table t1_copy_basic(
id1 bigint,
id2 bigint,
primary key(id1)
) Engine=InnoDB;
create table t1_id2_idx(
id2 bigint,
keyspace_id varbinary(10),
Expand Down Expand Up @@ -134,6 +140,12 @@ create table t1_sharded(
Name: "t1_id2_vdx",
}},
},
"t1_copy_basic": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id1",
Name: "hash",
}},
},
"t1_sharded": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id1",
Expand Down
165 changes: 160 additions & 5 deletions go/vt/vtgate/endtoend/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"regexp"
"sync"
"testing"

Expand Down Expand Up @@ -98,18 +99,29 @@ func TestVStream(t *testing.T) {
// In a real world scenario where every mysql instance hosts only one
// keyspace/shard, we should expect only a single event.
// The events could come in any order as the scatter insert runs in parallel.
emptyEventSkipped := false
for i := 0; i < 2; i++ {
events, err := reader.Recv()
if err != nil {
t.Fatal(err)
}
fmt.Printf("events: %v\n", events)
// An empty transaction has three events: begin, gtid and commit.
if len(events) == 3 && !emptyEventSkipped {
emptyEventSkipped = true
continue

// An empty transaction has either:
// - three events: begin, vgtid and commit.
// - two events: vgtid and other
if len(events) == 3 {
if events[0].Type == binlogdatapb.VEventType_BEGIN &&
events[1].Type == binlogdatapb.VEventType_VGTID &&
events[2].Type == binlogdatapb.VEventType_COMMIT {
continue
}
} else if len(events) == 2 {
if events[0].Type == binlogdatapb.VEventType_VGTID &&
events[1].Type == binlogdatapb.VEventType_OTHER {
continue
}
}

if len(events) != 5 {
t.Errorf("Unexpected event length: %v", events)
continue
Expand Down Expand Up @@ -381,6 +393,149 @@ func TestVStreamSharded(t *testing.T) {

}

// TestVStreamCopyTransactions tests that we are properly wrapping
// ROW events in the stream with BEGIN and COMMIT events.
func TestVStreamCopyTransactions(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
keyspace := "ks"
shards := []string{"-80", "80-"}
table := "t1_copy_basic"
beginEventSeen, commitEventSeen := false, false
numResultInTrx := 0
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{
{
Keyspace: keyspace,
Shard: shards[0],
Gtid: "", // Start a vstream copy
},
{
Keyspace: keyspace,
Shard: shards[1],
Gtid: "", // Start a vstream copy
},
},
}
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: table,
Filter: fmt.Sprintf("select * from %s", table),
}},
}

gconn, conn, _, closeConnections := initialize(ctx, t)
defer closeConnections()

// Clear any existing data.
q := fmt.Sprintf("delete from %s", table)
_, err := conn.ExecuteFetch(q, -1, false)
require.NoError(t, err, "error clearing data: %v", err)

// Generate some test data. Enough to cross the default
// vstream_packet_size threshold.
for i := 1; i <= 100000; i++ {
values := fmt.Sprintf("(%d, %d)", i, i)
q := fmt.Sprintf("insert into %s (id1, id2) values %s", table, values)
_, err := conn.ExecuteFetch(q, 1, false)
require.NoError(t, err, "error inserting data: %v", err)
}

// Start a vstream.
reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, nil)
require.NoError(t, err, "error starting vstream: %v", err)

recvLoop:
for {
finished := true

vevents, err := reader.Recv()
numResultInTrx++
eventCount := len(vevents)
t.Logf("------------------ Received %d events in response #%d for the transaction ------------------\n",
eventCount, numResultInTrx)
switch err {
case nil:

for _, event := range vevents {
switch event.Type {
case binlogdatapb.VEventType_BEGIN:
require.False(t, beginEventSeen, "received a second BEGIN event within the transaction: numResultInTrx=%d\n",
numResultInTrx)
beginEventSeen = true
t.Logf("Found BEGIN event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d\n",
beginEventSeen, commitEventSeen, event.Type, numResultInTrx)
require.False(t, commitEventSeen, "received a BEGIN event when expecting a COMMIT event: numResultInTrx=%d\n",
numResultInTrx)
case binlogdatapb.VEventType_VGTID:
t.Logf("Found VGTID event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n",
beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event)

finished = true
for _, shardGtid := range event.Vgtid.ShardGtids {
finished = finished && len(shardGtid.TablePKs) == 0
}

case binlogdatapb.VEventType_FIELD:
t.Logf("Found FIELD event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n",
beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event)
case binlogdatapb.VEventType_ROW:
// Uncomment if you need to do more debugging.
// t.Logf("Found ROW event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n",
// beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event)

case binlogdatapb.VEventType_COMMIT:
commitEventSeen = true
t.Logf("Found COMMIT event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n",
beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event)
require.True(t, beginEventSeen, "received COMMIT event before receiving BEGIN event: numResultInTrx=%d\n",
numResultInTrx)

if finished {
t.Logf("Finished vstream copy\n")
t.Logf("-------------------------------------------------------------------\n\n")
cancel()
break recvLoop
}

default:
t.Logf("Found extraneous event: %+v\n", event)
}
if beginEventSeen && commitEventSeen {
t.Logf("Received both BEGIN and COMMIT, so resetting transactional state\n")
beginEventSeen = false
commitEventSeen = false
numResultInTrx = 0
}
}
case io.EOF:
t.Logf("vstream ended\n")
t.Logf("-------------------------------------------------------------------\n\n")
cancel()
return
default:
require.FailNowf(t, "unexpected error", "encountered error in vstream: %v", err)
return
}
}
// The last response, when the vstream copy completes, does not
// typically contain ROW events.
if beginEventSeen || commitEventSeen {
require.True(t, (beginEventSeen && commitEventSeen), "did not receive both BEGIN and COMMIT events in the final ROW event set")
}
}

func removeAnyDeprecatedDisplayWidths(orig string) string {
var adjusted string
baseIntType := "int"
intRE := regexp.MustCompile(`(?i)int\(([0-9]*)?\)`)
adjusted = intRE.ReplaceAllString(orig, baseIntType)
baseYearType := "year"
yearRE := regexp.MustCompile(`(?i)year\(([0-9]*)?\)`)
adjusted = yearRE.ReplaceAllString(adjusted, baseYearType)
return adjusted
}

var printMu sync.Mutex

func printEvents(evs []*binlogdatapb.VEvent) {
Expand Down
16 changes: 16 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,26 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error {
log.Infof("sendFieldEvent returned error %v", err)
return err
}
// sendFieldEvent() sends a BEGIN event first.
uvs.inTransaction = true
}

if len(rows.Rows) == 0 {
log.V(2).Infof("0 rows returned for table %s", tableName)
return nil
}

// We are about to send ROW events, so we need to ensure
// that we do so within a transaction. The COMMIT event
// will be sent in sendEventsForRows() below.
if !uvs.inTransaction {
evs := []*binlogdatapb.VEvent{{
Type: binlogdatapb.VEventType_BEGIN,
}}
uvs.send(evs)
uvs.inTransaction = true
}

newLastPK = sqltypes.CustomProto3ToResult(uvs.pkfields, &querypb.QueryResult{
Fields: rows.Fields,
Rows: []*querypb.Row{rows.Lastpk},
Expand All @@ -271,6 +285,8 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error {
log.Infof("sendEventsForRows returned error %v", err)
return err
}
// sendEventsForRows() sends a COMMIT event last.
uvs.inTransaction = false

uvs.setCopyState(tableName, qrLastPK)
log.V(2).Infof("NewLastPK: %v", qrLastPK)
Expand Down
18 changes: 11 additions & 7 deletions go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,17 @@ type uvstreamer struct {
cancel func()

// input parameters
vse *Engine
send func([]*binlogdatapb.VEvent) error
cp dbconfigs.Connector
se *schema.Engine
startPos string
filter *binlogdatapb.Filter
inTablePKs []*binlogdatapb.TableLastPK
vse *Engine
send func([]*binlogdatapb.VEvent) error
cp dbconfigs.Connector
se *schema.Engine
startPos string
// Are we currently in an explicit transaction?
// If we are not, and we're about to send ROW
// events, then we need to send a BEGIN event first.
inTransaction bool
filter *binlogdatapb.Filter
inTablePKs []*binlogdatapb.TableLastPK

vschema *localVSchema

Expand Down

0 comments on commit 9e24435

Please sign in to comment.