diff --git a/dozer-ingestion/oracle/src/connector/mod.rs b/dozer-ingestion/oracle/src/connector/mod.rs index 54a5e5a864..efe6e9337b 100644 --- a/dozer-ingestion/oracle/src/connector/mod.rs +++ b/dozer-ingestion/oracle/src/connector/mod.rs @@ -364,7 +364,7 @@ impl Connector { continue; } - while !logs.is_empty() { + 'replicate_logs: while !logs.is_empty() { let log = logs.remove(0); info!( "Replicating log {} ({}, {}), starting from {}", @@ -391,36 +391,36 @@ impl Connector { }) }; + let mut uncommited_messages = vec![]; for content in receiver { let content = match content { Ok(content) => content, Err(e) => { - println!("Error: {e}"); - logs.clear(); - break; + error!("Error during log mining: {}. Retrying.", e); + break 'replicate_logs; } }; match content { - MappedLogManagerContent::Commit(scn) => checkpoint = scn, - MappedLogManagerContent::Op { table_index, op } => { - if ingestor - .blocking_handle_message(IngestionMessage::OperationEvent { - table_index, - op, - id: Some(OpIdentifier::new(checkpoint, 0)), - }) - .is_err() - { - return Ok(()); + MappedLogManagerContent::Commit(scn) => { + checkpoint = scn; + if !uncommited_messages.is_empty() { + for message in std::mem::take(&mut uncommited_messages) { + if ingestor.blocking_handle_message(message).is_err() { + return Ok(()); + } + } } } + MappedLogManagerContent::Op { table_index, op } => { + uncommited_messages.push(IngestionMessage::OperationEvent { + table_index, + op, + id: Some(OpIdentifier::new(checkpoint, 0)), + }); + } } } - if let Err(e) = handle.join().unwrap() { - // Perhaps the log was archived. - error!("Log manager couldn't be started: {}", e); - break; - } + handle.join().unwrap(); if logs.is_empty() { info!("Replicated all logs, retrying after {:?}", poll_interval); @@ -497,7 +497,7 @@ mod tests { fn estimate_throughput(iterator: IngestionIterator) { let mut tic = None; let mut count = 0; - let print_count_interval = 100_000; + let print_count_interval = 10_000; let mut count_mod_interval = 0; for message in iterator { if tic.is_none() { @@ -520,16 +520,22 @@ mod tests { env_logger::init(); + let replicate_user = "DOZER"; + let data_user = "DOZER"; + let host = "database-1.cxtwfj9nkwtu.ap-southeast-1.rds.amazonaws.com"; + let sid = "ORCL"; + let pid = None; + let mut connector = super::Connector::new( "oracle".into(), - "DOZER".into(), + replicate_user.into(), "123", - "database-1.cxtwfj9nkwtu.ap-southeast-1.rds.amazonaws.com:1521/ORCL", + &format!("{}:{}/{}", host, 1521, pid.unwrap_or(sid)), 100_000, OracleReplicator::DozerLogReader, ) .unwrap(); - let tables = connector.list_tables(&["DOZER".into()]).unwrap(); + let tables = connector.list_tables(&[data_user.into()]).unwrap(); let tables = connector.list_columns(tables).unwrap(); let schemas = connector.get_schemas(&tables).unwrap(); let schemas = schemas.into_iter().map(Result::unwrap).collect::>(); @@ -545,9 +551,9 @@ mod tests { let mut connector = super::Connector::new( "oracle".into(), - "DOZER".into(), + replicate_user.into(), "123", - "database-1.cxtwfj9nkwtu.ap-southeast-1.rds.amazonaws.com:1521/ORCL", + &format!("{}:{}/{}", host, 1521, sid), 1, OracleReplicator::LogMiner { poll_interval_in_milliseconds: 1000, @@ -556,8 +562,9 @@ mod tests { .unwrap(); let (ingestor, iterator) = Ingestor::initialize_channel(IngestionConfig::default()); let schemas = schemas.into_iter().map(|schema| schema.schema).collect(); + let con_id = pid.map(|pid| connector.get_con_id(pid).unwrap()); let handle = std::thread::spawn(move || { - connector.replicate(&ingestor, tables, schemas, checkpoint, None) + connector.replicate(&ingestor, tables, schemas, checkpoint, con_id) }); estimate_throughput(iterator); diff --git a/dozer-ingestion/oracle/src/connector/replicate/log_miner/mod.rs b/dozer-ingestion/oracle/src/connector/replicate/log_miner/mod.rs index 51f070dc09..44220aeca4 100644 --- a/dozer-ingestion/oracle/src/connector/replicate/log_miner/mod.rs +++ b/dozer-ingestion/oracle/src/connector/replicate/log_miner/mod.rs @@ -42,21 +42,33 @@ impl LogMiner { schemas: &[Schema], con_id: Option, output: SyncSender>, - ) -> Result<(), Error> { - let _guard = start_log_manager(connection, log, start_scn)?; + ) { + let _guard = match start_log_manager(connection, log, start_scn) { + Ok(guard) => guard, + Err(e) => { + let _ = output.send(Err(e)); + return; + } + }; - let contents = listing::LogManagerContent::list(connection, con_id)?; + let contents = match listing::LogManagerContent::list(connection, con_id) { + Ok(contents) => contents, + Err(e) => { + let _ = output.send(Err(e)); + return; + } + }; for content in contents { match parse_and_map(&self.parser, content, table_pair_to_index, schemas) { Ok(Some(mapped)) => { if output.send(Ok(mapped)).is_err() { - return Ok(()); + return; } } Ok(None) => {} Err(err) => { let _ = output.send(Err(err)); - return Ok(()); + return; } } } @@ -64,7 +76,6 @@ impl LogMiner { if let Err(e) = end_log_manager(connection) { let _ = output.send(Err(e)); } - Ok(()) } }