Skip to content

Commit

Permalink
feat: make probabilistic optimizations optional and tunable in the YA…
Browse files Browse the repository at this point in the history
…ML config

Probabilistic optimization sacrifices accuracy in order to reduce memory consumption. In certain parts of the pipeline, a Bloom Filter is used ([set_processor](https://github.com/getdozer/dozer/blob/2e3ba96c3f4bdf9a691747191ab15617564d8ca2/dozer-sql/src/pipeline/product/set/set_processor.rs#L20)), while in other parts, hash tables that store only the hash of the keys instead of the full keys are used ([aggregation_processor](https://github.com/getdozer/dozer/blob/2e3ba96c3f4bdf9a691747191ab15617564d8ca2/dozer-sql/src/pipeline/aggregation/processor.rs#L59) and [join_processor](https://github.com/getdozer/dozer/blob/2e3ba96c3f4bdf9a691747191ab15617564d8ca2/dozer-sql/src/pipeline/product/join/operator.rs#L57-L58)).

This commit makes these optimizations disabled by default and offers user-configurable flags to enable each of these optimizations separately.

This is an example of how to turn on probabilistic optimizations for each processor in the Dozer configuration.

```
flags:
  enable_probabilistic_optimizations:
    in_sets: true # enable probabilistic optimizations in set operations (UNION, EXCEPT, INTERSECT); Default: false
    in_joins: true # enable probabilistic optimizations in JOIN operations; Default: false
    in_aggregations: true # enable probabilistic optimizations in aggregations (SUM, COUNT, MIN, etc.); Default: false
```
  • Loading branch information
abcpro1 committed Aug 24, 2023
1 parent d8f230c commit 737d77e
Show file tree
Hide file tree
Showing 31 changed files with 853 additions and 193 deletions.
2 changes: 1 addition & 1 deletion dozer-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub use dozer_ingestion::{
pub use dozer_sql::pipeline::builder::QueryContext;
pub use ui_helper::config_to_ui_dag;
pub fn wrapped_statement_to_pipeline(sql: &str) -> Result<QueryContext, PipelineError> {
let mut pipeline = AppPipeline::new();
let mut pipeline = AppPipeline::new_with_default_flags();
statement_to_pipeline(sql, &mut pipeline, None)
}

Expand Down
10 changes: 8 additions & 2 deletions dozer-cli/src/live/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use dozer_types::{
models::{
api_config::{ApiConfig, AppGrpcOptions},
api_endpoint::ApiEndpoint,
flags::Flags,
},
};
use tokio::{runtime::Runtime, sync::RwLock};
Expand Down Expand Up @@ -292,6 +293,7 @@ async fn create_dag(
dozer.config.sql.as_deref(),
endpoint_and_logs,
MultiProgress::new(),
Flags::default(),
);
let (_shutdown_sender, shutdown_receiver) = shutdown::new(&dozer.runtime);
builder.build(&dozer.runtime, shutdown_receiver).await
Expand Down Expand Up @@ -324,8 +326,12 @@ fn get_dozer_run_instance(
) -> Result<SimpleOrchestrator, LiveError> {
match req.request {
Some(dozer_types::grpc_types::live::run_request::Request::Sql(req)) => {
let context = statement_to_pipeline(&req.sql, &mut AppPipeline::new(), None)
.map_err(LiveError::PipelineError)?;
let context = statement_to_pipeline(
&req.sql,
&mut AppPipeline::new(dozer.config.flags.clone().unwrap_or_default().into()),
None,
)
.map_err(LiveError::PipelineError)?;

//overwrite sql
dozer.config.sql = Some(req.sql);
Expand Down
8 changes: 6 additions & 2 deletions dozer-cli/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use dozer_types::indicatif::MultiProgress;
use dozer_types::log::debug;
use dozer_types::models::api_endpoint::ApiEndpoint;
use dozer_types::models::connection::Connection;
use dozer_types::models::flags::Flags;
use dozer_types::models::source::Source;
use dozer_types::parking_lot::Mutex;
use std::hash::Hash;
Expand Down Expand Up @@ -57,6 +58,7 @@ pub struct PipelineBuilder<'a> {
/// `ApiEndpoint` and its log.
endpoint_and_logs: Vec<(ApiEndpoint, OptionLog)>,
progress: MultiProgress,
flags: Flags,
}

impl<'a> PipelineBuilder<'a> {
Expand All @@ -66,13 +68,15 @@ impl<'a> PipelineBuilder<'a> {
sql: Option<&'a str>,
endpoint_and_logs: Vec<(ApiEndpoint, OptionLog)>,
progress: MultiProgress,
flags: Flags,
) -> Self {
Self {
connections,
sources,
sql,
endpoint_and_logs,
progress,
flags,
}
}

Expand Down Expand Up @@ -148,7 +152,7 @@ impl<'a> PipelineBuilder<'a> {
let mut original_sources = vec![];

let mut query_ctx = None;
let mut pipeline = AppPipeline::new();
let mut pipeline = AppPipeline::new((&self.flags).into());

let mut transformed_sources = vec![];

Expand Down Expand Up @@ -205,7 +209,7 @@ impl<'a> PipelineBuilder<'a> {

let mut pipelines: Vec<AppPipeline<SchemaSQLContext>> = vec![];

let mut pipeline = AppPipeline::new();
let mut pipeline = AppPipeline::new(self.flags.into());

let mut available_output_tables: HashMap<String, OutputTableInfo> = HashMap::new();

Expand Down
2 changes: 2 additions & 0 deletions dozer-cli/src/pipeline/tests/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use dozer_types::models::config::Config;

use dozer_types::indicatif::MultiProgress;
use dozer_types::models::connection::{Connection, ConnectionConfig};
use dozer_types::models::flags::Flags;
use dozer_types::models::source::Source;

fn get_default_config() -> Config {
Expand Down Expand Up @@ -66,6 +67,7 @@ fn load_multi_sources() {
.map(|endpoint| (endpoint, None))
.collect(),
MultiProgress::new(),
Flags::default(),
);

let runtime = tokio::runtime::Builder::new_current_thread()
Expand Down
2 changes: 2 additions & 0 deletions dozer-cli/src/simple/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use dozer_cache::dozer_log::replication::Log;
use dozer_core::checkpoint::{CheckpointFactory, CheckpointFactoryOptions};
use dozer_core::processor_record::ProcessorRecordStore;
use dozer_types::models::api_endpoint::ApiEndpoint;
use dozer_types::models::flags::Flags;
use dozer_types::parking_lot::Mutex;
use tokio::runtime::Runtime;

Expand Down Expand Up @@ -94,6 +95,7 @@ impl<'a> Executor<'a> {
.map(|(endpoint, log)| (endpoint.clone(), Some(log.log.clone())))
.collect(),
self.multi_pb.clone(),
Flags::default(),
);

let dag = builder.build(runtime, shutdown).await?;
Expand Down
3 changes: 2 additions & 1 deletion dozer-cli/src/simple/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ impl SimpleOrchestrator {
self.config.sql.as_deref(),
endpoint_and_logs,
self.multi_pb.clone(),
self.config.flags.clone().unwrap_or_default(),
);
let dag = self
.runtime
Expand Down Expand Up @@ -374,7 +375,7 @@ impl SimpleOrchestrator {
}

pub fn validate_sql(sql: String) -> Result<(), PipelineError> {
statement_to_pipeline(&sql, &mut AppPipeline::new(), None).map_or_else(
statement_to_pipeline(&sql, &mut AppPipeline::new_with_default_flags(), None).map_or_else(
|e| {
error!(
"[sql][{}] Transforms validation error: {}",
Expand Down
12 changes: 9 additions & 3 deletions dozer-cli/src/ui_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use dozer_core::{
use dozer_sql::pipeline::builder::{statement_to_pipeline, SchemaSQLContext};
use dozer_types::{
grpc_types::cloud::{QueryEdge, QueryGraph, QueryNode, QueryNodeType},
models::{config::Config, connection::Connection, source::Source},
models::{config::Config, connection::Connection, flags::Flags, source::Source},
};

use crate::{errors::OrchestrationError, pipeline::source_builder::SourceBuilder};
Expand Down Expand Up @@ -53,8 +53,9 @@ fn prepare_pipeline_dag(
sql: String,
connection_sources: HashMap<Connection, Vec<Source>>,
connection_source_ports: HashMap<(&str, &str), u16>,
flags: Flags,
) -> Result<Dag<SchemaSQLContext>, OrchestrationError> {
let mut pipeline = AppPipeline::new();
let mut pipeline = AppPipeline::new(flags.into());
let mut asm: AppSourceManager<dozer_sql::pipeline::builder::SchemaSQLContext> =
AppSourceManager::new();
connection_sources.iter().for_each(|cs| {
Expand Down Expand Up @@ -169,6 +170,11 @@ pub fn config_to_ui_dag(config: Config) -> Result<QueryGraph, OrchestrationError
}
let source_builder = SourceBuilder::new(connection_sources.clone(), None);
let connection_source_ports = source_builder.get_ports();
let sql_dag = prepare_pipeline_dag(sql, connection_sources, connection_source_ports)?;
let sql_dag = prepare_pipeline_dag(
sql,
connection_sources,
connection_source_ports,
config.flags.unwrap_or_default(),
)?;
Ok(transform_to_ui_graph(&sql_dag))
}
47 changes: 40 additions & 7 deletions dozer-core/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use dozer_types::models::flags::{EnableProbabilisticOptimizations, Flags};
use dozer_types::node::NodeHandle;

use crate::appsource::{self, AppSourceManager};
Expand Down Expand Up @@ -28,12 +29,7 @@ pub struct AppPipeline<T> {
processors: Vec<(NodeHandle, Box<dyn ProcessorFactory<T>>)>,
sinks: Vec<(NodeHandle, Box<dyn SinkFactory<T>>)>,
entry_points: Vec<(NodeHandle, PipelineEntryPoint)>,
}

impl<T> Default for AppPipeline<T> {
fn default() -> Self {
Self::new()
}
flags: PipelineFlags,
}

impl<T> AppPipeline<T> {
Expand Down Expand Up @@ -79,21 +75,58 @@ impl<T> AppPipeline<T> {
self.edges.push(edge);
}

pub fn new() -> Self {
pub fn new(flags: PipelineFlags) -> Self {
Self {
processors: Vec::new(),
sinks: Vec::new(),
edges: Vec::new(),
entry_points: Vec::new(),
flags,
}
}

pub fn new_with_default_flags() -> Self {
Self::new(Default::default())
}

pub fn get_entry_points_sources_names(&self) -> Vec<String> {
self.entry_points
.iter()
.map(|(_, p)| p.source_name().to_string())
.collect()
}

pub fn flags(&self) -> &PipelineFlags {
&self.flags
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PipelineFlags {
pub enable_probabilistic_optimizations: EnableProbabilisticOptimizations,
}

impl From<&Flags> for PipelineFlags {
fn from(flags: &Flags) -> Self {
Self {
enable_probabilistic_optimizations: flags
.enable_probabilistic_optimizations
.clone()
.unwrap_or_default(),
}
}
}

impl From<Flags> for PipelineFlags {
fn from(flags: Flags) -> Self {
Self::from(&flags)
}
}

impl Default for PipelineFlags {
fn default() -> Self {
Flags::default().into()
}
}

pub struct App<T> {
Expand Down
4 changes: 2 additions & 2 deletions dozer-core/src/tests/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ async fn test_app_dag() {

let mut app = App::new(asm);

let mut p1 = AppPipeline::new();
let mut p1 = AppPipeline::new_with_default_flags();
p1.add_processor(
Box::new(NoopJoinProcessorFactory {}),
"join",
Expand All @@ -197,7 +197,7 @@ async fn test_app_dag() {

app.add_pipeline(p1);

let mut p2 = AppPipeline::new();
let mut p2 = AppPipeline::new_with_default_flags();
p2.add_processor(
Box::new(NoopJoinProcessorFactory {}),
"join",
Expand Down
10 changes: 9 additions & 1 deletion dozer-sql/src/pipeline/aggregation/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@ pub struct AggregationProcessorFactory {
id: String,
projection: Select,
_stateful: bool,
enable_probabilistic_optimizations: bool,
}

impl AggregationProcessorFactory {
pub fn new(id: String, projection: Select, stateful: bool) -> Self {
pub fn new(
id: String,
projection: Select,
stateful: bool,
enable_probabilistic_optimizations: bool,
) -> Self {
Self {
id,
projection,
_stateful: stateful,
enable_probabilistic_optimizations,
}
}

Expand Down Expand Up @@ -90,6 +97,7 @@ impl ProcessorFactory<SchemaSQLContext> for AggregationProcessorFactory {
planner.having,
input_schema.clone(),
planner.post_aggregation_schema,
self.enable_probabilistic_optimizations,
)?)
};
Ok(processor)
Expand Down
Loading

0 comments on commit 737d77e

Please sign in to comment.