Skip to content

Commit be413cf

Browse files
authored
Add MySQL 8.4 Support (#1494)
* WIP: MySQL 8.4 support * Add go-version for doing version comparisons * docker-gh-ost-replica-tests: Support MySQL 8.4+ * localtests: Support MySQL 8.4 * Remove mysql-8.4.3 from replica-tests GHA since dbdeployer / ci env does not have 8.4.3 * MySQL 8.4: Actually use caching_sha2_password pw strategy * Commit up vendor/github.com/hashicorp/go-version * Add GHA job for docker-gh-ost-replica-tests * localtests/test.sh: Fix conditional bug and replica_terminology typo
1 parent ad5d3ea commit be413cf

19 files changed

+1437
-55
lines changed

Diff for: .github/workflows/replica-tests.yml

+25
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ jobs:
77

88
runs-on: ubuntu-20.04
99
strategy:
10+
fail-fast: false
1011
matrix:
1112
version: [mysql-5.7.25,mysql-8.0.16,PerconaServer-8.0.21]
1213

@@ -22,3 +23,27 @@ jobs:
2223
env:
2324
TEST_MYSQL_VERSION: ${{ matrix.version }}
2425
run: script/cibuild-gh-ost-replica-tests
26+
27+
docker-tests:
28+
runs-on: ubuntu-22.04
29+
strategy:
30+
fail-fast: false
31+
matrix:
32+
image: ['mysql:8.4.3']
33+
env:
34+
TEST_MYSQL_IMAGE: ${{ matrix.image }}
35+
36+
steps:
37+
- uses: actions/checkout@v4
38+
39+
- name: Setup environment
40+
run: script/docker-gh-ost-replica-tests up
41+
42+
- name: Run tests
43+
run: script/docker-gh-ost-replica-tests run
44+
45+
- name: Teardown environment
46+
if: always()
47+
run: script/docker-gh-ost-replica-tests down
48+
49+

Diff for: go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ require (
3636
github.com/go-logr/stdr v1.2.2 // indirect
3737
github.com/go-ole/go-ole v1.2.6 // indirect
3838
github.com/gogo/protobuf v1.3.2 // indirect
39+
github.com/hashicorp/go-version v1.7.0 // indirect
3940
github.com/klauspost/compress v1.17.4 // indirect
4041
github.com/kr/text v0.2.0 // indirect
4142
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect

Diff for: go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
6565
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
6666
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms=
6767
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg=
68+
github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY=
69+
github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
6870
github.com/jmoiron/sqlx v1.3.3/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ=
6971
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
7072
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=

Diff for: go/logic/applier.go

+9-5
Original file line numberDiff line numberDiff line change
@@ -829,7 +829,8 @@ func (this *Applier) RenameTablesRollback() (renameError error) {
829829
// We need to keep the SQL thread active so as to complete processing received events,
830830
// and have them written to the binary log, so that we can then read them via streamer.
831831
func (this *Applier) StopSlaveIOThread() error {
832-
query := `stop /* gh-ost */ slave io_thread`
832+
replicaTerm := mysql.ReplicaTermFor(this.migrationContext.ApplierMySQLVersion, `slave`)
833+
query := fmt.Sprintf("stop /* gh-ost */ %s io_thread", replicaTerm)
833834
this.migrationContext.Log.Infof("Stopping replication IO thread")
834835
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
835836
return err
@@ -840,7 +841,8 @@ func (this *Applier) StopSlaveIOThread() error {
840841

841842
// StartSlaveIOThread is applicable with --test-on-replica
842843
func (this *Applier) StartSlaveIOThread() error {
843-
query := `start /* gh-ost */ slave io_thread`
844+
replicaTerm := mysql.ReplicaTermFor(this.migrationContext.ApplierMySQLVersion, `slave`)
845+
query := fmt.Sprintf("start /* gh-ost */ %s io_thread", replicaTerm)
844846
this.migrationContext.Log.Infof("Starting replication IO thread")
845847
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
846848
return err
@@ -851,7 +853,8 @@ func (this *Applier) StartSlaveIOThread() error {
851853

852854
// StopSlaveSQLThread is applicable with --test-on-replica
853855
func (this *Applier) StopSlaveSQLThread() error {
854-
query := `stop /* gh-ost */ slave sql_thread`
856+
replicaTerm := mysql.ReplicaTermFor(this.migrationContext.ApplierMySQLVersion, `slave`)
857+
query := fmt.Sprintf("stop /* gh-ost */ %s sql_thread", replicaTerm)
855858
this.migrationContext.Log.Infof("Verifying SQL thread is stopped")
856859
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
857860
return err
@@ -862,7 +865,8 @@ func (this *Applier) StopSlaveSQLThread() error {
862865

863866
// StartSlaveSQLThread is applicable with --test-on-replica
864867
func (this *Applier) StartSlaveSQLThread() error {
865-
query := `start /* gh-ost */ slave sql_thread`
868+
replicaTerm := mysql.ReplicaTermFor(this.migrationContext.ApplierMySQLVersion, `slave`)
869+
query := fmt.Sprintf("start /* gh-ost */ %s sql_thread", replicaTerm)
866870
this.migrationContext.Log.Infof("Verifying SQL thread is running")
867871
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
868872
return err
@@ -880,7 +884,7 @@ func (this *Applier) StopReplication() error {
880884
return err
881885
}
882886

883-
readBinlogCoordinates, executeBinlogCoordinates, err := mysql.GetReplicationBinlogCoordinates(this.db)
887+
readBinlogCoordinates, executeBinlogCoordinates, err := mysql.GetReplicationBinlogCoordinates(this.migrationContext.ApplierMySQLVersion, this.db)
884888
if err != nil {
885889
return err
886890
}

Diff for: go/logic/inspect.go

+14-7
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const startReplicationMaxWait = 2 * time.Second
3030
type Inspector struct {
3131
connectionConfig *mysql.ConnectionConfig
3232
db *gosql.DB
33+
dbVersion string
3334
informationSchemaDb *gosql.DB
3435
migrationContext *base.MigrationContext
3536
name string
@@ -57,6 +58,8 @@ func (this *Inspector) InitDBConnections() (err error) {
5758
if err := this.validateConnection(); err != nil {
5859
return err
5960
}
61+
this.dbVersion = this.migrationContext.InspectorMySQLVersion
62+
6063
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL {
6164
if impliedKey, err := mysql.GetInstanceKey(this.db); err != nil {
6265
return err
@@ -288,15 +291,16 @@ func (this *Inspector) validateGrants() error {
288291
func (this *Inspector) restartReplication() error {
289292
this.migrationContext.Log.Infof("Restarting replication on %s to make sure binlog settings apply to replication thread", this.connectionConfig.Key.String())
290293

291-
masterKey, _ := mysql.GetMasterKeyFromSlaveStatus(this.connectionConfig)
294+
masterKey, _ := mysql.GetMasterKeyFromSlaveStatus(this.dbVersion, this.connectionConfig)
292295
if masterKey == nil {
293296
// This is not a replica
294297
return nil
295298
}
296299

297300
var stopError, startError error
298-
_, stopError = sqlutils.ExecNoPrepare(this.db, `stop slave`)
299-
_, startError = sqlutils.ExecNoPrepare(this.db, `start slave`)
301+
replicaTerm := mysql.ReplicaTermFor(this.dbVersion, `slave`)
302+
_, stopError = sqlutils.ExecNoPrepare(this.db, fmt.Sprintf("stop %s", replicaTerm))
303+
_, startError = sqlutils.ExecNoPrepare(this.db, fmt.Sprintf("start %s", replicaTerm))
300304
if stopError != nil {
301305
return stopError
302306
}
@@ -329,9 +333,11 @@ func (this *Inspector) restartReplication() error {
329333
// returns true if both are 'Yes', false otherwise
330334
func (this *Inspector) validateReplicationRestarted() (bool, error) {
331335
errNotRunning := fmt.Errorf("Replication not running on %s", this.connectionConfig.Key.String())
332-
query := `show /* gh-ost */ slave status`
336+
query := fmt.Sprintf("show /* gh-ost */ %s", mysql.ReplicaTermFor(this.dbVersion, "slave status"))
333337
err := sqlutils.QueryRowsMap(this.db, query, func(rowMap sqlutils.RowMap) error {
334-
if rowMap.GetString("Slave_IO_Running") != "Yes" || rowMap.GetString("Slave_SQL_Running") != "Yes" {
338+
ioRunningTerm := mysql.ReplicaTermFor(this.dbVersion, "Slave_IO_Running")
339+
sqlRunningTerm := mysql.ReplicaTermFor(this.dbVersion, "Slave_SQL_Running")
340+
if rowMap.GetString(ioRunningTerm) != "Yes" || rowMap.GetString(sqlRunningTerm) != "Yes" {
335341
return errNotRunning
336342
}
337343
return nil
@@ -389,7 +395,7 @@ func (this *Inspector) validateBinlogs() error {
389395
if !this.migrationContext.SwitchToRowBinlogFormat {
390396
return fmt.Errorf("You must be using ROW binlog format. I can switch it for you, provided --switch-to-rbr and that %s doesn't have replicas", this.connectionConfig.Key.String())
391397
}
392-
query := `show /* gh-ost */ slave hosts`
398+
query := fmt.Sprintf("show /* gh-ost */ %s", mysql.ReplicaTermFor(this.dbVersion, `slave hosts`))
393399
countReplicas := 0
394400
err := sqlutils.QueryRowsMap(this.db, query, func(rowMap sqlutils.RowMap) error {
395401
countReplicas++
@@ -864,11 +870,12 @@ func (this *Inspector) readChangelogState(hint string) (string, error) {
864870
func (this *Inspector) getMasterConnectionConfig() (applierConfig *mysql.ConnectionConfig, err error) {
865871
this.migrationContext.Log.Infof("Recursively searching for replication master")
866872
visitedKeys := mysql.NewInstanceKeyMap()
867-
return mysql.GetMasterConnectionConfigSafe(this.connectionConfig, visitedKeys, this.migrationContext.AllowedMasterMaster)
873+
return mysql.GetMasterConnectionConfigSafe(this.dbVersion, this.connectionConfig, visitedKeys, this.migrationContext.AllowedMasterMaster)
868874
}
869875

870876
func (this *Inspector) getReplicationLag() (replicationLag time.Duration, err error) {
871877
replicationLag, err = mysql.GetReplicationLagFromSlaveStatus(
878+
this.dbVersion,
872879
this.informationSchemaDb,
873880
)
874881
return replicationLag, err

Diff for: go/logic/streamer.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ const (
3636
type EventsStreamer struct {
3737
connectionConfig *mysql.ConnectionConfig
3838
db *gosql.DB
39+
dbVersion string
3940
migrationContext *base.MigrationContext
4041
initialBinlogCoordinates *mysql.BinlogCoordinates
4142
listeners [](*BinlogEventListener)
@@ -107,9 +108,11 @@ func (this *EventsStreamer) InitDBConnections() (err error) {
107108
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, EventsStreamerUri); err != nil {
108109
return err
109110
}
110-
if _, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext, this.name); err != nil {
111+
version, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext, this.name)
112+
if err != nil {
111113
return err
112114
}
115+
this.dbVersion = version
113116
if err := this.readCurrentBinlogCoordinates(); err != nil {
114117
return err
115118
}
@@ -140,7 +143,8 @@ func (this *EventsStreamer) GetReconnectBinlogCoordinates() *mysql.BinlogCoordin
140143

141144
// readCurrentBinlogCoordinates reads master status from hooked server
142145
func (this *EventsStreamer) readCurrentBinlogCoordinates() error {
143-
query := `show /* gh-ost readCurrentBinlogCoordinates */ master status`
146+
binaryLogStatusTerm := mysql.ReplicaTermFor(this.dbVersion, "master status")
147+
query := fmt.Sprintf("show /* gh-ost readCurrentBinlogCoordinates */ %s", binaryLogStatusTerm)
144148
foundMasterStatus := false
145149
err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
146150
this.initialBinlogCoordinates = &mysql.BinlogCoordinates{
@@ -155,7 +159,7 @@ func (this *EventsStreamer) readCurrentBinlogCoordinates() error {
155159
return err
156160
}
157161
if !foundMasterStatus {
158-
return fmt.Errorf("Got no results from SHOW MASTER STATUS. Bailing out")
162+
return fmt.Errorf("Got no results from SHOW %s. Bailing out", strings.ToUpper(binaryLogStatusTerm))
159163
}
160164
this.migrationContext.Log.Debugf("Streamer binlog coordinates: %+v", *this.initialBinlogCoordinates)
161165
return nil

Diff for: go/logic/throttler.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ func (this *Throttler) collectReplicationLag(firstThrottlingCollected chan<- boo
150150
// when running on replica, the heartbeat injection is also done on the replica.
151151
// This means we will always get a good heartbeat value.
152152
// When running on replica, we should instead check the `SHOW SLAVE STATUS` output.
153-
if lag, err := mysql.GetReplicationLagFromSlaveStatus(this.inspector.informationSchemaDb); err != nil {
153+
if lag, err := mysql.GetReplicationLagFromSlaveStatus(this.inspector.dbVersion, this.inspector.informationSchemaDb); err != nil {
154154
return this.migrationContext.Log.Errore(err)
155155
} else {
156156
atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag))

Diff for: go/mysql/replica_terminology_map.go

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package mysql
2+
3+
import (
4+
version "github.com/hashicorp/go-version"
5+
)
6+
7+
const (
8+
MysqlVersionCutoff = "8.4"
9+
)
10+
11+
var MysqlReplicaTermMap = map[string]string{
12+
"Seconds_Behind_Master": "Seconds_Behind_Source",
13+
"Master_Log_File": "Source_Log_File",
14+
"Master_Host": "Source_Host",
15+
"Master_Port": "Source_Port",
16+
"Exec_Master_Log_Pos": "Exec_Source_Log_Pos",
17+
"Read_Master_Log_Pos": "Read_Source_Log_Pos",
18+
"Relay_Master_Log_File": "Relay_Source_Log_File",
19+
"Slave_IO_Running": "Replica_IO_Running",
20+
"Slave_SQL_Running": "Replica_SQL_Running",
21+
"master status": "binary log status",
22+
"slave hosts": "replicas",
23+
"slave status": "replica status",
24+
"slave": "replica",
25+
}
26+
27+
func ReplicaTermFor(mysqlVersion string, term string) string {
28+
vs, err := version.NewVersion(mysqlVersion)
29+
if err != nil {
30+
// default to returning the same term if we cannot determine the version
31+
return term
32+
}
33+
34+
mysqlVersionCutoff, _ := version.NewVersion(MysqlVersionCutoff)
35+
if vs.GreaterThanOrEqual(mysqlVersionCutoff) {
36+
return MysqlReplicaTermMap[term]
37+
}
38+
return term
39+
}

Diff for: go/mysql/utils.go

+37-27
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,16 @@ func GetDB(migrationUuid string, mysql_uri string) (db *gosql.DB, exists bool, e
6161
}
6262

6363
// GetReplicationLagFromSlaveStatus returns replication lag for a given db; via SHOW SLAVE STATUS
64-
func GetReplicationLagFromSlaveStatus(informationSchemaDb *gosql.DB) (replicationLag time.Duration, err error) {
65-
err = sqlutils.QueryRowsMap(informationSchemaDb, `show slave status`, func(m sqlutils.RowMap) error {
66-
slaveIORunning := m.GetString("Slave_IO_Running")
67-
slaveSQLRunning := m.GetString("Slave_SQL_Running")
68-
secondsBehindMaster := m.GetNullInt64("Seconds_Behind_Master")
64+
func GetReplicationLagFromSlaveStatus(dbVersion string, informationSchemaDb *gosql.DB) (replicationLag time.Duration, err error) {
65+
showReplicaStatusQuery := fmt.Sprintf("show %s", ReplicaTermFor(dbVersion, `slave status`))
66+
err = sqlutils.QueryRowsMap(informationSchemaDb, showReplicaStatusQuery, func(m sqlutils.RowMap) error {
67+
ioRunningTerm := ReplicaTermFor(dbVersion, "Slave_IO_Running")
68+
sqlRunningTerm := ReplicaTermFor(dbVersion, "Slave_SQL_Running")
69+
slaveIORunning := m.GetString(ioRunningTerm)
70+
slaveSQLRunning := m.GetString(sqlRunningTerm)
71+
secondsBehindMaster := m.GetNullInt64(ReplicaTermFor(dbVersion, "Seconds_Behind_Master"))
6972
if !secondsBehindMaster.Valid {
70-
return fmt.Errorf("replication not running; Slave_IO_Running=%+v, Slave_SQL_Running=%+v", slaveIORunning, slaveSQLRunning)
73+
return fmt.Errorf("replication not running; %s=%+v, %s=%+v", ioRunningTerm, slaveIORunning, sqlRunningTerm, slaveSQLRunning)
7174
}
7275
replicationLag = time.Duration(secondsBehindMaster.Int64) * time.Second
7376
return nil
@@ -76,7 +79,7 @@ func GetReplicationLagFromSlaveStatus(informationSchemaDb *gosql.DB) (replicatio
7679
return replicationLag, err
7780
}
7881

79-
func GetMasterKeyFromSlaveStatus(connectionConfig *ConnectionConfig) (masterKey *InstanceKey, err error) {
82+
func GetMasterKeyFromSlaveStatus(dbVersion string, connectionConfig *ConnectionConfig) (masterKey *InstanceKey, err error) {
8083
currentUri := connectionConfig.GetDBUri("information_schema")
8184
// This function is only called once, okay to not have a cached connection pool
8285
db, err := gosql.Open("mysql", currentUri)
@@ -85,40 +88,45 @@ func GetMasterKeyFromSlaveStatus(connectionConfig *ConnectionConfig) (masterKey
8588
}
8689
defer db.Close()
8790

88-
err = sqlutils.QueryRowsMap(db, `show slave status`, func(rowMap sqlutils.RowMap) error {
91+
showReplicaStatusQuery := fmt.Sprintf("show %s", ReplicaTermFor(dbVersion, `slave status`))
92+
err = sqlutils.QueryRowsMap(db, showReplicaStatusQuery, func(rowMap sqlutils.RowMap) error {
8993
// We wish to recognize the case where the topology's master actually has replication configuration.
9094
// This can happen when a DBA issues a `RESET SLAVE` instead of `RESET SLAVE ALL`.
9195

9296
// An empty log file indicates this is a master:
93-
if rowMap.GetString("Master_Log_File") == "" {
97+
if rowMap.GetString(ReplicaTermFor(dbVersion, "Master_Log_File")) == "" {
9498
return nil
9599
}
96100

97-
slaveIORunning := rowMap.GetString("Slave_IO_Running")
98-
slaveSQLRunning := rowMap.GetString("Slave_SQL_Running")
101+
ioRunningTerm := ReplicaTermFor(dbVersion, "Slave_IO_Running")
102+
sqlRunningTerm := ReplicaTermFor(dbVersion, "Slave_SQL_Running")
103+
slaveIORunning := rowMap.GetString(ioRunningTerm)
104+
slaveSQLRunning := rowMap.GetString(sqlRunningTerm)
99105

100106
if slaveIORunning != "Yes" || slaveSQLRunning != "Yes" {
101-
return fmt.Errorf("Replication on %+v is broken: Slave_IO_Running: %s, Slave_SQL_Running: %s. Please make sure replication runs before using gh-ost.",
107+
return fmt.Errorf("Replication on %+v is broken: %s: %s, %s: %s. Please make sure replication runs before using gh-ost.",
102108
connectionConfig.Key,
109+
ioRunningTerm,
103110
slaveIORunning,
111+
sqlRunningTerm,
104112
slaveSQLRunning,
105113
)
106114
}
107115

108116
masterKey = &InstanceKey{
109-
Hostname: rowMap.GetString("Master_Host"),
110-
Port: rowMap.GetInt("Master_Port"),
117+
Hostname: rowMap.GetString(ReplicaTermFor(dbVersion, "Master_Host")),
118+
Port: rowMap.GetInt(ReplicaTermFor(dbVersion, "Master_Port")),
111119
}
112120
return nil
113121
})
114122

115123
return masterKey, err
116124
}
117125

118-
func GetMasterConnectionConfigSafe(connectionConfig *ConnectionConfig, visitedKeys *InstanceKeyMap, allowMasterMaster bool) (masterConfig *ConnectionConfig, err error) {
119-
log.Debugf("Looking for master on %+v", connectionConfig.Key)
126+
func GetMasterConnectionConfigSafe(dbVersion string, connectionConfig *ConnectionConfig, visitedKeys *InstanceKeyMap, allowMasterMaster bool) (masterConfig *ConnectionConfig, err error) {
127+
log.Debugf("Looking for %s on %+v", ReplicaTermFor(dbVersion, "master"), connectionConfig.Key)
120128

121-
masterKey, err := GetMasterKeyFromSlaveStatus(connectionConfig)
129+
masterKey, err := GetMasterKeyFromSlaveStatus(dbVersion, connectionConfig)
122130
if err != nil {
123131
return nil, err
124132
}
@@ -131,34 +139,36 @@ func GetMasterConnectionConfigSafe(connectionConfig *ConnectionConfig, visitedKe
131139
masterConfig = connectionConfig.Duplicate()
132140
masterConfig.Key = *masterKey
133141

134-
log.Debugf("Master of %+v is %+v", connectionConfig.Key, masterConfig.Key)
142+
log.Debugf("%s of %+v is %+v", ReplicaTermFor(dbVersion, "master"), connectionConfig.Key, masterConfig.Key)
135143
if visitedKeys.HasKey(masterConfig.Key) {
136144
if allowMasterMaster {
137145
return connectionConfig, nil
138146
}
139147
return nil, fmt.Errorf("There seems to be a master-master setup at %+v. This is unsupported. Bailing out", masterConfig.Key)
140148
}
141149
visitedKeys.AddKey(masterConfig.Key)
142-
return GetMasterConnectionConfigSafe(masterConfig, visitedKeys, allowMasterMaster)
150+
return GetMasterConnectionConfigSafe(dbVersion, masterConfig, visitedKeys, allowMasterMaster)
143151
}
144152

145-
func GetReplicationBinlogCoordinates(db *gosql.DB) (readBinlogCoordinates *BinlogCoordinates, executeBinlogCoordinates *BinlogCoordinates, err error) {
146-
err = sqlutils.QueryRowsMap(db, `show slave status`, func(m sqlutils.RowMap) error {
153+
func GetReplicationBinlogCoordinates(dbVersion string, db *gosql.DB) (readBinlogCoordinates *BinlogCoordinates, executeBinlogCoordinates *BinlogCoordinates, err error) {
154+
showReplicaStatusQuery := fmt.Sprintf("show %s", ReplicaTermFor(dbVersion, `slave status`))
155+
err = sqlutils.QueryRowsMap(db, showReplicaStatusQuery, func(m sqlutils.RowMap) error {
147156
readBinlogCoordinates = &BinlogCoordinates{
148-
LogFile: m.GetString("Master_Log_File"),
149-
LogPos: m.GetInt64("Read_Master_Log_Pos"),
157+
LogFile: m.GetString(ReplicaTermFor(dbVersion, "Master_Log_File")),
158+
LogPos: m.GetInt64(ReplicaTermFor(dbVersion, "Read_Master_Log_Pos")),
150159
}
151160
executeBinlogCoordinates = &BinlogCoordinates{
152-
LogFile: m.GetString("Relay_Master_Log_File"),
153-
LogPos: m.GetInt64("Exec_Master_Log_Pos"),
161+
LogFile: m.GetString(ReplicaTermFor(dbVersion, "Relay_Master_Log_File")),
162+
LogPos: m.GetInt64(ReplicaTermFor(dbVersion, "Exec_Master_Log_Pos")),
154163
}
155164
return nil
156165
})
157166
return readBinlogCoordinates, executeBinlogCoordinates, err
158167
}
159168

160-
func GetSelfBinlogCoordinates(db *gosql.DB) (selfBinlogCoordinates *BinlogCoordinates, err error) {
161-
err = sqlutils.QueryRowsMap(db, `show master status`, func(m sqlutils.RowMap) error {
169+
func GetSelfBinlogCoordinates(dbVersion string, db *gosql.DB) (selfBinlogCoordinates *BinlogCoordinates, err error) {
170+
binaryLogStatusTerm := ReplicaTermFor(dbVersion, "master status")
171+
err = sqlutils.QueryRowsMap(db, fmt.Sprintf("show %s", binaryLogStatusTerm), func(m sqlutils.RowMap) error {
162172
selfBinlogCoordinates = &BinlogCoordinates{
163173
LogFile: m.GetString("File"),
164174
LogPos: m.GetInt64("Position"),

0 commit comments

Comments
 (0)