diff --git a/dozer-cli/src/pipeline/connector_source.rs b/dozer-cli/src/pipeline/connector_source.rs index 93632ff8c2..7f7283d1fb 100644 --- a/dozer-cli/src/pipeline/connector_source.rs +++ b/dozer-cli/src/pipeline/connector_source.rs @@ -339,7 +339,7 @@ async fn forward_message_to_pipeline( break; } } - IngestionMessage::SnapshottingDone | IngestionMessage::SnapshottingStarted => { + IngestionMessage::SnapshottingDone { .. } | IngestionMessage::SnapshottingStarted => { for port in &ports { if sender.send((*port, message.clone())).await.is_err() { break; diff --git a/dozer-cli/src/pipeline/dummy_sink.rs b/dozer-cli/src/pipeline/dummy_sink.rs index 1a08995dc8..a42bd4115e 100644 --- a/dozer-cli/src/pipeline/dummy_sink.rs +++ b/dozer-cli/src/pipeline/dummy_sink.rs @@ -11,7 +11,8 @@ use dozer_types::{ chrono::Local, errors::internal::BoxedError, log::{info, warn}, - types::{FieldType, Operation, Schema}, + node::OpIdentifier, + types::{FieldType, Operation, OperationWithId, Schema}, }; #[derive(Debug)] @@ -57,10 +58,10 @@ impl Sink for DummySink { &mut self, _from_port: PortHandle, _record_store: &ProcessorRecordStore, - op: Operation, + op: OperationWithId, ) -> Result<(), BoxedError> { if let Some(inserted_at_index) = self.inserted_at_index { - if let Operation::Insert { new } = op { + if let Operation::Insert { new } = op.op { let value = &new.values[inserted_at_index]; if let Some(inserted_at) = value.to_timestamp() { let latency = Local::now().naive_utc() - inserted_at.naive_utc(); @@ -90,7 +91,11 @@ impl Sink for DummySink { Ok(()) } - fn on_source_snapshotting_done(&mut self, connection_name: String) -> Result<(), BoxedError> { + fn on_source_snapshotting_done( + &mut self, + connection_name: String, + _id: Option, + ) -> Result<(), BoxedError> { if let Some(started_instant) = self.snapshotting_started_instant.remove(&connection_name) { info!( "Snapshotting for connection {} took {:?}", @@ -105,4 +110,16 @@ impl Sink for DummySink { } Ok(()) } + + fn set_source_state(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> { + Ok(()) + } + + fn get_source_state(&mut self) -> Result>, BoxedError> { + Ok(None) + } + + fn get_latest_op_id(&mut self) -> Result, BoxedError> { + Ok(None) + } } diff --git a/dozer-cli/src/pipeline/log_sink.rs b/dozer-cli/src/pipeline/log_sink.rs index 26fa524fd2..e108cf0573 100644 --- a/dozer-cli/src/pipeline/log_sink.rs +++ b/dozer-cli/src/pipeline/log_sink.rs @@ -11,9 +11,9 @@ use dozer_core::{ }; use dozer_recordstore::ProcessorRecordStore; use dozer_tracing::LabelsAndProgress; -use dozer_types::indicatif::ProgressBar; use dozer_types::types::Schema; -use dozer_types::{errors::internal::BoxedError, types::Operation}; +use dozer_types::{errors::internal::BoxedError, node::OpIdentifier}; +use dozer_types::{indicatif::ProgressBar, types::OperationWithId}; use tokio::{runtime::Runtime, sync::Mutex}; #[derive(Debug)] @@ -87,12 +87,12 @@ impl Sink for LogSink { &mut self, _from_port: PortHandle, _record_store: &ProcessorRecordStore, - op: Operation, + op: OperationWithId, ) -> Result<(), BoxedError> { let end = self .runtime .block_on(self.log.lock()) - .write(dozer_cache::dozer_log::replication::LogOperation::Op { op }); + .write(dozer_cache::dozer_log::replication::LogOperation::Op { op: op.op }); self.pb.set_position(end as u64); Ok(()) } @@ -131,7 +131,11 @@ impl Sink for LogSink { Ok(()) } - fn on_source_snapshotting_done(&mut self, connection_name: String) -> Result<(), BoxedError> { + fn on_source_snapshotting_done( + &mut self, + connection_name: String, + _id: Option, + ) -> Result<(), BoxedError> { let end = self .runtime .block_on(self.log.lock()) @@ -139,4 +143,16 @@ impl Sink for LogSink { self.pb.set_position(end as u64); Ok(()) } + + fn set_source_state(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> { + Ok(()) + } + + fn get_source_state(&mut self) -> Result>, BoxedError> { + Ok(None) + } + + fn get_latest_op_id(&mut self) -> Result, BoxedError> { + Ok(None) + } } diff --git a/dozer-cli/src/simple/build/contract/mod.rs b/dozer-cli/src/simple/build/contract/mod.rs index b4c851b85d..5041a491f4 100644 --- a/dozer-cli/src/simple/build/contract/mod.rs +++ b/dozer-cli/src/simple/build/contract/mod.rs @@ -1,5 +1,5 @@ use std::{ - collections::{BTreeMap, HashMap, HashSet}, + collections::{BTreeMap, HashMap}, fs::OpenOptions, path::Path, }; @@ -11,7 +11,7 @@ use dozer_core::{ node::PortHandle, petgraph::{ algo::is_isomorphic_matching, - visit::{EdgeRef, IntoEdgesDirected, IntoNodeReferences}, + visit::{IntoEdgesDirected, IntoNodeReferences}, Direction, }, }; @@ -94,7 +94,11 @@ impl Contract { api, )?; - let connections = collect_ancestor_sources(dag_schemas, node_index); + let connections = dag_schemas + .collect_ancestor_sources(node_index) + .into_iter() + .map(|handle| handle.id) + .collect(); let schema = EndpointSchema { path: api.path.clone(), @@ -195,27 +199,6 @@ fn sink_input_schema(dag: &DagSchemas, node_index: NodeIndex) -> &Schema { &edge.weight().schema } -fn collect_ancestor_sources(dag: &DagSchemas, node_index: NodeIndex) -> HashSet { - let mut sources = HashSet::new(); - collect_ancestor_sources_recursive(dag, node_index, &mut sources); - sources -} - -fn collect_ancestor_sources_recursive( - dag: &DagSchemas, - node_index: NodeIndex, - sources: &mut HashSet, -) { - for edge in dag.graph().edges_directed(node_index, Direction::Incoming) { - let source_node_index = edge.source(); - let source_node = &dag.graph()[source_node_index]; - if matches!(source_node.kind, dozer_core::NodeKind::Source(_)) { - sources.insert(source_node.handle.id.clone()); - } - collect_ancestor_sources_recursive(dag, source_node_index, sources); - } -} - fn serde_json_to_path(path: impl AsRef, value: &impl Serialize) -> Result<(), BuildError> { let file = OpenOptions::new() .create(true) diff --git a/dozer-core/src/builder_dag.rs b/dozer-core/src/builder_dag.rs index 1645931378..fe67eb1966 100644 --- a/dozer-core/src/builder_dag.rs +++ b/dozer-core/src/builder_dag.rs @@ -1,16 +1,22 @@ -use std::{collections::HashMap, fmt::Debug}; +use std::{ + collections::{hash_map::Entry, HashMap}, + fmt::Debug, +}; use daggy::{ petgraph::visit::{IntoNodeIdentifiers, IntoNodeReferences}, NodeIndex, }; -use dozer_types::node::{NodeHandle, OpIdentifier}; +use dozer_types::{ + log::warn, + node::{NodeHandle, OpIdentifier}, +}; use crate::{ checkpoint::OptionCheckpoint, dag_schemas::{DagHaveSchemas, DagSchemas, EdgeType}, errors::ExecutionError, - node::{Processor, Sink, Source}, + node::{Processor, Sink, SinkFactory, Source}, NodeKind as DagNodeKind, }; @@ -64,29 +70,118 @@ impl BuilderDag { } } - // Build the nodes. - let mut graph = daggy::Dag::new(); + // Collect sources that may affect a node. + let mut affecting_sources = dag_schemas + .graph() + .node_identifiers() + .map(|node_index| dag_schemas.collect_ancestor_sources(node_index)) + .collect::>(); + + // Prepare nodes and edges for consuming. let (nodes, edges) = dag_schemas.into_graph().into_graph().into_nodes_edges(); - for (node_index, node) in nodes.into_iter().enumerate() { + let mut nodes = nodes + .into_iter() + .map(|node| Some(node.weight)) + .collect::>(); + + // Build the sinks and load checkpoint. + let mut graph = daggy::Dag::new(); + let mut source_states = HashMap::new(); + let mut source_op_ids = HashMap::new(); + let mut source_id_to_sinks = HashMap::>::new(); + let mut node_index_map: HashMap = HashMap::new(); + for (node_index, node) in nodes.iter_mut().enumerate() { + if let Some((handle, sink)) = take_sink(node) { + let sources = std::mem::take(&mut affecting_sources[node_index]); + if sources.len() > 1 { + warn!("Multiple sources ({sources:?}) connected to same sink: {handle}"); + } + let source = sources.into_iter().next().expect("sink must have a source"); + + let node_index = NodeIndex::new(node_index); + let mut sink = sink + .build( + input_schemas + .remove(&node_index) + .expect("we collected all input schemas"), + ) + .map_err(ExecutionError::Factory)?; + + let state = sink.get_source_state().map_err(ExecutionError::Sink)?; + if let Some(state) = state { + match source_states.entry(handle.clone()) { + Entry::Occupied(entry) => { + if entry.get() != &state { + return Err(ExecutionError::SourceStateConflict(handle)); + } + } + Entry::Vacant(entry) => { + entry.insert(state); + } + } + } + + let op_id = sink.get_latest_op_id().map_err(ExecutionError::Sink)?; + if let Some(op_id) = op_id { + match source_op_ids.entry(handle.clone()) { + Entry::Occupied(mut entry) => { + *entry.get_mut() = op_id.min(*entry.get()); + } + Entry::Vacant(entry) => { + entry.insert(op_id); + } + } + } + + let new_node_index = graph.add_node(NodeType { + handle, + kind: NodeKind::Sink(sink), + }); + node_index_map.insert(node_index, new_node_index); + source_id_to_sinks + .entry(source) + .or_default() + .push(new_node_index); + } + } + + // Build sources, processors, and collect source states. + for (node_index, node) in nodes.iter_mut().enumerate() { + let Some(node) = node.take() else { + continue; + }; let node_index = NodeIndex::new(node_index); - let node = node.weight; let node = match node.kind { DagNodeKind::Source(source) => { - let source_state = checkpoint.get_source_state(&node.handle)?; let source = source .build( output_schemas .remove(&node_index) .expect("we collected all output schemas"), - source_state.map(|state| state.0.to_vec()), + source_states.remove(&node.handle), ) .map_err(ExecutionError::Factory)?; + // Write state to relevant sink. + let state = source + .serialize_state() + .await + .map_err(ExecutionError::Source)?; + for sink in source_id_to_sinks.remove(&node.handle).unwrap_or_default() { + let sink = &mut graph[sink]; + let NodeKind::Sink(sink) = &mut sink.kind else { + unreachable!() + }; + sink.set_source_state(&state) + .map_err(ExecutionError::Sink)?; + } + + let last_checkpoint = source_op_ids.remove(&node.handle); NodeType { handle: node.handle, kind: NodeKind::Source { source, - last_checkpoint: source_state.map(|state| state.1), + last_checkpoint, }, } } @@ -111,27 +206,20 @@ impl BuilderDag { kind: NodeKind::Processor(processor), } } - DagNodeKind::Sink(sink) => { - let sink = sink - .build( - input_schemas - .remove(&node_index) - .expect("we collected all input schemas"), - ) - .map_err(ExecutionError::Factory)?; - NodeType { - handle: node.handle, - kind: NodeKind::Sink(sink), - } - } + DagNodeKind::Sink(_) => unreachable!(), }; - graph.add_node(node); + let new_node_index = graph.add_node(node); + node_index_map.insert(node_index, new_node_index); } // Connect the edges. for edge in edges { graph - .add_edge(edge.source(), edge.target(), edge.weight) + .add_edge( + node_index_map[&edge.source()], + node_index_map[&edge.target()], + edge.weight, + ) .expect("we know there's no loop"); } @@ -146,3 +234,13 @@ impl BuilderDag { self.graph } } + +fn take_sink(node: &mut Option) -> Option<(NodeHandle, Box)> { + let super::NodeType { handle, kind } = node.take()?; + if let super::NodeKind::Sink(sink) = kind { + Some((handle, sink)) + } else { + *node = Some(super::NodeType { handle, kind }); + None + } +} diff --git a/dozer-core/src/channels.rs b/dozer-core/src/channels.rs index e7dd84485a..1d1d918a96 100644 --- a/dozer-core/src/channels.rs +++ b/dozer-core/src/channels.rs @@ -1,10 +1,10 @@ use crate::node::PortHandle; -use dozer_types::types::Operation; +use dozer_types::types::OperationWithId; pub trait ProcessorChannelForwarder { /// Sends a operation to downstream nodes. Panics if the operation cannot be sent. /// /// We must panic instead of returning an error because this method will be called by `Processor::process`, /// which only returns recoverable errors. - fn send(&mut self, op: Operation, port: PortHandle); + fn send(&mut self, op: OperationWithId, port: PortHandle); } diff --git a/dozer-core/src/dag_schemas.rs b/dozer-core/src/dag_schemas.rs index 0b3f6b82bf..cd90f1cf8b 100644 --- a/dozer-core/src/dag_schemas.rs +++ b/dozer-core/src/dag_schemas.rs @@ -7,9 +7,10 @@ use daggy::petgraph::visit::{EdgeRef, IntoEdges, IntoEdgesDirected, IntoNodeRefe use daggy::petgraph::Direction; use daggy::{NodeIndex, Walker}; use dozer_types::log::{error, info}; +use dozer_types::node::NodeHandle; use dozer_types::serde::{Deserialize, Serialize}; use dozer_types::types::Schema; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use super::node::OutputPortDef; @@ -103,6 +104,27 @@ impl DagSchemas { } } } + + pub fn collect_ancestor_sources(&self, node_index: NodeIndex) -> HashSet { + let mut sources = HashSet::new(); + collect_ancestor_sources_recursive(self, node_index, &mut sources); + sources + } +} + +fn collect_ancestor_sources_recursive( + dag: &DagSchemas, + node_index: NodeIndex, + sources: &mut HashSet, +) { + for edge in dag.graph().edges_directed(node_index, Direction::Incoming) { + let source_node_index = edge.source(); + let source_node = &dag.graph()[source_node_index]; + if matches!(source_node.kind, NodeKind::Source(_)) { + sources.insert(source_node.handle.clone()); + } + collect_ancestor_sources_recursive(dag, source_node_index, sources); + } } pub trait DagHaveSchemas { diff --git a/dozer-core/src/errors.rs b/dozer-core/src/errors.rs index 3f7fc09806..2ae0b58926 100644 --- a/dozer-core/src/errors.rs +++ b/dozer-core/src/errors.rs @@ -37,6 +37,10 @@ pub enum ExecutionError { RestoreRecordWriter(#[source] DeserializationError), #[error("Source error: {0}")] Source(#[source] BoxedError), + #[error("Sink error: {0}")] + Sink(#[source] BoxedError), + #[error("State of {0} is not consistent across sinks")] + SourceStateConflict(NodeHandle), #[error("File system error {0:?}: {1}")] FileSystemError(PathBuf, #[source] std::io::Error), #[error("Recordstore error: {0}")] diff --git a/dozer-core/src/executor/processor_node.rs b/dozer-core/src/executor/processor_node.rs index e8465ce902..ade7db6c05 100644 --- a/dozer-core/src/executor/processor_node.rs +++ b/dozer-core/src/executor/processor_node.rs @@ -3,8 +3,8 @@ use std::{borrow::Cow, mem::swap}; use crossbeam::channel::Receiver; use daggy::NodeIndex; -use dozer_types::node::NodeHandle; -use dozer_types::types::Operation; +use dozer_types::node::{NodeHandle, OpIdentifier}; +use dozer_types::types::OperationWithId; use crate::epoch::Epoch; use crate::error_manager::ErrorManager; @@ -100,7 +100,7 @@ impl ReceiverLoop for ProcessorNode { Cow::Owned(self.port_handles[index].to_string()) } - fn on_op(&mut self, index: usize, op: Operation) -> Result<(), ExecutionError> { + fn on_op(&mut self, index: usize, op: OperationWithId) -> Result<(), ExecutionError> { if let Err(e) = self.processor.process( self.port_handles[index], &self.record_store, @@ -136,7 +136,12 @@ impl ReceiverLoop for ProcessorNode { .send_snapshotting_started(connection_name) } - fn on_snapshotting_done(&mut self, connection_name: String) -> Result<(), ExecutionError> { - self.channel_manager.send_snapshotting_done(connection_name) + fn on_snapshotting_done( + &mut self, + connection_name: String, + id: Option, + ) -> Result<(), ExecutionError> { + self.channel_manager + .send_snapshotting_done(connection_name, id) } } diff --git a/dozer-core/src/executor/receiver_loop.rs b/dozer-core/src/executor/receiver_loop.rs index d981328371..a8e0e5ed5c 100644 --- a/dozer-core/src/executor/receiver_loop.rs +++ b/dozer-core/src/executor/receiver_loop.rs @@ -1,7 +1,7 @@ use std::borrow::Cow; use crossbeam::channel::{Receiver, Select}; -use dozer_types::{log::debug, types::Operation}; +use dozer_types::{log::debug, node::OpIdentifier, types::OperationWithId}; use crate::{epoch::Epoch, errors::ExecutionError, executor_operation::ExecutorOperation}; @@ -18,7 +18,7 @@ pub trait ReceiverLoop: Name { /// Returns the name of the receiver at `index`. Used for logging. fn receiver_name(&self, index: usize) -> Cow; /// Responds to `op` from the receiver at `index`. - fn on_op(&mut self, index: usize, op: Operation) -> Result<(), ExecutionError>; + fn on_op(&mut self, index: usize, op: OperationWithId) -> Result<(), ExecutionError>; /// Responds to `commit` of `epoch`. fn on_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError>; /// Responds to `terminate`. @@ -26,7 +26,11 @@ pub trait ReceiverLoop: Name { /// Responds to `SnapshottingStarted`. fn on_snapshotting_started(&mut self, connection_name: String) -> Result<(), ExecutionError>; /// Responds to `SnapshottingDone`. - fn on_snapshotting_done(&mut self, connection_name: String) -> Result<(), ExecutionError>; + fn on_snapshotting_done( + &mut self, + connection_name: String, + id: Option, + ) -> Result<(), ExecutionError>; /// The loop implementation, calls [`on_op`], [`on_commit`] and [`on_terminate`] at appropriate times. fn receiver_loop(&mut self, initial_epoch_id: u64) -> Result<(), ExecutionError> { @@ -80,8 +84,11 @@ pub trait ReceiverLoop: Name { ExecutorOperation::SnapshottingStarted { connection_name } => { self.on_snapshotting_started(connection_name)?; } - ExecutorOperation::SnapshottingDone { connection_name } => { - self.on_snapshotting_done(connection_name)?; + ExecutorOperation::SnapshottingDone { + connection_name, + id, + } => { + self.on_snapshotting_done(connection_name, id)?; } } } @@ -103,17 +110,17 @@ mod tests { use crossbeam::channel::{unbounded, Sender}; use dozer_types::{ node::{NodeHandle, SourceState, SourceStates}, - types::{Field, Record}, + types::{Field, Operation, Record}, }; use super::*; struct TestReceiverLoop { receivers: Vec>, - ops: Vec<(usize, Operation)>, + ops: Vec<(usize, OperationWithId)>, commits: Vec, snapshotting_started: Vec, - snapshotting_done: Vec, + snapshotting_done: Vec<(String, Option)>, num_terminations: usize, } @@ -138,7 +145,7 @@ mod tests { Cow::Owned(format!("receiver_{index}")) } - fn on_op(&mut self, index: usize, op: Operation) -> Result<(), ExecutionError> { + fn on_op(&mut self, index: usize, op: OperationWithId) -> Result<(), ExecutionError> { self.ops.push((index, op)); Ok(()) } @@ -161,8 +168,12 @@ mod tests { Ok(()) } - fn on_snapshotting_done(&mut self, connection_name: String) -> Result<(), ExecutionError> { - self.snapshotting_done.push(connection_name); + fn on_snapshotting_done( + &mut self, + connection_name: String, + state: Option, + ) -> Result<(), ExecutionError> { + self.snapshotting_done.push((connection_name, state)); Ok(()) } } @@ -200,12 +211,13 @@ mod tests { senders[0] .send(ExecutorOperation::SnapshottingDone { connection_name: connection_name.clone(), + id: None, }) .unwrap(); senders[0].send(ExecutorOperation::Terminate).unwrap(); senders[1].send(ExecutorOperation::Terminate).unwrap(); test_loop.receiver_loop(0).unwrap(); - assert_eq!(test_loop.snapshotting_done, vec![connection_name]) + assert_eq!(test_loop.snapshotting_done, vec![(connection_name, None)]) } #[test] @@ -214,15 +226,21 @@ mod tests { let record = Record::new(vec![Field::Int(1)]); senders[0] .send(ExecutorOperation::Op { - op: Operation::Insert { + op: OperationWithId::without_id(Operation::Insert { new: record.clone(), - }, + }), }) .unwrap(); senders[0].send(ExecutorOperation::Terminate).unwrap(); senders[1].send(ExecutorOperation::Terminate).unwrap(); test_loop.receiver_loop(0).unwrap(); - assert_eq!(test_loop.ops, vec![(0, Operation::Insert { new: record })]); + assert_eq!( + test_loop.ops, + vec![( + 0, + OperationWithId::without_id(Operation::Insert { new: record }) + )] + ); } #[test] diff --git a/dozer-core/src/executor/sink_node.rs b/dozer-core/src/executor/sink_node.rs index 47c37a1d89..114afe799a 100644 --- a/dozer-core/src/executor/sink_node.rs +++ b/dozer-core/src/executor/sink_node.rs @@ -3,7 +3,10 @@ use std::{borrow::Cow, mem::swap, sync::Arc}; use crossbeam::channel::Receiver; use daggy::NodeIndex; use dozer_tracing::LabelsAndProgress; -use dozer_types::{node::NodeHandle, types::Operation}; +use dozer_types::{ + node::{NodeHandle, OpIdentifier}, + types::{Operation, OperationWithId}, +}; use metrics::{describe_counter, describe_histogram, histogram, increment_counter}; use crate::{ @@ -101,11 +104,11 @@ impl ReceiverLoop for SinkNode { Cow::Owned(self.port_handles[index].to_string()) } - fn on_op(&mut self, index: usize, op: Operation) -> Result<(), ExecutionError> { + fn on_op(&mut self, index: usize, op: OperationWithId) -> Result<(), ExecutionError> { let mut labels = self.labels.labels().clone(); labels.push("table", self.node_handle.id.clone()); const OPERATION_TYPE_LABEL: &str = "operation_type"; - match &op { + match &op.op { Operation::Insert { .. } => { labels.push(OPERATION_TYPE_LABEL, "insert"); } @@ -165,8 +168,12 @@ impl ReceiverLoop for SinkNode { Ok(()) } - fn on_snapshotting_done(&mut self, connection_name: String) -> Result<(), ExecutionError> { - if let Err(e) = self.sink.on_source_snapshotting_done(connection_name) { + fn on_snapshotting_done( + &mut self, + connection_name: String, + id: Option, + ) -> Result<(), ExecutionError> { + if let Err(e) = self.sink.on_source_snapshotting_done(connection_name, id) { self.error_manager.report(e); } Ok(()) diff --git a/dozer-core/src/executor_operation.rs b/dozer-core/src/executor_operation.rs index 16ac199c0d..4c7a349ac6 100644 --- a/dozer-core/src/executor_operation.rs +++ b/dozer-core/src/executor_operation.rs @@ -1,5 +1,8 @@ use dozer_recordstore::{ProcessorRecord, StoreRecord}; -use dozer_types::types::Operation; +use dozer_types::{ + node::OpIdentifier, + types::{Operation, OperationWithId}, +}; use crate::{epoch::Epoch, errors::ExecutionError}; @@ -67,9 +70,18 @@ impl ProcessorOperation { #[derive(Clone, Debug)] pub enum ExecutorOperation { - Op { op: Operation }, - Commit { epoch: Epoch }, + Op { + op: OperationWithId, + }, + Commit { + epoch: Epoch, + }, Terminate, - SnapshottingStarted { connection_name: String }, - SnapshottingDone { connection_name: String }, + SnapshottingStarted { + connection_name: String, + }, + SnapshottingDone { + connection_name: String, + id: Option, + }, } diff --git a/dozer-core/src/forwarder.rs b/dozer-core/src/forwarder.rs index b4bd072836..61fa1ac9cc 100644 --- a/dozer-core/src/forwarder.rs +++ b/dozer-core/src/forwarder.rs @@ -11,8 +11,8 @@ use crossbeam::channel::Sender; use dozer_recordstore::ProcessorRecordStore; use dozer_types::log::debug; use dozer_types::models::ingestion_types::IngestionMessage; -use dozer_types::node::{NodeHandle, SourceState}; -use dozer_types::types::Operation; +use dozer_types::node::{NodeHandle, OpIdentifier, SourceState}; +use dozer_types::types::OperationWithId; use std::collections::HashMap; use std::ops::Deref; use std::sync::Arc; @@ -29,10 +29,14 @@ pub struct ChannelManager { impl ChannelManager { #[inline] - fn send_op(&mut self, mut op: Operation, port_id: PortHandle) -> Result<(), ExecutionError> { + fn send_op( + &mut self, + mut op: OperationWithId, + port_id: PortHandle, + ) -> Result<(), ExecutionError> { if let Some(writer) = self.record_writers.get_mut(&port_id) { - match writer.write(&self.record_store, op) { - Ok(new_op) => op = new_op, + match writer.write(&self.record_store, op.op) { + Ok(new_op) => op.op = new_op, Err(e) => { self.error_manager.report(e.into()); return Ok(()); @@ -79,11 +83,16 @@ impl ChannelManager { Ok(()) } - pub fn send_snapshotting_done(&self, connection_name: String) -> Result<(), ExecutionError> { + pub fn send_snapshotting_done( + &self, + connection_name: String, + id: Option, + ) -> Result<(), ExecutionError> { for senders in self.senders.values() { for sender in senders { sender.send(ExecutorOperation::SnapshottingDone { connection_name: connection_name.clone(), + id, })?; } } @@ -232,17 +241,17 @@ impl SourceChannelManager { request_termination: bool, ) -> Result { match message { - IngestionMessage::OperationEvent { op, state, .. } => { - self.source_state = if let Some(state) = state { + IngestionMessage::OperationEvent { op, id, .. } => { + self.source_state = if let Some(id) = id { SourceState::Restartable { state: self.source_level_state.clone(), - checkpoint: state, + checkpoint: id, } } else { SourceState::NonRestartable }; - self.manager.send_op(op, port)?; + self.manager.send_op(OperationWithId { id, op }, port)?; self.num_uncommitted_ops += 1; self.trigger_commit_if_needed(request_termination) } @@ -252,10 +261,10 @@ impl SourceChannelManager { .send_snapshotting_started(self.manager.owner.id.clone())?; self.trigger_commit_if_needed(request_termination) } - IngestionMessage::SnapshottingDone => { + IngestionMessage::SnapshottingDone { id } => { self.num_uncommitted_ops += 1; self.manager - .send_snapshotting_done(self.manager.owner.id.clone())?; + .send_snapshotting_done(self.manager.owner.id.clone(), id)?; self.commit(request_termination) } } @@ -267,7 +276,7 @@ impl SourceChannelManager { } impl ProcessorChannelForwarder for ChannelManager { - fn send(&mut self, op: Operation, port: PortHandle) { + fn send(&mut self, op: OperationWithId, port: PortHandle) { self.send_op(op, port) .unwrap_or_else(|e| panic!("Failed to send operation: {e}")) } diff --git a/dozer-core/src/node.rs b/dozer-core/src/node.rs index 5b3a629d51..e372222b31 100644 --- a/dozer-core/src/node.rs +++ b/dozer-core/src/node.rs @@ -9,7 +9,7 @@ use dozer_types::models::ingestion_types::IngestionMessage; use dozer_types::node::OpIdentifier; use dozer_types::serde::{Deserialize, Serialize}; use dozer_types::tonic::async_trait; -use dozer_types::types::{Operation, Schema}; +use dozer_types::types::{OperationWithId, Schema}; use std::collections::HashMap; use std::fmt::{Debug, Display, Formatter}; @@ -93,7 +93,7 @@ pub trait Processor: Send + Sync + Debug { &mut self, from_port: PortHandle, record_store: &ProcessorRecordStore, - op: Operation, + op: OperationWithId, fw: &mut dyn ProcessorChannelForwarder, ) -> Result<(), BoxedError>; fn serialize( @@ -118,11 +118,20 @@ pub trait Sink: Send + Sync + Debug { &mut self, from_port: PortHandle, record_store: &ProcessorRecordStore, - op: Operation, + op: OperationWithId, ) -> Result<(), BoxedError>; fn persist(&mut self, epoch: &Epoch, queue: &Queue) -> Result<(), BoxedError>; fn on_source_snapshotting_started(&mut self, connection_name: String) -> Result<(), BoxedError>; - fn on_source_snapshotting_done(&mut self, connection_name: String) -> Result<(), BoxedError>; + fn on_source_snapshotting_done( + &mut self, + connection_name: String, + id: Option, + ) -> Result<(), BoxedError>; + + // Pipeline state management. + fn set_source_state(&mut self, source_state: &[u8]) -> Result<(), BoxedError>; + fn get_source_state(&mut self) -> Result>, BoxedError>; + fn get_latest_op_id(&mut self) -> Result, BoxedError>; } diff --git a/dozer-core/src/tests/dag_base_errors.rs b/dozer-core/src/tests/dag_base_errors.rs index 883865cf35..aec8e7b17b 100644 --- a/dozer-core/src/tests/dag_base_errors.rs +++ b/dozer-core/src/tests/dag_base_errors.rs @@ -16,7 +16,7 @@ use dozer_types::models::ingestion_types::IngestionMessage; use dozer_types::node::{NodeHandle, OpIdentifier}; use dozer_types::tonic::async_trait; use dozer_types::types::{ - Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition, + Field, FieldDefinition, FieldType, Operation, OperationWithId, Record, Schema, SourceDefinition, }; use std::collections::HashMap; @@ -92,7 +92,7 @@ impl Processor for ErrorProcessor { &mut self, _from_port: PortHandle, _record_store: &ProcessorRecordStore, - op: Operation, + op: OperationWithId, fw: &mut dyn ProcessorChannelForwarder, ) -> Result<(), BoxedError> { self.count += 1; @@ -361,7 +361,7 @@ impl Source for ErrGeneratorSource { Field::String(format!("value_{n}")), ]), }, - state: Some(OpIdentifier::new(0, n)), + id: Some(OpIdentifier::new(0, n)), }, )) .await?; @@ -455,7 +455,7 @@ impl Sink for ErrSink { &mut self, _from_port: PortHandle, _record_store: &ProcessorRecordStore, - _op: Operation, + _op: OperationWithId, ) -> Result<(), BoxedError> { self.current += 1; if self.current == self.err_at { @@ -479,9 +479,25 @@ impl Sink for ErrSink { Ok(()) } - fn on_source_snapshotting_done(&mut self, _connection_name: String) -> Result<(), BoxedError> { + fn on_source_snapshotting_done( + &mut self, + _connection_name: String, + _id: Option, + ) -> Result<(), BoxedError> { + Ok(()) + } + + fn set_source_state(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> { Ok(()) } + + fn get_source_state(&mut self) -> Result>, BoxedError> { + Ok(None) + } + + fn get_latest_op_id(&mut self) -> Result, BoxedError> { + Ok(None) + } } #[test] diff --git a/dozer-core/src/tests/dag_base_run.rs b/dozer-core/src/tests/dag_base_run.rs index 78ae7994c2..b9b84772a0 100644 --- a/dozer-core/src/tests/dag_base_run.rs +++ b/dozer-core/src/tests/dag_base_run.rs @@ -15,7 +15,7 @@ use dozer_recordstore::{ProcessorRecordStore, ProcessorRecordStoreDeserializer}; use dozer_types::errors::internal::BoxedError; use dozer_types::node::NodeHandle; use dozer_types::tonic::async_trait; -use dozer_types::types::{Operation, Schema}; +use dozer_types::types::{OperationWithId, Schema}; use std::collections::HashMap; use std::sync::atomic::{AtomicBool, Ordering}; @@ -77,7 +77,7 @@ impl Processor for NoopProcessor { &mut self, _from_port: PortHandle, _record_store: &ProcessorRecordStore, - op: Operation, + op: OperationWithId, fw: &mut dyn ProcessorChannelForwarder, ) -> Result<(), BoxedError> { fw.send(op, DEFAULT_PORT_HANDLE); @@ -237,7 +237,7 @@ impl Processor for NoopJoinProcessor { &mut self, _from_port: PortHandle, _record_store: &ProcessorRecordStore, - op: Operation, + op: OperationWithId, fw: &mut dyn ProcessorChannelForwarder, ) -> Result<(), BoxedError> { fw.send(op, DEFAULT_PORT_HANDLE); diff --git a/dozer-core/src/tests/sinks.rs b/dozer-core/src/tests/sinks.rs index c6ba8f53d6..66b45417b6 100644 --- a/dozer-core/src/tests/sinks.rs +++ b/dozer-core/src/tests/sinks.rs @@ -4,7 +4,8 @@ use crate::DEFAULT_PORT_HANDLE; use dozer_log::storage::Queue; use dozer_recordstore::ProcessorRecordStore; use dozer_types::errors::internal::BoxedError; -use dozer_types::types::{Operation, Schema}; +use dozer_types::node::OpIdentifier; +use dozer_types::types::{OperationWithId, Schema}; use dozer_types::log::debug; use std::collections::HashMap; @@ -72,7 +73,7 @@ impl Sink for CountingSink { &mut self, _from_port: PortHandle, _record_store: &ProcessorRecordStore, - _op: Operation, + _op: OperationWithId, ) -> Result<(), BoxedError> { self.current += 1; if self.current == self.expected { @@ -96,9 +97,25 @@ impl Sink for CountingSink { Ok(()) } - fn on_source_snapshotting_done(&mut self, _connection_name: String) -> Result<(), BoxedError> { + fn on_source_snapshotting_done( + &mut self, + _connection_name: String, + _id: Option, + ) -> Result<(), BoxedError> { + Ok(()) + } + + fn set_source_state(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> { Ok(()) } + + fn get_source_state(&mut self) -> Result>, BoxedError> { + Ok(None) + } + + fn get_latest_op_id(&mut self) -> Result, BoxedError> { + Ok(None) + } } #[derive(Debug)] diff --git a/dozer-core/src/tests/sources.rs b/dozer-core/src/tests/sources.rs index 5ace4845db..3f5082e07a 100644 --- a/dozer-core/src/tests/sources.rs +++ b/dozer-core/src/tests/sources.rs @@ -117,7 +117,7 @@ impl Source for GeneratorSource { Field::String(format!("value_{n}")), ]), }, - state: Some(OpIdentifier::new(0, n)), + id: Some(OpIdentifier::new(0, n)), }, )) .await?; @@ -248,7 +248,7 @@ impl Source for DualPortGeneratorSource { Field::String(format!("value_{n}")), ]), }, - state: Some(OpIdentifier::new(0, n)), + id: Some(OpIdentifier::new(0, n)), }, )) .await?; @@ -263,7 +263,7 @@ impl Source for DualPortGeneratorSource { Field::String(format!("value_{n}")), ]), }, - state: Some(OpIdentifier::new(0, n)), + id: Some(OpIdentifier::new(0, n)), }, )) .await?; diff --git a/dozer-ingestion/connector/src/ingestor.rs b/dozer-ingestion/connector/src/ingestor.rs index 140fd240cb..f89fbcc930 100644 --- a/dozer-ingestion/connector/src/ingestor.rs +++ b/dozer-ingestion/connector/src/ingestor.rs @@ -96,7 +96,7 @@ mod tests { .handle_message(IngestionMessage::OperationEvent { table_index: 0, op: operation.clone(), - state: None, + id: None, }) .await .unwrap(); @@ -104,12 +104,12 @@ mod tests { .handle_message(IngestionMessage::OperationEvent { table_index: 0, op: operation2.clone(), - state: None, + id: None, }) .await .unwrap(); ingestor - .handle_message(IngestionMessage::SnapshottingDone) + .handle_message(IngestionMessage::SnapshottingDone { id: None }) .await .unwrap(); @@ -121,7 +121,7 @@ mod tests { IngestionMessage::OperationEvent { table_index: 0, op, - state: None + id: None }, msg ); diff --git a/dozer-ingestion/deltalake/src/reader.rs b/dozer-ingestion/deltalake/src/reader.rs index 461aeb5132..d55e066a52 100644 --- a/dozer-ingestion/deltalake/src/reader.rs +++ b/dozer-ingestion/deltalake/src/reader.rs @@ -69,7 +69,7 @@ impl DeltaLakeReader { lifetime: None, }, }, - state: None, + id: None, }) .await .unwrap(); diff --git a/dozer-ingestion/dozer/src/connector.rs b/dozer-ingestion/dozer/src/connector.rs index 2ae207b016..2f4443b76c 100644 --- a/dozer-ingestion/dozer/src/connector.rs +++ b/dozer-ingestion/dozer/src/connector.rs @@ -260,7 +260,7 @@ async fn read_table( .send(IngestionMessage::OperationEvent { table_index, op, - state: Some(OpIdentifier::new(0, op_and_pos.pos)), + id: Some(OpIdentifier::new(0, op_and_pos.pos)), }) .await; } diff --git a/dozer-ingestion/ethereum/src/log/sender.rs b/dozer-ingestion/ethereum/src/log/sender.rs index b887f4bc78..bb922acb7d 100644 --- a/dozer-ingestion/ethereum/src/log/sender.rs +++ b/dozer-ingestion/ethereum/src/log/sender.rs @@ -222,7 +222,7 @@ async fn process_log(details: Arc>, msg: Log) { .handle_message(IngestionMessage::OperationEvent { table_index, op, - state: None, + id: None, }) .await .is_err() @@ -245,7 +245,7 @@ async fn process_log(details: Arc>, msg: Log) { .handle_message(IngestionMessage::OperationEvent { table_index, op, - state: None, + id: None, }) .await; } else { diff --git a/dozer-ingestion/ethereum/src/trace/connector.rs b/dozer-ingestion/ethereum/src/trace/connector.rs index 095ba2e98c..04c8b8f0d0 100644 --- a/dozer-ingestion/ethereum/src/trace/connector.rs +++ b/dozer-ingestion/ethereum/src/trace/connector.rs @@ -160,7 +160,7 @@ pub async fn run( .handle_message(IngestionMessage::OperationEvent { table_index: 0, // We have only one table op, - state: None, + id: None, }) .await .is_err() diff --git a/dozer-ingestion/grpc/src/adapter/arrow.rs b/dozer-ingestion/grpc/src/adapter/arrow.rs index a42f6b7301..72ff83f150 100644 --- a/dozer-ingestion/grpc/src/adapter/arrow.rs +++ b/dozer-ingestion/grpc/src/adapter/arrow.rs @@ -115,7 +115,7 @@ pub async fn handle_message( .handle_message(IngestionMessage::OperationEvent { table_index, op, - state: None, + id: None, }) .await .is_err() diff --git a/dozer-ingestion/grpc/src/adapter/default.rs b/dozer-ingestion/grpc/src/adapter/default.rs index 8bbc1092b5..1614ffb79b 100644 --- a/dozer-ingestion/grpc/src/adapter/default.rs +++ b/dozer-ingestion/grpc/src/adapter/default.rs @@ -88,7 +88,7 @@ pub async fn handle_message( .handle_message(IngestionMessage::OperationEvent { table_index, op, - state: None, + id: None, }) .await; Ok(()) diff --git a/dozer-ingestion/javascript/src/js_extension/mod.rs b/dozer-ingestion/javascript/src/js_extension/mod.rs index e9bd5f5535..5a3e2021f7 100644 --- a/dozer-ingestion/javascript/src/js_extension/mod.rs +++ b/dozer-ingestion/javascript/src/js_extension/mod.rs @@ -114,13 +114,13 @@ impl JsExtension { async fn send(ingestor: Ingestor, val: JsMessage) -> Result<(), Error> { let msg = match val.typ { MsgType::SnapshottingStarted => IngestionMessage::SnapshottingStarted, - MsgType::SnapshottingDone => IngestionMessage::SnapshottingDone, + MsgType::SnapshottingDone => IngestionMessage::SnapshottingDone { id: None }, MsgType::Insert | MsgType::Delete | MsgType::Update => { let op = map_operation(val)?; IngestionMessage::OperationEvent { table_index: 0, op, - state: None, + id: None, } } }; diff --git a/dozer-ingestion/kafka/src/debezium/stream_consumer.rs b/dozer-ingestion/kafka/src/debezium/stream_consumer.rs index c461acfd07..d59f8d790a 100644 --- a/dozer-ingestion/kafka/src/debezium/stream_consumer.rs +++ b/dozer-ingestion/kafka/src/debezium/stream_consumer.rs @@ -152,7 +152,7 @@ impl StreamConsumer for DebeziumStreamConsumer { lifetime: None, }, }, - state: None, + id: None, }) .await .is_err() @@ -174,7 +174,7 @@ impl StreamConsumer for DebeziumStreamConsumer { lifetime: None, }, }, - state: None, + id: None, }) .await .is_err() @@ -196,7 +196,7 @@ impl StreamConsumer for DebeziumStreamConsumer { lifetime: None, }, }, - state: None, + id: None, }) .await .is_err() diff --git a/dozer-ingestion/kafka/src/stream_consumer_basic.rs b/dozer-ingestion/kafka/src/stream_consumer_basic.rs index f2cffa5421..fceade9724 100644 --- a/dozer-ingestion/kafka/src/stream_consumer_basic.rs +++ b/dozer-ingestion/kafka/src/stream_consumer_basic.rs @@ -159,7 +159,7 @@ impl StreamConsumer for StreamConsumerBasic { lifetime: None, }, }, - state: None, + id: None, }) .await .is_err() diff --git a/dozer-ingestion/mongodb/src/lib.rs b/dozer-ingestion/mongodb/src/lib.rs index 2df4d4c2cf..9511d01e40 100644 --- a/dozer-ingestion/mongodb/src/lib.rs +++ b/dozer-ingestion/mongodb/src/lib.rs @@ -631,7 +631,7 @@ impl Connector for MongodbConnector { .handle_message(IngestionMessage::OperationEvent { table_index, op, - state: None, + id: None, }) .await .is_err() @@ -641,7 +641,7 @@ impl Connector for MongodbConnector { } } if snapshot_ingestor - .handle_message(IngestionMessage::SnapshottingDone) + .handle_message(IngestionMessage::SnapshottingDone { id: None }) .await .is_err() { @@ -678,7 +678,7 @@ impl Connector for MongodbConnector { .handle_message(IngestionMessage::OperationEvent { table_index, op, - state: None, + id: None, }) .await .is_err() diff --git a/dozer-ingestion/mysql/src/binlog.rs b/dozer-ingestion/mysql/src/binlog.rs index a81a6a6fff..89de529015 100644 --- a/dozer-ingestion/mysql/src/binlog.rs +++ b/dozer-ingestion/mysql/src/binlog.rs @@ -542,7 +542,7 @@ impl BinlogIngestor<'_, '_, '_> { .handle_message(IngestionMessage::OperationEvent { table_index: table.def.table_index, op: op?, - state: Some(encode_state(pos)), + id: Some(encode_state(pos)), }) .await .is_err() diff --git a/dozer-ingestion/mysql/src/connector.rs b/dozer-ingestion/mysql/src/connector.rs index 818df5bc6c..27932cefae 100644 --- a/dozer-ingestion/mysql/src/connector.rs +++ b/dozer-ingestion/mysql/src/connector.rs @@ -378,7 +378,7 @@ impl MySQLConnector { .handle_message(IngestionMessage::OperationEvent { table_index, op, - state: None, + id: None, }) .await .is_err() @@ -403,7 +403,7 @@ impl MySQLConnector { if snapshot_started && ingestor - .handle_message(IngestionMessage::SnapshottingDone) + .handle_message(IngestionMessage::SnapshottingDone { id: None }) .await .is_err() { @@ -623,7 +623,7 @@ mod tests { Field::Float(1.0.into()), ]), }, - state: None, + id: None, }, IngestionMessage::OperationEvent { table_index: 0, @@ -634,7 +634,7 @@ mod tests { Field::Float(2.0.into()), ]), }, - state: None, + id: None, }, IngestionMessage::OperationEvent { table_index: 0, @@ -645,9 +645,9 @@ mod tests { Field::Float(3.0.into()), ]), }, - state: None, + id: None, }, - IngestionMessage::SnapshottingDone, + IngestionMessage::SnapshottingDone { id: None }, ]; check_ingestion_messages(&mut iterator, expected_ingestion_messages).await; @@ -711,16 +711,16 @@ mod tests { op: Insert { new: Record::new(vec![Field::Int(4), Field::Float(4.0.into())]), }, - state: None, + id: None, }, IngestionMessage::OperationEvent { table_index: 1, op: Insert { new: Record::new(vec![Field::Int(1), Field::Json(true.into())]), }, - state: None, + id: None, }, - IngestionMessage::SnapshottingDone, + IngestionMessage::SnapshottingDone { id: None }, ]; check_ingestion_messages(&mut iterator, expected_ingestion_messages).await; @@ -736,7 +736,7 @@ mod tests { old: Record::new(vec![Field::Int(4), Field::Float(4.0.into())]), new: Record::new(vec![Field::Int(4), Field::Float(5.0.into())]), }, - state: None, + id: None, }]; check_ingestion_messages(&mut iterator, expected_ingestion_messages).await; @@ -751,7 +751,7 @@ mod tests { op: Delete { old: Record::new(vec![Field::Int(4), Field::Float(5.0.into())]), }, - state: None, + id: None, }]; check_ingestion_messages(&mut iterator, expected_ingestion_messages).await; diff --git a/dozer-ingestion/object-store/src/connector.rs b/dozer-ingestion/object-store/src/connector.rs index f4dea28f44..99fda3c63a 100644 --- a/dozer-ingestion/object-store/src/connector.rs +++ b/dozer-ingestion/object-store/src/connector.rs @@ -177,7 +177,7 @@ impl Connector for ObjectStoreConnector { let updated_state = try_join_all(handles).await.unwrap(); sender - .send(Ok(Some(IngestionMessage::SnapshottingDone))) + .send(Ok(Some(IngestionMessage::SnapshottingDone { id: None }))) .await .unwrap(); diff --git a/dozer-ingestion/object-store/src/table_reader.rs b/dozer-ingestion/object-store/src/table_reader.rs index d5fa1a94a7..846a9c9b5f 100644 --- a/dozer-ingestion/object-store/src/table_reader.rs +++ b/dozer-ingestion/object-store/src/table_reader.rs @@ -114,7 +114,7 @@ pub async fn read( .send(Ok(Some(IngestionMessage::OperationEvent { table_index, op: evt, - state: None, + id: None, }))) .await .is_err() diff --git a/dozer-ingestion/object-store/src/tests/local_storage_tests.rs b/dozer-ingestion/object-store/src/tests/local_storage_tests.rs index d5e5088511..c3aad47977 100644 --- a/dozer-ingestion/object-store/src/tests/local_storage_tests.rs +++ b/dozer-ingestion/object-store/src/tests/local_storage_tests.rs @@ -102,7 +102,7 @@ fn test_read_parquet_file() { } let row = iterator.next(); - if let Some(IngestionMessage::SnapshottingDone) = row { + if let Some(IngestionMessage::SnapshottingDone { .. }) = row { } else { panic!("Unexpected message"); } @@ -151,7 +151,7 @@ fn test_read_parquet_file_marker() { } let row = iterator.next(); - if let Some(IngestionMessage::SnapshottingDone) = row { + if let Some(IngestionMessage::SnapshottingDone { .. }) = row { } else { panic!("Unexpected message"); } @@ -172,7 +172,7 @@ fn test_read_parquet_file_no_marker() { } let row = iterator.next(); - if let Some(IngestionMessage::SnapshottingDone) = row { + if let Some(IngestionMessage::SnapshottingDone { .. }) = row { } else { panic!("Unexpected message"); } @@ -228,7 +228,7 @@ fn test_csv_read() { } let row = iterator.next(); - if let Some(IngestionMessage::SnapshottingDone) = row { + if let Some(IngestionMessage::SnapshottingDone { .. }) = row { } else { panic!("Unexpected message"); } @@ -284,7 +284,7 @@ fn test_csv_read_marker() { } let row = iterator.next(); - if let Some(IngestionMessage::SnapshottingDone) = row { + if let Some(IngestionMessage::SnapshottingDone { .. }) = row { } else { panic!("Unexpected message"); } @@ -342,7 +342,7 @@ fn test_csv_read_only_one_marker() { // No data to be snapshotted let row = iterator.next(); - if let Some(IngestionMessage::SnapshottingDone) = row { + if let Some(IngestionMessage::SnapshottingDone { .. }) = row { } else { panic!("Unexpected message"); } @@ -365,7 +365,7 @@ fn test_csv_read_no_marker() { // No data to be snapshotted let row = iterator.next(); - if let Some(IngestionMessage::SnapshottingDone) = row { + if let Some(IngestionMessage::SnapshottingDone { .. }) = row { } else { panic!("Unexpected message"); } diff --git a/dozer-ingestion/postgres/src/replicator.rs b/dozer-ingestion/postgres/src/replicator.rs index 01059ebb44..26f19628d4 100644 --- a/dozer-ingestion/postgres/src/replicator.rs +++ b/dozer-ingestion/postgres/src/replicator.rs @@ -132,7 +132,7 @@ impl<'a> CDCHandler<'a> { .handle_message(IngestionMessage::OperationEvent { table_index, op, - state: Some(OpIdentifier::new(self.begin_lsn, self.seq_no)), + id: Some(OpIdentifier::new(self.begin_lsn, self.seq_no)), }) .await .is_err() diff --git a/dozer-ingestion/postgres/src/snapshotter.rs b/dozer-ingestion/postgres/src/snapshotter.rs index 4b86f80807..573341ba6c 100644 --- a/dozer-ingestion/postgres/src/snapshotter.rs +++ b/dozer-ingestion/postgres/src/snapshotter.rs @@ -161,7 +161,7 @@ impl<'a> PostgresSnapshotter<'a> { .handle_message(IngestionMessage::OperationEvent { table_index, op: evt, - state: None, + id: None, }) .await .is_err() @@ -173,7 +173,7 @@ impl<'a> PostgresSnapshotter<'a> { if self .ingestor - .handle_message(IngestionMessage::SnapshottingDone) + .handle_message(IngestionMessage::SnapshottingDone { id: None }) .await .is_err() { diff --git a/dozer-ingestion/snowflake/src/stream_consumer.rs b/dozer-ingestion/snowflake/src/stream_consumer.rs index 23edc0b962..b1b4ed4dfd 100644 --- a/dozer-ingestion/snowflake/src/stream_consumer.rs +++ b/dozer-ingestion/snowflake/src/stream_consumer.rs @@ -126,7 +126,7 @@ impl StreamConsumer { .blocking_handle_message(IngestionMessage::OperationEvent { table_index, op, - state: Some(OpIdentifier::new(iteration, idx as u64)), + id: Some(OpIdentifier::new(iteration, idx as u64)), }) .is_err() { diff --git a/dozer-sink-aerospike/src/lib.rs b/dozer-sink-aerospike/src/lib.rs index beaa696d98..7119bf0848 100644 --- a/dozer-sink-aerospike/src/lib.rs +++ b/dozer-sink-aerospike/src/lib.rs @@ -1,4 +1,5 @@ use crossbeam_channel::{bounded, Receiver, Sender}; +use dozer_types::node::OpIdentifier; use std::alloc::{handle_alloc_error, Layout}; use std::ffi::{c_char, c_void, CStr, CString, NulError}; use std::fmt::Display; @@ -34,7 +35,7 @@ use dozer_types::errors::internal::BoxedError; use dozer_types::geo::{Coord, Point}; use dozer_types::log::{error, info}; use dozer_types::ordered_float::OrderedFloat; -use dozer_types::types::DozerPoint; +use dozer_types::types::{DozerPoint, OperationWithId}; use dozer_types::{ errors::types::TypeError, log::warn, @@ -338,7 +339,7 @@ impl Drop for AsRecord<'_> { #[derive(Debug)] struct AerospikeSink { - sender: Sender, + sender: Sender, snapshotting_started_instant: HashMap, } @@ -377,7 +378,7 @@ impl AerospikeSink { struct AerospikeSinkWorker { client: Arc, - receiver: Receiver, + receiver: Receiver, namespace: CString, set_name: CString, primary_index: usize, @@ -709,14 +710,14 @@ impl AerospikeSinkWorker { Ok(()) } - fn process_impl(&mut self, op: Operation) -> Result<(), AerospikeSinkError> { + fn process_impl(&mut self, op: OperationWithId) -> Result<(), AerospikeSinkError> { // XXX: We know from the schema how many strings we have to allocate, // so we could optimize this to allocate the correct amount ahead // of time. Furthermore, we also know (an upper bound of) the total size of the strings we // have to allocate, so we could just allocate one large Vec, and // use that for all string allocations, like an arena let mut allocated_strings = Vec::new(); - match op { + match op.op { Operation::Insert { new } => { // We create the key and record on the stack, because we can // and it saves an allocation. These structs are self-referential @@ -876,7 +877,7 @@ impl Sink for AerospikeSink { &mut self, from_port: PortHandle, _record_store: &dozer_recordstore::ProcessorRecordStore, - op: dozer_types::types::Operation, + op: OperationWithId, ) -> Result<(), BoxedError> { debug_assert_eq!(from_port, DEFAULT_PORT_HANDLE); self.sender.send(op)?; @@ -900,7 +901,11 @@ impl Sink for AerospikeSink { Ok(()) } - fn on_source_snapshotting_done(&mut self, connection_name: String) -> Result<(), BoxedError> { + fn on_source_snapshotting_done( + &mut self, + connection_name: String, + _id: Option, + ) -> Result<(), BoxedError> { if let Some(started_instant) = self.snapshotting_started_instant.remove(&connection_name) { info!( "Snapshotting for connection {} took {:?}", @@ -915,6 +920,18 @@ impl Sink for AerospikeSink { } Ok(()) } + + fn set_source_state(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> { + Ok(()) + } + + fn get_source_state(&mut self) -> Result>, BoxedError> { + Ok(None) + } + + fn get_latest_op_id(&mut self) -> Result, BoxedError> { + Ok(None) + } } #[cfg(test)] @@ -954,9 +971,9 @@ mod tests { sink.process( DEFAULT_PORT_HANDLE, &rs, - Operation::Insert { + OperationWithId::without_id(Operation::Insert { new: record(i as u64), - }, + }), ) .unwrap(); } @@ -980,7 +997,7 @@ mod tests { sink.process( DEFAULT_PORT_HANDLE, &rs, - Operation::BatchInsert { new: batch }, + OperationWithId::without_id(Operation::BatchInsert { new: batch }), ) .unwrap() } diff --git a/dozer-sql/src/aggregation/processor.rs b/dozer-sql/src/aggregation/processor.rs index 9722a1cf66..ab150c07b6 100644 --- a/dozer-sql/src/aggregation/processor.rs +++ b/dozer-sql/src/aggregation/processor.rs @@ -12,7 +12,7 @@ use dozer_recordstore::ProcessorRecordStore; use dozer_sql_expression::execution::Expression; use dozer_types::bincode; use dozer_types::errors::internal::BoxedError; -use dozer_types::types::{Field, FieldType, Operation, Record, Schema}; +use dozer_types::types::{Field, FieldType, Operation, OperationWithId, Record, Schema}; use std::collections::HashMap; use crate::aggregation::aggregator::{ @@ -604,12 +604,12 @@ impl Processor for AggregationProcessor { &mut self, _from_port: PortHandle, _record_store: &ProcessorRecordStore, - op: Operation, + op: OperationWithId, fw: &mut dyn ProcessorChannelForwarder, ) -> Result<(), BoxedError> { - let ops = self.aggregate(op)?; + let ops = self.aggregate(op.op)?; for output_op in ops { - fw.send(output_op, DEFAULT_PORT_HANDLE); + fw.send(OperationWithId::without_id(output_op), DEFAULT_PORT_HANDLE); } Ok(()) } diff --git a/dozer-sql/src/expression/tests/test_common.rs b/dozer-sql/src/expression/tests/test_common.rs index bd887fb75e..af8197bc3d 100644 --- a/dozer-sql/src/expression/tests/test_common.rs +++ b/dozer-sql/src/expression/tests/test_common.rs @@ -4,16 +4,16 @@ use dozer_core::channels::ProcessorChannelForwarder; use dozer_core::node::ProcessorFactory; use dozer_core::DEFAULT_PORT_HANDLE; use dozer_recordstore::ProcessorRecordStoreDeserializer; -use dozer_types::types::{Field, Schema}; +use dozer_types::types::{Field, OperationWithId, Schema}; use dozer_types::types::{Operation, Record}; use std::collections::HashMap; struct TestChannelForwarder { - operations: Vec, + operations: Vec, } impl ProcessorChannelForwarder for TestChannelForwarder { - fn send(&mut self, op: Operation, _port: dozer_core::node::PortHandle) { + fn send(&mut self, op: OperationWithId, _port: dozer_core::node::PortHandle) { self.operations.push(op); } } @@ -57,10 +57,15 @@ pub(crate) fn run_fct(sql: &str, schema: Schema, input: Vec) -> Field { let op = Operation::Insert { new: rec }; processor - .process(DEFAULT_PORT_HANDLE, &record_store, op, &mut fw) + .process( + DEFAULT_PORT_HANDLE, + &record_store, + OperationWithId::without_id(op), + &mut fw, + ) .unwrap(); - match &mut fw.operations[0] { + match &mut fw.operations[0].op { Operation::Insert { new } => new.values.remove(0), _ => panic!("Unable to find result value"), } diff --git a/dozer-sql/src/product/join/processor.rs b/dozer-sql/src/product/join/processor.rs index e3cd4474f2..8a7e447436 100644 --- a/dozer-sql/src/product/join/processor.rs +++ b/dozer-sql/src/product/join/processor.rs @@ -6,7 +6,7 @@ use dozer_core::DEFAULT_PORT_HANDLE; use dozer_recordstore::ProcessorRecordStore; use dozer_tracing::Labels; use dozer_types::errors::internal::BoxedError; -use dozer_types::types::{Lifetime, Operation}; +use dozer_types::types::{Lifetime, Operation, OperationWithId}; use metrics::{ counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram, increment_counter, @@ -76,7 +76,7 @@ impl Processor for ProductProcessor { &mut self, from_port: PortHandle, _record_store: &ProcessorRecordStore, - op: Operation, + op: OperationWithId, fw: &mut dyn ProcessorChannelForwarder, ) -> Result<(), BoxedError> { let from_branch = match from_port { @@ -86,7 +86,7 @@ impl Processor for ProductProcessor { }; let now = std::time::Instant::now(); - let records = match op { + let records = match op.op { Operation::Delete { old } => { if let Some(lifetime) = old.get_lifetime() { self.update_eviction_index(lifetime); @@ -123,9 +123,9 @@ impl Processor for ProductProcessor { self.process( from_port, _record_store, - Operation::Insert { + OperationWithId::without_id(Operation::Insert { new: record.clone(), - }, + }), fw, )?; } @@ -157,10 +157,16 @@ impl Processor for ProductProcessor { for (action, record) in records { match action { JoinAction::Insert => { - fw.send(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE); + fw.send( + OperationWithId::without_id(Operation::Insert { new: record }), + DEFAULT_PORT_HANDLE, + ); } JoinAction::Delete => { - fw.send(Operation::Delete { old: record }, DEFAULT_PORT_HANDLE); + fw.send( + OperationWithId::without_id(Operation::Delete { old: record }), + DEFAULT_PORT_HANDLE, + ); } } } @@ -201,11 +207,11 @@ mod tests { use super::*; struct TestChannelForwarder { - operations: Vec, + operations: Vec, } impl ProcessorChannelForwarder for TestChannelForwarder { - fn send(&mut self, op: Operation, _port: dozer_core::node::PortHandle) { + fn send(&mut self, op: OperationWithId, _port: dozer_core::node::PortHandle) { self.operations.push(op); } } @@ -307,11 +313,16 @@ mod tests { JoinSide::Right => RIGHT_JOIN_PORT, }; self.processor - .process(port, &self.record_store, operation, &mut self.forwarder) + .process( + port, + &self.record_store, + OperationWithId::without_id(operation), + &mut self.forwarder, + ) .unwrap(); let output_ops = self.forwarder.operations.clone(); self.forwarder.operations.clear(); - output_ops + output_ops.into_iter().map(|op| op.op).collect() } fn insert(&mut self, side: JoinSide, values: &[Field]) -> (Record, Vec) { diff --git a/dozer-sql/src/product/set/set_processor.rs b/dozer-sql/src/product/set/set_processor.rs index 5d85b8819f..bfe18c8941 100644 --- a/dozer-sql/src/product/set/set_processor.rs +++ b/dozer-sql/src/product/set/set_processor.rs @@ -12,7 +12,7 @@ use dozer_core::node::{PortHandle, Processor}; use dozer_core::DEFAULT_PORT_HANDLE; use dozer_recordstore::{ProcessorRecordStore, ProcessorRecordStoreDeserializer}; use dozer_types::errors::internal::BoxedError; -use dozer_types::types::{Operation, Record}; +use dozer_types::types::{Operation, OperationWithId, Record}; use std::fmt::{Debug, Formatter}; pub struct SetProcessor { @@ -102,20 +102,26 @@ impl Processor for SetProcessor { &mut self, _from_port: PortHandle, _record_store: &ProcessorRecordStore, - op: Operation, + op: OperationWithId, fw: &mut dyn ProcessorChannelForwarder, ) -> Result<(), BoxedError> { - match op { + match op.op { Operation::Delete { old } => { let records = self.delete(old).map_err(PipelineError::ProductError)?; for (action, record) in records.into_iter() { match action { SetAction::Insert => { - fw.send(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE); + fw.send( + OperationWithId::without_id(Operation::Insert { new: record }), + DEFAULT_PORT_HANDLE, + ); } SetAction::Delete => { - fw.send(Operation::Delete { old: record }, DEFAULT_PORT_HANDLE); + fw.send( + OperationWithId::without_id(Operation::Delete { old: record }), + DEFAULT_PORT_HANDLE, + ); } } } @@ -126,10 +132,16 @@ impl Processor for SetProcessor { for (action, record) in records.into_iter() { match action { SetAction::Insert => { - fw.send(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE); + fw.send( + OperationWithId::without_id(Operation::Insert { new: record }), + DEFAULT_PORT_HANDLE, + ); } SetAction::Delete => { - fw.send(Operation::Delete { old: record }, DEFAULT_PORT_HANDLE); + fw.send( + OperationWithId::without_id(Operation::Delete { old: record }), + DEFAULT_PORT_HANDLE, + ); } } } @@ -141,10 +153,16 @@ impl Processor for SetProcessor { for (action, old) in old_records.into_iter() { match action { SetAction::Insert => { - fw.send(Operation::Insert { new: old }, DEFAULT_PORT_HANDLE); + fw.send( + OperationWithId::without_id(Operation::Insert { new: old }), + DEFAULT_PORT_HANDLE, + ); } SetAction::Delete => { - fw.send(Operation::Delete { old }, DEFAULT_PORT_HANDLE); + fw.send( + OperationWithId::without_id(Operation::Delete { old }), + DEFAULT_PORT_HANDLE, + ); } } } @@ -152,10 +170,16 @@ impl Processor for SetProcessor { for (action, new) in new_records.into_iter() { match action { SetAction::Insert => { - fw.send(Operation::Insert { new }, DEFAULT_PORT_HANDLE); + fw.send( + OperationWithId::without_id(Operation::Insert { new }), + DEFAULT_PORT_HANDLE, + ); } SetAction::Delete => { - fw.send(Operation::Delete { old: new }, DEFAULT_PORT_HANDLE); + fw.send( + OperationWithId::without_id(Operation::Delete { old: new }), + DEFAULT_PORT_HANDLE, + ); } } } @@ -165,7 +189,7 @@ impl Processor for SetProcessor { self.process( _from_port, _record_store, - Operation::Insert { new: record }, + OperationWithId::without_id(Operation::Insert { new: record }), fw, )?; } diff --git a/dozer-sql/src/product/table/processor.rs b/dozer-sql/src/product/table/processor.rs index c9a5960329..7bed836a1a 100644 --- a/dozer-sql/src/product/table/processor.rs +++ b/dozer-sql/src/product/table/processor.rs @@ -5,7 +5,7 @@ use dozer_core::node::{PortHandle, Processor}; use dozer_core::DEFAULT_PORT_HANDLE; use dozer_recordstore::ProcessorRecordStore; use dozer_types::errors::internal::BoxedError; -use dozer_types::types::Operation; +use dozer_types::types::OperationWithId; #[derive(Debug)] pub struct TableProcessor { @@ -27,7 +27,7 @@ impl Processor for TableProcessor { &mut self, _from_port: PortHandle, _record_store: &ProcessorRecordStore, - op: Operation, + op: OperationWithId, fw: &mut dyn ProcessorChannelForwarder, ) -> Result<(), BoxedError> { fw.send(op, DEFAULT_PORT_HANDLE); diff --git a/dozer-sql/src/projection/processor.rs b/dozer-sql/src/projection/processor.rs index bffebeb559..338c823b4c 100644 --- a/dozer-sql/src/projection/processor.rs +++ b/dozer-sql/src/projection/processor.rs @@ -9,7 +9,7 @@ use dozer_core::node::{PortHandle, Processor}; use dozer_core::DEFAULT_PORT_HANDLE; use dozer_recordstore::ProcessorRecordStore; use dozer_types::errors::internal::BoxedError; -use dozer_types::types::{Operation, Record, Schema}; +use dozer_types::types::{Operation, OperationWithId, Record, Schema}; #[derive(Debug)] pub struct ProjectionProcessor { @@ -85,10 +85,10 @@ impl Processor for ProjectionProcessor { &mut self, _from_port: PortHandle, _record_store: &ProcessorRecordStore, - op: Operation, + op: OperationWithId, fw: &mut dyn ProcessorChannelForwarder, ) -> Result<(), BoxedError> { - let output_op = match op { + let output_op = match op.op { Operation::Delete { ref old } => self.delete(old)?, Operation::Insert { ref new } => Operation::Insert { new: self.insert(new)?, @@ -102,7 +102,13 @@ impl Processor for ProjectionProcessor { Operation::BatchInsert { new: records } } }; - fw.send(output_op, DEFAULT_PORT_HANDLE); + fw.send( + OperationWithId { + id: op.id, + op: output_op, + }, + DEFAULT_PORT_HANDLE, + ); Ok(()) } diff --git a/dozer-sql/src/selection/processor.rs b/dozer-sql/src/selection/processor.rs index 2d299a7a51..0d234bf659 100644 --- a/dozer-sql/src/selection/processor.rs +++ b/dozer-sql/src/selection/processor.rs @@ -7,7 +7,7 @@ use dozer_core::DEFAULT_PORT_HANDLE; use dozer_recordstore::ProcessorRecordStore; use dozer_sql_expression::execution::Expression; use dozer_types::errors::internal::BoxedError; -use dozer_types::types::{Field, Operation, Record, Schema}; +use dozer_types::types::{Field, Operation, OperationWithId, Record, Schema}; use crate::errors::PipelineError; @@ -48,10 +48,10 @@ impl Processor for SelectionProcessor { &mut self, _from_port: PortHandle, _record_store: &ProcessorRecordStore, - op: Operation, + op: OperationWithId, fw: &mut dyn ProcessorChannelForwarder, ) -> Result<(), BoxedError> { - match op { + match op.op { Operation::Delete { ref old } => { if self.filter(old)? { fw.send(op, DEFAULT_PORT_HANDLE); @@ -68,15 +68,33 @@ impl Processor for SelectionProcessor { match (old_fulfilled, new_fulfilled) { (true, true) => { // both records fulfills the WHERE condition, forward the operation - fw.send(Operation::Update { old, new }, DEFAULT_PORT_HANDLE); + fw.send( + OperationWithId { + id: op.id, + op: Operation::Update { old, new }, + }, + DEFAULT_PORT_HANDLE, + ); } (true, false) => { // the old record fulfills the WHERE condition while then new one doesn't, forward a delete operation - fw.send(Operation::Delete { old }, DEFAULT_PORT_HANDLE); + fw.send( + OperationWithId { + id: op.id, + op: Operation::Delete { old }, + }, + DEFAULT_PORT_HANDLE, + ); } (false, true) => { // the old record doesn't fulfill the WHERE condition while then new one does, forward an insert operation - fw.send(Operation::Insert { new }, DEFAULT_PORT_HANDLE); + fw.send( + OperationWithId { + id: op.id, + op: Operation::Insert { new }, + }, + DEFAULT_PORT_HANDLE, + ); } (false, false) => { // both records doesn't fulfill the WHERE condition, don't forward the operation @@ -93,7 +111,13 @@ impl Processor for SelectionProcessor { }) .collect::, _>>()?; if !records.is_empty() { - fw.send(Operation::BatchInsert { new: records }, DEFAULT_PORT_HANDLE); + fw.send( + OperationWithId { + id: op.id, + op: Operation::BatchInsert { new: records }, + }, + DEFAULT_PORT_HANDLE, + ); } } } diff --git a/dozer-sql/src/table_operator/processor.rs b/dozer-sql/src/table_operator/processor.rs index 161d803be7..a040367504 100644 --- a/dozer-sql/src/table_operator/processor.rs +++ b/dozer-sql/src/table_operator/processor.rs @@ -5,7 +5,7 @@ use dozer_core::node::{PortHandle, Processor}; use dozer_core::DEFAULT_PORT_HANDLE; use dozer_recordstore::ProcessorRecordStore; use dozer_types::errors::internal::BoxedError; -use dozer_types::types::{Operation, Schema}; +use dozer_types::types::{Operation, OperationWithId, Schema}; use crate::errors::PipelineError; @@ -42,17 +42,20 @@ impl Processor for TableOperatorProcessor { &mut self, _from_port: PortHandle, record_store: &ProcessorRecordStore, - op: Operation, + op: OperationWithId, fw: &mut dyn ProcessorChannelForwarder, ) -> Result<(), BoxedError> { - match op { + match op.op { Operation::Delete { ref old } => { let records = self .operator .execute(record_store, old, &self.input_schema) .map_err(PipelineError::TableOperatorError)?; for record in records { - fw.send(Operation::Delete { old: record }, DEFAULT_PORT_HANDLE); + fw.send( + OperationWithId::without_id(Operation::Delete { old: record }), + DEFAULT_PORT_HANDLE, + ); } } Operation::Insert { ref new } => { @@ -61,7 +64,10 @@ impl Processor for TableOperatorProcessor { .execute(record_store, new, &self.input_schema) .map_err(PipelineError::TableOperatorError)?; for record in records { - fw.send(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE); + fw.send( + OperationWithId::without_id(Operation::Insert { new: record }), + DEFAULT_PORT_HANDLE, + ); } } Operation::Update { ref old, ref new } => { @@ -70,7 +76,10 @@ impl Processor for TableOperatorProcessor { .execute(record_store, old, &self.input_schema) .map_err(PipelineError::TableOperatorError)?; for record in old_records { - fw.send(Operation::Delete { old: record }, DEFAULT_PORT_HANDLE); + fw.send( + OperationWithId::without_id(Operation::Delete { old: record }), + DEFAULT_PORT_HANDLE, + ); } let new_records = self @@ -78,7 +87,10 @@ impl Processor for TableOperatorProcessor { .execute(record_store, new, &self.input_schema) .map_err(PipelineError::TableOperatorError)?; for record in new_records { - fw.send(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE); + fw.send( + OperationWithId::without_id(Operation::Insert { new: record }), + DEFAULT_PORT_HANDLE, + ); } } Operation::BatchInsert { new } => { @@ -90,7 +102,10 @@ impl Processor for TableOperatorProcessor { .map_err(PipelineError::TableOperatorError)?, ); } - fw.send(Operation::BatchInsert { new: records }, DEFAULT_PORT_HANDLE); + fw.send( + OperationWithId::without_id(Operation::BatchInsert { new: records }), + DEFAULT_PORT_HANDLE, + ); } } Ok(()) diff --git a/dozer-sql/src/tests/builder_test.rs b/dozer-sql/src/tests/builder_test.rs index bec44aa143..f225e157f9 100644 --- a/dozer-sql/src/tests/builder_test.rs +++ b/dozer-sql/src/tests/builder_test.rs @@ -17,7 +17,7 @@ use dozer_types::node::OpIdentifier; use dozer_types::ordered_float::OrderedFloat; use dozer_types::tonic::async_trait; use dozer_types::types::{ - Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition, + Field, FieldDefinition, FieldType, Operation, OperationWithId, Record, Schema, SourceDefinition, }; use tokio::sync::mpsc::Sender; @@ -133,7 +133,7 @@ impl Source for TestSource { ), ]), }, - state: None, + id: None, }, )) .await @@ -179,7 +179,7 @@ impl Sink for TestSink { &mut self, _from_port: PortHandle, _record_store: &ProcessorRecordStore, - op: Operation, + op: OperationWithId, ) -> Result<(), BoxedError> { println!("Sink: {:?}", op); Ok(()) @@ -200,9 +200,25 @@ impl Sink for TestSink { Ok(()) } - fn on_source_snapshotting_done(&mut self, _connection_name: String) -> Result<(), BoxedError> { + fn on_source_snapshotting_done( + &mut self, + _connection_name: String, + _id: Option, + ) -> Result<(), BoxedError> { + Ok(()) + } + + fn set_source_state(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> { Ok(()) } + + fn get_source_state(&mut self) -> Result>, BoxedError> { + Ok(None) + } + + fn get_latest_op_id(&mut self) -> Result, BoxedError> { + Ok(None) + } } #[test] diff --git a/dozer-sql/src/window/processor.rs b/dozer-sql/src/window/processor.rs index 24f3111b4c..ef7f405b9e 100644 --- a/dozer-sql/src/window/processor.rs +++ b/dozer-sql/src/window/processor.rs @@ -6,7 +6,7 @@ use dozer_core::node::{PortHandle, Processor}; use dozer_core::DEFAULT_PORT_HANDLE; use dozer_recordstore::ProcessorRecordStore; use dozer_types::errors::internal::BoxedError; -use dozer_types::types::Operation; +use dozer_types::types::{Operation, OperationWithId}; use super::operator::WindowType; @@ -31,17 +31,20 @@ impl Processor for WindowProcessor { &mut self, _from_port: PortHandle, record_store: &ProcessorRecordStore, - op: Operation, + op: OperationWithId, fw: &mut dyn ProcessorChannelForwarder, ) -> Result<(), BoxedError> { - match op { + match op.op { Operation::Delete { old } => { let records = self .window .execute(record_store, old) .map_err(PipelineError::WindowError)?; for record in records { - fw.send(Operation::Delete { old: record }, DEFAULT_PORT_HANDLE); + fw.send( + OperationWithId::without_id(Operation::Delete { old: record }), + DEFAULT_PORT_HANDLE, + ); } } Operation::Insert { new } => { @@ -50,21 +53,24 @@ impl Processor for WindowProcessor { .execute(record_store, new) .map_err(PipelineError::WindowError)?; for record in records { - fw.send(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE); + fw.send( + OperationWithId::without_id(Operation::Insert { new: record }), + DEFAULT_PORT_HANDLE, + ); } } Operation::Update { old, new } => { self.process( DEFAULT_PORT_HANDLE, record_store, - Operation::Delete { old }, + OperationWithId::without_id(Operation::Delete { old }), fw, )?; self.process( DEFAULT_PORT_HANDLE, record_store, - Operation::Insert { new }, + OperationWithId::without_id(Operation::Insert { new }), fw, )?; } @@ -77,7 +83,10 @@ impl Processor for WindowProcessor { .map_err(PipelineError::WindowError)?, ); } - fw.send(Operation::BatchInsert { new: records }, DEFAULT_PORT_HANDLE); + fw.send( + OperationWithId::without_id(Operation::BatchInsert { new: records }), + DEFAULT_PORT_HANDLE, + ); } } Ok(()) diff --git a/dozer-tests/src/sql_tests/helper/pipeline.rs b/dozer-tests/src/sql_tests/helper/pipeline.rs index 29461f7dd2..abe2269986 100644 --- a/dozer-tests/src/sql_tests/helper/pipeline.rs +++ b/dozer-tests/src/sql_tests/helper/pipeline.rs @@ -21,7 +21,7 @@ use dozer_sql::builder::statement_to_pipeline; use dozer_types::errors::internal::BoxedError; use dozer_types::models::ingestion_types::IngestionMessage; use dozer_types::node::OpIdentifier; -use dozer_types::types::{Operation, Record, Schema, SourceDefinition}; +use dozer_types::types::{Operation, OperationWithId, Record, Schema, SourceDefinition}; use std::collections::HashMap; use tempdir::TempDir; use tokio::{self, runtime::Runtime}; @@ -140,7 +140,7 @@ impl Source for TestSource { IngestionMessage::OperationEvent { table_index: 0, op, - state: None, + id: None, }, )) .await @@ -255,9 +255,9 @@ impl Sink for TestSink { &mut self, _from_port: PortHandle, record_store: &ProcessorRecordStore, - op: Operation, + op: OperationWithId, ) -> Result<(), BoxedError> { - self.update_result(record_store, op) + self.update_result(record_store, op.op) } fn commit(&mut self, _epoch_details: &Epoch) -> Result<(), BoxedError> { @@ -275,9 +275,25 @@ impl Sink for TestSink { Ok(()) } - fn on_source_snapshotting_done(&mut self, _connection_name: String) -> Result<(), BoxedError> { + fn on_source_snapshotting_done( + &mut self, + _connection_name: String, + _id: Option, + ) -> Result<(), BoxedError> { Ok(()) } + + fn set_source_state(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> { + Ok(()) + } + + fn get_source_state(&mut self) -> Result>, BoxedError> { + Ok(None) + } + + fn get_latest_op_id(&mut self) -> Result, BoxedError> { + Ok(None) + } } pub struct TestPipeline { diff --git a/dozer-types/src/models/ingestion_types.rs b/dozer-types/src/models/ingestion_types.rs index 6e6982b376..1589dcf4c0 100644 --- a/dozer-types/src/models/ingestion_types.rs +++ b/dozer-types/src/models/ingestion_types.rs @@ -25,13 +25,13 @@ pub enum IngestionMessage { /// The CDC event. op: Operation, /// If this connector supports restarting from a specific CDC event, it should provide a `OpIdentifier`. - state: Option, + id: Option, }, /// A connector uses this message kind to notify Dozer that a initial snapshot of the source tables is started SnapshottingStarted, /// A connector uses this message kind to notify Dozer that a initial snapshot of the source tables is done, /// and the data is up-to-date until next CDC event. - SnapshottingDone, + SnapshottingDone { id: Option }, } #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Hash, JsonSchema, Default)] diff --git a/dozer-types/src/types/mod.rs b/dozer-types/src/types/mod.rs index 70baca098a..ecc04bebcc 100644 --- a/dozer-types/src/types/mod.rs +++ b/dozer-types/src/types/mod.rs @@ -8,6 +8,7 @@ use std::hash::Hash; use std::str::FromStr; use crate::errors::types::TypeError; +use crate::node::OpIdentifier; use prettytable::{Cell, Row, Table}; use serde::{self, Deserialize, Serialize}; @@ -294,6 +295,18 @@ pub enum Operation { BatchInsert { new: Vec }, } +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, bincode::Encode, bincode::Decode)] +pub struct OperationWithId { + pub id: Option, + pub op: Operation, +} + +impl OperationWithId { + pub fn without_id(op: Operation) -> Self { + Self { id: None, op } + } +} + // Helpful in interacting with external systems during ingestion and querying // For example, nanoseconds can overflow. #[derive(