Skip to content

Commit

Permalink
Merge pull request #320 from dolthub/fulghum/com_binlog_prototype
Browse files Browse the repository at this point in the history
Updates for binlog primary protocol
  • Loading branch information
fulghum committed Mar 25, 2024
2 parents ad95f0d + f47cd63 commit 65f4774
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 9 deletions.
18 changes: 13 additions & 5 deletions go/mysql/binlog_dump.go
Expand Up @@ -78,11 +78,19 @@ func (c *Conn) parseComBinlogDumpGTID(data []byte) (logFile string, logPos uint6
if !ok {
return logFile, logPos, position, readPacketErr
}
if gtid := string(data[pos : pos+int(dataSize)]); gtid != "" {
position, err = DecodePosition(gtid)
if err != nil {
return logFile, logPos, position, err
}

gtidBytes := data[pos : pos+int(dataSize)]

// NOTE: A MySQL 8.0 replica sends the GTID set as binary data, not as a human-readable string.
// The main Vitess codebase was parsing a human-readable string and then using DecodePosition
// to parse it, but that doesn't seem to actually work with real MySQL replicas, so we
// diverge here from their implementation.
gtidSet, err := NewMysql56GTIDSetFromSIDBlock(gtidBytes)
if err != nil {
return logFile, logPos, position, err
}
position = Position{
GTIDSet: gtidSet,
}
}

Expand Down
27 changes: 27 additions & 0 deletions go/mysql/binlog_event.go
Expand Up @@ -202,6 +202,12 @@ type TableMap struct {
Metadata []uint16
}

// String implements the Stringer interface
func (t *TableMap) String() string {
return fmt.Sprintf("{Flags: %v, Database: %q, Name: %q, Types: %v, CanBeNull: %v, Metadata: %v}",
t.Flags, t.Database, t.Name, t.Types, t.CanBeNull, t.Metadata)
}

// Rows contains data from a {WRITE,UPDATE,DELETE}_ROWS_EVENT.
type Rows struct {
// Flags has the flags from the event.
Expand All @@ -222,6 +228,21 @@ type Rows struct {
Rows []Row
}

// String implements the Stringer interface
func (r *Rows) String() string {
rows := "[]"
if r.Rows != nil {
rows = "["
for _, row := range r.Rows {
rows += row.String() + ", "
}
rows += "]"
}

return fmt.Sprintf("{Flags: %v, IdentifyColumns: %v, DataColumns: %v, Rows: %s}",
r.Flags, r.IdentifyColumns, r.DataColumns, rows)
}

// Row contains data for a single Row in a Rows event.
type Row struct {
// NullIdentifyColumns describes which of the identify columns are NULL.
Expand All @@ -241,6 +262,12 @@ type Row struct {
Data []byte
}

// String implements the Stringer interface
func (r *Row) String() string {
return fmt.Sprintf("{NullIdentifyColumns: %v, NullColumns: %v, Identify: %v, Data: %v}",
r.NullIdentifyColumns, r.NullColumns, r.Identify, r.Data)
}

// Bitmap is used by the previous structures.
type Bitmap struct {
// data is the slice this is based on.
Expand Down
30 changes: 30 additions & 0 deletions go/mysql/binlog_event_make.go
Expand Up @@ -322,6 +322,36 @@ func NewMariaDBGTIDEvent(f BinlogFormat, s *FakeBinlogStream, gtid MariadbGTID,
return NewMariadbBinlogEvent(ev)
}

// NewMySQLGTIDEvent returns a MySQL specific GTID event.
func NewMySQLGTIDEvent(f BinlogFormat, s *FakeBinlogStream, gtid Mysql56GTID, hasBegin bool) BinlogEvent {
length := 1 + // flags
16 + // SID (server UUID)
8 // GNO (sequence number, signed int)
data := make([]byte, length)

// flags
data[0] = 0

// SID (server UUID)
sid := gtid.Server
copy(data[1:17], sid[:])

// GNO (sequence number, signed int)
sequence := gtid.Sequence
binary.LittleEndian.PutUint64(data[17:25], uint64(sequence))

const FLStandalone = 1
var flags2 byte
if !hasBegin {
flags2 |= FLStandalone
}
data[0] = flags2

ev := s.Packetize(f, eGTIDEvent, 0, data)
return NewMysql56BinlogEvent(ev)
}


// NewTableMapEvent returns a TableMap event.
// Only works with post_header_length=8.
func NewTableMapEvent(f BinlogFormat, s *FakeBinlogStream, tableID uint64, tm *TableMap) BinlogEvent {
Expand Down
53 changes: 49 additions & 4 deletions go/mysql/conn.go
Expand Up @@ -266,6 +266,25 @@ func (c *Conn) startWriterBuffering() {
c.bufferedWriter.Reset(c.Conn)
}

// FlushBuffer flushes the buffered writer used by this connection, if one is currently in use. If no
// buffering is currently in use, this method is a no-op. Our fork of Vitess typically handles flushing
// buffers in a defer function, so callers generally don't need to manually flush the connection's
// buffer. The exception is for the COM_DUMP_BINLOG_GTID command – this command leaves the connection
// for the server to continue pushing events over, and the defer function set by the connection handling
// code won't get called until the stream is closed, which could be hours or days later.
//
// TODO: The latest Vitess code uses a flush timer that periodically flushes the buffer. We should
// switch over to that since it's a cleaner solution and could potentially benefit other commands
// as well, but it's a more invasive change, so we're starting with simply allowing the caller to
// explicitly flush the buffer.
func (c *Conn) FlushBuffer() error {
if c.bufferedWriter == nil {
return nil
}

return c.bufferedWriter.Flush()
}

// flush flushes the written data to the socket.
// This must be called to terminate startBuffering.
func (c *Conn) flush() error {
Expand Down Expand Up @@ -1367,10 +1386,11 @@ func (c *Conn) handleNextCommand(handler Handler) error {
return nil

case ComRegisterReplica:
// TODO: Seems like we probably need this command implemented, too, but it hasn't been needed
// yet in a simple Vitess <-> Vitess replication test, so skipping for now.
//return c.handleComRegisterReplica(handler, data)
return fmt.Errorf("ComRegisterReplica not implemented")
ok := c.handleComRegisterReplica(handler, data)
if !ok {
return fmt.Errorf("error handling ComRegisterReplica packet: %v", data)
}
return nil

default:
log.Errorf("Got unhandled packet (default) from %s, returning error: %v", c, data)
Expand All @@ -1384,6 +1404,31 @@ func (c *Conn) handleNextCommand(handler Handler) error {
return nil
}

func (c *Conn) handleComRegisterReplica(handler Handler, data []byte) (kontinue bool) {
binlogReplicaHandler, ok := handler.(BinlogReplicaHandler)
if !ok {
log.Warningf("received COM_REGISTER_REPLICA command, but handler does not implement BinlogReplicaHandler")
return true
}

c.recycleReadPacket()

replicaHost, replicaPort, replicaUser, replicaPassword, err := c.parseComRegisterReplica(data)
if err != nil {
log.Errorf("conn %v: parseComRegisterReplica failed: %v", c.ID(), err)
return false
}
if err := binlogReplicaHandler.ComRegisterReplica(c, replicaHost, replicaPort, replicaUser, replicaPassword); err != nil {
c.writeErrorPacketFromError(err)
return false
}

if err := c.writeOKPacket(0, 0, c.StatusFlags, 0); err != nil {
c.writeErrorPacketFromError(err)
}
return true
}

func (c *Conn) handleComBinlogDumpGTID(handler Handler, data []byte) (kontinue bool) {
binlogReplicaHandler, ok := handler.(BinlogReplicaHandler)
if !ok {
Expand Down
5 changes: 5 additions & 0 deletions go/mysql/encoding.go
Expand Up @@ -188,6 +188,11 @@ func readEOFString(data []byte, pos int) (string, int, bool) {
return string(data[pos:]), len(data) - pos, true
}

func readUint8(data []byte, pos int) (uint8, int, bool) {
b, pos, ok := readByte(data, pos)
return uint8(b), pos, ok
}

func readUint16(data []byte, pos int) (uint16, int, bool) {
if pos+1 >= len(data) {
return 0, 0, false
Expand Down
72 changes: 72 additions & 0 deletions go/mysql/register-replica.go
@@ -0,0 +1,72 @@
/*
Copyright 2022 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package mysql

import (
vtrpcpb "github.com/dolthub/vitess/go/vt/proto/vtrpc"
"github.com/dolthub/vitess/go/vt/vterrors"
)

var (
comRegisterReplicaPacketErr = vterrors.Errorf(vtrpcpb.Code_INTERNAL, "error reading BinlogDumpGTID packet")
)

func (c *Conn) parseComRegisterReplica(data []byte) (
replicaHost string,
replicaPort uint16,
replicaUser string,
replicaPassword string,
err error,
) {
pos := 1
pos += 4 // server-id

// hostname
hostnameLen, pos, ok := readUint8(data, pos)
if !ok {
return replicaHost, replicaPort, replicaUser, replicaPassword, comRegisterReplicaPacketErr
}
replicaHost = string(data[pos : pos+int(hostnameLen)])
pos += int(hostnameLen)

// username
usernameLen, pos, ok := readUint8(data, pos)
if !ok {
return replicaHost, replicaPort, replicaUser, replicaPassword, comRegisterReplicaPacketErr
}
replicaUser = string(data[pos : pos+int(usernameLen)])
pos += int(usernameLen)

// password
passwordLen, pos, ok := readUint8(data, pos)
if !ok {
return replicaHost, replicaPort, replicaUser, replicaPassword, comRegisterReplicaPacketErr
}
replicaPassword = string(data[pos : pos+int(passwordLen)])
pos += int(passwordLen)

// port
replicaPort, _, ok = readUint16(data, pos)
if !ok {
return replicaHost, replicaPort, replicaUser, replicaPassword, comRegisterReplicaPacketErr
}
// remaining: (commented because of ineffectual assignment)
// pos += 4 // replication rank
// pos += 4 // master-id

return replicaHost, replicaPort, replicaUser, replicaPassword, nil
}

0 comments on commit 65f4774

Please sign in to comment.