Skip to content

Commit

Permalink
feat: Oracle replication unfinished
Browse files Browse the repository at this point in the history
  • Loading branch information
chubei committed Feb 3, 2024
1 parent 452f964 commit dd9e464
Show file tree
Hide file tree
Showing 12 changed files with 708 additions and 30 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions dozer-ingestion/oracle/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ edition = "2021"
[dependencies]
dozer-ingestion-connector = { path = "../connector" }
oracle = { version = "0.5.7", features = ["chrono", "stmt_without_lifetime"] }

[dev-dependencies]
env_logger = "0.11.1"
183 changes: 173 additions & 10 deletions dozer-ingestion/oracle/src/connector/mod.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,33 @@
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};

use dozer_ingestion_connector::{
dozer_types::{
models::ingestion_types::IngestionMessage, rust_decimal, thiserror, types::Operation,
log::info,
models::ingestion_types::{IngestionMessage, OracleReplicator},
node::OpIdentifier,
rust_decimal, thiserror,
types::Operation,
},
Ingestor, SourceSchema, TableIdentifier, TableInfo,
};
use oracle::{
sql_type::{Collection, ObjectType, OracleType},
sql_type::{Collection, ObjectType},
Connection,
};

use replicate::log_miner::MappedLogManagerContent;

#[derive(Debug, Clone)]
pub struct Connector {
connection_name: String,
connection: Arc<Connection>,
username: String,
batch_size: usize,
replicator: OracleReplicator,
}

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -50,17 +58,23 @@ pub enum Error {
macro_rules! str_to_sql {
($s:expr) => {
// `s.len()` is the upper bound of `s.chars().count()`
(&$s, &OracleType::Varchar2($s.len() as u32))
(
&$s,
&::oracle::sql_type::OracleType::Varchar2($s.len() as u32),
)
};
}

pub type Scn = u64;

impl Connector {
pub fn new(
connection_name: String,
username: String,
password: &str,
connect_string: &str,
batch_size: usize,
replicator: OracleReplicator,
) -> Result<Self, Error> {
let connection = Connection::connect(&username, password, connect_string)?;

Expand All @@ -69,6 +83,7 @@ impl Connector {
connection: Arc::new(connection),
username,
batch_size,
replicator,
})
}

Expand Down Expand Up @@ -206,7 +221,7 @@ impl Connector {
Ok(result)
}

pub fn snapshot(&mut self, ingestor: &Ingestor, tables: Vec<TableInfo>) -> Result<(), Error> {
pub fn snapshot(&mut self, ingestor: &Ingestor, tables: Vec<TableInfo>) -> Result<Scn, Error> {
let schemas = self
.get_schemas(&tables)?
.into_iter()
Expand Down Expand Up @@ -235,7 +250,7 @@ impl Connector {
})
.is_err()
{
return Ok(());
return self.get_scn_and_commit();
}
}

Expand All @@ -248,18 +263,138 @@ impl Connector {
})
.is_err()
{
return Ok(());
return self.get_scn_and_commit();
}
}

self.get_scn_and_commit()
}

fn get_scn_and_commit(&mut self) -> Result<Scn, Error> {
let sql = "SELECT DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER() FROM DUAL";
let scn = self.connection.query_row_as::<Scn>(sql, &[])?;
self.connection.commit()?;
Ok(())
Ok(scn)
}

pub fn replicate(
&mut self,
ingestor: &Ingestor,
tables: Vec<TableInfo>,
checkpoint: Scn,
con_id: Option<u32>,
) -> Result<(), Error> {
match self.replicator {
OracleReplicator::LogMiner {
poll_interval_in_milliseconds,
} => self.replicate_log_miner(
ingestor,
tables,
checkpoint,
con_id,
Duration::from_millis(poll_interval_in_milliseconds),
),
OracleReplicator::DozerLogReader => unimplemented!("dozer log reader"),
}
}

fn replicate_log_miner(
&mut self,
ingestor: &Ingestor,
tables: Vec<TableInfo>,
mut checkpoint: Scn,
con_id: Option<u32>,
poll_interval: Duration,
) -> Result<(), Error> {
let table_pair_to_index = tables
.into_iter()
.enumerate()
.map(|(index, table)| {
let schema = table.schema.unwrap_or_else(|| self.username.clone());
((schema, table.name), index)
})
.collect::<HashMap<_, _>>();

loop {
let start_scn = checkpoint + 1;
let mut logs = replicate::merge::list_and_join_online_log(&self.connection, start_scn)?;
if !replicate::log_contains_scn(logs.first(), start_scn) {
info!(
"Online log is empty or doesn't contain start scn {}, listing and merging archived logs",
start_scn
);
logs = replicate::merge::list_and_merge_archived_log(
&self.connection,
start_scn,
logs,
)?;
}

if logs.is_empty() {
info!("No logs found, retrying after {:?}", poll_interval);
std::thread::sleep(poll_interval);
continue;
}

while !logs.is_empty() {
let log = logs.remove(0);
info!(
"Replicating log {} ({}, {}), starting from {}",
log.name, log.first_change, log.next_change, start_scn
);

let (sender, receiver) = std::sync::mpsc::sync_channel(100);
let handle = {
let connection = self.connection.clone();
let log = log.clone();
let table_pair_to_index = table_pair_to_index.clone();
std::thread::spawn(move || {
replicate::log_miner::mine(
&connection,
&log,
start_scn,
&table_pair_to_index,
con_id,
sender,
)
})
};

for content in receiver {
match content? {
MappedLogManagerContent::Commit(scn) => checkpoint = scn,
MappedLogManagerContent::Op { table_index, op } => {
if ingestor
.blocking_handle_message(IngestionMessage::OperationEvent {
table_index,
op,
id: Some(OpIdentifier::new(checkpoint, 0)),
})
.is_err()
{
return Ok(());
}
}
}
}
handle.join().unwrap();

if logs.is_empty() {
info!("Replicated all logs, retrying after {:?}", poll_interval);
std::thread::sleep(poll_interval);
} else {
// If there are more logs, we need to start from the next log's first change.
checkpoint = log.next_change - 1;
}
}
}
}
}

mod join;
mod listing;
mod mapping;
mod replicate;

const TEMP_DOZER_TYPE_NAME: &str = "TEMP_DOZER_TYPE";

Expand Down Expand Up @@ -295,24 +430,52 @@ mod tests {
#[test]
#[ignore]
fn test_connector() {
use dozer_ingestion_connector::{IngestionConfig, Ingestor};
use dozer_ingestion_connector::{
dozer_types::models::ingestion_types::OracleReplicator, IngestionConfig, Ingestor,
};

env_logger::init();

let mut connector = super::Connector::new(
"oracle".into(),
"C##DOZER".into(),
"123",
"localhost:1521/ORCLPDB1",
1,
OracleReplicator::DozerLogReader,
)
.unwrap();
let _con_id = connector.get_con_id("ORCLPDB1").unwrap();
let tables = connector.list_tables(&["CHUBEI".into()]).unwrap();
let tables = connector.list_columns(tables).unwrap();
let schemas = connector.get_schemas(&tables).unwrap();
let schemas = schemas.into_iter().map(Result::unwrap).collect::<Vec<_>>();
dbg!(schemas);
let (ingestor, iterator) = Ingestor::initialize_channel(IngestionConfig::default());
let handle = std::thread::spawn(move || connector.snapshot(&ingestor, tables));
let handle = {
let tables = tables.clone();
std::thread::spawn(move || connector.snapshot(&ingestor, tables))
};
for message in iterator {
dbg!(message);
}
let checkpoint = handle.join().unwrap().unwrap();

let mut connector = super::Connector::new(
"oracle".into(),
"C##DOZER".into(),
"123",
"localhost:1521/ORCLCDB",
1,
OracleReplicator::LogMiner {
poll_interval_in_milliseconds: 1000,
},
)
.unwrap();
let con_id = connector.get_con_id("ORCLPDB1").unwrap();
let (ingestor, iterator) = Ingestor::initialize_channel(IngestionConfig::default());
let handle = std::thread::spawn(move || {
connector.replicate(&ingestor, tables, checkpoint, Some(con_id))
});
for message in iterator {
dbg!(message);
}
Expand Down
Loading

0 comments on commit dd9e464

Please sign in to comment.