Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement nested queries and CTE. #680

Merged
merged 22 commits into from
Jan 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions dozer-core/src/dag/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ pub enum ExecutionError {
"Channel returned empty message in processor. Might be an issue with the sender: {0}, {1}"
)]
ProcessorReceiverError(usize, #[source] BoxedError),

#[error(transparent)]
JoinError(JoinError),

#[error(transparent)]
SourceError(SourceError),
}

#[derive(Error, Debug)]
Expand All @@ -118,3 +124,21 @@ pub enum SinkError {
#[error("Failed to initialize schema in Sink: {0}")]
CacheCountFailed(#[source] BoxedError),
}

#[derive(Error, Debug)]
pub enum JoinError {
#[error("Failed to find table in Join during Insert: {0}")]
InsertPortError(PortHandle),
#[error("Failed to find table in Join during Delete: {0}")]
DeletePortError(PortHandle),
#[error("Failed to find table in Join during Update: {0}")]
UpdatePortError(PortHandle),
#[error("Join ports are not properly initialized")]
PortNotConnected(PortHandle),
}

#[derive(Error, Debug)]
pub enum SourceError {
#[error("Failed to find table in Source: {0:?}")]
PortError(String),
}
4 changes: 3 additions & 1 deletion dozer-core/src/dag/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,9 @@ impl<'a> DagExecutor<'a> {
epoch_manager: Arc<EpochManager>,
start_barrier: Arc<Barrier>,
) -> Result<JoinHandle<()>, ExecutionError> {
let (sender, receiver) = bounded(self.options.channel_buffer_sz);
// let (sender, receiver) = bounded(self.options.channel_buffer_sz);
let (sender, receiver) = bounded(1);

let start_seq = *self
.consistency_metadata
.get(&handle)
Expand Down
8 changes: 7 additions & 1 deletion dozer-core/src/dag/executor_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use dozer_types::types::{Operation, Schema};
use std::collections::HashMap;
use std::path::Path;

use super::dag::NodeType;

pub(crate) struct StorageMetadata {
pub env: LmdbEnvironmentManager,
pub meta_db: Database,
Expand Down Expand Up @@ -96,7 +98,11 @@ pub(crate) fn index_edges(
receivers.insert(edge.to.node.clone(), HashMap::new());
}

let (tx, rx) = bounded(channel_buf_sz);
// let (tx, rx) = bounded(channel_buf_sz);
let (tx, rx) = match dag.nodes.get(&edge.from.node).unwrap() {
NodeType::Source(_) => bounded(1),
_ => bounded(channel_buf_sz),
};

let rcv_port: PortHandle = edge.to.port;
if receivers
Expand Down
12 changes: 7 additions & 5 deletions dozer-orchestrator/src/pipeline/connector_source.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use dozer_core::dag::channels::SourceChannelForwarder;
use dozer_core::dag::errors::ExecutionError;
use dozer_core::dag::errors::ExecutionError::ReplicationTypeNotFound;
use dozer_core::dag::errors::{ExecutionError, SourceError};
use dozer_core::dag::node::{OutputPortDef, OutputPortType, PortHandle, Source, SourceFactory};
use dozer_ingestion::connectors::{get_connector, TableInfo};
use dozer_ingestion::errors::ConnectorError;
Expand Down Expand Up @@ -209,10 +209,12 @@ impl Source for ConnectorSource {
Operation::Update { old: _, new } => new.schema_id.to_owned(),
};
let schema_id = get_schema_id(identifier.as_ref())?;
let port = self
.schema_port_map
.get(&schema_id)
.map_or(Err(ExecutionError::PortNotFound(schema_id.to_string())), Ok)?;
let port = self.schema_port_map.get(&schema_id).map_or(
Err(ExecutionError::SourceError(SourceError::PortError(
schema_id.to_string(),
))),
Ok,
)?;
fw.send(lsn, seq_no, op.operation.to_owned(), port.to_owned())?
}
}
Expand Down
25 changes: 14 additions & 11 deletions dozer-orchestrator/src/simple/executor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use dozer_api::grpc::internal_grpc::PipelineResponse;
use dozer_core::dag::app::{App, AppPipeline};
use dozer_sql::pipeline::builder::{self, statement_to_pipeline};
use dozer_types::indicatif::MultiProgress;
use dozer_types::types::{Operation, SchemaWithChangesType};
use std::collections::HashMap;
Expand All @@ -17,7 +18,6 @@ use dozer_ingestion::connectors::{get_connector, get_connector_info_table, Table

use dozer_ingestion::ingestion::{IngestionIterator, Ingestor};

use dozer_sql::pipeline::builder::PipelineBuilder;
use dozer_types::crossbeam;
use dozer_types::log::{error, info};
use dozer_types::models::api_security::ApiSecurity;
Expand Down Expand Up @@ -163,17 +163,16 @@ impl Executor {
) -> Result<dozer_core::dag::dag::Dag, OrchestrationError> {
let grouped_connections = self.get_connection_groups();

let mut pipeline = PipelineBuilder {}
.build_pipeline(&sql)
.map_err(OrchestrationError::PipelineError)?;
let (mut pipeline, (query_name, query_port)) =
statement_to_pipeline(&sql).map_err(OrchestrationError::PipelineError)?;
pipeline.add_sink(
Arc::new(StreamingSinkFactory::new(sender)),
"streaming_sink",
);
pipeline
.connect_nodes(
"aggregation",
Some(DEFAULT_PORT_HANDLE),
&query_name,
Some(query_port),
"streaming_sink",
Some(DEFAULT_PORT_HANDLE),
)
Expand Down Expand Up @@ -222,9 +221,13 @@ impl Executor {
let _api_endpoint_name = api_endpoint.name.clone();
let cache = cache_endpoint.cache;

let mut pipeline = PipelineBuilder {}
.build_pipeline(&api_endpoint.sql)
.map_err(OrchestrationError::PipelineError)?;
// let mut pipeline = PipelineBuilder {}
// .build_pipeline(&api_endpoint.sql)
// .map_err(OrchestrationError::PipelineError)?;

let (mut pipeline, (query_name, query_port)) =
builder::statement_to_pipeline(&api_endpoint.sql)
.map_err(OrchestrationError::PipelineError)?;

pipeline.add_sink(
Arc::new(CacheSinkFactory::new(
Expand All @@ -241,8 +244,8 @@ impl Executor {

pipeline
.connect_nodes(
"aggregation",
Some(DEFAULT_PORT_HANDLE),
&query_name,
Some(query_port),
cache_endpoint.endpoint.name.as_str(),
Some(DEFAULT_PORT_HANDLE),
)
Expand Down
5 changes: 2 additions & 3 deletions dozer-orchestrator/src/simple/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use dozer_core::dag::dag_schemas::DagSchemaManager;
use dozer_core::dag::errors::ExecutionError::InternalError;
use dozer_ingestion::ingestion::IngestionConfig;
use dozer_ingestion::ingestion::Ingestor;
use dozer_sql::pipeline::builder::PipelineBuilder;
use dozer_sql::pipeline::builder::statement_to_pipeline;
use dozer_types::crossbeam::channel::{self, unbounded, Sender};
use dozer_types::log::{info, warn};
use dozer_types::models::api_config::ApiConfig;
Expand Down Expand Up @@ -402,8 +402,7 @@ impl SimpleOrchestrator {
pub fn validate_endpoints(endpoints: &Vec<ApiEndpoint>) -> Result<(), OrchestrationError> {
let mut is_all_valid = true;
for endpoint in endpoints {
let builder = PipelineBuilder {};
builder.build_pipeline(&endpoint.sql).map_or_else(
statement_to_pipeline(&endpoint.sql).map_or_else(
|e| {
is_all_valid = false;
error!(
Expand Down
2 changes: 1 addition & 1 deletion dozer-sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ dyn-clone = "1.0.10"
like = "0.3.1"
lmdb-rkv = "0.14.0"
lmdb-rkv-sys = "0.11.2"

uuid = {version = "1.1.2", features = ["v1", "v4", "fast-rng"]}
dozer-types = {path = "../dozer-types"}
dozer-core = {path = "../dozer-core"}
dozer-tracing = {path = "../dozer-tracing"}
Expand Down
27 changes: 21 additions & 6 deletions dozer-sql/src/pipeline/aggregation/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,17 @@ use super::{
pub struct AggregationProcessorFactory {
select: Vec<SelectItem>,
groupby: Vec<SqlExpr>,
stateful: bool,
}

impl AggregationProcessorFactory {
/// Creates a new [`AggregationProcessorFactory`].
pub fn new(select: Vec<SelectItem>, groupby: Vec<SqlExpr>) -> Self {
Self { select, groupby }
pub fn new(select: Vec<SelectItem>, groupby: Vec<SqlExpr>, stateful: bool) -> Self {
Self {
select,
groupby,
stateful,
}
}
}

Expand All @@ -42,10 +47,20 @@ impl ProcessorFactory for AggregationProcessorFactory {
}

fn get_output_ports(&self) -> Vec<OutputPortDef> {
vec![OutputPortDef::new(
DEFAULT_PORT_HANDLE,
OutputPortType::Stateless,
)]
if self.stateful {
vec![OutputPortDef::new(
DEFAULT_PORT_HANDLE,
OutputPortType::StatefulWithPrimaryKeyLookup {
retr_old_records_for_deletes: true,
retr_old_records_for_updates: true,
},
)]
} else {
vec![OutputPortDef::new(
DEFAULT_PORT_HANDLE,
OutputPortType::Stateless,
)]
}
}

fn get_output_schema(
Expand Down
4 changes: 2 additions & 2 deletions dozer-sql/src/pipeline/aggregation/tests/aggregation_null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::pipeline::aggregation::factory::AggregationProcessorFactory;
use crate::pipeline::aggregation::tests::aggregation_tests_utils::{
init_input_schema, init_processor, FIELD_100_INT,
};
use crate::pipeline::builder::get_select;
use crate::pipeline::tests::utils::get_select;
use dozer_core::dag::dag::DEFAULT_PORT_HANDLE;
use dozer_core::dag::node::ProcessorFactory;
use dozer_types::types::FieldType::Int;
Expand Down Expand Up @@ -61,7 +61,7 @@ fn test_aggregation_alias() {

let select = get_select("SELECT ID, SUM(Salary) as Salaries FROM Users GROUP BY ID").unwrap();

let factory = AggregationProcessorFactory::new(select.projection, select.group_by);
let factory = AggregationProcessorFactory::new(select.projection, select.group_by, false);
let out_schema = factory
.get_output_schema(
&DEFAULT_PORT_HANDLE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use std::collections::HashMap;

use crate::pipeline::{
aggregation::{factory::get_aggregation_rules, processor::AggregationProcessor},
builder::get_select,
errors::PipelineError,
tests::utils::get_select,
};

use dozer_types::chrono::{DateTime, NaiveDate, TimeZone, Utc};
Expand Down
Loading