Skip to content

Commit

Permalink
dump: use tx for misc operations #1052-1
Browse files Browse the repository at this point in the history
Before
- use e.db for
  - getting table structures
  - counting rows
After
- use the tx (start transaction with consistent snapshot)
  • Loading branch information
ffffwh committed Apr 10, 2023
1 parent f87876d commit 2e38652
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 20 deletions.
33 changes: 16 additions & 17 deletions driver/mysql/extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func (e *Extractor) Run() {
} else { // no full copy
// Will not get consistent table meta-info for an incremental only job.
// https://github.com/actiontech/dtle/issues/321#issuecomment-441191534
if err := e.getSchemaTablesAndMeta(); err != nil {
if err := e.getSchemaTablesAndMeta(e.db); err != nil {
e.onError(common.TaskStateDead, err)
return
}
Expand Down Expand Up @@ -380,9 +380,9 @@ func (e *Extractor) initiateInspector() (err error) {
return nil
}

func (e *Extractor) inspectTables() (err error) {
func (e *Extractor) inspectTables(db sql.QueryAble) (err error) {
// Creates a MYSQL Dump based on the options supplied through the dumper.
dbsExisted, err := sql.ShowDatabases(e.db)
dbsExisted, err := sql.ShowDatabases(db)
if err != nil {
return err
}
Expand Down Expand Up @@ -434,11 +434,11 @@ func (e *Extractor) inspectTables() (err error) {
schemaCtx.TableSchemaRename = doDb.TableSchemaRename
e.replicateDoDb[doDb.TableSchema] = schemaCtx

schemaCtx.CreateSchemaString, err = sql.ShowCreateSchema(e.ctx, e.db, doDb.TableSchema)
schemaCtx.CreateSchemaString, err = sql.ShowCreateSchema(e.ctx, db, doDb.TableSchema)
if err != nil {
return err
}
existedTables, err := sql.ShowTables(e.db, doDb.TableSchema, e.mysqlContext.ExpandSyntaxSupport)
existedTables, err := sql.ShowTables(db, doDb.TableSchema, e.mysqlContext.ExpandSyntaxSupport)
if err != nil {
return err
}
Expand Down Expand Up @@ -534,12 +534,12 @@ func (e *Extractor) inspectTables() (err error) {
schemaCtx := common.NewSchemaContext(dbName)
e.replicateDoDb[dbName] = schemaCtx

schemaCtx.CreateSchemaString, err = sql.ShowCreateSchema(e.ctx, e.db, dbName)
schemaCtx.CreateSchemaString, err = sql.ShowCreateSchema(e.ctx, db, dbName)
if err != nil {
return err
}

tbs, err := sql.ShowTables(e.db, dbName, e.mysqlContext.ExpandSyntaxSupport)
tbs, err := sql.ShowTables(db, dbName, e.mysqlContext.ExpandSyntaxSupport)
if err != nil {
return err
}
Expand Down Expand Up @@ -753,8 +753,8 @@ func (e *Extractor) initDBConnections() (err error) {
return nil
}

func (e *Extractor) getSchemaTablesAndMeta() error {
if err := e.inspectTables(); err != nil {
func (e *Extractor) getSchemaTablesAndMeta(queryable sql.QueryAble) error {
if err := e.inspectTables(queryable); err != nil {
return err
}

Expand All @@ -774,7 +774,7 @@ func (e *Extractor) getSchemaTablesAndMeta() error {
continue
}

stmt, err := base.ShowCreateTable(e.db, db.TableSchema, tb.TableName)
stmt, err := base.ShowCreateTable(queryable, db.TableSchema, tb.TableName)
if err != nil {
e.logger.Error("error at ShowCreateTable.", "err", err)
return err
Expand Down Expand Up @@ -856,15 +856,14 @@ func (e *Extractor) setInitialBinlogCoordinates() error {
}

// CountTableRows counts exact number of rows on the original table
func (e *Extractor) CountTableRows(table *common.Table) (int64, error) {
func (e *Extractor) CountTableRows(db sql.QueryAble, table *common.Table) (int64, error) {
//e.logger.Debug("As instructed, I'm issuing a SELECT COUNT(*) on the table. This may take a while")

var query string
// It only requires select privilege on target table to select its information_schema item.
query = fmt.Sprintf(`select table_rows from information_schema.tables where table_schema = '%s' and table_name = '%s'`,
sql.EscapeValue(table.TableSchema), sql.EscapeValue(table.TableName))
query = fmt.Sprintf(`select table_rows from information_schema.tables where table_schema = ? and table_name = ?`)
var rowsEstimate int64
err := e.db.QueryRow(query).Scan(&rowsEstimate)
err := db.QueryRow(query, table.TableSchema, table.TableName).Scan(&rowsEstimate)
if err != nil {
e.logger.Error("error when getting estimated row number (using information_schema)", "err", err,
"schema", table.TableSchema, "table", table.TableName)
Expand Down Expand Up @@ -1295,7 +1294,7 @@ func (e *Extractor) mysqlDump() (retErr error) {
// we are reading the database names from the database and not taking them from the user ...
e.logger.Info("Step: read list of available tables in each database", "n", step)

err = e.getSchemaTablesAndMeta()
err = e.getSchemaTablesAndMeta(tx)
if err != nil {
return err
}
Expand Down Expand Up @@ -1328,7 +1327,7 @@ func (e *Extractor) mysqlDump() (retErr error) {
// Create the tables.
for _, tbCtx := range db.TableMap {
tb := tbCtx.Table
tb.Counter, err = e.CountTableRows(tb)
tb.Counter, err = e.CountTableRows(tx, tb)
if err != nil {
return errors.Wrapf(err, "CountTableRows %v.%v", tb.TableSchema, tb.TableName)
}
Expand All @@ -1345,7 +1344,7 @@ func (e *Extractor) mysqlDump() (retErr error) {
return err
}*/
} else {
ctStmt, err := base.ShowCreateTable(e.singletonDB, tb.TableSchema, tb.TableName)
ctStmt, err := base.ShowCreateTable(tx, tb.TableSchema, tb.TableName)
if err != nil {
return err
}
Expand Down
7 changes: 4 additions & 3 deletions driver/mysql/sql/sqlutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ type QueryAble interface {
Prepare(query string) (*gosql.Stmt, error)
Query(query string, args ...interface{}) (*gosql.Rows, error)
QueryRow(query string, args ...interface{}) *gosql.Row
QueryRowContext(ctx context.Context, query string, args ...interface{}) *gosql.Row
}

func GetServerUUID(db QueryAble) (result string, err error) {
Expand Down Expand Up @@ -291,7 +292,7 @@ func QueryResultData(db *gosql.DB, query string, args ...interface{}) (ResultDat
//INSERT INTO {{ .Name }} VALUES {{ .Values }};
//UNLOCK TABLES;

func ShowDatabases(db *gosql.DB) ([]string, error) {
func ShowDatabases(db QueryAble) ([]string, error) {
dbs := make([]string, 0)

// Get table list
Expand All @@ -317,7 +318,7 @@ func ShowDatabases(db *gosql.DB) ([]string, error) {
return dbs, rows.Err()
}

func ShowCreateSchema(ctx context.Context, db *gosql.DB, dbName string) (r string, err error) {
func ShowCreateSchema(ctx context.Context, db QueryAble, dbName string) (r string, err error) {
query := fmt.Sprintf("SHOW CREATE SCHEMA IF NOT EXISTS %s", mysqlconfig.EscapeName(dbName))
g.Logger.Debug("ShowCreateSchema", "query", query)
row := db.QueryRowContext(ctx, query)
Expand All @@ -330,7 +331,7 @@ func ShowCreateSchema(ctx context.Context, db *gosql.DB, dbName string) (r strin
return r, nil
}

func ShowTables(db *gosql.DB, dbName string, showType bool) (tables []*common.Table, err error) {
func ShowTables(db QueryAble, dbName string, showType bool) (tables []*common.Table, err error) {
// Get table list
var query string
escapedDbName := mysqlconfig.EscapeName(dbName)
Expand Down

0 comments on commit 2e38652

Please sign in to comment.