Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow CopyDB to run on a read only replica #35

Merged
merged 7 commits into from
Aug 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions copydb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package copydb

import (
"fmt"
"time"

"github.com/Shopify/ghostferry"
)
Expand Down Expand Up @@ -60,6 +61,25 @@ type Config struct {
// Iterative
// NoVerification
VerifierType string

// If you're running Ghostferry from a read only replica, turn this option
// on and specify SourceReplicationMaster and ReplicatedMasterPositionQuery.
RunFerryFromReplica bool

// This is the configuration to connect to the master writer of the source DB.
// This is only used if the source db is a replica and RunFerryFromReplica
// is on.
SourceReplicationMaster ghostferry.DatabaseConfig

// This is the SQL query used to read the position of the master binlog that
// has been replicated to the Source. As an example, you can query the
// pt-heartbeat table:
//
// SELECT file, position FROM meta.ptheartbeat WHERE server_id = master_server_id
ReplicatedMasterPositionQuery string

// The duration to wait for the replication to catchup before aborting. Only use if RunFerryFromReplica is true.
WaitForReplicationTimeout string
}

func (c *Config) InitializeAndValidateConfig() error {
Expand All @@ -83,6 +103,13 @@ func (c *Config) InitializeAndValidateConfig() error {
c.DatabaseRewrites = c.Databases.Rewrites
c.TableRewrites = c.Tables.Rewrites

if c.WaitForReplicationTimeout != "" {
_, err := time.ParseDuration(c.WaitForReplicationTimeout)
if err != nil {
return err
}
}

if err := c.Config.ValidateConfig(); err != nil {
return err
}
Expand Down
34 changes: 34 additions & 0 deletions copydb/copydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"strings"
"sync"
"time"

"github.com/Shopify/ghostferry"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -35,6 +36,13 @@ func NewFerry(config *Config) *CopydbFerry {
}

func (this *CopydbFerry) Initialize() error {
if this.config.RunFerryFromReplica {
err := this.initializeWaitUntilReplicaIsCaughtUpToMasterConnection()
if err != nil {
return err
}
}

err := this.Ferry.Initialize()
if err != nil {
return err
Expand Down Expand Up @@ -161,6 +169,32 @@ func (this *CopydbFerry) ShutdownControlServer() error {
return this.controlServer.Shutdown()
}

func (this *CopydbFerry) initializeWaitUntilReplicaIsCaughtUpToMasterConnection() error {
masterDB, err := this.config.SourceReplicationMaster.SqlDB(logrus.WithField("tag", "copydb"))
if err != nil {
return err
}

positionFetcher := ghostferry.ReplicatedMasterPositionViaCustomQuery{Query: this.config.ReplicatedMasterPositionQuery}

var timeout time.Duration
if this.config.WaitForReplicationTimeout == "" {
timeout = time.Duration(0)
} else {
timeout, err = time.ParseDuration(this.config.WaitForReplicationTimeout)
if err != nil {
return err
}
}

this.Ferry.WaitUntilReplicaIsCaughtUpToMaster = &ghostferry.WaitUntilReplicaIsCaughtUpToMaster{
MasterDB: masterDB,
Timeout: timeout,
ReplicatedMasterPositionFetcher: positionFetcher,
}
return nil
}

func (this *CopydbFerry) createDatabaseIfExistsOnTarget(database string) error {
if targetDbName, exists := this.Ferry.DatabaseRewrites[database]; exists {
database = targetDbName
Expand Down
12 changes: 11 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mysql-1:
image: percona:5.7
command: --server-id=1 --log-bin=mysql-bin --binlog-format=ROW --sync-binlog=1 --log-slave-updates=ON --gtid-mode=ON --enforce-gtid-consistency=ON --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci --max-connections=1000
command: --server-id=1 --log-bin=mysql-bin --binlog-format=ROW --sync-binlog=1 --log-slave-updates=ON --gtid-mode=ON --enforce-gtid-consistency=ON --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci --max-connections=1000 --read-only=OFF
environment:
MYSQL_ALLOW_EMPTY_PASSWORD: "yes"
volumes:
Expand All @@ -17,3 +17,13 @@ mysql-2:
- /var/lib/mysql
ports:
- "29292:3306"

mysql-3:
image: percona:5.7
command: --server-id=3 --log-bin=mysql-bin --binlog-format=ROW --sync-binlog=1 --log-slave-updates=ON --gtid-mode=ON --enforce-gtid-consistency=ON --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci --max-connections=1000
environment:
MYSQL_ALLOW_EMPTY_PASSWORD: "yes"
volumes:
- /var/lib/mysql
ports:
- "29293:3306"
6 changes: 4 additions & 2 deletions docs/source/tutorialcopydb.rst
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,10 @@ we get into the habit of thinking of this step:

The last step ``FLUSH BINARY LOGS`` is not necessarily required if you run your
MySQL server with ``sync_binlog=1``. If you're running Ghostferry from a source
that is a replica, you need another tool to guarantee this property. See
`<https://github.com/Shopify/ghostferry/issues/19>`_.
that is a replica, you need to also use turn option ``RunFerryFromReplica`` on
in the config json as well as other options. See
`<https://godoc.org/github.com/Shopify/ghostferry/copydb#Config>`_ for more
details.

We can then go back to the web ui and click the Allow Automatic Cutover button.
In a second or two the ghostferry binlog streaming process should stop. Refresh
Expand Down
12 changes: 4 additions & 8 deletions examples/copydb/conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,19 @@
"Source": {
"Host": "127.0.0.1",
"Port": 29291,
"User": "ghostferry",
"Pass": "ghostferry",
"User": "root",
"Pass": "",
"Collation": "utf8mb4_unicode_ci",
"Params": {
"charset": "utf8mb4"
},
"TLS": {
"CertPath": "/home/vagrant/.mysql-vending-machine/mysqls/gf.n1/data/server-cert.pem",
"ServerName": "MySQL_Server_5.7.20-19_Auto_Generated_Server_Certificate"
}
},

"Target": {
"Host": "127.0.0.1",
"Port": 29292,
"User": "ghostferry",
"Pass": "ghostferry",
"User": "root",
"Pass": "",
"Collation": "utf8mb4_unicode_ci",
"Params": {
"charset": "utf8mb4"
Expand Down
47 changes: 47 additions & 0 deletions examples/copydb/run-on-replica.conf.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
{
"Source": {
"Host": "127.0.0.1",
"Port": 29292,
"User": "root",
"Pass": "",
"Collation": "utf8mb4_unicode_ci",
"Params": {
"charset": "utf8mb4"
}
},

"Target": {
"Host": "127.0.0.1",
"Port": 29293,
"User": "root",
"Pass": "",
"Collation": "utf8mb4_unicode_ci",
"Params": {
"charset": "utf8mb4"
}
},

"RunFerryFromReplica": true,
"SourceReplicationMaster": {
"Host": "127.0.0.1",
"Port": 29291,
"User": "root",
"Pass": "",
"Collation": "utf8mb4_unicode_ci",
"Params": {
"charset": "utf8mb4"
}
},

"ReplicatedMasterPositionQuery": "SELECT file, position FROM meta.heartbeat",

"Databases": {
"Whitelist": ["abc"]
},

"Tables": {
"Blacklist": ["schema_migrations"]
},

"VerifierType": "ChecksumTable"
}
70 changes: 63 additions & 7 deletions ferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package ghostferry
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/go-sql-driver/mysql"
siddontangmysql "github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/schema"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -108,13 +110,13 @@ func (f *Ferry) Initialize() (err error) {
return err
}

err = checkConnection(f.logger, "source", f.SourceDB)
err = f.checkConnection("source", f.SourceDB)
if err != nil {
f.logger.WithError(err).Error("source connection checking failed")
return err
}

err = checkConnectionForBinlogFormat(f.SourceDB)
err = f.checkConnectionForBinlogFormat(f.SourceDB)
if err != nil {
f.logger.WithError(err).Error("binlog format for source db is not compatible")
return err
Expand All @@ -126,12 +128,60 @@ func (f *Ferry) Initialize() (err error) {
return err
}

err = checkConnection(f.logger, "target", f.TargetDB)
err = f.checkConnection("target", f.TargetDB)
if err != nil {
f.logger.WithError(err).Error("target connection checking failed")
return err
}

if f.WaitUntilReplicaIsCaughtUpToMaster != nil {
f.WaitUntilReplicaIsCaughtUpToMaster.ReplicaDB = f.SourceDB

if f.WaitUntilReplicaIsCaughtUpToMaster.MasterDB == nil {
err = errors.New("must specify a MasterDB")
f.logger.WithError(err).Error("must specify a MasterDB")
return err
}

err = f.checkConnection("source_master", f.WaitUntilReplicaIsCaughtUpToMaster.MasterDB)
if err != nil {
f.logger.WithError(err).Error("source master connection checking failed")
return err
}

var zeroPosition siddontangmysql.Position
// Ensures the query to check for position is executable.
_, err = f.WaitUntilReplicaIsCaughtUpToMaster.IsCaughtUp(zeroPosition, 1)
if err != nil {
f.logger.WithError(err).Error("cannot check replicated master position on the source database")
return err
}

isReplica, err := f.checkDbIsAReplica(f.WaitUntilReplicaIsCaughtUpToMaster.MasterDB)
if err != nil {
f.logger.WithError(err).Error("cannot check if master is a read replica")
return err
}

if isReplica {
err = errors.New("source master is a read replica, not a master writer")
f.logger.WithError(err).Error("source master is a read replica")
return err
}
} else {
isReplica, err := f.checkDbIsAReplica(f.SourceDB)
if err != nil {
f.logger.WithError(err).Error("cannot check if source is a replica")
return err
}

if isReplica {
err = errors.New("source is a read replica. running Ghostferry with a source replica is unsafe unless WaitUntilReplicaIsCaughtUpToMaster is used")
f.logger.WithError(err).Error("source is a read replica")
return err
}
}

if f.ErrorHandler == nil {
f.ErrorHandler = &PanicErrorHandler{
Ferry: f,
Expand Down Expand Up @@ -321,7 +371,6 @@ func (f *Ferry) WaitUntilBinlogStreamerCatchesUp() {
// You will know that the BinlogStreamer finished when .Run() returns.
func (f *Ferry) FlushBinlogAndStopStreaming() {
if f.WaitUntilReplicaIsCaughtUpToMaster != nil {
f.WaitUntilReplicaIsCaughtUpToMaster.ReplicaDB = f.SourceDB
err := f.WaitUntilReplicaIsCaughtUpToMaster.Wait()
if err != nil {
f.ErrorHandler.Fatal("wait_replica", err)
Expand Down Expand Up @@ -349,7 +398,7 @@ func (f *Ferry) onFinishedIterations() error {
return nil
}

func checkConnection(logger *logrus.Entry, dbname string, db *sql.DB) error {
func (f *Ferry) checkConnection(dbname string, db *sql.DB) error {
row := db.QueryRow("SHOW STATUS LIKE 'Ssl_cipher'")
var name, cipher string
err := row.Scan(&name, &cipher)
Expand All @@ -359,7 +408,7 @@ func checkConnection(logger *logrus.Entry, dbname string, db *sql.DB) error {

hasSSL := cipher != ""

logger.WithFields(logrus.Fields{
f.logger.WithFields(logrus.Fields{
"hasSSL": hasSSL,
"ssl_cipher": cipher,
"dbname": dbname,
Expand All @@ -368,7 +417,7 @@ func checkConnection(logger *logrus.Entry, dbname string, db *sql.DB) error {
return nil
}

func checkConnectionForBinlogFormat(db *sql.DB) error {
func (f *Ferry) checkConnectionForBinlogFormat(db *sql.DB) error {
var name, value string

row := db.QueryRow("SHOW VARIABLES LIKE 'binlog_format'")
Expand All @@ -389,3 +438,10 @@ func checkConnectionForBinlogFormat(db *sql.DB) error {

return nil
}

func (f *Ferry) checkDbIsAReplica(db *sql.DB) (bool, error) {
row := db.QueryRow("SELECT @@read_only")
var isReadOnly bool
err := row.Scan(&isReadOnly)
return isReadOnly, err
}
Loading