Skip to content

Commit

Permalink
fix: Avoid sending repeated messages when error happens in Oracle log…
Browse files Browse the repository at this point in the history
… miner (#2373)
  • Loading branch information
chubei committed Feb 4, 2024
1 parent 43a949e commit 17836d6
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 33 deletions.
61 changes: 34 additions & 27 deletions dozer-ingestion/oracle/src/connector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ impl Connector {
continue;
}

while !logs.is_empty() {
'replicate_logs: while !logs.is_empty() {
let log = logs.remove(0);
info!(
"Replicating log {} ({}, {}), starting from {}",
Expand All @@ -391,36 +391,36 @@ impl Connector {
})
};

let mut uncommited_messages = vec![];
for content in receiver {
let content = match content {
Ok(content) => content,
Err(e) => {
println!("Error: {e}");
logs.clear();
break;
error!("Error during log mining: {}. Retrying.", e);
break 'replicate_logs;
}
};
match content {
MappedLogManagerContent::Commit(scn) => checkpoint = scn,
MappedLogManagerContent::Op { table_index, op } => {
if ingestor
.blocking_handle_message(IngestionMessage::OperationEvent {
table_index,
op,
id: Some(OpIdentifier::new(checkpoint, 0)),
})
.is_err()
{
return Ok(());
MappedLogManagerContent::Commit(scn) => {
checkpoint = scn;
if !uncommited_messages.is_empty() {
for message in std::mem::take(&mut uncommited_messages) {
if ingestor.blocking_handle_message(message).is_err() {
return Ok(());
}
}
}
}
MappedLogManagerContent::Op { table_index, op } => {
uncommited_messages.push(IngestionMessage::OperationEvent {
table_index,
op,
id: Some(OpIdentifier::new(checkpoint, 0)),
});
}
}
}
if let Err(e) = handle.join().unwrap() {
// Perhaps the log was archived.
error!("Log manager couldn't be started: {}", e);
break;
}
handle.join().unwrap();

if logs.is_empty() {
info!("Replicated all logs, retrying after {:?}", poll_interval);
Expand Down Expand Up @@ -497,7 +497,7 @@ mod tests {
fn estimate_throughput(iterator: IngestionIterator) {
let mut tic = None;
let mut count = 0;
let print_count_interval = 100_000;
let print_count_interval = 10_000;
let mut count_mod_interval = 0;
for message in iterator {
if tic.is_none() {
Expand All @@ -520,16 +520,22 @@ mod tests {

env_logger::init();

let replicate_user = "DOZER";
let data_user = "DOZER";
let host = "database-1.cxtwfj9nkwtu.ap-southeast-1.rds.amazonaws.com";
let sid = "ORCL";
let pid = None;

let mut connector = super::Connector::new(
"oracle".into(),
"DOZER".into(),
replicate_user.into(),
"123",
"database-1.cxtwfj9nkwtu.ap-southeast-1.rds.amazonaws.com:1521/ORCL",
&format!("{}:{}/{}", host, 1521, pid.unwrap_or(sid)),
100_000,
OracleReplicator::DozerLogReader,
)
.unwrap();
let tables = connector.list_tables(&["DOZER".into()]).unwrap();
let tables = connector.list_tables(&[data_user.into()]).unwrap();
let tables = connector.list_columns(tables).unwrap();
let schemas = connector.get_schemas(&tables).unwrap();
let schemas = schemas.into_iter().map(Result::unwrap).collect::<Vec<_>>();
Expand All @@ -545,9 +551,9 @@ mod tests {

let mut connector = super::Connector::new(
"oracle".into(),
"DOZER".into(),
replicate_user.into(),
"123",
"database-1.cxtwfj9nkwtu.ap-southeast-1.rds.amazonaws.com:1521/ORCL",
&format!("{}:{}/{}", host, 1521, sid),
1,
OracleReplicator::LogMiner {
poll_interval_in_milliseconds: 1000,
Expand All @@ -556,8 +562,9 @@ mod tests {
.unwrap();
let (ingestor, iterator) = Ingestor::initialize_channel(IngestionConfig::default());
let schemas = schemas.into_iter().map(|schema| schema.schema).collect();
let con_id = pid.map(|pid| connector.get_con_id(pid).unwrap());
let handle = std::thread::spawn(move || {
connector.replicate(&ingestor, tables, schemas, checkpoint, None)
connector.replicate(&ingestor, tables, schemas, checkpoint, con_id)
});

estimate_throughput(iterator);
Expand Down
23 changes: 17 additions & 6 deletions dozer-ingestion/oracle/src/connector/replicate/log_miner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,29 +42,40 @@ impl LogMiner {
schemas: &[Schema],
con_id: Option<u32>,
output: SyncSender<Result<MappedLogManagerContent, Error>>,
) -> Result<(), Error> {
let _guard = start_log_manager(connection, log, start_scn)?;
) {
let _guard = match start_log_manager(connection, log, start_scn) {
Ok(guard) => guard,
Err(e) => {
let _ = output.send(Err(e));
return;
}
};

let contents = listing::LogManagerContent::list(connection, con_id)?;
let contents = match listing::LogManagerContent::list(connection, con_id) {
Ok(contents) => contents,
Err(e) => {
let _ = output.send(Err(e));
return;
}
};
for content in contents {
match parse_and_map(&self.parser, content, table_pair_to_index, schemas) {
Ok(Some(mapped)) => {
if output.send(Ok(mapped)).is_err() {
return Ok(());
return;
}
}
Ok(None) => {}
Err(err) => {
let _ = output.send(Err(err));
return Ok(());
return;
}
}
}

if let Err(e) = end_log_manager(connection) {
let _ = output.send(Err(e));
}
Ok(())
}
}

Expand Down

0 comments on commit 17836d6

Please sign in to comment.