diff --git a/dozer-cli/src/pipeline/builder.rs b/dozer-cli/src/pipeline/builder.rs index c71e71d53e..a5a0bb58de 100644 --- a/dozer-cli/src/pipeline/builder.rs +++ b/dozer-cli/src/pipeline/builder.rs @@ -104,7 +104,7 @@ impl<'a> PipelineBuilder<'a> { let mut connector_map = HashMap::new(); for connection in self.connections { - let connector = get_connector(runtime.clone(), connection.clone()) + let connector = get_connector(runtime.clone(), connection.clone(), None) .map_err(|e| ConnectorSourceFactoryError::Connector(e.into()))?; if let Some(info_table) = get_connector_info_table(connection) { diff --git a/dozer-cli/src/pipeline/connector_source.rs b/dozer-cli/src/pipeline/connector_source.rs index d2009ece70..93632ff8c2 100644 --- a/dozer-cli/src/pipeline/connector_source.rs +++ b/dozer-cli/src/pipeline/connector_source.rs @@ -1,9 +1,7 @@ use dozer_api::shutdown::ShutdownReceiver; -use dozer_core::node::{ - OutputPortDef, OutputPortType, PortHandle, Source, SourceFactory, SourceState, -}; +use dozer_core::node::{OutputPortDef, OutputPortType, PortHandle, Source, SourceFactory}; use dozer_ingestion::{ - get_connector, CdcType, Connector, IngestionIterator, TableIdentifier, TableInfo, TableToIngest, + get_connector, CdcType, Connector, IngestionIterator, TableIdentifier, TableInfo, }; use dozer_ingestion::{IngestionConfig, Ingestor}; @@ -12,7 +10,7 @@ use dozer_types::errors::internal::BoxedError; use dozer_types::log::{error, info}; use dozer_types::models::connection::Connection; use dozer_types::models::ingestion_types::IngestionMessage; -use dozer_types::parking_lot::Mutex; +use dozer_types::node::OpIdentifier; use dozer_types::thiserror::{self, Error}; use dozer_types::tracing::{span, Level}; use dozer_types::types::{Operation, Schema, SourceDefinition}; @@ -46,10 +44,9 @@ pub enum ConnectorSourceFactoryError { #[derive(Debug)] pub struct ConnectorSourceFactory { - connection_name: String, + connection: Connection, + runtime: Arc, tables: Vec, - /// Will be moved to `ConnectorSource` in `build`. - connector: Mutex>>, labels: LabelsAndProgress, shutdown: ShutdownReceiver, } @@ -66,9 +63,7 @@ impl ConnectorSourceFactory { labels: LabelsAndProgress, shutdown: ShutdownReceiver, ) -> Result { - let connection_name = connection.name.clone(); - - let connector = get_connector(runtime.clone(), connection) + let connector = get_connector(runtime.clone(), connection.clone(), None) .map_err(|e| ConnectorSourceFactoryError::Connector(e.into()))?; // Fill column names if not provided. @@ -116,9 +111,9 @@ impl ConnectorSourceFactory { } Ok(Self { - connection_name, + connection, + runtime, tables, - connector: Mutex::new(Some(connector)), labels, shutdown, }) @@ -138,7 +133,7 @@ impl SourceFactory for ConnectorSourceFactory { // Add source information to the schema. for field in &mut schema.fields { field.source = SourceDefinition::Table { - connection: self.connection_name.clone(), + connection: self.connection.name.clone(), name: table_name.clone(), }; } @@ -174,35 +169,27 @@ impl SourceFactory for ConnectorSourceFactory { fn build( &self, _output_schemas: HashMap, - mut last_checkpoint: SourceState, + state: Option>, ) -> Result, BoxedError> { - // Construct the tables to ingest. + // Construct table info. let tables = self .tables .iter() - .map(|table| { - let state = last_checkpoint.remove(&table.port).flatten(); - TableToIngest { - schema: table.schema_name.clone(), - name: table.name.clone(), - column_names: table.columns.clone(), - state, - } + .map(|table| TableInfo { + schema: table.schema_name.clone(), + name: table.name.clone(), + column_names: table.columns.clone(), }) .collect(); let ports = self.tables.iter().map(|table| table.port).collect(); - let connector = self - .connector - .lock() - .take() - .expect("ConnectorSource was already built"); + let connector = get_connector(self.runtime.clone(), self.connection.clone(), state)?; Ok(Box::new(ConnectorSource { tables, ports, connector, - connection_name: self.connection_name.clone(), + connection_name: self.connection.name.clone(), labels: self.labels.clone(), shutdown: self.shutdown.clone(), ingestion_config: IngestionConfig::default(), @@ -212,7 +199,7 @@ impl SourceFactory for ConnectorSourceFactory { #[derive(Debug)] pub struct ConnectorSource { - tables: Vec, + tables: Vec, ports: Vec, connector: Box, connection_name: String, @@ -223,9 +210,14 @@ pub struct ConnectorSource { #[async_trait] impl Source for ConnectorSource { + async fn serialize_state(&self) -> Result, BoxedError> { + self.connector.serialize_state().await + } + async fn start( &self, sender: Sender<(PortHandle, IngestionMessage)>, + last_checkpoint: Option, ) -> Result<(), BoxedError> { let (ingestor, iterator) = Ingestor::initialize_channel(self.ingestion_config.clone()); let connection_name = self.connection_name.clone(); @@ -254,7 +246,8 @@ impl Source for ConnectorSource { eprintln!("Aborted connector {}", name); }); let result = Abortable::new( - self.connector.start(&ingestor, self.tables.clone()), + self.connector + .start(&ingestor, self.tables.clone(), last_checkpoint), abort_registration, ) .await; @@ -281,7 +274,7 @@ async fn forward_message_to_pipeline( mut iterator: IngestionIterator, sender: Sender<(PortHandle, IngestionMessage)>, connection_name: String, - tables: Vec, + tables: Vec, ports: Vec, labels: LabelsAndProgress, ) { diff --git a/dozer-cli/src/simple/orchestrator.rs b/dozer-cli/src/simple/orchestrator.rs index 70f61415a0..ff3a49e5ec 100644 --- a/dozer-cli/src/simple/orchestrator.rs +++ b/dozer-cli/src/simple/orchestrator.rs @@ -372,7 +372,8 @@ impl SimpleOrchestrator { ) -> Result, Vec)>, OrchestrationError> { let mut schema_map = HashMap::new(); for connection in &self.config.connections { - let connector = get_connector(self.runtime.clone(), connection.clone()) + // We're not really going to start ingestion, so passing `None` as state here is OK. + let connector = get_connector(self.runtime.clone(), connection.clone(), None) .map_err(|e| ConnectorSourceFactoryError::Connector(e.into()))?; let schema_tuples = connector .list_all_schemas() diff --git a/dozer-core/src/builder_dag.rs b/dozer-core/src/builder_dag.rs index 071ea2faef..1645931378 100644 --- a/dozer-core/src/builder_dag.rs +++ b/dozer-core/src/builder_dag.rs @@ -4,7 +4,7 @@ use daggy::{ petgraph::visit::{IntoNodeIdentifiers, IntoNodeReferences}, NodeIndex, }; -use dozer_types::node::NodeHandle; +use dozer_types::node::{NodeHandle, OpIdentifier}; use crate::{ checkpoint::OptionCheckpoint, @@ -26,7 +26,10 @@ pub struct NodeType { #[derive(Debug)] /// Node kind, source, processor or sink. Source has a checkpoint to start from. pub enum NodeKind { - Source(Box), + Source { + source: Box, + last_checkpoint: Option, + }, Processor(Box), Sink(Box), } @@ -69,32 +72,22 @@ impl BuilderDag { let node = node.weight; let node = match node.kind { DagNodeKind::Source(source) => { - let mut last_checkpoint_by_name = checkpoint.get_source_state(&node.handle)?; - let mut last_checkpoint = HashMap::new(); - for port_def in source.get_output_ports() { - let port_name = source.get_output_port_name(&port_def.handle); - last_checkpoint.insert( - port_def.handle, - last_checkpoint_by_name - .as_mut() - .and_then(|last_checkpoint| { - last_checkpoint.remove(&port_name).flatten().cloned() - }), - ); - } - + let source_state = checkpoint.get_source_state(&node.handle)?; let source = source .build( output_schemas .remove(&node_index) .expect("we collected all output schemas"), - last_checkpoint, + source_state.map(|state| state.0.to_vec()), ) .map_err(ExecutionError::Factory)?; NodeType { handle: node.handle, - kind: NodeKind::Source(source), + kind: NodeKind::Source { + source, + last_checkpoint: source_state.map(|state| state.1), + }, } } DagNodeKind::Processor(processor) => { diff --git a/dozer-core/src/checkpoint/mod.rs b/dozer-core/src/checkpoint/mod.rs index 40e570da30..ef5a91fd26 100644 --- a/dozer-core/src/checkpoint/mod.rs +++ b/dozer-core/src/checkpoint/mod.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, ops::Deref, sync::Arc}; +use std::{ops::Deref, sync::Arc}; use dozer_log::{ camino::Utf8Path, @@ -12,7 +12,7 @@ use dozer_types::{ bincode, log::{error, info}, models::app_config::{DataStorage, RecordStore}, - node::{NodeHandle, RestartableState, SourceStates, TableState}, + node::{NodeHandle, OpIdentifier, SourceState, SourceStates}, parking_lot::Mutex, tonic::codegen::tokio_stream::StreamExt, types::Field, @@ -114,29 +114,21 @@ impl OptionCheckpoint { pub fn get_source_state( &self, node_handle: &NodeHandle, - ) -> Result>>, ExecutionError> { + ) -> Result, ExecutionError> { let Some(checkpoint) = self.checkpoint.as_ref() else { return Ok(None); }; - let Some(source_state) = checkpoint.source_states.get(node_handle) else { + let Some(state) = checkpoint.source_states.get(node_handle) else { return Ok(None); }; - let mut result = HashMap::new(); - for (table_name, state) in source_state { - let state = match state { - TableState::NotStarted => None, - TableState::NonRestartable => { - return Err(ExecutionError::SourceCannotRestart { - source_name: node_handle.clone(), - table_name: table_name.clone(), - }); - } - TableState::Restartable(state) => Some(state), - }; - result.insert(table_name.clone(), state); + match state { + SourceState::NotStarted => Ok(None), + SourceState::NonRestartable => { + Err(ExecutionError::SourceCannotRestart(node_handle.clone())) + } + SourceState::Restartable { state, checkpoint } => Ok(Some((state, *checkpoint))), } - Ok(Some(result)) } pub async fn load_processor_data( @@ -374,7 +366,7 @@ pub async fn create_checkpoint_factory_for_test( let epoch_id = 42; let source_states: SourceStates = [( NodeHandle::new(Some(1), "id".to_string()), - Default::default(), + SourceState::NotStarted, )] .into_iter() .collect(); diff --git a/dozer-core/src/epoch/manager.rs b/dozer-core/src/epoch/manager.rs index 563e79b623..93ba90f759 100644 --- a/dozer-core/src/epoch/manager.rs +++ b/dozer-core/src/epoch/manager.rs @@ -1,8 +1,7 @@ use dozer_recordstore::ProcessorRecordStore; use dozer_types::log::info; -use dozer_types::node::{NodeHandle, SourceStates, TableState}; +use dozer_types::node::{NodeHandle, SourceState, SourceStates}; use dozer_types::parking_lot::Mutex; -use std::collections::HashMap; use std::ops::DerefMut; use std::sync::{Arc, Barrier}; use std::thread::sleep; @@ -162,7 +161,7 @@ impl EpochManager { /// - `request_commit`: Whether the source wants to commit. The `EpochManager` checks if any source wants to commit and returns `Some` if so. pub fn wait_for_epoch_close( &self, - source_state: (NodeHandle, HashMap), + source_state: (NodeHandle, SourceState), request_termination: bool, request_commit: bool, ) -> ClosedEpoch { @@ -300,11 +299,9 @@ impl EpochManager { } fn is_restartable(source_states: &SourceStates) -> bool { - source_states.values().all(|table_states| { - table_states - .values() - .all(|table_state| table_state != &TableState::NonRestartable) - }) + source_states + .values() + .all(|source_state| source_state != &SourceState::NonRestartable) } #[cfg(test)] @@ -336,7 +333,7 @@ mod tests { epoch_manager: &EpochManager, termination_gen: &(impl Fn(u16) -> bool + Sync), commit_gen: &(impl Fn(u16) -> bool + Sync), - source_state_gen: &(impl Fn(u16) -> (NodeHandle, HashMap) + Sync), + source_state_gen: &(impl Fn(u16) -> (NodeHandle, SourceState) + Sync), ) -> ClosedEpoch { scope(|scope| { let handles = (0..NUM_THREADS) @@ -368,10 +365,10 @@ mod tests { }) } - fn generate_source_state(index: u16) -> (NodeHandle, HashMap) { + fn generate_source_state(index: u16) -> (NodeHandle, SourceState) { ( NodeHandle::new(Some(index), index.to_string()), - Default::default(), + SourceState::NotStarted, ) } diff --git a/dozer-core/src/errors.rs b/dozer-core/src/errors.rs index cf2c4fd8b9..3f7fc09806 100644 --- a/dozer-core/src/errors.rs +++ b/dozer-core/src/errors.rs @@ -49,11 +49,8 @@ pub enum ExecutionError { CheckpointedLogReader(#[from] CheckpointedLogReaderError), #[error("Cannot deserialize checkpoint: {0}")] CorruptedCheckpoint(#[source] bincode::error::DecodeError), - #[error("Table {table_name} of source {source_name} cannot restart. You have to clean data from previous runs by running `dozer clean`")] - SourceCannotRestart { - source_name: NodeHandle, - table_name: String, - }, + #[error("Source {0} cannot restart. You have to clean data from previous runs by running `dozer clean`")] + SourceCannotRestart(NodeHandle), #[error("Failed to create checkpoint: {0}")] FailedToCreateCheckpoint(BoxedError), #[error("Failed to serialize record writer: {0}")] diff --git a/dozer-core/src/executor/mod.rs b/dozer-core/src/executor/mod.rs index cde94ebd59..7148b7d1f6 100644 --- a/dozer-core/src/executor/mod.rs +++ b/dozer-core/src/executor/mod.rs @@ -129,7 +129,7 @@ impl DagExecutor { running.clone(), runtime.clone(), ) - .await; + .await?; join_handles.push(start_source(source_node)?); } NodeKind::Processor(_) => { diff --git a/dozer-core/src/executor/receiver_loop.rs b/dozer-core/src/executor/receiver_loop.rs index f0092acabd..d981328371 100644 --- a/dozer-core/src/executor/receiver_loop.rs +++ b/dozer-core/src/executor/receiver_loop.rs @@ -102,7 +102,7 @@ mod tests { use crossbeam::channel::{unbounded, Sender}; use dozer_types::{ - node::{NodeHandle, SourceStates}, + node::{NodeHandle, SourceState, SourceStates}, types::{Field, Record}, }; @@ -229,8 +229,14 @@ mod tests { fn receiver_loop_increases_epoch_id() { let (mut test_loop, senders) = TestReceiverLoop::new(2); let mut source_states = SourceStates::default(); - source_states.insert(NodeHandle::new(None, "0".to_string()), Default::default()); - source_states.insert(NodeHandle::new(None, "1".to_string()), Default::default()); + source_states.insert( + NodeHandle::new(None, "0".to_string()), + SourceState::NotStarted, + ); + source_states.insert( + NodeHandle::new(None, "1".to_string()), + SourceState::NotStarted, + ); let source_states = Arc::new(source_states); let decision_instant = SystemTime::now(); let mut epoch0 = Epoch::new(0, source_states.clone(), None, None, decision_instant); @@ -272,8 +278,14 @@ mod tests { fn receiver_loop_panics_on_inconsistent_commit_epoch() { let (mut test_loop, senders) = TestReceiverLoop::new(2); let mut source_states = SourceStates::new(); - source_states.insert(NodeHandle::new(None, "0".to_string()), Default::default()); - source_states.insert(NodeHandle::new(None, "1".to_string()), Default::default()); + source_states.insert( + NodeHandle::new(None, "0".to_string()), + SourceState::NotStarted, + ); + source_states.insert( + NodeHandle::new(None, "1".to_string()), + SourceState::NotStarted, + ); let source_states = Arc::new(source_states); let decision_instant = SystemTime::now(); let epoch0 = Epoch::new(0, source_states.clone(), None, None, decision_instant); diff --git a/dozer-core/src/executor/source_node.rs b/dozer-core/src/executor/source_node.rs index 86d998ae8d..bdcba2c5f2 100644 --- a/dozer-core/src/executor/source_node.rs +++ b/dozer-core/src/executor/source_node.rs @@ -12,8 +12,8 @@ use dozer_log::tokio::{ sync::mpsc::{channel, Receiver, Sender}, time::timeout, }; -use dozer_types::models::ingestion_types::IngestionMessage; use dozer_types::{log::debug, node::NodeHandle}; +use dozer_types::{models::ingestion_types::IngestionMessage, node::OpIdentifier}; use crate::{ builder_dag::NodeKind, @@ -34,6 +34,8 @@ pub struct SourceNode { source: Box, /// The sender that will be passed to the source for outputting data. sender: Sender<(PortHandle, IngestionMessage)>, + /// Checkpoint for the source. + last_checkpoint: Option, /// The receiver for receiving data from source. receiver: Receiver<(PortHandle, IngestionMessage)>, /// Receiving timeout. @@ -82,9 +84,10 @@ impl Node for SourceNode { fn run(mut self) -> Result<(), ExecutionError> { let source = self.source; let sender = self.sender; + let last_checkpoint = self.last_checkpoint; let mut handle = Some( self.runtime - .spawn(async move { source.start(sender).await }), + .spawn(async move { source.start(sender, last_checkpoint).await }), ); loop { @@ -142,13 +145,17 @@ pub async fn create_source_node( options: &ExecutorOptions, running: Arc, runtime: Arc, -) -> SourceNode { +) -> Result { // Get the source node. let Some(node) = dag.node_weight_mut(node_index).take() else { panic!("Must pass in a node") }; let node_handle = node.handle; - let NodeKind::Source(source) = node.kind else { + let NodeKind::Source { + source, + last_checkpoint, + } = node.kind + else { panic!("Must pass in a source node"); }; let port_names = dag @@ -168,8 +175,13 @@ pub async fn create_source_node( // Create channel manager. let (senders, record_writers) = dag.collect_senders_and_record_writers(node_index).await; + let source_state = source + .serialize_state() + .await + .map_err(ExecutionError::Source)?; let channel_manager = SourceChannelManager::new( node_handle.clone(), + source_state, port_names, record_writers, senders, @@ -179,14 +191,15 @@ pub async fn create_source_node( dag.error_manager().clone(), ); - SourceNode { + Ok(SourceNode { node_handle, source, sender, + last_checkpoint, receiver, timeout: options.commit_time_threshold, running, channel_manager, runtime, - } + }) } diff --git a/dozer-core/src/forwarder.rs b/dozer-core/src/forwarder.rs index 1538f8d9f8..b4bd072836 100644 --- a/dozer-core/src/forwarder.rs +++ b/dozer-core/src/forwarder.rs @@ -11,7 +11,7 @@ use crossbeam::channel::Sender; use dozer_recordstore::ProcessorRecordStore; use dozer_types::log::debug; use dozer_types::models::ingestion_types::IngestionMessage; -use dozer_types::node::{NodeHandle, TableState}; +use dozer_types::node::{NodeHandle, SourceState}; use dozer_types::types::Operation; use std::collections::HashMap; use std::ops::Deref; @@ -131,7 +131,8 @@ impl ChannelManager { pub(crate) struct SourceChannelManager { port_names: HashMap, manager: ChannelManager, - current_op_ids: HashMap, + source_level_state: Vec, + source_state: SourceState, commit_sz: u32, num_uncommitted_ops: u32, max_duration_between_commits: Duration, @@ -143,6 +144,7 @@ impl SourceChannelManager { #[allow(clippy::too_many_arguments)] pub fn new( owner: NodeHandle, + source_state: Vec, port_names: HashMap, record_writers: HashMap>, senders: HashMap>>, @@ -151,11 +153,9 @@ impl SourceChannelManager { epoch_manager: Arc, error_manager: Arc, ) -> Self { + let source_level_state = source_state; // FIXME: Read current_op_id from persisted state. - let current_op_ids = port_names - .values() - .map(|n| (n.clone(), TableState::NotStarted)) - .collect(); + let source_state = SourceState::NotStarted; Self { manager: ChannelManager::new( @@ -166,7 +166,8 @@ impl SourceChannelManager { error_manager, ), port_names, - current_op_ids, + source_level_state, + source_state, commit_sz, num_uncommitted_ops: 0, max_duration_between_commits, @@ -186,7 +187,7 @@ impl SourceChannelManager { fn commit(&mut self, request_termination: bool) -> Result { let epoch = self.epoch_manager.wait_for_epoch_close( - (self.manager.owner.clone(), self.current_op_ids.clone()), + (self.manager.owner.clone(), self.source_state.clone()), request_termination, self.num_uncommitted_ops > 0, ); @@ -232,15 +233,14 @@ impl SourceChannelManager { ) -> Result { match message { IngestionMessage::OperationEvent { op, state, .. } => { - let port_name = self.port_names[&port].clone(); - self.current_op_ids.insert( - port_name, - if let Some(state) = state { - TableState::Restartable(state) - } else { - TableState::NonRestartable - }, - ); + self.source_state = if let Some(state) = state { + SourceState::Restartable { + state: self.source_level_state.clone(), + checkpoint: state, + } + } else { + SourceState::NonRestartable + }; self.manager.send_op(op, port)?; self.num_uncommitted_ops += 1; diff --git a/dozer-core/src/node.rs b/dozer-core/src/node.rs index 2a366977db..5b3a629d51 100644 --- a/dozer-core/src/node.rs +++ b/dozer-core/src/node.rs @@ -6,7 +6,7 @@ use dozer_log::storage::{Object, Queue}; use dozer_log::tokio::sync::mpsc::Sender; use dozer_types::errors::internal::BoxedError; use dozer_types::models::ingestion_types::IngestionMessage; -use dozer_types::node::RestartableState; +use dozer_types::node::OpIdentifier; use dozer_types::serde::{Deserialize, Serialize}; use dozer_types::tonic::async_trait; use dozer_types::types::{Operation, Schema}; @@ -52,16 +52,19 @@ pub trait SourceFactory: Send + Sync + Debug { fn build( &self, output_schemas: HashMap, - last_checkpoint: SourceState, + state: Option>, ) -> Result, BoxedError>; } -pub type SourceState = HashMap>; - #[async_trait] pub trait Source: Send + Sync + Debug { - async fn start(&self, sender: Sender<(PortHandle, IngestionMessage)>) - -> Result<(), BoxedError>; + async fn serialize_state(&self) -> Result, BoxedError>; + + async fn start( + &self, + sender: Sender<(PortHandle, IngestionMessage)>, + last_checkpoint: Option, + ) -> Result<(), BoxedError>; } #[async_trait] diff --git a/dozer-core/src/tests/app.rs b/dozer-core/src/tests/app.rs index 40bcc4d9ee..f364fd67b1 100644 --- a/dozer-core/src/tests/app.rs +++ b/dozer-core/src/tests/app.rs @@ -1,7 +1,7 @@ use super::run_dag; use crate::app::{App, AppPipeline, PipelineEntryPoint}; use crate::appsource::{AppSourceManager, AppSourceMappings}; -use crate::node::{OutputPortDef, PortHandle, Source, SourceFactory, SourceState}; +use crate::node::{OutputPortDef, PortHandle, Source, SourceFactory}; use crate::tests::dag_base_run::{ NoopJoinProcessorFactory, NOOP_JOIN_LEFT_INPUT_PORT, NOOP_JOIN_RIGHT_INPUT_PORT, }; @@ -38,7 +38,7 @@ impl SourceFactory for NoneSourceFactory { fn build( &self, _output_schemas: HashMap, - _last_checkpoint: SourceState, + _state: Option>, ) -> Result, BoxedError> { todo!() } diff --git a/dozer-core/src/tests/dag_base_create_errors.rs b/dozer-core/src/tests/dag_base_create_errors.rs index 08f7de3bb9..ea3c523d4a 100644 --- a/dozer-core/src/tests/dag_base_create_errors.rs +++ b/dozer-core/src/tests/dag_base_create_errors.rs @@ -1,6 +1,5 @@ use crate::node::{ OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Source, SourceFactory, - SourceState, }; use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE}; @@ -60,7 +59,7 @@ impl SourceFactory for CreateErrSourceFactory { fn build( &self, _output_schemas: HashMap, - _last_checkpoint: SourceState, + _state: Option>, ) -> Result, BoxedError> { if self.panic { panic!("Generated error"); diff --git a/dozer-core/src/tests/dag_base_errors.rs b/dozer-core/src/tests/dag_base_errors.rs index 081e3fcf1b..883865cf35 100644 --- a/dozer-core/src/tests/dag_base_errors.rs +++ b/dozer-core/src/tests/dag_base_errors.rs @@ -2,7 +2,7 @@ use crate::channels::ProcessorChannelForwarder; use crate::epoch::Epoch; use crate::node::{ OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Sink, SinkFactory, - Source, SourceFactory, SourceState, + Source, SourceFactory, }; use crate::tests::dag_base_run::NoopProcessorFactory; use crate::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT}; @@ -13,7 +13,7 @@ use dozer_log::tokio::sync::mpsc::Sender; use dozer_recordstore::{ProcessorRecordStore, ProcessorRecordStoreDeserializer}; use dozer_types::errors::internal::BoxedError; use dozer_types::models::ingestion_types::IngestionMessage; -use dozer_types::node::NodeHandle; +use dozer_types::node::{NodeHandle, OpIdentifier}; use dozer_types::tonic::async_trait; use dozer_types::types::{ Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition, @@ -319,7 +319,7 @@ impl SourceFactory for ErrGeneratorSourceFactory { fn build( &self, _output_schemas: HashMap, - _last_checkpoint: SourceState, + _state: Option>, ) -> Result, BoxedError> { Ok(Box::new(ErrGeneratorSource { count: self.count, @@ -336,9 +336,14 @@ pub(crate) struct ErrGeneratorSource { #[async_trait] impl Source for ErrGeneratorSource { + async fn serialize_state(&self) -> Result, BoxedError> { + Ok(vec![]) + } + async fn start( &self, sender: Sender<(PortHandle, IngestionMessage)>, + _last_checkpoint: Option, ) -> Result<(), BoxedError> { for n in 1..(self.count + 1) { if n == self.err_at { @@ -356,7 +361,7 @@ impl Source for ErrGeneratorSource { Field::String(format!("value_{n}")), ]), }, - state: Some(n.to_be_bytes().to_vec().into()), + state: Some(OpIdentifier::new(0, n)), }, )) .await?; diff --git a/dozer-core/src/tests/dag_ports.rs b/dozer-core/src/tests/dag_ports.rs index d1874d5194..1f6307fdac 100644 --- a/dozer-core/src/tests/dag_ports.rs +++ b/dozer-core/src/tests/dag_ports.rs @@ -1,6 +1,5 @@ use crate::node::{ OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Source, SourceFactory, - SourceState, }; use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE}; use dozer_recordstore::ProcessorRecordStoreDeserializer; @@ -39,7 +38,7 @@ impl SourceFactory for DynPortsSourceFactory { fn build( &self, _input_schemas: HashMap, - _last_checkpoint: SourceState, + _state: Option>, ) -> Result, BoxedError> { todo!() } diff --git a/dozer-core/src/tests/dag_schemas.rs b/dozer-core/src/tests/dag_schemas.rs index f2aef9902b..7312332c03 100644 --- a/dozer-core/src/tests/dag_schemas.rs +++ b/dozer-core/src/tests/dag_schemas.rs @@ -1,7 +1,7 @@ use crate::dag_schemas::{DagHaveSchemas, DagSchemas}; use crate::node::{ OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, SinkFactory, Source, - SourceFactory, SourceState, + SourceFactory, }; use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE}; @@ -63,7 +63,7 @@ impl SourceFactory for TestUsersSourceFactory { fn build( &self, _input_schemas: HashMap, - _last_checkpoint: SourceState, + _state: Option>, ) -> Result, BoxedError> { todo!() } @@ -110,7 +110,7 @@ impl SourceFactory for TestCountriesSourceFactory { fn build( &self, _input_schemas: HashMap, - _last_checkpoint: SourceState, + _state: Option>, ) -> Result, BoxedError> { todo!() } diff --git a/dozer-core/src/tests/sources.rs b/dozer-core/src/tests/sources.rs index 489ccdbe4e..5ace4845db 100644 --- a/dozer-core/src/tests/sources.rs +++ b/dozer-core/src/tests/sources.rs @@ -1,8 +1,9 @@ -use crate::node::{OutputPortDef, OutputPortType, PortHandle, Source, SourceFactory, SourceState}; +use crate::node::{OutputPortDef, OutputPortType, PortHandle, Source, SourceFactory}; use crate::DEFAULT_PORT_HANDLE; use dozer_log::tokio::{self, sync::mpsc::Sender}; use dozer_types::errors::internal::BoxedError; use dozer_types::models::ingestion_types::IngestionMessage; +use dozer_types::node::OpIdentifier; use dozer_types::tonic::async_trait; use dozer_types::types::{ Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition, @@ -75,16 +76,9 @@ impl SourceFactory for GeneratorSourceFactory { fn build( &self, _input_schemas: HashMap, - last_checkpoint: SourceState, + _state: Option>, ) -> Result, BoxedError> { - let state = last_checkpoint.values().next().and_then(|state| { - state - .as_ref() - .map(|state| u64::from_be_bytes(state.0.as_slice().try_into().unwrap())) - }); - let start = state.map(|state| state + 1).unwrap_or(0); Ok(Box::new(GeneratorSource { - start, count: self.count, running: self.running.clone(), })) @@ -93,18 +87,25 @@ impl SourceFactory for GeneratorSourceFactory { #[derive(Debug)] pub(crate) struct GeneratorSource { - start: u64, count: u64, running: Arc, } #[async_trait] impl Source for GeneratorSource { + async fn serialize_state(&self) -> Result, BoxedError> { + Ok(vec![]) + } + async fn start( &self, sender: Sender<(PortHandle, IngestionMessage)>, + last_checkpoint: Option, ) -> Result<(), BoxedError> { - for n in self.start..(self.start + self.count) { + let start = last_checkpoint + .map(|checkpoint| checkpoint.seq_in_tx + 1) + .unwrap_or(0); + for n in start..(start + self.count) { sender .send(( GENERATOR_SOURCE_OUTPUT_PORT, @@ -116,7 +117,7 @@ impl Source for GeneratorSource { Field::String(format!("value_{n}")), ]), }, - state: Some(n.to_be_bytes().to_vec().into()), + state: Some(OpIdentifier::new(0, n)), }, )) .await?; @@ -209,7 +210,7 @@ impl SourceFactory for DualPortGeneratorSourceFactory { fn build( &self, _input_schemas: HashMap, - _last_checkpoint: SourceState, + _state: Option>, ) -> Result, BoxedError> { Ok(Box::new(DualPortGeneratorSource { count: self.count, @@ -226,9 +227,14 @@ pub(crate) struct DualPortGeneratorSource { #[async_trait] impl Source for DualPortGeneratorSource { + async fn serialize_state(&self) -> Result, BoxedError> { + Ok(vec![]) + } + async fn start( &self, sender: Sender<(PortHandle, IngestionMessage)>, + _last_checkpoint: Option, ) -> Result<(), BoxedError> { for n in 1..(self.count + 1) { sender @@ -242,7 +248,7 @@ impl Source for DualPortGeneratorSource { Field::String(format!("value_{n}")), ]), }, - state: Some(n.to_be_bytes().to_vec().into()), + state: Some(OpIdentifier::new(0, n)), }, )) .await?; @@ -257,7 +263,7 @@ impl Source for DualPortGeneratorSource { Field::String(format!("value_{n}")), ]), }, - state: Some(n.to_be_bytes().to_vec().into()), + state: Some(OpIdentifier::new(0, n)), }, )) .await?; @@ -294,7 +300,7 @@ impl SourceFactory for ConnectivityTestSourceFactory { fn build( &self, _output_schemas: HashMap, - _last_checkpoint: SourceState, + _state: Option>, ) -> Result, BoxedError> { unimplemented!("This struct is for connectivity test, only output ports are defined") } diff --git a/dozer-ingestion/benches/helper.rs b/dozer-ingestion/benches/helper.rs index 9e4060c57d..3dd765d187 100644 --- a/dozer-ingestion/benches/helper.rs +++ b/dozer-ingestion/benches/helper.rs @@ -7,7 +7,7 @@ use dozer_ingestion_connector::{ models::connection::Connection, serde::{Deserialize, Serialize}, }, - Connector, IngestionIterator, Ingestor, TableToIngest, + Connector, IngestionIterator, Ingestor, TableInfo, }; use tokio::runtime::Runtime; @@ -45,22 +45,19 @@ pub fn get_progress() -> ProgressBar { } pub fn get_connection_iterator(runtime: Arc, config: TestConfig) -> IngestionIterator { - let connector = dozer_ingestion::get_connector(runtime.clone(), config.connection).unwrap(); + let connector = + dozer_ingestion::get_connector(runtime.clone(), config.connection, None).unwrap(); let tables = runtime.block_on(list_tables(&*connector)); let (ingestor, iterator) = Ingestor::initialize_channel(Default::default()); runtime.clone().spawn_blocking(move || async move { - if let Err(e) = runtime.block_on(connector.start(&ingestor, tables)) { + if let Err(e) = runtime.block_on(connector.start(&ingestor, tables, None)) { error!("Error starting connector: {:?}", e); } }); iterator } -async fn list_tables(connector: &dyn Connector) -> Vec { +async fn list_tables(connector: &dyn Connector) -> Vec { let tables = connector.list_tables().await.unwrap(); - let tables = connector.list_columns(tables).await.unwrap(); - tables - .into_iter() - .map(TableToIngest::from_scratch) - .collect() + connector.list_columns(tables).await.unwrap() } diff --git a/dozer-ingestion/connector/src/lib.rs b/dozer-ingestion/connector/src/lib.rs index 71256f6e9b..1449300608 100644 --- a/dozer-ingestion/connector/src/lib.rs +++ b/dozer-ingestion/connector/src/lib.rs @@ -1,7 +1,7 @@ use std::fmt::Debug; use dozer_types::errors::internal::BoxedError; -use dozer_types::node::RestartableState; +use dozer_types::node::OpIdentifier; use dozer_types::serde; use dozer_types::serde::{Deserialize, Serialize}; pub use dozer_types::tonic::async_trait; @@ -96,11 +96,15 @@ pub trait Connector: Send + Sync + Debug { Ok((table_infos, schemas)) } + /// Serializes any state that's required to re-instantiate this connector. Should not be confused with `last_checkpoint`. + async fn serialize_state(&self) -> Result, BoxedError>; + /// Starts outputting data from `tables` to `ingestor`. This method should never return unless there is an unrecoverable error. async fn start( &self, ingestor: &Ingestor, - tables: Vec, + tables: Vec, + last_checkpoint: Option, ) -> Result<(), BoxedError>; } @@ -136,27 +140,3 @@ pub struct TableInfo { /// The column names to be mapped. pub column_names: Vec, } - -#[derive(Debug, Clone)] -/// `TableInfo` with an optional checkpoint info. -pub struct TableToIngest { - /// The `schema` scope of the table. - pub schema: Option, - /// The table name, must be unique under the `schema` scope, or global scope if `schema` is `None`. - pub name: String, - /// The column names to be mapped. - pub column_names: Vec, - /// The state to restart after. - pub state: Option, -} - -impl TableToIngest { - pub fn from_scratch(table_info: TableInfo) -> Self { - Self { - schema: table_info.schema, - name: table_info.name, - column_names: table_info.column_names, - state: None, - } - } -} diff --git a/dozer-ingestion/connector/src/test_util.rs b/dozer-ingestion/connector/src/test_util.rs index 2b11876e08..b08f839844 100644 --- a/dozer-ingestion/connector/src/test_util.rs +++ b/dozer-ingestion/connector/src/test_util.rs @@ -8,7 +8,7 @@ use dozer_types::{ use futures::stream::{AbortHandle, Abortable}; use tokio::runtime::Runtime; -use crate::{Connector, IngestionIterator, Ingestor, TableInfo, TableToIngest}; +use crate::{Connector, IngestionIterator, Ingestor, TableInfo}; pub fn create_test_runtime() -> Arc { Arc::new( @@ -26,14 +26,10 @@ pub fn spawn_connector( ) -> (IngestionIterator, AbortHandle) { let (ingestor, iterator) = Ingestor::initialize_channel(Default::default()); let (abort_handle, abort_registration) = AbortHandle::new_pair(); - let tables = tables - .into_iter() - .map(TableToIngest::from_scratch) - .collect(); runtime.clone().spawn_blocking(move || { runtime.block_on(async move { if let Ok(Err(e)) = - Abortable::new(connector.start(&ingestor, tables), abort_registration).await + Abortable::new(connector.start(&ingestor, tables, None), abort_registration).await { error!("Connector `start` returned error: {e}") } diff --git a/dozer-ingestion/deltalake/src/connector.rs b/dozer-ingestion/deltalake/src/connector.rs index a50cdb4aa4..b455ddfc0f 100644 --- a/dozer-ingestion/deltalake/src/connector.rs +++ b/dozer-ingestion/deltalake/src/connector.rs @@ -3,10 +3,11 @@ use crate::schema_helper::SchemaHelper; use dozer_ingestion_connector::{ async_trait, dozer_types::{ - errors::internal::BoxedError, models::ingestion_types::DeltaLakeConfig, types::FieldType, + errors::internal::BoxedError, models::ingestion_types::DeltaLakeConfig, node::OpIdentifier, + types::FieldType, }, utils::{ListOrFilterColumns, TableNotFound}, - Connector, Ingestor, SourceSchemaResult, TableIdentifier, TableInfo, TableToIngest, + Connector, Ingestor, SourceSchemaResult, TableIdentifier, TableInfo, }; #[derive(Debug)] @@ -108,11 +109,17 @@ impl Connector for DeltaLakeConnector { schema_helper.get_schemas(&table_infos).await } + async fn serialize_state(&self) -> Result, BoxedError> { + Ok(vec![]) + } + async fn start( &self, ingestor: &Ingestor, - tables: Vec, + tables: Vec, + last_checkpoint: Option, ) -> Result<(), BoxedError> { + assert!(last_checkpoint.is_none()); let reader = DeltaLakeReader::new(self.config.clone()); reader.read(&tables, ingestor).await } diff --git a/dozer-ingestion/deltalake/src/reader.rs b/dozer-ingestion/deltalake/src/reader.rs index 3e25e91b1e..461aeb5132 100644 --- a/dozer-ingestion/deltalake/src/reader.rs +++ b/dozer-ingestion/deltalake/src/reader.rs @@ -11,7 +11,7 @@ use dozer_ingestion_connector::{ futures::StreamExt, tokio, utils::TableNotFound, - Ingestor, TableToIngest, + Ingestor, TableInfo, }; pub struct DeltaLakeReader { @@ -23,11 +23,7 @@ impl DeltaLakeReader { Self { config } } - pub async fn read( - &self, - table: &[TableToIngest], - ingestor: &Ingestor, - ) -> Result<(), BoxedError> { + pub async fn read(&self, table: &[TableInfo], ingestor: &Ingestor) -> Result<(), BoxedError> { for (table_index, table) in table.iter().enumerate() { self.read_impl(table_index, table, ingestor).await?; } @@ -37,11 +33,9 @@ impl DeltaLakeReader { async fn read_impl( &self, table_index: usize, - table: &TableToIngest, + table: &TableInfo, ingestor: &Ingestor, ) -> Result<(), BoxedError> { - assert!(table.state.is_none()); - let table_path = table_path(&self.config, &table.name)?; let ctx = SessionContext::new(); let delta_table = deltalake::open_table(table_path).await?; diff --git a/dozer-ingestion/dozer/src/connector.rs b/dozer-ingestion/dozer/src/connector.rs index 02b0092952..2ae207b016 100644 --- a/dozer-ingestion/dozer/src/connector.rs +++ b/dozer-ingestion/dozer/src/connector.rs @@ -12,7 +12,7 @@ use dozer_ingestion_connector::{ default_buffer_size, default_log_batch_size, default_timeout, IngestionMessage, NestedDozerConfig, NestedDozerLogOptions, }, - node::RestartableState, + node::OpIdentifier, serde_json, tonic::{async_trait, transport::Channel}, types::{FieldType, Operation, Record, Schema}, @@ -23,7 +23,6 @@ use dozer_ingestion_connector::{ }, utils::warn_dropped_primary_index, CdcType, Connector, Ingestor, SourceSchema, SourceSchemaResult, TableIdentifier, TableInfo, - TableToIngest, }; use dozer_log::{ reader::{LogReaderBuilder, LogReaderOptions}, @@ -123,17 +122,28 @@ impl Connector for NestedDozerConnector { Ok(schemas) } + async fn serialize_state(&self) -> Result, BoxedError> { + Ok(vec![]) + } + async fn start( &self, ingestor: &Ingestor, - tables: Vec, + tables: Vec, + last_checkpoint: Option, ) -> Result<(), BoxedError> { let mut joinset = JoinSet::new(); let (sender, mut receiver) = channel(100); for (table_index, table) in tables.into_iter().enumerate() { let builder = self.get_reader_builder(table.name.clone()).await?; - joinset.spawn(read_table(table_index, table, builder, sender.clone())); + joinset.spawn(read_table( + table_index, + table, + last_checkpoint, + builder, + sender.clone(), + )); } let ingestor = ingestor.clone(); @@ -204,14 +214,12 @@ impl NestedDozerConnector { async fn read_table( table_index: usize, - table_info: TableToIngest, + table_info: TableInfo, + last_checkpoint: Option, reader_builder: LogReaderBuilder, sender: Sender, ) -> Result<(), NestedDozerConnectorError> { - let state = table_info - .state - .map(|state| decode_state(&state)) - .transpose()?; + let state = last_checkpoint.map(|state| state.seq_in_tx); let starting_point = state.map(|pos| pos + 1).unwrap_or(0); let mut reader = reader_builder.build(starting_point); let schema = reader.schema.schema.clone(); @@ -252,26 +260,12 @@ async fn read_table( .send(IngestionMessage::OperationEvent { table_index, op, - state: Some(encode_state(op_and_pos.pos)), + state: Some(OpIdentifier::new(0, op_and_pos.pos)), }) .await; } } -fn encode_state(pos: u64) -> RestartableState { - pos.to_be_bytes().to_vec().into() -} - -fn decode_state(state: &RestartableState) -> Result { - Ok(u64::from_be_bytes( - state - .0 - .as_slice() - .try_into() - .map_err(|_| NestedDozerConnectorError::CorruptedState)?, - )) -} - struct SchemaMapper { source_schema: Schema, fields: Vec, diff --git a/dozer-ingestion/dozer/src/lib.rs b/dozer-ingestion/dozer/src/lib.rs index 14a6fe6834..5620508c91 100644 --- a/dozer-ingestion/dozer/src/lib.rs +++ b/dozer-ingestion/dozer/src/lib.rs @@ -8,9 +8,6 @@ use dozer_log::errors::{ReaderBuilderError, ReaderError}; #[derive(Error, Debug)] enum NestedDozerConnectorError { - #[error("Failed to parse checkpoint state")] - CorruptedState, - #[error("Failed to connect to upstream dozer at {0}: {1:?}")] ConnectionError(String, #[source] dozer_types::tonic::transport::Error), diff --git a/dozer-ingestion/ethereum/src/log/connector.rs b/dozer-ingestion/ethereum/src/log/connector.rs index b23445ab02..0372465bb3 100644 --- a/dozer-ingestion/ethereum/src/log/connector.rs +++ b/dozer-ingestion/ethereum/src/log/connector.rs @@ -3,6 +3,7 @@ use std::{str::FromStr, sync::Arc}; use super::helper; use super::sender::{run, EthDetails}; +use dozer_ingestion_connector::dozer_types::node::OpIdentifier; use dozer_ingestion_connector::utils::TableNotFound; use dozer_ingestion_connector::{ async_trait, @@ -14,7 +15,6 @@ use dozer_ingestion_connector::{ types::FieldType, }, CdcType, Connector, Ingestor, SourceSchema, SourceSchemaResult, TableIdentifier, TableInfo, - TableToIngest, }; use web3::ethabi::{Contract, Event}; use web3::types::{Address, BlockNumber, Filter, FilterBuilder, H256, U64}; @@ -234,10 +234,15 @@ impl Connector for EthLogConnector { Ok(result) } + async fn serialize_state(&self) -> Result, BoxedError> { + Ok(vec![]) + } + async fn start( &self, ingestor: &Ingestor, - tables: Vec, + tables: Vec, + _last_checkpoint: Option, ) -> Result<(), BoxedError> { // Start a new thread that interfaces with ETH node let wss_url = self.config.wss_url.to_owned(); diff --git a/dozer-ingestion/ethereum/src/log/helper.rs b/dozer-ingestion/ethereum/src/log/helper.rs index 7501c70f8f..9afecb8113 100644 --- a/dozer-ingestion/ethereum/src/log/helper.rs +++ b/dozer-ingestion/ethereum/src/log/helper.rs @@ -5,7 +5,7 @@ use dozer_ingestion_connector::dozer_types::log::error; use dozer_ingestion_connector::dozer_types::types::{ Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition, }; -use dozer_ingestion_connector::{CdcType, SourceSchema, TableToIngest}; +use dozer_ingestion_connector::{CdcType, SourceSchema, TableInfo}; use web3::ethabi::RawLog; use web3::types::Log; @@ -60,7 +60,7 @@ pub fn get_contract_event_schemas( pub fn decode_event( log: Log, contracts: HashMap, - tables: Vec, + tables: Vec, ) -> Option<(usize, Operation)> { let address = format!("{:?}", log.address); diff --git a/dozer-ingestion/ethereum/src/log/sender.rs b/dozer-ingestion/ethereum/src/log/sender.rs index b2de12777b..b887f4bc78 100644 --- a/dozer-ingestion/ethereum/src/log/sender.rs +++ b/dozer-ingestion/ethereum/src/log/sender.rs @@ -9,7 +9,7 @@ use dozer_ingestion_connector::dozer_types::models::ingestion_types::{ }; use dozer_ingestion_connector::futures::future::BoxFuture; use dozer_ingestion_connector::futures::{FutureExt, StreamExt}; -use dozer_ingestion_connector::{tokio, Ingestor, TableToIngest}; +use dozer_ingestion_connector::{tokio, Ingestor, TableInfo}; use web3::transports::WebSocket; use web3::types::{Log, H256}; use web3::Web3; @@ -27,7 +27,7 @@ pub struct EthDetails<'a> { filter: EthFilter, ingestor: &'a Ingestor, contracts: HashMap, - pub tables: Vec, + pub tables: Vec, pub schema_map: HashMap, pub conn_name: String, } @@ -39,7 +39,7 @@ impl<'a> EthDetails<'a> { filter: EthFilter, ingestor: &'a Ingestor, contracts: HashMap, - tables: Vec, + tables: Vec, schema_map: HashMap, conn_name: String, ) -> Self { diff --git a/dozer-ingestion/ethereum/src/trace/connector.rs b/dozer-ingestion/ethereum/src/trace/connector.rs index 9ca8525788..095ba2e98c 100644 --- a/dozer-ingestion/ethereum/src/trace/connector.rs +++ b/dozer-ingestion/ethereum/src/trace/connector.rs @@ -4,11 +4,11 @@ use dozer_ingestion_connector::{ errors::internal::BoxedError, log::{error, info, warn}, models::ingestion_types::{default_batch_size, EthTraceConfig, IngestionMessage}, + node::OpIdentifier, types::FieldType, }, utils::TableNotFound, CdcType, Connector, Ingestor, SourceSchema, SourceSchemaResult, TableIdentifier, TableInfo, - TableToIngest, }; use super::super::helper as conn_helper; @@ -98,10 +98,15 @@ impl Connector for EthTraceConnector { ))]) } + async fn serialize_state(&self) -> Result, BoxedError> { + Ok(vec![]) + } + async fn start( &self, ingestor: &Ingestor, - _tables: Vec, + _tables: Vec, + _last_checkpoint: Option, ) -> Result<(), BoxedError> { let config = self.config.clone(); let conn_name = self.conn_name.clone(); diff --git a/dozer-ingestion/grpc/src/connector.rs b/dozer-ingestion/grpc/src/connector.rs index 5975a9ac13..b93f2a6a9c 100644 --- a/dozer-ingestion/grpc/src/connector.rs +++ b/dozer-ingestion/grpc/src/connector.rs @@ -5,6 +5,7 @@ use crate::Error; use super::adapter::{GrpcIngestor, IngestAdapter}; use super::ingest::IngestorServiceImpl; +use dozer_ingestion_connector::dozer_types::node::OpIdentifier; use dozer_ingestion_connector::utils::TableNotFound; use dozer_ingestion_connector::{ async_trait, dozer_types, @@ -19,7 +20,6 @@ use dozer_ingestion_connector::{ tracing::Level, }, Connector, Ingestor, SourceSchema, SourceSchemaResult, TableIdentifier, TableInfo, - TableToIngest, }; use tower_http::trace::{self, TraceLayer}; @@ -62,11 +62,7 @@ where Ok(schemas_str) } - pub async fn serve( - &self, - ingestor: &Ingestor, - tables: Vec, - ) -> Result<(), Error> { + pub async fn serve(&self, ingestor: &Ingestor, tables: Vec) -> Result<(), Error> { let host = self.config.host.clone().unwrap_or_else(default_ingest_host); let port = self.config.port.unwrap_or_else(default_ingest_port); @@ -216,10 +212,15 @@ where Ok(result) } + async fn serialize_state(&self) -> Result, BoxedError> { + Ok(vec![]) + } + async fn start( &self, ingestor: &Ingestor, - tables: Vec, + tables: Vec, + _last_checkpoint: Option, ) -> Result<(), BoxedError> { self.serve(ingestor, tables).await.map_err(Into::into) } diff --git a/dozer-ingestion/grpc/src/ingest.rs b/dozer-ingestion/grpc/src/ingest.rs index 91697d39c5..21beed123d 100644 --- a/dozer-ingestion/grpc/src/ingest.rs +++ b/dozer-ingestion/grpc/src/ingest.rs @@ -9,7 +9,7 @@ use dozer_ingestion_connector::{ tonic::{self, Streaming}, }, futures::StreamExt, - tokio, Ingestor, TableToIngest, + tokio, Ingestor, TableInfo, }; use super::adapter::{GrpcIngestMessage, GrpcIngestor, IngestAdapter}; @@ -20,7 +20,7 @@ where { adapter: Arc>, ingestor: &'static Ingestor, - tables: Vec, + tables: Vec, } impl IngestorServiceImpl where @@ -29,7 +29,7 @@ where pub fn new( adapter: GrpcIngestor, ingestor: &'static Ingestor, - tables: Vec, + tables: Vec, ) -> Self { Self { adapter: Arc::new(adapter), diff --git a/dozer-ingestion/javascript/src/lib.rs b/dozer-ingestion/javascript/src/lib.rs index 6e2ff41866..b02711f52c 100644 --- a/dozer-ingestion/javascript/src/lib.rs +++ b/dozer-ingestion/javascript/src/lib.rs @@ -5,11 +5,11 @@ use dozer_ingestion_connector::{ dozer_types::{ errors::internal::BoxedError, models::ingestion_types::{default_bootstrap_path, JavaScriptConfig}, + node::OpIdentifier, types::{FieldDefinition, FieldType, Schema, SourceDefinition}, }, tokio::runtime::Runtime, CdcType, Connector, Ingestor, SourceSchema, SourceSchemaResult, TableIdentifier, TableInfo, - TableToIngest, }; use js_extension::JsExtension; @@ -74,10 +74,15 @@ impl Connector for JavaScriptConnector { })]) } + async fn serialize_state(&self) -> Result, BoxedError> { + Ok(vec![]) + } + async fn start( &self, ingestor: &Ingestor, - _tables: Vec, + _tables: Vec, + _last_checkpoint: Option, ) -> Result<(), BoxedError> { let js_path = self .config diff --git a/dozer-ingestion/kafka/src/connector.rs b/dozer-ingestion/kafka/src/connector.rs index 7be348671e..d7dd633f44 100644 --- a/dozer-ingestion/kafka/src/connector.rs +++ b/dozer-ingestion/kafka/src/connector.rs @@ -1,6 +1,7 @@ use dozer_ingestion_connector::async_trait; use dozer_ingestion_connector::dozer_types::errors::internal::BoxedError; use dozer_ingestion_connector::dozer_types::models::ingestion_types::KafkaConfig; +use dozer_ingestion_connector::dozer_types::node::OpIdentifier; use dozer_ingestion_connector::dozer_types::types::FieldType; use dozer_ingestion_connector::Connector; use dozer_ingestion_connector::Ingestor; @@ -8,7 +9,6 @@ use dozer_ingestion_connector::SourceSchema; use dozer_ingestion_connector::SourceSchemaResult; use dozer_ingestion_connector::TableIdentifier; use dozer_ingestion_connector::TableInfo; -use dozer_ingestion_connector::TableToIngest; use rdkafka::consumer::BaseConsumer; use rdkafka::consumer::Consumer; use rdkafka::util::Timeout; @@ -127,21 +127,33 @@ impl Connector for KafkaConnector { .collect()) } + async fn serialize_state(&self) -> Result, BoxedError> { + Ok(vec![]) + } + async fn start( &self, ingestor: &Ingestor, - tables: Vec, + tables: Vec, + last_checkpoint: Option, ) -> Result<(), BoxedError> { let broker = self.config.broker.to_owned(); - run(broker, tables, ingestor, &self.config.schema_registry_url) - .await - .map_err(Into::into) + run( + broker, + tables, + last_checkpoint, + ingestor, + &self.config.schema_registry_url, + ) + .await + .map_err(Into::into) } } async fn run( broker: String, - tables: Vec, + tables: Vec, + last_checkpoint: Option, ingestor: &Ingestor, schema_registry_url: &Option, ) -> Result<(), KafkaError> { @@ -153,6 +165,12 @@ async fn run( let consumer = StreamConsumerBasic::default(); consumer - .run(client_config, ingestor, tables, schema_registry_url) + .run( + client_config, + ingestor, + tables, + last_checkpoint, + schema_registry_url, + ) .await } diff --git a/dozer-ingestion/kafka/src/debezium/stream_consumer.rs b/dozer-ingestion/kafka/src/debezium/stream_consumer.rs index 1429436910..c461acfd07 100644 --- a/dozer-ingestion/kafka/src/debezium/stream_consumer.rs +++ b/dozer-ingestion/kafka/src/debezium/stream_consumer.rs @@ -4,6 +4,8 @@ use crate::stream_consumer::StreamConsumer; use crate::stream_consumer_helper::{is_network_failure, OffsetsMap, StreamConsumerHelper}; use crate::{KafkaError, KafkaStreamError}; +use dozer_ingestion_connector::dozer_types::node::OpIdentifier; +use dozer_ingestion_connector::TableInfo; use dozer_ingestion_connector::{ async_trait, dozer_types::{ @@ -13,7 +15,7 @@ use dozer_ingestion_connector::{ serde_json::Value, types::{Operation, Record}, }, - Ingestor, TableToIngest, + Ingestor, }; use rdkafka::{ClientConfig, Message}; @@ -88,16 +90,12 @@ impl StreamConsumer for DebeziumStreamConsumer { &self, client_config: ClientConfig, ingestor: &Ingestor, - tables: Vec, + tables: Vec, + last_checkpoint: Option, _schema_registry_url: &Option, ) -> Result<(), KafkaError> { - let topics: Vec<&str> = tables - .iter() - .map(|t| { - assert!(t.state.is_none()); - t.name.as_str() - }) - .collect(); + assert!(last_checkpoint.is_none()); + let topics: Vec<&str> = tables.iter().map(|t| t.name.as_str()).collect(); let mut con = StreamConsumerHelper::start(&client_config, &topics).await?; let mut offsets = OffsetsMap::new(); loop { diff --git a/dozer-ingestion/kafka/src/stream_consumer.rs b/dozer-ingestion/kafka/src/stream_consumer.rs index d7573710a8..00765fd2b2 100644 --- a/dozer-ingestion/kafka/src/stream_consumer.rs +++ b/dozer-ingestion/kafka/src/stream_consumer.rs @@ -1,6 +1,8 @@ use crate::KafkaError; -use dozer_ingestion_connector::{async_trait, Ingestor, TableToIngest}; +use dozer_ingestion_connector::{ + async_trait, dozer_types::node::OpIdentifier, Ingestor, TableInfo, +}; use rdkafka::ClientConfig; #[async_trait] @@ -9,7 +11,8 @@ pub trait StreamConsumer { &self, client_config: ClientConfig, ingestor: &Ingestor, - tables: Vec, + tables: Vec, + last_checkpoint: Option, schema_registry_url: &Option, ) -> Result<(), KafkaError>; } diff --git a/dozer-ingestion/kafka/src/stream_consumer_basic.rs b/dozer-ingestion/kafka/src/stream_consumer_basic.rs index f72f8db69d..f2cffa5421 100644 --- a/dozer-ingestion/kafka/src/stream_consumer_basic.rs +++ b/dozer-ingestion/kafka/src/stream_consumer_basic.rs @@ -4,11 +4,12 @@ use dozer_ingestion_connector::{ async_trait, dozer_types::{ models::ingestion_types::IngestionMessage, + node::OpIdentifier, serde::{Deserialize, Serialize}, serde_json::{self, Value}, types::{Field, Operation, Record}, }, - Ingestor, TableToIngest, + Ingestor, TableInfo, }; use rdkafka::{ClientConfig, Message}; @@ -81,15 +82,15 @@ impl StreamConsumer for StreamConsumerBasic { &self, client_config: ClientConfig, ingestor: &Ingestor, - tables: Vec, + tables: Vec, + last_checkpoint: Option, schema_registry_url: &Option, ) -> Result<(), KafkaError> { + assert!(last_checkpoint.is_none()); let topics: Vec = tables.iter().map(|t| t.name.clone()).collect(); let mut schemas = HashMap::new(); for (table_index, table) in tables.into_iter().enumerate() { - assert!(table.state.is_none()); - let schema = if let Some(url) = schema_registry_url { SchemaRegistryBasic::get_single_schema(&table.name, url).await? } else { diff --git a/dozer-ingestion/mongodb/src/lib.rs b/dozer-ingestion/mongodb/src/lib.rs index f2441f5632..2df4d4c2cf 100644 --- a/dozer-ingestion/mongodb/src/lib.rs +++ b/dozer-ingestion/mongodb/src/lib.rs @@ -8,6 +8,7 @@ use dozer_ingestion_connector::{ errors::{internal::BoxedError, types::DeserializationError}, json_types::{serde_json_to_json_value, JsonValue}, models::ingestion_types::IngestionMessage, + node::OpIdentifier, thiserror::{self, Error}, types::{Field, FieldDefinition, FieldType, Operation, Record, SourceDefinition}, }, @@ -17,7 +18,6 @@ use dozer_ingestion_connector::{ sync::mpsc::{channel, Sender}, }, CdcType, Connector, Ingestor, SourceSchema, SourceSchemaResult, TableIdentifier, TableInfo, - TableToIngest, }; use mongodb::{ change_stream::event::ChangeStreamEvent, @@ -589,10 +589,15 @@ impl Connector for MongodbConnector { Ok(()) } + async fn serialize_state(&self) -> Result, BoxedError> { + Ok(vec![]) + } + async fn start( &self, ingestor: &Ingestor, - tables: Vec, + tables: Vec, + _last_checkpoint: Option, ) -> Result<(), BoxedError> { // Snapshot: find // diff --git a/dozer-ingestion/mysql/src/binlog.rs b/dozer-ingestion/mysql/src/binlog.rs index 27240dd521..a81a6a6fff 100644 --- a/dozer-ingestion/mysql/src/binlog.rs +++ b/dozer-ingestion/mysql/src/binlog.rs @@ -40,15 +40,15 @@ use std::{ #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct BinlogPosition { - pub filename: Vec, + pub binlog_id: u64, pub position: u64, pub seq_no: u64, } pub async fn get_master_binlog_position( conn: &mut Conn, -) -> Result { - let (filename, position) = { +) -> Result<(String, BinlogPosition), MySQLConnectorError> { + let (filename, position): (Vec, u64) = { let mut row: Row = conn .exec_first("SHOW MASTER STATUS", ()) .await @@ -57,11 +57,32 @@ pub async fn get_master_binlog_position( (row.take(0).unwrap(), row.take(1).unwrap()) }; - Ok(BinlogPosition { - filename, - position, - seq_no: 0, - }) + let binlog_id_with_prefix = String::from_utf8(filename.clone()).map_err(|err| { + MySQLConnectorError::BinlogError(format!( + "Unexpected binlog filename format: {filename:?}: {err}" + )) + })?; + + let Some((prefix, suffix)) = binlog_id_with_prefix.split_once('.') else { + return Err(MySQLConnectorError::BinlogError(format!( + "Unexpected binlog filename format: {binlog_id_with_prefix:?}" + ))); + }; + + let binlog_id = suffix.parse::().map_err(|err| { + MySQLConnectorError::BinlogError(format!( + "Unexpected binlog filename format: {filename:?}: {err}" + )) + })?; + + Ok(( + prefix.to_string(), + BinlogPosition { + binlog_id, + position, + seq_no: 0, + }, + )) } pub async fn get_binlog_format(conn: &mut Conn) -> Result { @@ -84,6 +105,7 @@ pub struct BinlogIngestor<'a, 'd, 'e> { conn_pool: &'d Pool, conn_url: &'e String, ingestion_start_position: Option, + binlog_prefix: String, } impl<'a, 'd, 'e> BinlogIngestor<'a, 'd, 'e> { @@ -93,6 +115,7 @@ impl<'a, 'd, 'e> BinlogIngestor<'a, 'd, 'e> { stop_position: Option, server_id: u32, (conn_pool, conn_url): (&'d Pool, &'e String), + binlog_prefix: String, ) -> Self { Self { ingestor, @@ -104,6 +127,7 @@ impl<'a, 'd, 'e> BinlogIngestor<'a, 'd, 'e> { server_id, conn_pool, conn_url, + binlog_prefix, } } } @@ -116,12 +140,17 @@ impl BinlogIngestor<'_, '_, '_> { } async fn open_binlog(&mut self) -> Result<(), MySQLConnectorError> { + let filename_formatted = format!( + "{}.{:0>6}", + self.binlog_prefix, self.next_position.binlog_id + ); + let filename = filename_formatted.as_bytes(); let binlog_stream = self .connect() .await? .get_binlog_stream( mysql_async::BinlogRequest::new(self.server_id) - .with_filename(self.next_position.filename.as_slice()) + .with_filename(filename) .with_pos(self.next_position.position), ) .await @@ -130,7 +159,7 @@ impl BinlogIngestor<'_, '_, '_> { self.binlog_stream = Some(binlog_stream); self.local_stop_position = self.stop_position.as_ref().and_then(|stop_position| { - if self.next_position.filename == stop_position.filename { + if self.next_position.binlog_id == stop_position.binlog_id { Some(stop_position.position) } else { None @@ -201,13 +230,28 @@ impl BinlogIngestor<'_, '_, '_> { _ => unreachable!(), }; - if rotate_event.name_raw() != self.next_position.filename.as_slice() { + let filename = rotate_event.name(); + let Some((prefix, suffix)) = filename.split_once('.') else { + Err(MySQLConnectorError::BinlogError(format!( + "Unexpected binlog filename format: {filename:?}" + )))? + }; + let rotated_binlog_id = suffix.parse::().map_err(|err| { + MySQLConnectorError::BinlogError(format!( + "Unexpected binlog filename format: {filename:?}: {err}" + )) + })?; + + if rotated_binlog_id != self.next_position.binlog_id + || self.binlog_prefix != prefix + { self.next_position = BinlogPosition { - filename: rotate_event.name_raw().into(), + binlog_id: rotated_binlog_id, position: rotate_event.position(), seq_no: 0, }; + self.binlog_prefix = prefix.to_string(); self.open_binlog().await?; } @@ -225,7 +269,7 @@ impl BinlogIngestor<'_, '_, '_> { if query == b"BEGIN" { transaction_pos.seq_no = 0; - transaction_pos.filename = self.next_position.filename.clone(); + transaction_pos.binlog_id = self.next_position.binlog_id; transaction_pos.position = (binlog_event.header().log_pos() - binlog_event.header().event_size()) as u64; @@ -1262,7 +1306,7 @@ mod tests { fn test_ingestion_continue_condition() { let binlog_id = 18; let start_position = super::BinlogPosition { - filename: format!("mysql-bin.0000{binlog_id}").into(), + binlog_id, position: 1234, seq_no: 3, }; @@ -1308,7 +1352,7 @@ mod tests { // Operation from previous binlog let operation_position = super::BinlogPosition { - filename: format!("mysql-bin.0000{}", binlog_id - 1).into(), + binlog_id: binlog_id - 1, ..start_position.clone() }; @@ -1316,7 +1360,7 @@ mod tests { // Operation from next binlog let operation_position = super::BinlogPosition { - filename: format!("mysql-bin.0000{}", binlog_id + 1).into(), + binlog_id: binlog_id + 1, ..start_position }; diff --git a/dozer-ingestion/mysql/src/connection.rs b/dozer-ingestion/mysql/src/connection.rs index ac6775ad18..d8b1aa9d99 100644 --- a/dozer-ingestion/mysql/src/connection.rs +++ b/dozer-ingestion/mysql/src/connection.rs @@ -94,11 +94,15 @@ pub fn is_network_failure(err: &mysql_async::Error) -> bool { } fn add_query_offset(query: &str, offset: u64) -> String { - assert!(query - .trim_start() - .get(0..7) - .map(|s| s.to_uppercase() == "SELECT ") - .unwrap_or(false)); + assert!([(7, "SELECT ".to_string()), (5, "SHOW ".to_string())] + .iter() + .any(|(len, prefix)| { + query + .trim_start() + .get(0..*len) + .map(|s| s.to_uppercase() == *prefix) + .unwrap_or(false) + })); if offset == 0 { query.into() diff --git a/dozer-ingestion/mysql/src/connector.rs b/dozer-ingestion/mysql/src/connector.rs index 83ee42f44b..818df5bc6c 100644 --- a/dozer-ingestion/mysql/src/connector.rs +++ b/dozer-ingestion/mysql/src/connector.rs @@ -1,5 +1,4 @@ use crate::MySQLConnectorError; -use std::collections::HashMap; use super::{ binlog::{get_binlog_format, get_master_binlog_position, BinlogIngestor, BinlogPosition}, @@ -8,7 +7,8 @@ use super::{ helpers::{escape_identifier, qualify_table_name}, schema::{ColumnDefinition, SchemaHelper, TableDefinition}, }; -use dozer_ingestion_connector::dozer_types::log::info; +use crate::MySQLConnectorError::BinlogQueryError; +use dozer_ingestion_connector::dozer_types::{log::info, node::OpIdentifier}; use dozer_ingestion_connector::{ async_trait, dozer_types::{ @@ -18,7 +18,6 @@ use dozer_ingestion_connector::{ }, utils::TableNotFound, CdcType, Connector, Ingestor, SourceSchema, SourceSchemaResult, TableIdentifier, TableInfo, - TableToIngest, }; use mysql_async::{Opts, Pool}; use mysql_common::Row; @@ -190,12 +189,19 @@ impl Connector for MySQLConnector { Ok(schemas) } + async fn serialize_state(&self) -> Result, BoxedError> { + Ok(vec![]) + } + async fn start( &self, ingestor: &Ingestor, - tables: Vec, + tables: Vec, + last_checkpoint: Option, ) -> Result<(), BoxedError> { - self.replicate(ingestor, tables).await.map_err(Into::into) + self.replicate(ingestor, tables, last_checkpoint) + .await + .map_err(Into::into) } } @@ -213,7 +219,8 @@ impl MySQLConnector { async fn replicate( &self, ingestor: &Ingestor, - table_infos: Vec, + table_infos: Vec, + last_checkpoint: Option, ) -> Result<(), MySQLConnectorError> { let mut table_definitions = self .schema_helper() @@ -230,36 +237,66 @@ impl MySQLConnector { ) .await?; - let binlog_positions_map: HashMap> = table_infos - .iter() - .map(|s| { - ( - s.name.clone(), - s.state.clone().map(|state| { - crate::binlog::BinlogPosition::try_from(state.clone()).unwrap() - }), - ) - }) - .collect(); + let binlog_position = last_checkpoint + .map(crate::binlog::BinlogPosition::try_from) + .transpose()?; let binlog_positions = self - .replicate_tables(ingestor, &table_definitions, binlog_positions_map) + .replicate_tables(ingestor, &table_definitions, binlog_position) .await?; let binlog_position = self.sync_with_binlog(ingestor, binlog_positions).await?; + let prefix = self.get_prefix(binlog_position.binlog_id).await?; + info!("Ingestion starting at {:?}", binlog_position); - self.ingest_binlog(ingestor, &mut table_definitions, binlog_position, None) - .await?; + self.ingest_binlog( + ingestor, + &mut table_definitions, + binlog_position, + None, + prefix, + ) + .await?; Ok(()) } + async fn get_prefix(&self, suffix: u64) -> Result { + let suffix_formatted = format!("{:0>6}", suffix); + let mut conn = self.connect().await?; + let mut rows = conn.exec_iter("SHOW BINARY LOGS".to_string(), vec![]); + + let mut prefix = None; + while let Some(result) = rows.next().await { + let binlog_id = result + .map_err(MySQLConnectorError::QueryResultError)? + .get::(0) + .ok_or(BinlogQueryError)?; + + let row_binlog_suffix = &binlog_id[binlog_id.len() - 6..]; + + if row_binlog_suffix == suffix_formatted { + if prefix.is_some() { + return Err(MySQLConnectorError::MultipleBinlogsWithSameSuffix); + } + + prefix = Some(binlog_id[..(binlog_id.len() - 7)].to_string()); + } + } + + if let Some(prefix) = prefix { + Ok(prefix) + } else { + Err(MySQLConnectorError::BinlogNotFound) + } + } + async fn replicate_tables( &self, ingestor: &Ingestor, table_definitions: &[TableDefinition], - binlog_positions_map: HashMap>, + binlog_position: Option, ) -> Result, MySQLConnectorError> { let mut binlog_position_per_table = Vec::new(); @@ -267,8 +304,8 @@ impl MySQLConnector { let mut snapshot_started = false; for (table_index, td) in table_definitions.iter().enumerate() { - let position = match binlog_positions_map.get(&td.table_name) { - Some(Some(position)) => position.clone(), + let position = match &binlog_position { + Some(position) => position.clone(), _ => { if !snapshot_started { if ingestor @@ -351,7 +388,7 @@ impl MySQLConnector { } } - let binlog_position = get_master_binlog_position(&mut conn).await?; + let (_prefix, binlog_position) = get_master_binlog_position(&mut conn).await?; conn.query_drop("UNLOCK TABLES") .await @@ -384,7 +421,7 @@ impl MySQLConnector { assert!(!binlog_positions.is_empty()); let position = { - let mut last_position = None; + let mut last_position: Option = None; let mut synced_tables = Vec::new(); for (table, position) in binlog_positions.into_iter() { @@ -393,11 +430,14 @@ impl MySQLConnector { if let Some(start_position) = last_position { let end_position = position.clone(); + let prefix = self.get_prefix(start_position.binlog_id).await?; + self.ingest_binlog( ingestor, &mut synced_tables, start_position, Some(end_position), + prefix, ) .await?; } @@ -417,6 +457,7 @@ impl MySQLConnector { tables: &mut [TableDefinition], start_position: BinlogPosition, stop_position: Option, + binlog_prefix: String, ) -> Result<(), MySQLConnectorError> { let server_id = self.server_id.unwrap_or_else(|| rand::thread_rng().gen()); @@ -426,6 +467,7 @@ impl MySQLConnector { stop_position, server_id, (&self.conn_pool, &self.conn_url), + binlog_prefix, ); binlog_ingestor.ingest(tables, self.schema_helper()).await @@ -438,7 +480,6 @@ mod tests { connection::Conn, tests::{create_test_table, mariadb_test_config, mysql_test_config, TestConfig}, }; - use std::collections::HashMap; use super::MySQLConnector; use dozer_ingestion_connector::{ @@ -449,7 +490,6 @@ mod tests { }, }, tokio, CdcType, Connector, IngestionIterator, Ingestor, SourceSchema, TableIdentifier, - TableToIngest, }; use serial_test::serial; use std::time::Duration; @@ -568,7 +608,7 @@ mod tests { .unwrap(); let result = connector - .replicate_tables(&ingestor, &table_definitions, HashMap::new()) + .replicate_tables(&ingestor, &table_definitions, None) .await; assert!(result.is_ok(), "unexpected error: {result:?}"); @@ -651,20 +691,7 @@ mod tests { .build() .unwrap() .block_on(async move { - let _ = connector - .replicate( - &ingestor, - table_infos - .iter() - .map(|t| TableToIngest { - schema: t.schema.clone(), - name: t.name.clone(), - column_names: t.column_names.clone(), - state: None, - }) - .collect::>(), - ) - .await; + let _ = connector.replicate(&ingestor, table_infos, None).await; }); }); diff --git a/dozer-ingestion/mysql/src/lib.rs b/dozer-ingestion/mysql/src/lib.rs index 86b5b97e2b..10ba9a2447 100644 --- a/dozer-ingestion/mysql/src/lib.rs +++ b/dozer-ingestion/mysql/src/lib.rs @@ -55,6 +55,18 @@ pub enum MySQLConnectorError { #[error("Failed to send snapshot completed ingestion message")] SnapshotIngestionMessageError, + + #[error("State error: {0}")] + State(#[from] MysqlStateError), + + #[error("Binlog not found")] + BinlogNotFound, + + #[error("Fetch of binlog query failed")] + BinlogQueryError, + + #[error("Multiple binlogs with the same suffix")] + MultipleBinlogsWithSameSuffix, } #[derive(Error, Debug)] diff --git a/dozer-ingestion/mysql/src/state.rs b/dozer-ingestion/mysql/src/state.rs index deaf7280d7..fd6ad21708 100644 --- a/dozer-ingestion/mysql/src/state.rs +++ b/dozer-ingestion/mysql/src/state.rs @@ -1,28 +1,47 @@ +use dozer_ingestion_connector::dozer_types::node::OpIdentifier; + use crate::binlog::BinlogPosition; use crate::MysqlStateError; -use dozer_ingestion_connector::dozer_types::node::RestartableState; -pub fn encode_state(pos: &BinlogPosition) -> RestartableState { - let mut state = vec![]; - state.extend_from_slice(&pos.position.to_be_bytes()); - state.extend_from_slice(&pos.seq_no.to_be_bytes()); - state.extend(pos.clone().filename); +pub fn encode_state(pos: &BinlogPosition) -> OpIdentifier { + let lsn = (pos.binlog_id << 32) | pos.position; - state.into() + OpIdentifier { + txid: lsn, + seq_in_tx: pos.seq_no, + } } -impl TryFrom for BinlogPosition { +impl TryFrom for BinlogPosition { type Error = MysqlStateError; - fn try_from(state: RestartableState) -> Result { - let position = u64::from_be_bytes(state.0[0..8].try_into()?); - let seq_no = u64::from_be_bytes(state.0[8..16].try_into()?); - let filename = state.0[16..].to_vec(); + fn try_from(state: OpIdentifier) -> Result { + let binlog_id = state.txid >> 32; + let position = state.txid & 0x00000000ffffffff; + let seq_no = state.seq_in_tx; Ok(BinlogPosition { + binlog_id, position, seq_no, - filename, }) } } + +#[cfg(test)] +mod tests { + #[test] + fn test_decode_encode() { + use super::*; + let pos = BinlogPosition { + binlog_id: 123, + position: 456, + seq_no: 789, + }; + + let state = encode_state(&pos); + let pos2 = BinlogPosition::try_from(state).unwrap(); + + assert_eq!(pos, pos2); + } +} diff --git a/dozer-ingestion/object-store/src/connector.rs b/dozer-ingestion/object-store/src/connector.rs index f51512df0e..f4dea28f44 100644 --- a/dozer-ingestion/object-store/src/connector.rs +++ b/dozer-ingestion/object-store/src/connector.rs @@ -1,6 +1,7 @@ use dozer_ingestion_connector::dozer_types::errors::internal::BoxedError; use dozer_ingestion_connector::dozer_types::log::error; use dozer_ingestion_connector::dozer_types::models::ingestion_types::IngestionMessage; +use dozer_ingestion_connector::dozer_types::node::OpIdentifier; use dozer_ingestion_connector::dozer_types::types::FieldType; use dozer_ingestion_connector::futures::future::try_join_all; use dozer_ingestion_connector::tokio::sync::mpsc::channel; @@ -8,7 +9,6 @@ use dozer_ingestion_connector::tokio::task::JoinSet; use dozer_ingestion_connector::utils::{ListOrFilterColumns, TableNotFound}; use dozer_ingestion_connector::{ async_trait, tokio, Connector, Ingestor, SourceSchemaResult, TableIdentifier, TableInfo, - TableToIngest, }; use crate::adapters::DozerObjectStore; @@ -93,11 +93,17 @@ impl Connector for ObjectStoreConnector { Ok(schema_mapper::get_schema(&self.config, &list_or_filter_columns).await) } + async fn serialize_state(&self) -> Result, BoxedError> { + Ok(vec![]) + } + async fn start( &self, ingestor: &Ingestor, - tables: Vec, + tables: Vec, + last_checkpoint: Option, ) -> Result<(), BoxedError> { + assert!(last_checkpoint.is_none()); let (sender, mut receiver) = channel::, ObjectStoreConnectorError>>(100); // todo: increase buffer siz let ingestor_clone = ingestor.clone(); @@ -132,7 +138,6 @@ impl Connector for ObjectStoreConnector { let mut handles = vec![]; for (table_index, table_info) in tables.iter().enumerate() { - assert!(table_info.state.is_none()); let table_info = TableInfo { schema: table_info.schema.clone(), name: table_info.name.clone(), diff --git a/dozer-ingestion/postgres/src/connector.rs b/dozer-ingestion/postgres/src/connector.rs index 2851e7c20e..cd45f12c3b 100644 --- a/dozer-ingestion/postgres/src/connector.rs +++ b/dozer-ingestion/postgres/src/connector.rs @@ -1,9 +1,9 @@ -use dozer_ingestion_connector::dozer_types::log::warn; +use dozer_ingestion_connector::dozer_types::node::OpIdentifier; use dozer_ingestion_connector::{ async_trait, - dozer_types::{errors::internal::BoxedError, log::info, types::FieldType}, + dozer_types::{errors::internal::BoxedError, types::FieldType}, utils::ListOrFilterColumns, - Connector, Ingestor, SourceSchemaResult, TableIdentifier, TableInfo, TableToIngest, + Connector, Ingestor, SourceSchemaResult, TableIdentifier, TableInfo, }; use postgres_types::PgLsn; use rand::distributions::Alphanumeric; @@ -11,7 +11,6 @@ use rand::Rng; use tokio_postgres::config::ReplicationMode; use tokio_postgres::Config; -use crate::state::LsnWithSlot; use crate::{ connection::validator::validate_connection, iterator::PostgresIterator, @@ -32,7 +31,8 @@ pub struct PostgresConfig { #[derive(Debug)] pub struct PostgresConnector { - name: String, + pub name: String, + pub slot_name: String, replication_conn_config: Config, conn_config: Config, schema_helper: SchemaHelper, @@ -49,7 +49,10 @@ pub struct ReplicationSlotInfo { pub const REPLICATION_SLOT_PREFIX: &str = "dozer_slot"; impl PostgresConnector { - pub fn new(config: PostgresConfig) -> PostgresConnector { + pub fn new( + config: PostgresConfig, + state: Option>, + ) -> Result { let mut replication_conn_config = config.config.clone(); replication_conn_config.replication_mode(ReplicationMode::Logical); @@ -58,55 +61,20 @@ impl PostgresConnector { // conn_str - replication_conn_config // conn_str_plain- conn_config - PostgresConnector { + let slot_name = state + .map(String::from_utf8) + .transpose()? + .unwrap_or_else(|| get_slot_name(&config.name)); + + Ok(PostgresConnector { name: config.name, + slot_name, conn_config: config.config, replication_conn_config, schema_helper: helper, schema: config.schema, batch_size: config.batch_size, - } - } - - fn get_lsn_with_offset_from_seq( - conn_name: &str, - tables: Vec, - ) -> Option { - let m: Option = tables - .iter() - .filter_map(|table| { - if let Some(s) = &table.state { - match LsnWithSlot::try_from(s.clone()) { - Ok(state) => Some(state), - Err(e) => { - warn!( - "[{conn_name}] Failed to parse checkpoint: {error}", - conn_name = conn_name, - error = e - ); - None - } - } - } else { - None - } - }) - .collect::>() - .iter() - .max_by_key(|x| x.lsn) - .cloned(); - - if let Some(x) = &m { - info!( - "[{conn_name}] Last checkpoint {txid}({seq_in_tx}) in {slot_name}", - conn_name = conn_name, - txid = x.lsn.0, - seq_in_tx = x.lsn.1, - slot_name = x.slot_name - ); - } - - m + }) } } @@ -195,19 +163,17 @@ impl Connector for PostgresConnector { .collect()) } + async fn serialize_state(&self) -> Result, BoxedError> { + Ok(self.slot_name.as_bytes().to_vec()) + } + async fn start( &self, ingestor: &Ingestor, - tables: Vec, + tables: Vec, + last_checkpoint: Option, ) -> Result<(), BoxedError> { - let lsn_with_slot = - PostgresConnector::get_lsn_with_offset_from_seq(&self.name, tables.clone()); - let slot_name = lsn_with_slot - .clone() - .map_or(self.get_slot_name(), |LsnWithSlot { slot_name, .. }| { - slot_name - }); - let lsn = lsn_with_slot.map(|LsnWithSlot { lsn, .. }| lsn); + let lsn = last_checkpoint.map(|checkpoint| (checkpoint.txid.into(), checkpoint.seq_in_tx)); if lsn.is_none() { let client = helper::connect(self.replication_conn_config.clone()).await?; @@ -215,8 +181,7 @@ impl Connector for PostgresConnector { .iter() .map(|table| TableIdentifier::new(table.schema.clone(), table.name.clone())) .collect::>(); - self.create_publication(client, Some(&table_identifiers)) - .await?; + create_publication(client, &self.name, Some(&table_identifiers)).await?; } let tables = tables @@ -229,8 +194,8 @@ impl Connector for PostgresConnector { .collect::>(); let iterator = PostgresIterator::new( self.name.clone(), - self.get_publication_name(), - slot_name, + get_publication_name(&self.name), + self.slot_name.clone(), self.schema_helper.get_tables(Some(&tables)).await?, self.replication_conn_config.clone(), ingestor, @@ -242,61 +207,59 @@ impl Connector for PostgresConnector { } } -impl PostgresConnector { - fn get_publication_name(&self) -> String { - format!("dozer_publication_{}", self.name) - } +fn get_publication_name(conn_name: &str) -> String { + format!("dozer_publication_{}", conn_name) +} - pub fn get_slot_name(&self) -> String { - let rand_name_suffix: String = rand::thread_rng() - .sample_iter(&Alphanumeric) - .take(7) - .map(char::from) - .collect(); +pub fn get_slot_name(conn_name: &str) -> String { + let rand_name_suffix: String = rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(7) + .map(char::from) + .collect(); - format!( - "{REPLICATION_SLOT_PREFIX}_{}_{}", - self.name, - rand_name_suffix.to_lowercase() - ) - } + format!( + "{REPLICATION_SLOT_PREFIX}_{}_{}", + conn_name, + rand_name_suffix.to_lowercase() + ) +} - pub async fn create_publication( - &self, - mut client: Client, - table_identifiers: Option<&[TableIdentifier]>, - ) -> Result<(), PostgresConnectorError> { - let publication_name = self.get_publication_name(); - let table_str: String = match table_identifiers { - None => "ALL TABLES".to_string(), - Some(table_identifiers) => { - let table_names = table_identifiers - .iter() - .map(|table_identifier| { - format!( - r#""{}"."{}""#, - table_identifier - .schema - .as_deref() - .unwrap_or(DEFAULT_SCHEMA_NAME), - table_identifier.name - ) - }) - .collect::>(); - format!("TABLE {}", table_names.join(" , ")) - } - }; +pub async fn create_publication( + mut client: Client, + conn_name: &str, + table_identifiers: Option<&[TableIdentifier]>, +) -> Result<(), PostgresConnectorError> { + let publication_name = get_publication_name(conn_name); + let table_str: String = match table_identifiers { + None => "ALL TABLES".to_string(), + Some(table_identifiers) => { + let table_names = table_identifiers + .iter() + .map(|table_identifier| { + format!( + r#""{}"."{}""#, + table_identifier + .schema + .as_deref() + .unwrap_or(DEFAULT_SCHEMA_NAME), + table_identifier.name + ) + }) + .collect::>(); + format!("TABLE {}", table_names.join(" , ")) + } + }; - client - .simple_query(format!("DROP PUBLICATION IF EXISTS {publication_name}").as_str()) - .await - .map_err(PostgresConnectorError::DropPublicationError)?; + client + .simple_query(format!("DROP PUBLICATION IF EXISTS {publication_name}").as_str()) + .await + .map_err(PostgresConnectorError::DropPublicationError)?; - client - .simple_query(format!("CREATE PUBLICATION {publication_name} FOR {table_str}").as_str()) - .await - .map_err(PostgresConnectorError::CreatePublicationError)?; + client + .simple_query(format!("CREATE PUBLICATION {publication_name} FOR {table_str}").as_str()) + .await + .map_err(PostgresConnectorError::CreatePublicationError)?; - Ok(()) - } + Ok(()) } diff --git a/dozer-ingestion/postgres/src/lib.rs b/dozer-ingestion/postgres/src/lib.rs index 06a83f1fa8..85e7324117 100644 --- a/dozer-ingestion/postgres/src/lib.rs +++ b/dozer-ingestion/postgres/src/lib.rs @@ -16,7 +16,6 @@ mod replication_slot_helper; pub mod replicator; mod schema; pub mod snapshotter; -mod state; #[cfg(test)] pub mod test_utils; #[cfg(test)] @@ -33,6 +32,9 @@ pub enum PostgresConnectorError { #[error("Invalid SslMode: {0:?}")] InvalidSslError(SslMode), + #[error("Failed to convert slot name from state. Error: {0}")] + StringReadError(#[from] FromUtf8Error), + #[error("Query failed in connector: {0}")] InvalidQueryError(#[source] tokio_postgres::Error), @@ -144,15 +146,6 @@ pub enum PostgresConnectorError { UnexpectedQueryMessageError, } -#[derive(Error, Debug)] -pub enum PostgresStateError { - #[error("Failed to read lsn from state. Error: {0}")] - TrySliceError(#[from] std::array::TryFromSliceError), - - #[error("Failed to convert slot name from state. Error: {0}")] - StringReadError(#[from] FromUtf8Error), -} - #[derive(Error, Debug)] pub enum PostgresSchemaError { #[error("Schema's '{0}' doesn't have primary key")] diff --git a/dozer-ingestion/postgres/src/replicator.rs b/dozer-ingestion/postgres/src/replicator.rs index 516f453ac9..01059ebb44 100644 --- a/dozer-ingestion/postgres/src/replicator.rs +++ b/dozer-ingestion/postgres/src/replicator.rs @@ -2,6 +2,7 @@ use dozer_ingestion_connector::dozer_types::bytes; use dozer_ingestion_connector::dozer_types::chrono::{TimeZone, Utc}; use dozer_ingestion_connector::dozer_types::log::{error, info}; use dozer_ingestion_connector::dozer_types::models::ingestion_types::IngestionMessage; +use dozer_ingestion_connector::dozer_types::node::OpIdentifier; use dozer_ingestion_connector::futures::StreamExt; use dozer_ingestion_connector::Ingestor; use postgres_protocol::message::backend::ReplicationMessage::*; @@ -15,7 +16,6 @@ use std::time::SystemTime; use crate::connection::client::Client; use crate::connection::helper::{self, is_network_failure}; -use crate::state::encode_state; use crate::xlog_mapper::XlogMapper; use crate::PostgresConnectorError; @@ -132,11 +132,7 @@ impl<'a> CDCHandler<'a> { .handle_message(IngestionMessage::OperationEvent { table_index, op, - state: Some(encode_state( - self.begin_lsn, - self.seq_no, - self.slot_name.clone(), - )), + state: Some(OpIdentifier::new(self.begin_lsn, self.seq_no)), }) .await .is_err() diff --git a/dozer-ingestion/postgres/src/state.rs b/dozer-ingestion/postgres/src/state.rs deleted file mode 100644 index ce4a8514fe..0000000000 --- a/dozer-ingestion/postgres/src/state.rs +++ /dev/null @@ -1,35 +0,0 @@ -use crate::connector::REPLICATION_SLOT_PREFIX; -use crate::PostgresStateError; -use dozer_ingestion_connector::dozer_types::node::RestartableState; -use postgres_protocol::Lsn; -use postgres_types::PgLsn; - -#[derive(Clone)] -pub struct LsnWithSlot { - pub lsn: (PgLsn, u64), - pub slot_name: String, -} - -pub fn encode_state(lsn: Lsn, seq_no: u64, slot_name: String) -> RestartableState { - let slot_name_suffix = slot_name.replace(REPLICATION_SLOT_PREFIX, ""); - let mut state = vec![]; - state.extend_from_slice(&lsn.to_be_bytes()); - state.extend_from_slice(&seq_no.to_be_bytes()); - state.extend_from_slice(slot_name_suffix.as_bytes()); - state.into() -} - -impl TryFrom for LsnWithSlot { - type Error = PostgresStateError; - - fn try_from(state: RestartableState) -> Result { - let lsn = Lsn::from_be_bytes(state.0[0..8].try_into()?); - let seq_no = u64::from_be_bytes(state.0[8..16].try_into()?); - let slot_name = String::from_utf8(state.0[16..].into())?; - - Ok(LsnWithSlot { - lsn: (PgLsn::from(lsn), seq_no), - slot_name: format!("{REPLICATION_SLOT_PREFIX}{slot_name}"), - }) - } -} diff --git a/dozer-ingestion/postgres/src/tests/continue_replication_tests.rs b/dozer-ingestion/postgres/src/tests/continue_replication_tests.rs index cfcd75aedb..6771f5fb0f 100644 --- a/dozer-ingestion/postgres/src/tests/continue_replication_tests.rs +++ b/dozer-ingestion/postgres/src/tests/continue_replication_tests.rs @@ -11,7 +11,7 @@ mod tests { use crate::{ connection::helper::{self, map_connection_config}, - connector::{PostgresConfig, PostgresConnector}, + connector::{create_publication, PostgresConfig, PostgresConnector}, replication_slot_helper::ReplicationSlotHelper, test_utils::{create_slot, load_test_connection_config, retry_drop_active_slot}, tests::client::TestPostgresClient, @@ -30,7 +30,7 @@ mod tests { batch_size: 1000, }; - let connector = PostgresConnector::new(postgres_config); + let connector = PostgresConnector::new(postgres_config, None).unwrap(); // let result = connector.can_start_from((1, 0)).unwrap(); // assert!(!result, "Cannot continue, because slot doesnt exist"); @@ -42,20 +42,21 @@ mod tests { let client = helper::connect(replication_conn_config.clone()) .await .unwrap(); - connector.create_publication(client, None).await.unwrap(); + create_publication(client, &connector.name, None) + .await + .unwrap(); // Creating slot let mut client = helper::connect(replication_conn_config.clone()) .await .unwrap(); - let slot_name = connector.get_slot_name(); - let _parsed_lsn = create_slot(&mut client, &slot_name).await; + let _parsed_lsn = create_slot(&mut client, &connector.slot_name).await; // let result = connector // .can_start_from((u64::from(parsed_lsn), 0)) // .unwrap(); - ReplicationSlotHelper::drop_replication_slot(&mut client, &slot_name) + ReplicationSlotHelper::drop_replication_slot(&mut client, &connector.slot_name) .await .unwrap(); // assert!( @@ -84,7 +85,7 @@ mod tests { batch_size: 1000, }; - let connector = PostgresConnector::new(postgres_config); + let connector = PostgresConnector::new(postgres_config, None).unwrap(); let mut replication_conn_config = conn_config; replication_conn_config.replication_mode(ReplicationMode::Logical); @@ -97,8 +98,7 @@ mod tests { schema: Some("public".to_string()), name: table_name.clone(), }; - connector - .create_publication(client, Some(&[table_identifier])) + create_publication(client, &connector.name, Some(&[table_identifier])) .await .unwrap(); @@ -107,8 +107,7 @@ mod tests { .await .unwrap(); - let slot_name = connector.get_slot_name(); - let _parsed_lsn = create_slot(&mut client, &slot_name).await; + let _parsed_lsn = create_slot(&mut client, &connector.slot_name).await; // let config = IngestionConfig::default(); // let (ingestor, mut iterator) = Ingestor::initialize_channel(config); @@ -155,9 +154,10 @@ mod tests { // } // } - if let Err(e) = ReplicationSlotHelper::drop_replication_slot(&mut client, &slot_name).await + if let Err(e) = + ReplicationSlotHelper::drop_replication_slot(&mut client, &connector.slot_name).await { - retry_drop_active_slot(e, &mut client, &slot_name) + retry_drop_active_slot(e, &mut client, &connector.slot_name) .await .unwrap(); } diff --git a/dozer-ingestion/snowflake/src/connector/snowflake.rs b/dozer-ingestion/snowflake/src/connector/snowflake.rs index b5e6f547c7..5c7a606112 100644 --- a/dozer-ingestion/snowflake/src/connector/snowflake.rs +++ b/dozer-ingestion/snowflake/src/connector/snowflake.rs @@ -4,17 +4,15 @@ use dozer_ingestion_connector::{ errors::internal::BoxedError, log::{info, warn}, models::ingestion_types::{default_snowflake_poll_interval, SnowflakeConfig}, + node::OpIdentifier, types::FieldType, }, tokio, Connector, Ingestor, SourceSchema, SourceSchemaResult, TableIdentifier, TableInfo, - TableToIngest, }; use odbc::create_environment_v3; use crate::{ - connection::client::Client, - schema_helper::SchemaHelper, - stream_consumer::{decode_state, StreamConsumer}, + connection::client::Client, schema_helper::SchemaHelper, stream_consumer::StreamConsumer, SnowflakeError, SnowflakeStreamError, }; @@ -117,16 +115,21 @@ impl Connector for SnowflakeConnector { .collect()) } + async fn serialize_state(&self) -> Result, BoxedError> { + Ok(vec![]) + } + async fn start( &self, ingestor: &Ingestor, - tables: Vec, + tables: Vec, + last_checkpoint: Option, ) -> Result<(), BoxedError> { spawn_blocking({ let name = self.name.clone(); let config = self.config.clone(); let ingestor = ingestor.clone(); - move || run(name, config, tables, ingestor) + move || run(name, config, tables, last_checkpoint, ingestor) }) .await .map_err(Into::into) @@ -136,7 +139,8 @@ impl Connector for SnowflakeConnector { fn run( name: String, config: SnowflakeConfig, - tables: Vec, + tables: Vec, + last_checkpoint: Option, ingestor: Ingestor, ) -> Result<(), SnowflakeError> { // SNAPSHOT part - run it when stream table doesn't exist @@ -152,7 +156,8 @@ fn run( for (idx, table) in tables.iter().enumerate() { // We only check stream status on first iteration if iteration == 0 { - let state = table.state.as_ref().map(decode_state).transpose()?; + let state = + last_checkpoint.map(|checkpoint| (checkpoint.txid, checkpoint.seq_in_tx)); match state { None | Some((0, _)) => { info!("[{}][{}] Creating new stream", name, table.name); diff --git a/dozer-ingestion/snowflake/src/stream_consumer.rs b/dozer-ingestion/snowflake/src/stream_consumer.rs index 3272c1ee26..23edc0b962 100644 --- a/dozer-ingestion/snowflake/src/stream_consumer.rs +++ b/dozer-ingestion/snowflake/src/stream_consumer.rs @@ -1,7 +1,7 @@ use dozer_ingestion_connector::{ dozer_types::{ models::ingestion_types::IngestionMessage, - node::RestartableState, + node::OpIdentifier, types::{Field, Operation, Record}, }, Ingestor, @@ -126,7 +126,7 @@ impl StreamConsumer { .blocking_handle_message(IngestionMessage::OperationEvent { table_index, op, - state: Some(encode_state(iteration, idx as u64)), + state: Some(OpIdentifier::new(iteration, idx as u64)), }) .is_err() { @@ -141,20 +141,3 @@ impl StreamConsumer { client.exec(&query) } } - -fn encode_state(iteration: u64, index: u64) -> RestartableState { - let mut state = vec![]; - state.extend_from_slice(&iteration.to_be_bytes()); - state.extend_from_slice(&index.to_be_bytes()); - state.into() -} - -pub fn decode_state(state: &RestartableState) -> Result<(u64, u64), SnowflakeError> { - if state.0.len() != 16 { - return Err(SnowflakeError::CorruptedState); - } - let state = state.0.as_slice(); - let iteration = u64::from_be_bytes(state[0..8].try_into().unwrap()); - let index = u64::from_be_bytes(state[8..16].try_into().unwrap()); - Ok((iteration, index)) -} diff --git a/dozer-ingestion/src/lib.rs b/dozer-ingestion/src/lib.rs index bdb3a6b089..6d520cc3e0 100644 --- a/dozer-ingestion/src/lib.rs +++ b/dozer-ingestion/src/lib.rs @@ -39,6 +39,7 @@ const DEFAULT_POSTGRES_SNAPSHOT_BATCH_SIZE: u32 = 100_000; pub fn get_connector( runtime: Arc, connection: Connection, + state: Option>, ) -> Result, ConnectorError> { let config = connection.config; match config.clone() { @@ -54,7 +55,7 @@ pub fn get_connector( if let Some(dbname) = postgres_config.config.get_dbname() { debug!("Connecting to postgres database - {}", dbname.to_string()); } - Ok(Box::new(PostgresConnector::new(postgres_config))) + Ok(Box::new(PostgresConnector::new(postgres_config, state)?)) } #[cfg(feature = "ethereum")] ConnectionConfig::Ethereum(eth_config) => match eth_config.provider { diff --git a/dozer-ingestion/tests/test_suite/connectors/postgres.rs b/dozer-ingestion/tests/test_suite/connectors/postgres.rs index 805470f982..f781f63d7a 100644 --- a/dozer-ingestion/tests/test_suite/connectors/postgres.rs +++ b/dozer-ingestion/tests/test_suite/connectors/postgres.rs @@ -128,12 +128,16 @@ async fn create_postgres_server() -> (Client, PostgresConnectorTest, PostgresCon .password(password) .dbname(dbname); - let connector = PostgresConnector::new(PostgresConfig { - name: "postgres_connector_test".to_string(), - config: config.clone(), - schema: None, - batch_size: 1000, - }); + let connector = PostgresConnector::new( + PostgresConfig { + name: "postgres_connector_test".to_string(), + config: config.clone(), + schema: None, + batch_size: 1000, + }, + None, + ) + .unwrap(); let client = connect(config.clone()).await.unwrap(); diff --git a/dozer-sql/src/tests/builder_test.rs b/dozer-sql/src/tests/builder_test.rs index c92687e2d1..bec44aa143 100644 --- a/dozer-sql/src/tests/builder_test.rs +++ b/dozer-sql/src/tests/builder_test.rs @@ -6,7 +6,6 @@ use dozer_core::epoch::Epoch; use dozer_core::executor::DagExecutor; use dozer_core::node::{ OutputPortDef, OutputPortType, PortHandle, Sink, SinkFactory, Source, SourceFactory, - SourceState, }; use dozer_core::DEFAULT_PORT_HANDLE; use dozer_recordstore::ProcessorRecordStore; @@ -14,6 +13,7 @@ use dozer_types::chrono::DateTime; use dozer_types::errors::internal::BoxedError; use dozer_types::log::debug; use dozer_types::models::ingestion_types::IngestionMessage; +use dozer_types::node::OpIdentifier; use dozer_types::ordered_float::OrderedFloat; use dozer_types::tonic::async_trait; use dozer_types::types::{ @@ -97,7 +97,7 @@ impl SourceFactory for TestSourceFactory { fn build( &self, _output_schemas: HashMap, - _last_checkpoint: SourceState, + _state: Option>, ) -> Result, BoxedError> { Ok(Box::new(TestSource {})) } @@ -108,9 +108,14 @@ pub struct TestSource {} #[async_trait] impl Source for TestSource { + async fn serialize_state(&self) -> Result, BoxedError> { + Ok(vec![]) + } + async fn start( &self, sender: Sender<(PortHandle, IngestionMessage)>, + _last_checkpoint: Option, ) -> Result<(), BoxedError> { for _ in 0..10 { sender diff --git a/dozer-tests/src/sql_tests/helper/pipeline.rs b/dozer-tests/src/sql_tests/helper/pipeline.rs index dba14db880..29461f7dd2 100644 --- a/dozer-tests/src/sql_tests/helper/pipeline.rs +++ b/dozer-tests/src/sql_tests/helper/pipeline.rs @@ -8,7 +8,6 @@ use dozer_core::epoch::Epoch; use dozer_core::errors::ExecutionError; use dozer_core::node::{ OutputPortDef, OutputPortType, PortHandle, Sink, SinkFactory, Source, SourceFactory, - SourceState, }; use dozer_core::{Dag, DEFAULT_PORT_HANDLE}; @@ -21,6 +20,7 @@ use dozer_sql::builder::statement_to_pipeline; use dozer_types::errors::internal::BoxedError; use dozer_types::models::ingestion_types::IngestionMessage; +use dozer_types::node::OpIdentifier; use dozer_types::types::{Operation, Record, Schema, SourceDefinition}; use std::collections::HashMap; use tempdir::TempDir; @@ -106,7 +106,7 @@ impl SourceFactory for TestSourceFactory { fn build( &self, _output_schemas: HashMap, - _last_checkpoint: SourceState, + _state: Option>, ) -> Result, BoxedError> { Ok(Box::new(TestSource { name_to_port: self.name_to_port.to_owned(), @@ -123,9 +123,14 @@ pub struct TestSource { #[async_trait] impl Source for TestSource { + async fn serialize_state(&self) -> Result, BoxedError> { + Ok(vec![]) + } + async fn start( &self, sender: tokio::sync::mpsc::Sender<(PortHandle, IngestionMessage)>, + _last_checkpoint: Option, ) -> Result<(), BoxedError> { while let Ok(Some((schema_name, op))) = self.receiver.recv() { let port = self.name_to_port.get(&schema_name).expect("port not found"); diff --git a/dozer-types/src/models/ingestion_types.rs b/dozer-types/src/models/ingestion_types.rs index 0ed9eb674b..6e6982b376 100644 --- a/dozer-types/src/models/ingestion_types.rs +++ b/dozer-types/src/models/ingestion_types.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; use crate::{ helper::{deserialize_duration_secs_f64, f64_schema, serialize_duration_secs_f64}, models::connection::SchemaExample, - node::RestartableState, + node::OpIdentifier, types::Operation, }; @@ -24,8 +24,8 @@ pub enum IngestionMessage { table_index: usize, /// The CDC event. op: Operation, - /// If this connector supports restarting from a specific CDC event, it should provide a `RestartableState`. - state: Option, + /// If this connector supports restarting from a specific CDC event, it should provide a `OpIdentifier`. + state: Option, }, /// A connector uses this message kind to notify Dozer that a initial snapshot of the source tables is started SnapshottingStarted, diff --git a/dozer-types/src/node.rs b/dozer-types/src/node.rs index 5682247148..b95c09916d 100644 --- a/dozer-types/src/node.rs +++ b/dozer-types/src/node.rs @@ -63,38 +63,71 @@ impl Display for NodeHandle { } #[derive( - Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, bincode::Encode, bincode::Decode, + Clone, + Debug, + Copy, + PartialEq, + Eq, + PartialOrd, + Ord, + Hash, + Default, + Serialize, + Deserialize, + bincode::Encode, + bincode::Decode, )] -/// A table's restartable state, any binary data. -pub struct RestartableState(pub Vec); +/// A identifier made of two `u64`s. +pub struct OpIdentifier { + /// High 64 bits of the identifier. + pub txid: u64, + /// Low 64 bits of the identifier. + pub seq_in_tx: u64, +} + +impl OpIdentifier { + pub fn new(txid: u64, seq_in_tx: u64) -> Self { + Self { txid, seq_in_tx } + } + + pub fn to_bytes(&self) -> [u8; 16] { + let mut result = [0_u8; 16]; + result[0..8].copy_from_slice(&self.txid.to_be_bytes()); + result[8..16].copy_from_slice(&self.seq_in_tx.to_be_bytes()); + result + } -impl From> for RestartableState { - fn from(value: Vec) -> Self { - Self(value) + pub fn from_bytes(bytes: [u8; 16]) -> Self { + let txid = u64::from_be_bytes(bytes[0..8].try_into().unwrap()); + let seq_in_tx = u64::from_be_bytes(bytes[8..16].try_into().unwrap()); + Self::new(txid, seq_in_tx) } } #[derive( Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, bincode::Encode, bincode::Decode, )] -/// A table's ingestion state. -pub enum TableState { - /// This table hasn't been ingested. +/// A source's ingestion state. +pub enum SourceState { + /// This source hasn't been ingested. NotStarted, - /// This table has some data ingested, and it can't be restarted. + /// This source has some data ingested, and it can't be restarted. NonRestartable, - /// This table has some data ingested, and it can be restarted if it's given the state. - Restartable(RestartableState), + /// This source has some data ingested, and it can be restarted if it's given the state. + Restartable { + state: Vec, + checkpoint: OpIdentifier, + }, } -/// Map from a `Source` node's handle to its tables' states. +/// Map from a `Source` node's handle to its state. /// /// This uniquely identifies the state of the Dozer pipeline. /// We generate this map on every commit, and it's: /// /// - Written to `Log` so consumers of log know where the pipeline is when pipeline restarts, and can rollback if some events were not persisted to checkpoints. /// - Written to checkpoints so when pipeline is restarted, we know where to tell the source to start from. -pub type SourceStates = HashMap>; +pub type SourceStates = HashMap; #[test] fn test_handle_to_from_bytes() {