diff --git a/.gitignore b/.gitignore index d609d3b19..b4e313bde 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,4 @@ bin/ dist .vagrant .scannerwork -.vscode/launch.json +.vscode/ diff --git a/driver/oracle/config/db_config.go b/driver/oracle/config/db_config.go index e1853a964..7959d9135 100644 --- a/driver/oracle/config/db_config.go +++ b/driver/oracle/config/db_config.go @@ -22,6 +22,7 @@ type OracleDB struct { _db *sql.DB LogMinerConn *sql.Conn MetaDataConn *sql.Conn + SCN int64 } func (m *OracleConfig) ConnectString() string { @@ -110,13 +111,18 @@ func (o *OracleDB) NLS_DATE_FORMAT() error { return nil } func (o *OracleDB) GetTables(schema string) ([]string, error) { + asOfSCN := "" + // if o.SCN != 0 { + // asOfSCN = fmt.Sprintf("AS OF SCN %d", o.SCN) + // } query := fmt.Sprintf(` SELECT table_name FROM all_tables +%s WHERE - owner = '%s'`, schema) + owner = '%s'`, asOfSCN, schema) rows, err := o.MetaDataConn.QueryContext(context.TODO(), query) if err != nil { @@ -137,12 +143,17 @@ WHERE } func (o *OracleDB) GetSchemas() ([]string, error) { - query := `SELECT + asOfSCN := "" + // if o.SCN != 0 { + // asOfSCN = fmt.Sprintf("AS OF SCN %d", o.SCN) + // } + query := fmt.Sprintf(`SELECT USERNAME FROM DBA_USERS + %s WHERE - USERNAME NOT IN ( 'SYS', 'SYSTEM', 'ANONYMOUS', 'APEX_PUBLIC_USER', 'APEX_040000', 'OUTLN', 'XS$NULL', 'FLOWS_FILES', 'MDSYS', 'CTXSYS', 'XDB', 'HR' )` + USERNAME NOT IN ( 'SYS', 'SYSTEM', 'ANONYMOUS', 'APEX_PUBLIC_USER', 'APEX_040000', 'OUTLN', 'XS$NULL', 'FLOWS_FILES', 'MDSYS', 'CTXSYS', 'XDB', 'HR' )`, asOfSCN) rows, err := o.MetaDataConn.QueryContext(context.TODO(), query) if err != nil { @@ -163,11 +174,16 @@ func (o *OracleDB) GetSchemas() ([]string, error) { } func (o *OracleDB) GetColumns(schema, table string) ([]string, error) { + asOfSCN := "" + // if o.SCN != 0 { + // asOfSCN = fmt.Sprintf("AS OF SCN %d", o.SCN) + // } query := fmt.Sprintf(`SELECT column_name FROM all_tab_cols + %s WHERE table_name = '%s' AND owner = '%s' - ORDER BY COLUMN_ID`, table, schema) + ORDER BY COLUMN_ID`, asOfSCN, table, schema) rows, err := o.MetaDataConn.QueryContext(context.TODO(), query) if err != nil { @@ -201,3 +217,32 @@ SELECT dbms_metadata.get_ddl('TABLE','%s','%s') FROM dual`, table, schema)) } return query, nil } + +func (o *OracleDB) NewTx(ctx context.Context) (*sql.Tx, error) { + tx, err := o._db.BeginTx(ctx, &sql.TxOptions{}) + if err != nil { + return nil, err + } + return tx, nil +} + +func (o *OracleDB) GetCurrentSnapshotSCN() (int64, error) { + var globalSCN int64 + // 获取当前 SCN 号 + err := o.MetaDataConn.QueryRowContext(context.TODO(), "SELECT CURRENT_SCN FROM V$DATABASE").Scan(&globalSCN) + if err != nil { + return 0, err + } + return globalSCN, nil +} + +func (o *OracleDB) InitSCN(scn int64) (err error) { + if scn == 0 { + scn, err = o.GetCurrentSnapshotSCN() + if err != nil { + return err + } + } + o.SCN = scn + return nil +} diff --git a/driver/oracle/extractor/extractor_oracle.go b/driver/oracle/extractor/extractor_oracle.go index 8e6a033bb..374951162 100644 --- a/driver/oracle/extractor/extractor_oracle.go +++ b/driver/oracle/extractor/extractor_oracle.go @@ -7,6 +7,8 @@ package extractor import ( + "context" + "database/sql" gosql "database/sql" "encoding/binary" "fmt" @@ -408,6 +410,10 @@ func (e *ExtractorOracle) Finish1() (err error) { } func (e *ExtractorOracle) getSchemaTablesAndMeta() error { + err := e.oracleDB.InitSCN(e.mysqlContext.OracleConfig.Scn) + if err != nil { + return err + } if err := e.inspectTables(); err != nil { return err } @@ -864,22 +870,35 @@ func (e *ExtractorOracle) CheckAndApplyLowerCaseTableNames() { } func (e *ExtractorOracle) oracleDump() error { - // step 1 : todo lock row - // query : lock table schema.table in row share mode; if err := e.getSchemaTablesAndMeta(); err != nil { e.onError(common.TaskStateDead, err) return err } - // step 2 : get current scn for d.Dump() - currentSCN, err := (&LogMinerStream{oracleDB: e.oracleDB}).GetCurrentSnapshotSCN() + // step 1 : lock table schema.table in row share mode; + tx, err := e.oracleDB.NewTx(context.TODO()) if err != nil { + e.onError(common.TaskStateDead, err) return err } - // step 3 : defer unlock row + err = e.LockTablesForSchema(tx, context.TODO()) + if err != nil { + e.onError(common.TaskStateDead, err) + return err + } + defer func() { + if tx != nil { + tx.Rollback() + } + }() + // step 2 : get current scn for d.Dump() + // currentSCN, err := e.oracleDB.GetCurrentSnapshotSCN() + // if err != nil { + // return err + // } - // step 4 : create table ddl + // step 3 : create table ddl if !e.mysqlContext.SkipCreateDbTable { e.logger.Info("generating DROP and CREATE statements to reflect current database schemas", "replicateDoDb", e.replicateDoDb) @@ -931,11 +950,17 @@ func (e *ExtractorOracle) oracleDump() error { } } } + + // step 4: rollback for unlock table + if tx != nil { + tx.Rollback() + } + // step 5: Dump all of the tables and generate source records ... // todo need merged dumper with mysql dumper for _, db := range e.replicateDoDb { for _, t := range db.Tables { - d := NewDumper(e.oracleDB, t, e.mysqlContext.ChunkSize, e.logger.ResetNamed("dumper"), e.memory1, currentSCN) + d := NewDumper(e.oracleDB, t, e.mysqlContext.ChunkSize, e.logger.ResetNamed("dumper"), e.memory1, e.oracleDB.SCN) if err := d.Dump(); err != nil { e.onError(common.TaskStateDead, err) } @@ -971,3 +996,24 @@ func (e *ExtractorOracle) encodeAndSendDumpEntry(entry *common.DumpEntry) error } return nil } + +func (e *ExtractorOracle) LockTablesForSchema(tx *sql.Tx, ctx context.Context) error { + for _, db := range e.replicateDoDb { + for _, table := range db.Tables { + query := fmt.Sprintf("LOCK TABLE %s.%s IN ROW SHARE MODE", db.TableSchema, table.TableName) + _, err := tx.Exec(query) + if err != nil { + return err + } + } + } + return nil +} + +func releaseSchemaSnapshotLock(tx *sql.Tx) { + // roll back connect + for err := tx.Rollback(); err != nil; { + // log roll back failed + continue + } +} diff --git a/driver/oracle/extractor/log_miner.go b/driver/oracle/extractor/log_miner.go index 5759ef965..a94133fbe 100644 --- a/driver/oracle/extractor/log_miner.go +++ b/driver/oracle/extractor/log_miner.go @@ -28,16 +28,6 @@ import ( "github.com/thinkeridea/go-extend/exbytes" ) -func (l *LogMinerStream) GetCurrentSnapshotSCN() (int64, error) { - var globalSCN int64 - // 获取当前 SCN 号 - err := l.oracleDB.LogMinerConn.QueryRowContext(context.TODO(), "SELECT CURRENT_SCN FROM V$DATABASE").Scan(&globalSCN) - if err != nil { - return 0, err - } - return globalSCN, nil -} - type LogFile struct { Name string FirstChange int64 @@ -532,7 +522,7 @@ func (e *ExtractorOracle) DataStreamEvents(entriesChannel chan<- *common.EntryCo e.logger.Debug("start oracle. DataStreamEvents") if e.LogMinerStream.startScn == 0 { - scn, err := e.LogMinerStream.GetCurrentSnapshotSCN() + scn, err := e.LogMinerStream.oracleDB.GetCurrentSnapshotSCN() if err != nil { e.logger.Error("GetCurrentSnapshotSCN", "err", err) return err @@ -726,7 +716,7 @@ func (l *LogMinerStream) stopLogMiner() error { //} func (l *LogMinerStream) getEndScn() (int64, error) { - latestScn, err := l.GetCurrentSnapshotSCN() + latestScn, err := l.oracleDB.GetCurrentSnapshotSCN() if err != nil { l.logger.Error("GetCurrentSnapshotSCN", "err", err) return 0, err