diff --git a/driver/oracle/extractor/extractor_oracle.go b/driver/oracle/extractor/extractor_oracle.go index 8dca118ea..2b1c73cf3 100644 --- a/driver/oracle/extractor/extractor_oracle.go +++ b/driver/oracle/extractor/extractor_oracle.go @@ -203,39 +203,50 @@ func (e *ExtractorOracle) Run() { return } - startSCN, committedSCN, err := e.calculateSCNPos() - if err != nil { - e.onError(common.TaskStateDead, errors.Wrap(err, "calculateSCNPos")) - return + e.initDBConnections() + + fullCopy := true + { + startSCN, committedSCN, err := e.calculateSCNPos() + if err != nil { + e.onError(common.TaskStateDead, errors.Wrap(err, "calculateSCNPos")) + return + } + // todo + notFullCopy := true + if notFullCopy { + fullCopy = false + } else if startSCN == 0 && committedSCN == 0 { + fullCopy = false + } } - e.initDBConnections() - e.getSchemaTablesAndMeta() - - e.LogMinerStream = NewLogMinerStream(e.oracleDB, e.logger, e.mysqlContext.ReplicateDoDb, e.mysqlContext.ReplicateIgnoreDb, - startSCN, committedSCN, 100000) - //e.logger.Info("CheckAndApplyLowerCaseTableNames") - //e.CheckAndApplyLowerCaseTableNames() - // 字符集同步 todo - fullCopy := false if fullCopy { - e.logger.Debug("mysqlDump. before") - } else { // no full copy - // Will not get consistent table meta-info for an incremental only job. - // https://github.com/actiontech/dtle/issues/321#issuecomment-441191534 - // 获取需要同步的表结构数据 - //if err := e.getSchemaTablesAndMeta(); err != nil { - // e.onError(common.TaskStateDead, err) - // return - //} - } - //err = e.sendFullComplete() + e.logger.Debug("oracleDump. before") + if err := e.oracleDump(); err != nil { + e.onError(common.TaskStateDead, err) + return + } + err = e.sendFullComplete() + if err != nil { + e.onError(common.TaskStateDead, errors.Wrap(err, "sendFullComplete")) + return + } + } else { + if err := e.getSchemaTablesAndMeta(); err != nil { + e.onError(common.TaskStateDead, err) + return + } + } + { - //if err != nil { - // e.logger.Error("error after streamerReadyCh", "err", err) - // e.onError(common.TaskStateDead, err) - // return - //} + startSCN, committedSCN, err := e.calculateSCNPos() + if err != nil { + e.onError(common.TaskStateDead, errors.Wrap(err, "calculateSCNPos")) + return + } + e.LogMinerStream = NewLogMinerStream(e.oracleDB, e.logger, e.mysqlContext.ReplicateDoDb, e.mysqlContext.ReplicateIgnoreDb, + startSCN, committedSCN, 100000) e.logger.Debug("start .initiateStreaming before") if err := e.initiateStreaming(); err != nil { e.logger.Error("error at initiateStreaming", "err", err) @@ -851,3 +862,115 @@ func (e *ExtractorOracle) CheckAndApplyLowerCaseTableNames() { lowerConfigItem(e.mysqlContext.ReplicateIgnoreDb) } } + +func (e *ExtractorOracle) oracleDump() error { + // step 1 : todo lock row + // query : lock table schema.table in row share mode; + if err := e.getSchemaTablesAndMeta(); err != nil { + e.onError(common.TaskStateDead, err) + return err + } + + // step 2 : get current scn for d.Dump() + _, err := (&LogMinerStream{oracleDB: e.oracleDB}).GetCurrentSnapshotSCN() + if err != nil { + return err + } + + // step 3 : defer unlock row + + // step 4 : create table ddl + if !e.mysqlContext.SkipCreateDbTable { + e.logger.Info("generating DROP and CREATE statements to reflect current database schemas", + "replicateDoDb", e.replicateDoDb) + + for _, db := range e.replicateDoDb { + var dbSQL string + // rename schema + if db.TableSchemaRename != "" { + dbSQL = fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", mysqlconfig.EscapeName(db.TableSchemaRename)) + } else { + dbSQL = fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", mysqlconfig.EscapeName(db.TableSchema)) + } + + entry := &common.DumpEntry{ + DbSQL: dbSQL, + } + if err := e.encodeAndSendDumpEntry(entry); err != nil { + e.onError(common.TaskStateRestart, err) + } + + for _, tb := range db.Tables { + tbSQL := make([]string, 0) + + targetSchema := g.StringElse(db.TableSchemaRename, tb.TableSchema) + targetTable := g.StringElse(tb.TableRename, tb.TableName) + if e.mysqlContext.DropTableIfExists { + tbSQL = append(tbSQL, fmt.Sprintf("DROP TABLE IF EXISTS %s.%s", + mysqlconfig.EscapeName(targetSchema), mysqlconfig.EscapeName(targetTable))) + } + // Get the specified table's create DDL statement. + createTbSQL, err := e.oracleDB.GetTableDDL(tb.TableSchema, tb.TableName) + if err != nil { + return err + } + + // parse ddl + for i := range createTbSQL { + dataEvent, err := e.parseDDLSQL(entry.TbSQL[i], tb.TableSchema) + if err != nil { + return err + } + tbSQL = append(tbSQL, dataEvent.Query) + } + + entry := &common.DumpEntry{ + TbSQL: tbSQL, + // TotalCount: tb.Counter, + } + if err := e.encodeAndSendDumpEntry(entry); err != nil { + e.onError(common.TaskStateRestart, err) + } + } + } + } + // step 5: Dump all of the tables and generate source records ... + // todo need merged dumper with mysql dumper + // for _, db := range e.replicateDoDb { + // for _, t := range db.Tables { + // d := NewDumper(e.oracleDB, t, e.mysqlContext.ChunkSize, e.logger.ResetNamed("dumper"), e.memory1) + // if err := d.Dump(); err != nil { + // e.onError(common.TaskStateDead, err) + // } + // // todo close when shutdown + // // e.dumpers = append(e.dumpers, d) + + // // Scan the rows in the table ... + // for entry := range d.resultsChannel { + // if entry.Err != "" { + // e.onError(common.TaskStateDead, fmt.Errorf(entry.Err)) + // } else { + // if err := e.encodeAndSendDumpEntry(entry); err != nil { + // e.onError(common.TaskStateRestart, err) + // } + // } + // } + // } + // } + return nil +} + +func (e *ExtractorOracle) encodeAndSendDumpEntry(entry *common.DumpEntry) error { + bs, err := entry.Marshal(nil) + if err != nil { + return err + } + txMsg, err := common.Compress(bs) + if err != nil { + return errors.Wrap(err, "common.Compress") + } + if err := e.publish(fmt.Sprintf("%s_full", e.subject), txMsg, 0); err != nil { + return err + } + return nil +}