Skip to content

Commit

Permalink
refactor: Send source state and op id to sink
Browse files Browse the repository at this point in the history
  • Loading branch information
chubei committed Jan 30, 2024
1 parent 2f526c8 commit b279c89
Show file tree
Hide file tree
Showing 51 changed files with 639 additions and 250 deletions.
2 changes: 1 addition & 1 deletion dozer-cli/src/pipeline/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ async fn forward_message_to_pipeline(
break;
}
}
IngestionMessage::SnapshottingDone | IngestionMessage::SnapshottingStarted => {
IngestionMessage::SnapshottingDone { .. } | IngestionMessage::SnapshottingStarted => {
for port in &ports {
if sender.send((*port, message.clone())).await.is_err() {
break;
Expand Down
25 changes: 21 additions & 4 deletions dozer-cli/src/pipeline/dummy_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use dozer_types::{
chrono::Local,
errors::internal::BoxedError,
log::{info, warn},
types::{FieldType, Operation, Schema},
node::OpIdentifier,
types::{FieldType, Operation, OperationWithId, Schema},
};

#[derive(Debug)]
Expand Down Expand Up @@ -57,10 +58,10 @@ impl Sink for DummySink {
&mut self,
_from_port: PortHandle,
_record_store: &ProcessorRecordStore,
op: Operation,
op: OperationWithId,
) -> Result<(), BoxedError> {
if let Some(inserted_at_index) = self.inserted_at_index {
if let Operation::Insert { new } = op {
if let Operation::Insert { new } = op.op {
let value = &new.values[inserted_at_index];
if let Some(inserted_at) = value.to_timestamp() {
let latency = Local::now().naive_utc() - inserted_at.naive_utc();
Expand Down Expand Up @@ -90,7 +91,11 @@ impl Sink for DummySink {
Ok(())
}

fn on_source_snapshotting_done(&mut self, connection_name: String) -> Result<(), BoxedError> {
fn on_source_snapshotting_done(
&mut self,
connection_name: String,
_id: Option<OpIdentifier>,
) -> Result<(), BoxedError> {
if let Some(started_instant) = self.snapshotting_started_instant.remove(&connection_name) {
info!(
"Snapshotting for connection {} took {:?}",
Expand All @@ -105,4 +110,16 @@ impl Sink for DummySink {
}
Ok(())
}

fn set_source_state(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> {
Ok(())
}

fn get_source_state(&mut self) -> Result<Option<Vec<u8>>, BoxedError> {
Ok(None)
}

fn get_latest_op_id(&mut self) -> Result<Option<OpIdentifier>, BoxedError> {
Ok(None)
}
}
26 changes: 21 additions & 5 deletions dozer-cli/src/pipeline/log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ use dozer_core::{
};
use dozer_recordstore::ProcessorRecordStore;
use dozer_tracing::LabelsAndProgress;
use dozer_types::indicatif::ProgressBar;
use dozer_types::types::Schema;
use dozer_types::{errors::internal::BoxedError, types::Operation};
use dozer_types::{errors::internal::BoxedError, node::OpIdentifier};
use dozer_types::{indicatif::ProgressBar, types::OperationWithId};
use tokio::{runtime::Runtime, sync::Mutex};

#[derive(Debug)]
Expand Down Expand Up @@ -87,12 +87,12 @@ impl Sink for LogSink {
&mut self,
_from_port: PortHandle,
_record_store: &ProcessorRecordStore,
op: Operation,
op: OperationWithId,
) -> Result<(), BoxedError> {
let end = self
.runtime
.block_on(self.log.lock())
.write(dozer_cache::dozer_log::replication::LogOperation::Op { op });
.write(dozer_cache::dozer_log::replication::LogOperation::Op { op: op.op });
self.pb.set_position(end as u64);
Ok(())
}
Expand Down Expand Up @@ -131,12 +131,28 @@ impl Sink for LogSink {
Ok(())
}

fn on_source_snapshotting_done(&mut self, connection_name: String) -> Result<(), BoxedError> {
fn on_source_snapshotting_done(
&mut self,
connection_name: String,
_id: Option<OpIdentifier>,
) -> Result<(), BoxedError> {
let end = self
.runtime
.block_on(self.log.lock())
.write(LogOperation::SnapshottingDone { connection_name });
self.pb.set_position(end as u64);
Ok(())
}

fn set_source_state(&mut self, _source_state: &[u8]) -> Result<(), BoxedError> {
Ok(())
}

fn get_source_state(&mut self) -> Result<Option<Vec<u8>>, BoxedError> {
Ok(None)
}

fn get_latest_op_id(&mut self) -> Result<Option<OpIdentifier>, BoxedError> {
Ok(None)
}
}
31 changes: 7 additions & 24 deletions dozer-cli/src/simple/build/contract/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::{BTreeMap, HashMap, HashSet},
collections::{BTreeMap, HashMap},
fs::OpenOptions,
path::Path,
};
Expand All @@ -11,7 +11,7 @@ use dozer_core::{
node::PortHandle,
petgraph::{
algo::is_isomorphic_matching,
visit::{EdgeRef, IntoEdgesDirected, IntoNodeReferences},
visit::{IntoEdgesDirected, IntoNodeReferences},
Direction,
},
};
Expand Down Expand Up @@ -94,7 +94,11 @@ impl Contract {
api,
)?;

let connections = collect_ancestor_sources(dag_schemas, node_index);
let connections = dag_schemas
.collect_ancestor_sources(node_index)
.into_iter()
.map(|handle| handle.id)
.collect();

let schema = EndpointSchema {
path: api.path.clone(),
Expand Down Expand Up @@ -195,27 +199,6 @@ fn sink_input_schema(dag: &DagSchemas, node_index: NodeIndex) -> &Schema {
&edge.weight().schema
}

fn collect_ancestor_sources(dag: &DagSchemas, node_index: NodeIndex) -> HashSet<String> {
let mut sources = HashSet::new();
collect_ancestor_sources_recursive(dag, node_index, &mut sources);
sources
}

fn collect_ancestor_sources_recursive(
dag: &DagSchemas,
node_index: NodeIndex,
sources: &mut HashSet<String>,
) {
for edge in dag.graph().edges_directed(node_index, Direction::Incoming) {
let source_node_index = edge.source();
let source_node = &dag.graph()[source_node_index];
if matches!(source_node.kind, dozer_core::NodeKind::Source(_)) {
sources.insert(source_node.handle.id.clone());
}
collect_ancestor_sources_recursive(dag, source_node_index, sources);
}
}

fn serde_json_to_path(path: impl AsRef<Path>, value: &impl Serialize) -> Result<(), BuildError> {
let file = OpenOptions::new()
.create(true)
Expand Down
148 changes: 123 additions & 25 deletions dozer-core/src/builder_dag.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
use std::{collections::HashMap, fmt::Debug};
use std::{
collections::{hash_map::Entry, HashMap},
fmt::Debug,
};

use daggy::{
petgraph::visit::{IntoNodeIdentifiers, IntoNodeReferences},
NodeIndex,
};
use dozer_types::node::{NodeHandle, OpIdentifier};
use dozer_types::{
log::warn,
node::{NodeHandle, OpIdentifier},
};

use crate::{
checkpoint::OptionCheckpoint,
dag_schemas::{DagHaveSchemas, DagSchemas, EdgeType},
errors::ExecutionError,
node::{Processor, Sink, Source},
node::{Processor, Sink, SinkFactory, Source},
NodeKind as DagNodeKind,
};

Expand Down Expand Up @@ -64,29 +70,118 @@ impl BuilderDag {
}
}

// Build the nodes.
let mut graph = daggy::Dag::new();
// Collect sources that may affect a node.
let mut affecting_sources = dag_schemas
.graph()
.node_identifiers()
.map(|node_index| dag_schemas.collect_ancestor_sources(node_index))
.collect::<Vec<_>>();

// Prepare nodes and edges for consuming.
let (nodes, edges) = dag_schemas.into_graph().into_graph().into_nodes_edges();
for (node_index, node) in nodes.into_iter().enumerate() {
let mut nodes = nodes
.into_iter()
.map(|node| Some(node.weight))
.collect::<Vec<_>>();

// Build the sinks and load checkpoint.
let mut graph = daggy::Dag::new();
let mut source_states = HashMap::new();
let mut source_op_ids = HashMap::new();
let mut source_id_to_sinks = HashMap::<NodeHandle, Vec<NodeIndex>>::new();
let mut node_index_map: HashMap<NodeIndex, NodeIndex> = HashMap::new();
for (node_index, node) in nodes.iter_mut().enumerate() {
if let Some((handle, sink)) = take_sink(node) {
let sources = std::mem::take(&mut affecting_sources[node_index]);
if sources.len() > 1 {
warn!("Multiple sources ({sources:?}) connected to same sink: {handle}");
}
let source = sources.into_iter().next().expect("sink must have a source");

let node_index = NodeIndex::new(node_index);
let mut sink = sink
.build(
input_schemas
.remove(&node_index)
.expect("we collected all input schemas"),
)
.map_err(ExecutionError::Factory)?;

let state = sink.get_source_state().map_err(ExecutionError::Sink)?;
if let Some(state) = state {
match source_states.entry(handle.clone()) {
Entry::Occupied(entry) => {
if entry.get() != &state {
return Err(ExecutionError::SourceStateConflict(handle));
}
}
Entry::Vacant(entry) => {
entry.insert(state);
}
}
}

let op_id = sink.get_latest_op_id().map_err(ExecutionError::Sink)?;
if let Some(op_id) = op_id {
match source_op_ids.entry(handle.clone()) {
Entry::Occupied(mut entry) => {
*entry.get_mut() = op_id.min(*entry.get());
}
Entry::Vacant(entry) => {
entry.insert(op_id);
}
}
}

let new_node_index = graph.add_node(NodeType {
handle,
kind: NodeKind::Sink(sink),
});
node_index_map.insert(node_index, new_node_index);
source_id_to_sinks
.entry(source)
.or_default()
.push(new_node_index);
}
}

// Build sources, processors, and collect source states.
for (node_index, node) in nodes.iter_mut().enumerate() {
let Some(node) = node.take() else {
continue;
};
let node_index = NodeIndex::new(node_index);
let node = node.weight;
let node = match node.kind {
DagNodeKind::Source(source) => {
let source_state = checkpoint.get_source_state(&node.handle)?;
let source = source
.build(
output_schemas
.remove(&node_index)
.expect("we collected all output schemas"),
source_state.map(|state| state.0.to_vec()),
source_states.remove(&node.handle),
)
.map_err(ExecutionError::Factory)?;

// Write state to relevant sink.
let state = source
.serialize_state()
.await
.map_err(ExecutionError::Source)?;
for sink in source_id_to_sinks.remove(&node.handle).unwrap_or_default() {
let sink = &mut graph[sink];
let NodeKind::Sink(sink) = &mut sink.kind else {
unreachable!()
};
sink.set_source_state(&state)
.map_err(ExecutionError::Sink)?;
}

let last_checkpoint = source_op_ids.remove(&node.handle);
NodeType {
handle: node.handle,
kind: NodeKind::Source {
source,
last_checkpoint: source_state.map(|state| state.1),
last_checkpoint,
},
}
}
Expand All @@ -111,27 +206,20 @@ impl BuilderDag {
kind: NodeKind::Processor(processor),
}
}
DagNodeKind::Sink(sink) => {
let sink = sink
.build(
input_schemas
.remove(&node_index)
.expect("we collected all input schemas"),
)
.map_err(ExecutionError::Factory)?;
NodeType {
handle: node.handle,
kind: NodeKind::Sink(sink),
}
}
DagNodeKind::Sink(_) => unreachable!(),
};
graph.add_node(node);
let new_node_index = graph.add_node(node);
node_index_map.insert(node_index, new_node_index);
}

// Connect the edges.
for edge in edges {
graph
.add_edge(edge.source(), edge.target(), edge.weight)
.add_edge(
node_index_map[&edge.source()],
node_index_map[&edge.target()],
edge.weight,
)
.expect("we know there's no loop");
}

Expand All @@ -146,3 +234,13 @@ impl BuilderDag {
self.graph
}
}

fn take_sink(node: &mut Option<super::NodeType>) -> Option<(NodeHandle, Box<dyn SinkFactory>)> {
let super::NodeType { handle, kind } = node.take()?;
if let super::NodeKind::Sink(sink) = kind {
Some((handle, sink))
} else {
*node = Some(super::NodeType { handle, kind });
None
}
}
4 changes: 2 additions & 2 deletions dozer-core/src/channels.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::node::PortHandle;
use dozer_types::types::Operation;
use dozer_types::types::OperationWithId;

pub trait ProcessorChannelForwarder {
/// Sends a operation to downstream nodes. Panics if the operation cannot be sent.
///
/// We must panic instead of returning an error because this method will be called by `Processor::process`,
/// which only returns recoverable errors.
fn send(&mut self, op: Operation, port: PortHandle);
fn send(&mut self, op: OperationWithId, port: PortHandle);
}

0 comments on commit b279c89

Please sign in to comment.