Skip to content

Commit

Permalink
fix: Oracle source skips uncommitted operations that are before anoth…
Browse files Browse the repository at this point in the history
…er commit (#2413)

* fix: Oracle source start scn not updated on starting a new log

* fix: Oracle log miner should not filter redo record by SCN. The filtering should be done on COMMIT_SCN instead.
  • Loading branch information
chubei committed Feb 21, 2024
1 parent 456416c commit eb61d81
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 14 deletions.
7 changes: 4 additions & 3 deletions dozer-ingestion/oracle/src/connector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ impl Connector {
con_id: Option<u32>,
poll_interval: Duration,
) -> Result<(), Error> {
let miner = replicate::log_miner::LogMiner::new();
let table_pair_to_index = tables
.into_iter()
.enumerate()
Expand All @@ -344,7 +345,7 @@ impl Connector {
.collect::<HashMap<_, _>>();

loop {
let start_scn = checkpoint + 1;
let mut start_scn = checkpoint + 1;
let mut logs = replicate::merge::list_and_join_online_log(&self.connection, start_scn)?;
if !replicate::log_contains_scn(logs.first(), start_scn) {
info!(
Expand Down Expand Up @@ -373,12 +374,12 @@ impl Connector {

let (sender, receiver) = std::sync::mpsc::sync_channel(100);
let handle = {
let miner = miner.clone();
let connection = self.connection.clone();
let log = log.clone();
let table_pair_to_index = table_pair_to_index.clone();
let schemas = schemas.clone();
std::thread::spawn(move || {
let miner = replicate::log_miner::LogMiner::new();
miner.mine(
&connection,
&log,
Expand Down Expand Up @@ -437,7 +438,7 @@ impl Connector {
std::thread::sleep(poll_interval);
} else {
// If there are more logs, we need to start from the next log's first change.
checkpoint = log.next_change - 1;
start_scn = log.next_change;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub struct LogManagerContent {
impl LogManagerContent {
pub fn list(
connection: &Connection,
start_scn: Scn,
con_id: Option<u32>,
) -> Result<impl Iterator<Item = Result<LogManagerContent, Error>> + '_, Error> {
type Row = (
Expand All @@ -27,11 +28,11 @@ impl LogManagerContent {
String,
);
let rows = if let Some(con_id) = con_id {
let sql = "SELECT COMMIT_SCN, COMMIT_TIMESTAMP, OPERATION_CODE, SEG_OWNER, TABLE_NAME, SQL_REDO FROM V$LOGMNR_CONTENTS WHERE SRC_CON_ID = :con_id";
connection.query_as::<Row>(sql, &[&con_id])
let sql = "SELECT COMMIT_SCN, COMMIT_TIMESTAMP, OPERATION_CODE, SEG_OWNER, TABLE_NAME, SQL_REDO FROM V$LOGMNR_CONTENTS WHERE COMMIT_SCN >= :start_scn AND SRC_CON_ID = :con_id";
connection.query_as::<Row>(sql, &[&start_scn, &con_id])
} else {
let sql = "SELECT COMMIT_SCN, COMMIT_TIMESTAMP, OPERATION_CODE, SEG_OWNER, TABLE_NAME, SQL_REDO FROM V$LOGMNR_CONTENTS";
connection.query_as::<Row>(sql, &[])
let sql = "SELECT COMMIT_SCN, COMMIT_TIMESTAMP, OPERATION_CODE, SEG_OWNER, TABLE_NAME, SQL_REDO FROM V$LOGMNR_CONTENTS WHERE COMMIT_SCN >= :start_scn";
connection.query_as::<Row>(sql, &[&start_scn])
}?;

Ok(rows.into_iter().map(|row| {
Expand Down
10 changes: 3 additions & 7 deletions dozer-ingestion/oracle/src/connector/replicate/log_miner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ impl LogMiner {
}
}

/// Returns `Err` if the log manager can not be started.
/// In that case `output` is useless.
#[allow(clippy::too_many_arguments)]
pub fn mine(
&self,
Expand All @@ -43,15 +41,15 @@ impl LogMiner {
con_id: Option<u32>,
output: SyncSender<Result<MappedLogManagerContent, Error>>,
) {
let _guard = match start_log_manager(connection, log, start_scn) {
let _guard = match start_log_manager(connection, log) {
Ok(guard) => guard,
Err(e) => {
let _ = output.send(Err(e));
return;
}
};

let contents = match listing::LogManagerContent::list(connection, con_id) {
let contents = match listing::LogManagerContent::list(connection, start_scn, con_id) {
Ok(contents) => contents,
Err(e) => {
let _ = output.send(Err(e));
Expand Down Expand Up @@ -86,23 +84,21 @@ mod parse;
fn start_log_manager<'a>(
connection: &'a Connection,
log: &ArchivedLog,
start_scn: Scn,
) -> Result<LogManagerGuard<'a>, Error> {
let sql =
"BEGIN DBMS_LOGMNR.ADD_LOGFILE(LOGFILENAME => :name, OPTIONS => DBMS_LOGMNR.NEW); END;";
connection.execute(sql, &[&str_to_sql!(log.name)])?;
let sql = "
BEGIN
DBMS_LOGMNR.START_LOGMNR(
STARTSCN => :start_scn,
OPTIONS =>
DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG +
DBMS_LOGMNR.COMMITTED_DATA_ONLY +
DBMS_LOGMNR.PRINT_PRETTY_SQL +
DBMS_LOGMNR.NO_ROWID_IN_STMT
);
END;";
connection.execute(sql, &[&start_scn])?;
connection.execute(sql, &[])?;
Ok(LogManagerGuard { connection })
}

Expand Down

0 comments on commit eb61d81

Please sign in to comment.