Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Make source state source wide instead of per table #2344

Merged
merged 5 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dozer-cli/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl<'a> PipelineBuilder<'a> {

let mut connector_map = HashMap::new();
for connection in self.connections {
let connector = get_connector(runtime.clone(), connection.clone())
let connector = get_connector(runtime.clone(), connection.clone(), None)
.map_err(|e| ConnectorSourceFactoryError::Connector(e.into()))?;

if let Some(info_table) = get_connector_info_table(connection) {
Expand Down
59 changes: 26 additions & 33 deletions dozer-cli/src/pipeline/connector_source.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use dozer_api::shutdown::ShutdownReceiver;
use dozer_core::node::{
OutputPortDef, OutputPortType, PortHandle, Source, SourceFactory, SourceState,
};
use dozer_core::node::{OutputPortDef, OutputPortType, PortHandle, Source, SourceFactory};
use dozer_ingestion::{
get_connector, CdcType, Connector, IngestionIterator, TableIdentifier, TableInfo, TableToIngest,
get_connector, CdcType, Connector, IngestionIterator, TableIdentifier, TableInfo,
};
use dozer_ingestion::{IngestionConfig, Ingestor};

Expand All @@ -12,7 +10,7 @@ use dozer_types::errors::internal::BoxedError;
use dozer_types::log::{error, info};
use dozer_types::models::connection::Connection;
use dozer_types::models::ingestion_types::IngestionMessage;
use dozer_types::parking_lot::Mutex;
use dozer_types::node::OpIdentifier;
use dozer_types::thiserror::{self, Error};
use dozer_types::tracing::{span, Level};
use dozer_types::types::{Operation, Schema, SourceDefinition};
Expand Down Expand Up @@ -46,10 +44,9 @@ pub enum ConnectorSourceFactoryError {

#[derive(Debug)]
pub struct ConnectorSourceFactory {
connection_name: String,
connection: Connection,
runtime: Arc<Runtime>,
tables: Vec<Table>,
/// Will be moved to `ConnectorSource` in `build`.
connector: Mutex<Option<Box<dyn Connector>>>,
labels: LabelsAndProgress,
shutdown: ShutdownReceiver,
}
Expand All @@ -66,9 +63,7 @@ impl ConnectorSourceFactory {
labels: LabelsAndProgress,
shutdown: ShutdownReceiver,
) -> Result<Self, ConnectorSourceFactoryError> {
let connection_name = connection.name.clone();

let connector = get_connector(runtime.clone(), connection)
let connector = get_connector(runtime.clone(), connection.clone(), None)
.map_err(|e| ConnectorSourceFactoryError::Connector(e.into()))?;

// Fill column names if not provided.
Expand Down Expand Up @@ -116,9 +111,9 @@ impl ConnectorSourceFactory {
}

Ok(Self {
connection_name,
connection,
runtime,
tables,
connector: Mutex::new(Some(connector)),
labels,
shutdown,
})
Expand All @@ -138,7 +133,7 @@ impl SourceFactory for ConnectorSourceFactory {
// Add source information to the schema.
for field in &mut schema.fields {
field.source = SourceDefinition::Table {
connection: self.connection_name.clone(),
connection: self.connection.name.clone(),
name: table_name.clone(),
};
}
Expand Down Expand Up @@ -174,35 +169,27 @@ impl SourceFactory for ConnectorSourceFactory {
fn build(
&self,
_output_schemas: HashMap<PortHandle, Schema>,
mut last_checkpoint: SourceState,
state: Option<Vec<u8>>,
) -> Result<Box<dyn Source>, BoxedError> {
// Construct the tables to ingest.
// Construct table info.
let tables = self
.tables
.iter()
.map(|table| {
let state = last_checkpoint.remove(&table.port).flatten();
TableToIngest {
schema: table.schema_name.clone(),
name: table.name.clone(),
column_names: table.columns.clone(),
state,
}
.map(|table| TableInfo {
schema: table.schema_name.clone(),
name: table.name.clone(),
column_names: table.columns.clone(),
})
.collect();
let ports = self.tables.iter().map(|table| table.port).collect();

let connector = self
.connector
.lock()
.take()
.expect("ConnectorSource was already built");
let connector = get_connector(self.runtime.clone(), self.connection.clone(), state)?;

Ok(Box::new(ConnectorSource {
tables,
ports,
connector,
connection_name: self.connection_name.clone(),
connection_name: self.connection.name.clone(),
labels: self.labels.clone(),
shutdown: self.shutdown.clone(),
ingestion_config: IngestionConfig::default(),
Expand All @@ -212,7 +199,7 @@ impl SourceFactory for ConnectorSourceFactory {

#[derive(Debug)]
pub struct ConnectorSource {
tables: Vec<TableToIngest>,
tables: Vec<TableInfo>,
ports: Vec<PortHandle>,
connector: Box<dyn Connector>,
connection_name: String,
Expand All @@ -223,9 +210,14 @@ pub struct ConnectorSource {

#[async_trait]
impl Source for ConnectorSource {
async fn serialize_state(&self) -> Result<Vec<u8>, BoxedError> {
self.connector.serialize_state().await
}

async fn start(
&self,
sender: Sender<(PortHandle, IngestionMessage)>,
last_checkpoint: Option<OpIdentifier>,
) -> Result<(), BoxedError> {
let (ingestor, iterator) = Ingestor::initialize_channel(self.ingestion_config.clone());
let connection_name = self.connection_name.clone();
Expand Down Expand Up @@ -254,7 +246,8 @@ impl Source for ConnectorSource {
eprintln!("Aborted connector {}", name);
});
let result = Abortable::new(
self.connector.start(&ingestor, self.tables.clone()),
self.connector
.start(&ingestor, self.tables.clone(), last_checkpoint),
abort_registration,
)
.await;
Expand All @@ -281,7 +274,7 @@ async fn forward_message_to_pipeline(
mut iterator: IngestionIterator,
sender: Sender<(PortHandle, IngestionMessage)>,
connection_name: String,
tables: Vec<TableToIngest>,
tables: Vec<TableInfo>,
ports: Vec<PortHandle>,
labels: LabelsAndProgress,
) {
Expand Down
3 changes: 2 additions & 1 deletion dozer-cli/src/simple/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,8 @@ impl SimpleOrchestrator {
) -> Result<HashMap<String, (Vec<TableInfo>, Vec<SourceSchema>)>, OrchestrationError> {
let mut schema_map = HashMap::new();
for connection in &self.config.connections {
let connector = get_connector(self.runtime.clone(), connection.clone())
// We're not really going to start ingestion, so passing `None` as state here is OK.
let connector = get_connector(self.runtime.clone(), connection.clone(), None)
.map_err(|e| ConnectorSourceFactoryError::Connector(e.into()))?;
let schema_tuples = connector
.list_all_schemas()
Expand Down
29 changes: 11 additions & 18 deletions dozer-core/src/builder_dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use daggy::{
petgraph::visit::{IntoNodeIdentifiers, IntoNodeReferences},
NodeIndex,
};
use dozer_types::node::NodeHandle;
use dozer_types::node::{NodeHandle, OpIdentifier};

use crate::{
checkpoint::OptionCheckpoint,
Expand All @@ -26,7 +26,10 @@ pub struct NodeType {
#[derive(Debug)]
/// Node kind, source, processor or sink. Source has a checkpoint to start from.
pub enum NodeKind {
Source(Box<dyn Source>),
Source {
source: Box<dyn Source>,
last_checkpoint: Option<OpIdentifier>,
},
Processor(Box<dyn Processor>),
Sink(Box<dyn Sink>),
}
Expand Down Expand Up @@ -69,32 +72,22 @@ impl BuilderDag {
let node = node.weight;
let node = match node.kind {
DagNodeKind::Source(source) => {
let mut last_checkpoint_by_name = checkpoint.get_source_state(&node.handle)?;
let mut last_checkpoint = HashMap::new();
for port_def in source.get_output_ports() {
let port_name = source.get_output_port_name(&port_def.handle);
last_checkpoint.insert(
port_def.handle,
last_checkpoint_by_name
.as_mut()
.and_then(|last_checkpoint| {
last_checkpoint.remove(&port_name).flatten().cloned()
}),
);
}

let source_state = checkpoint.get_source_state(&node.handle)?;
let source = source
.build(
output_schemas
.remove(&node_index)
.expect("we collected all output schemas"),
last_checkpoint,
source_state.map(|state| state.0.to_vec()),
)
.map_err(ExecutionError::Factory)?;

NodeType {
handle: node.handle,
kind: NodeKind::Source(source),
kind: NodeKind::Source {
source,
last_checkpoint: source_state.map(|state| state.1),
},
}
}
DagNodeKind::Processor(processor) => {
Expand Down
30 changes: 11 additions & 19 deletions dozer-core/src/checkpoint/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, ops::Deref, sync::Arc};
use std::{ops::Deref, sync::Arc};

use dozer_log::{
camino::Utf8Path,
Expand All @@ -12,7 +12,7 @@ use dozer_types::{
bincode,
log::{error, info},
models::app_config::{DataStorage, RecordStore},
node::{NodeHandle, RestartableState, SourceStates, TableState},
node::{NodeHandle, OpIdentifier, SourceState, SourceStates},
parking_lot::Mutex,
tonic::codegen::tokio_stream::StreamExt,
types::Field,
Expand Down Expand Up @@ -114,29 +114,21 @@ impl OptionCheckpoint {
pub fn get_source_state(
&self,
node_handle: &NodeHandle,
) -> Result<Option<HashMap<String, Option<&RestartableState>>>, ExecutionError> {
) -> Result<Option<(&[u8], OpIdentifier)>, ExecutionError> {
let Some(checkpoint) = self.checkpoint.as_ref() else {
return Ok(None);
};
let Some(source_state) = checkpoint.source_states.get(node_handle) else {
let Some(state) = checkpoint.source_states.get(node_handle) else {
return Ok(None);
};

let mut result = HashMap::new();
for (table_name, state) in source_state {
let state = match state {
TableState::NotStarted => None,
TableState::NonRestartable => {
return Err(ExecutionError::SourceCannotRestart {
source_name: node_handle.clone(),
table_name: table_name.clone(),
});
}
TableState::Restartable(state) => Some(state),
};
result.insert(table_name.clone(), state);
match state {
SourceState::NotStarted => Ok(None),
SourceState::NonRestartable => {
Err(ExecutionError::SourceCannotRestart(node_handle.clone()))
}
SourceState::Restartable { state, checkpoint } => Ok(Some((state, *checkpoint))),
}
Ok(Some(result))
}

pub async fn load_processor_data(
Expand Down Expand Up @@ -374,7 +366,7 @@ pub async fn create_checkpoint_factory_for_test(
let epoch_id = 42;
let source_states: SourceStates = [(
NodeHandle::new(Some(1), "id".to_string()),
Default::default(),
SourceState::NotStarted,
)]
.into_iter()
.collect();
Expand Down
19 changes: 8 additions & 11 deletions dozer-core/src/epoch/manager.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use dozer_recordstore::ProcessorRecordStore;
use dozer_types::log::info;
use dozer_types::node::{NodeHandle, SourceStates, TableState};
use dozer_types::node::{NodeHandle, SourceState, SourceStates};
use dozer_types::parking_lot::Mutex;
use std::collections::HashMap;
use std::ops::DerefMut;
use std::sync::{Arc, Barrier};
use std::thread::sleep;
Expand Down Expand Up @@ -162,7 +161,7 @@ impl EpochManager {
/// - `request_commit`: Whether the source wants to commit. The `EpochManager` checks if any source wants to commit and returns `Some` if so.
pub fn wait_for_epoch_close(
&self,
source_state: (NodeHandle, HashMap<String, TableState>),
source_state: (NodeHandle, SourceState),
request_termination: bool,
request_commit: bool,
) -> ClosedEpoch {
Expand Down Expand Up @@ -300,11 +299,9 @@ impl EpochManager {
}

fn is_restartable(source_states: &SourceStates) -> bool {
source_states.values().all(|table_states| {
table_states
.values()
.all(|table_state| table_state != &TableState::NonRestartable)
})
source_states
.values()
.all(|source_state| source_state != &SourceState::NonRestartable)
}

#[cfg(test)]
Expand Down Expand Up @@ -336,7 +333,7 @@ mod tests {
epoch_manager: &EpochManager,
termination_gen: &(impl Fn(u16) -> bool + Sync),
commit_gen: &(impl Fn(u16) -> bool + Sync),
source_state_gen: &(impl Fn(u16) -> (NodeHandle, HashMap<String, TableState>) + Sync),
source_state_gen: &(impl Fn(u16) -> (NodeHandle, SourceState) + Sync),
) -> ClosedEpoch {
scope(|scope| {
let handles = (0..NUM_THREADS)
Expand Down Expand Up @@ -368,10 +365,10 @@ mod tests {
})
}

fn generate_source_state(index: u16) -> (NodeHandle, HashMap<String, TableState>) {
fn generate_source_state(index: u16) -> (NodeHandle, SourceState) {
(
NodeHandle::new(Some(index), index.to_string()),
Default::default(),
SourceState::NotStarted,
)
}

Expand Down
7 changes: 2 additions & 5 deletions dozer-core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,8 @@ pub enum ExecutionError {
CheckpointedLogReader(#[from] CheckpointedLogReaderError),
#[error("Cannot deserialize checkpoint: {0}")]
CorruptedCheckpoint(#[source] bincode::error::DecodeError),
#[error("Table {table_name} of source {source_name} cannot restart. You have to clean data from previous runs by running `dozer clean`")]
SourceCannotRestart {
source_name: NodeHandle,
table_name: String,
},
#[error("Source {0} cannot restart. You have to clean data from previous runs by running `dozer clean`")]
SourceCannotRestart(NodeHandle),
#[error("Failed to create checkpoint: {0}")]
FailedToCreateCheckpoint(BoxedError),
#[error("Failed to serialize record writer: {0}")]
Expand Down
2 changes: 1 addition & 1 deletion dozer-core/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl DagExecutor {
running.clone(),
runtime.clone(),
)
.await;
.await?;
join_handles.push(start_source(source_node)?);
}
NodeKind::Processor(_) => {
Expand Down
Loading
Loading