Skip to content

Commit

Permalink
Merge 535ae51 into 5d27a9b
Browse files Browse the repository at this point in the history
  • Loading branch information
mediuminvader committed Jan 20, 2023
2 parents 5d27a9b + 535ae51 commit 19bce71
Show file tree
Hide file tree
Showing 26 changed files with 637 additions and 428 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions dozer-core/src/dag/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ pub enum ExecutionError {
"Channel returned empty message in processor. Might be an issue with the sender: {0}, {1}"
)]
ProcessorReceiverError(usize, #[source] BoxedError),

#[error(transparent)]
JoinError(JoinError),

#[error(transparent)]
SourceError(SourceError),
}

#[derive(Error, Debug)]
Expand All @@ -118,3 +124,21 @@ pub enum SinkError {
#[error("Failed to initialize schema in Sink: {0}")]
CacheCountFailed(#[source] BoxedError),
}

#[derive(Error, Debug)]
pub enum JoinError {
#[error("Failed to find table in Join during Insert: {0}")]
InsertPortError(PortHandle),
#[error("Failed to find table in Join during Delete: {0}")]
DeletePortError(PortHandle),
#[error("Failed to find table in Join during Update: {0}")]
UpdatePortError(PortHandle),
#[error("Join ports are not properly initialized")]
PortNotConnected(PortHandle),
}

#[derive(Error, Debug)]
pub enum SourceError {
#[error("Failed to find table in Source: {0:?}")]
PortError(String),
}
4 changes: 3 additions & 1 deletion dozer-core/src/dag/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,9 @@ impl<'a> DagExecutor<'a> {
epoch_manager: Arc<EpochManager>,
start_barrier: Arc<Barrier>,
) -> Result<JoinHandle<()>, ExecutionError> {
let (sender, receiver) = bounded(self.options.channel_buffer_sz);
// let (sender, receiver) = bounded(self.options.channel_buffer_sz);
let (sender, receiver) = bounded(1);

let start_seq = *self
.consistency_metadata
.get(&handle)
Expand Down
8 changes: 7 additions & 1 deletion dozer-core/src/dag/executor_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use dozer_types::types::{Operation, Schema};
use std::collections::HashMap;
use std::path::Path;

use super::dag::NodeType;

pub(crate) struct StorageMetadata {
pub env: LmdbEnvironmentManager,
pub meta_db: Database,
Expand Down Expand Up @@ -96,7 +98,11 @@ pub(crate) fn index_edges(
receivers.insert(edge.to.node.clone(), HashMap::new());
}

let (tx, rx) = bounded(channel_buf_sz);
// let (tx, rx) = bounded(channel_buf_sz);
let (tx, rx) = match dag.nodes.get(&edge.from.node).unwrap() {
NodeType::Source(_) => bounded(1),
_ => bounded(channel_buf_sz),
};

let rcv_port: PortHandle = edge.to.port;
if receivers
Expand Down
12 changes: 7 additions & 5 deletions dozer-orchestrator/src/pipeline/connector_source.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use dozer_core::dag::channels::SourceChannelForwarder;
use dozer_core::dag::errors::ExecutionError;
use dozer_core::dag::errors::ExecutionError::ReplicationTypeNotFound;
use dozer_core::dag::errors::{ExecutionError, SourceError};
use dozer_core::dag::node::{OutputPortDef, OutputPortType, PortHandle, Source, SourceFactory};
use dozer_ingestion::connectors::{get_connector, TableInfo};
use dozer_ingestion::errors::ConnectorError;
Expand Down Expand Up @@ -209,10 +209,12 @@ impl Source for ConnectorSource {
Operation::Update { old: _, new } => new.schema_id.to_owned(),
};
let schema_id = get_schema_id(identifier.as_ref())?;
let port = self
.schema_port_map
.get(&schema_id)
.map_or(Err(ExecutionError::PortNotFound(schema_id.to_string())), Ok)?;
let port = self.schema_port_map.get(&schema_id).map_or(
Err(ExecutionError::SourceError(SourceError::PortError(
schema_id.to_string(),
))),
Ok,
)?;
fw.send(lsn, seq_no, op.operation.to_owned(), port.to_owned())?
}
}
Expand Down
25 changes: 14 additions & 11 deletions dozer-orchestrator/src/simple/executor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use dozer_api::grpc::internal_grpc::PipelineResponse;
use dozer_core::dag::app::{App, AppPipeline};
use dozer_sql::pipeline::builder::{self, statement_to_pipeline};
use dozer_types::indicatif::MultiProgress;
use dozer_types::types::{Operation, SchemaWithChangesType};
use std::collections::HashMap;
Expand All @@ -17,7 +18,6 @@ use dozer_ingestion::connectors::{get_connector, get_connector_info_table, Table

use dozer_ingestion::ingestion::{IngestionIterator, Ingestor};

use dozer_sql::pipeline::builder::PipelineBuilder;
use dozer_types::crossbeam;
use dozer_types::log::{error, info};
use dozer_types::models::api_security::ApiSecurity;
Expand Down Expand Up @@ -163,17 +163,16 @@ impl Executor {
) -> Result<dozer_core::dag::dag::Dag, OrchestrationError> {
let grouped_connections = self.get_connection_groups();

let mut pipeline = PipelineBuilder {}
.build_pipeline(&sql)
.map_err(OrchestrationError::PipelineError)?;
let (mut pipeline, (query_name, query_port)) =
statement_to_pipeline(&sql).map_err(OrchestrationError::PipelineError)?;
pipeline.add_sink(
Arc::new(StreamingSinkFactory::new(sender)),
"streaming_sink",
);
pipeline
.connect_nodes(
"aggregation",
Some(DEFAULT_PORT_HANDLE),
&query_name,
Some(query_port),
"streaming_sink",
Some(DEFAULT_PORT_HANDLE),
)
Expand Down Expand Up @@ -222,9 +221,13 @@ impl Executor {
let _api_endpoint_name = api_endpoint.name.clone();
let cache = cache_endpoint.cache;

let mut pipeline = PipelineBuilder {}
.build_pipeline(&api_endpoint.sql)
.map_err(OrchestrationError::PipelineError)?;
// let mut pipeline = PipelineBuilder {}
// .build_pipeline(&api_endpoint.sql)
// .map_err(OrchestrationError::PipelineError)?;

let (mut pipeline, (query_name, query_port)) =
builder::statement_to_pipeline(&api_endpoint.sql)
.map_err(OrchestrationError::PipelineError)?;

pipeline.add_sink(
Arc::new(CacheSinkFactory::new(
Expand All @@ -241,8 +244,8 @@ impl Executor {

pipeline
.connect_nodes(
"aggregation",
Some(DEFAULT_PORT_HANDLE),
&query_name,
Some(query_port),
cache_endpoint.endpoint.name.as_str(),
Some(DEFAULT_PORT_HANDLE),
)
Expand Down
5 changes: 2 additions & 3 deletions dozer-orchestrator/src/simple/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use dozer_core::dag::dag_schemas::DagSchemaManager;
use dozer_core::dag::errors::ExecutionError::InternalError;
use dozer_ingestion::ingestion::IngestionConfig;
use dozer_ingestion::ingestion::Ingestor;
use dozer_sql::pipeline::builder::PipelineBuilder;
use dozer_sql::pipeline::builder::statement_to_pipeline;
use dozer_types::crossbeam::channel::{self, unbounded, Sender};
use dozer_types::log::{info, warn};
use dozer_types::models::api_config::ApiConfig;
Expand Down Expand Up @@ -402,8 +402,7 @@ impl SimpleOrchestrator {
pub fn validate_endpoints(endpoints: &Vec<ApiEndpoint>) -> Result<(), OrchestrationError> {
let mut is_all_valid = true;
for endpoint in endpoints {
let builder = PipelineBuilder {};
builder.build_pipeline(&endpoint.sql).map_or_else(
statement_to_pipeline(&endpoint.sql).map_or_else(
|e| {
is_all_valid = false;
error!(
Expand Down
2 changes: 1 addition & 1 deletion dozer-sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ dyn-clone = "1.0.10"
like = "0.3.1"
lmdb-rkv = "0.14.0"
lmdb-rkv-sys = "0.11.2"

uuid = {version = "1.1.2", features = ["v1", "v4", "fast-rng"]}
dozer-types = {path = "../dozer-types"}
dozer-core = {path = "../dozer-core"}
dozer-tracing = {path = "../dozer-tracing"}
Expand Down
27 changes: 21 additions & 6 deletions dozer-sql/src/pipeline/aggregation/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,17 @@ use super::{
pub struct AggregationProcessorFactory {
select: Vec<SelectItem>,
groupby: Vec<SqlExpr>,
stateful: bool,
}

impl AggregationProcessorFactory {
/// Creates a new [`AggregationProcessorFactory`].
pub fn new(select: Vec<SelectItem>, groupby: Vec<SqlExpr>) -> Self {
Self { select, groupby }
pub fn new(select: Vec<SelectItem>, groupby: Vec<SqlExpr>, stateful: bool) -> Self {
Self {
select,
groupby,
stateful,
}
}
}

Expand All @@ -42,10 +47,20 @@ impl ProcessorFactory for AggregationProcessorFactory {
}

fn get_output_ports(&self) -> Vec<OutputPortDef> {
vec![OutputPortDef::new(
DEFAULT_PORT_HANDLE,
OutputPortType::Stateless,
)]
if self.stateful {
vec![OutputPortDef::new(
DEFAULT_PORT_HANDLE,
OutputPortType::StatefulWithPrimaryKeyLookup {
retr_old_records_for_deletes: true,
retr_old_records_for_updates: true,
},
)]
} else {
vec![OutputPortDef::new(
DEFAULT_PORT_HANDLE,
OutputPortType::Stateless,
)]
}
}

fn get_output_schema(
Expand Down
4 changes: 2 additions & 2 deletions dozer-sql/src/pipeline/aggregation/tests/aggregation_null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::pipeline::aggregation::factory::AggregationProcessorFactory;
use crate::pipeline::aggregation::tests::aggregation_tests_utils::{
init_input_schema, init_processor, FIELD_100_INT,
};
use crate::pipeline::builder::get_select;
use crate::pipeline::tests::utils::get_select;
use dozer_core::dag::dag::DEFAULT_PORT_HANDLE;
use dozer_core::dag::node::ProcessorFactory;
use dozer_types::types::FieldType::Int;
Expand Down Expand Up @@ -61,7 +61,7 @@ fn test_aggregation_alias() {

let select = get_select("SELECT ID, SUM(Salary) as Salaries FROM Users GROUP BY ID").unwrap();

let factory = AggregationProcessorFactory::new(select.projection, select.group_by);
let factory = AggregationProcessorFactory::new(select.projection, select.group_by, false);
let out_schema = factory
.get_output_schema(
&DEFAULT_PORT_HANDLE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use std::collections::HashMap;

use crate::pipeline::{
aggregation::{factory::get_aggregation_rules, processor::AggregationProcessor},
builder::get_select,
errors::PipelineError,
tests::utils::get_select,
};

use dozer_types::chrono::{DateTime, NaiveDate, TimeZone, Utc};
Expand Down
Loading

0 comments on commit 19bce71

Please sign in to comment.