Skip to content

Commit

Permalink
chore: fix clippy complain too_many_arguments by introduce CacheSinkS…
Browse files Browse the repository at this point in the history
…ettings
  • Loading branch information
duonganhthu43 committed Jan 19, 2023
1 parent a25505e commit 70da95c
Show file tree
Hide file tree
Showing 11 changed files with 52 additions and 56 deletions.
2 changes: 1 addition & 1 deletion dozer-api/src/generator/protoc/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{errors::GenerationError, PipelineDetails};
use dozer_cache::cache::Cache;
use dozer_types::log::error;
use dozer_types::models::api_security::ApiSecurity;
use dozer_types::models::app_config::Flags;
use dozer_types::models::flags::Flags;
use dozer_types::serde::{self, Deserialize, Serialize};
use dozer_types::types::FieldType;
use handlebars::Handlebars;
Expand Down
3 changes: 2 additions & 1 deletion dozer-api/src/generator/protoc/tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use super::generator::ProtoGenerator;
use crate::generator::protoc::utils::{create_descriptor_set, get_proto_descriptor};
use crate::{test_utils, CacheEndpoint, PipelineDetails};
use dozer_types::models::{api_security::ApiSecurity, app_config::Flags};
use dozer_types::models::api_security::ApiSecurity;
use dozer_types::models::flags::Flags;
use prost_reflect::{MethodDescriptor, ServiceDescriptor};
use std::collections::HashMap;
use tempdir::TempDir;
Expand Down
2 changes: 1 addition & 1 deletion dozer-api/src/grpc/client_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use dozer_types::{
models::{
api_config::{ApiGrpc, ApiPipelineInternal},
api_security::ApiSecurity,
app_config::Flags,
flags::Flags,
},
types::Schema,
};
Expand Down
2 changes: 1 addition & 1 deletion dozer-orchestrator/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ pub mod connector_source;
mod sinks;
pub mod source_builder;
mod streaming_sink;
pub use sinks::{CacheSink, CacheSinkFactory};
pub use sinks::{CacheSink, CacheSinkFactory, CacheSinkSettings};
pub(crate) use streaming_sink::StreamingSinkFactory;
29 changes: 19 additions & 10 deletions dozer-orchestrator/src/pipeline/sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use dozer_types::indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use dozer_types::log::debug;
use dozer_types::models::api_endpoint::{ApiEndpoint, ApiIndex};
use dozer_types::models::api_security::ApiSecurity;
use dozer_types::models::app_config::Flags;
use dozer_types::models::flags::Flags;
use dozer_types::types::FieldType;
use dozer_types::types::{IndexDefinition, Operation, Schema, SchemaIdentifier};
use std::collections::hash_map::DefaultHasher;
Expand Down Expand Up @@ -48,17 +48,28 @@ pub fn attach_progress(multi_pb: Option<MultiProgress>) -> ProgressBar {
);
pb
}

#[derive(Debug, Clone)]
pub struct CacheSinkSettings {
flags: Option<Flags>,
api_security: Option<ApiSecurity>,
}
impl CacheSinkSettings {
pub fn new(flags: Option<Flags>, api_security: Option<ApiSecurity>) -> Self {
Self {
flags,
api_security,
}
}
}
#[derive(Debug)]
pub struct CacheSinkFactory {
input_ports: Vec<PortHandle>,
cache: Arc<LmdbCache>,
api_endpoint: ApiEndpoint,
notifier: Option<Sender<PipelineResponse>>,
generated_path: PathBuf,
api_security: Option<ApiSecurity>,
multi_pb: MultiProgress,
flags: Option<Flags>,
settings: CacheSinkSettings,
}

impl CacheSinkFactory {
Expand All @@ -68,19 +79,17 @@ impl CacheSinkFactory {
api_endpoint: ApiEndpoint,
notifier: Option<Sender<PipelineResponse>>,
generated_path: PathBuf,
api_security: Option<ApiSecurity>,
multi_pb: MultiProgress,
flags: Option<Flags>,
settings: CacheSinkSettings,
) -> Self {
Self {
input_ports,
cache,
api_endpoint,
notifier,
generated_path,
api_security,
multi_pb,
flags,
settings,
}
}

Expand Down Expand Up @@ -222,8 +231,8 @@ impl SinkFactory for CacheSinkFactory {
endpoint: self.api_endpoint.to_owned(),
},
},
&self.api_security,
&self.flags,
&self.settings.api_security,
&self.settings.flags,
)
.map_err(|e| ExecutionError::InternalError(Box::new(e)))?;

Expand Down
16 changes: 6 additions & 10 deletions dozer-orchestrator/src/simple/executor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use dozer_api::grpc::internal_grpc::PipelineResponse;
use dozer_core::dag::app::{App, AppPipeline};
use dozer_types::indicatif::MultiProgress;
use dozer_types::models::app_config::Flags;
use dozer_types::types::{Operation, SchemaWithChangesType};
use std::collections::HashMap;
use std::path::PathBuf;
Expand All @@ -11,7 +10,7 @@ use std::sync::Arc;
use dozer_api::CacheEndpoint;
use dozer_types::models::source::Source;

use crate::pipeline::{CacheSinkFactory, StreamingSinkFactory};
use crate::pipeline::{CacheSinkFactory, CacheSinkSettings, StreamingSinkFactory};
use dozer_core::dag::dag::DEFAULT_PORT_HANDLE;
use dozer_core::dag::executor::{DagExecutor, ExecutorOptions};
use dozer_ingestion::connectors::{get_connector, get_connector_info_table, TableInfo};
Expand All @@ -21,7 +20,7 @@ use dozer_ingestion::ingestion::{IngestionIterator, Ingestor};
use dozer_sql::pipeline::builder::PipelineBuilder;
use dozer_types::crossbeam;
use dozer_types::log::{error, info};
use dozer_types::models::api_security::ApiSecurity;

use dozer_types::models::connection::Connection;
use dozer_types::parking_lot::RwLock;
use OrchestrationError::ExecutionError;
Expand Down Expand Up @@ -210,8 +209,7 @@ impl Executor {
&self,
notifier: Option<crossbeam::channel::Sender<PipelineResponse>>,
api_dir: PathBuf,
api_security: Option<ApiSecurity>,
flags: Option<Flags>,
settings: CacheSinkSettings,
) -> Result<dozer_core::dag::dag::Dag, OrchestrationError> {
let grouped_connections = self.get_connection_groups();

Expand All @@ -235,9 +233,8 @@ impl Executor {
api_endpoint,
notifier.clone(),
api_dir.clone(),
api_security.clone(),
self.progress.clone(),
flags.clone(),
settings.to_owned(),
)),
cache_endpoint.endpoint.name.as_str(),
);
Expand Down Expand Up @@ -303,12 +300,11 @@ impl Executor {
pub fn run(
&self,
notifier: Option<crossbeam::channel::Sender<PipelineResponse>>,
api_security: Option<ApiSecurity>,
flags: Option<Flags>,
settings: CacheSinkSettings,
) -> Result<(), OrchestrationError> {
let running_wait = self.running.clone();

let parent_dag = self.build_pipeline(notifier, PathBuf::default(), api_security, flags)?;
let parent_dag = self.build_pipeline(notifier, PathBuf::default(), settings)?;
let path = &self.pipeline_dir;

if !path.exists() {
Expand Down
19 changes: 10 additions & 9 deletions dozer-orchestrator/src/simple/orchestrator.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use super::executor::Executor;
use crate::console_helper::get_colored_text;
use crate::errors::OrchestrationError;
use crate::pipeline::CacheSinkSettings;
use crate::utils::{
get_api_dir, get_api_security_config, get_cache_dir, get_grpc_config, get_pipeline_config,
get_pipeline_dir, get_rest_config,
get_api_dir, get_api_security_config, get_cache_dir, get_flags, get_grpc_config,
get_pipeline_config, get_pipeline_dir, get_rest_config,
};
use crate::{flatten_joinhandle, Orchestrator};
use dozer_api::auth::{Access, Authorizer};
Expand Down Expand Up @@ -217,11 +218,10 @@ impl Orchestrator for SimpleOrchestrator {
running,
pipeline_home_dir,
);
executor.run(
Some(sender),
self.config.api.to_owned().unwrap_or_default().api_security,
self.config.flags.to_owned(),
)
let flags = get_flags(self.config.clone());
let api_security = get_api_security_config(self.config.clone());
let settings = CacheSinkSettings::new(flags, api_security);
executor.run(Some(sender), settings)
}

fn list_connectors(
Expand Down Expand Up @@ -343,9 +343,10 @@ impl Orchestrator for SimpleOrchestrator {
e,
)
})?;

let api_security = get_api_security_config(self.config.clone());
let dag = executor.build_pipeline(None, generated_path.clone(), api_security, self.config.flags.to_owned())?;
let flags = get_flags(self.config.clone());
let settings = CacheSinkSettings::new(flags, api_security);
let dag = executor.build_pipeline(None, generated_path.clone(), settings)?;
let schema_manager = DagSchemaManager::new(&dag)?;
// Every sink will initialize its schema in sink and also in a proto file.
schema_manager.prepare()?;
Expand Down
6 changes: 4 additions & 2 deletions dozer-orchestrator/src/simple/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ use dozer_types::{
models::{
self,
api_endpoint::{ApiEndpoint, ApiIndex},
app_config::Flags,
connection::EventsAuthentication,
flags::Flags,
},
types::{Field, OperationEvent, Record, Schema},
};
use serde_json::{json, Value};
use tempdir::TempDir;

use crate::pipeline::CacheSinkSettings;

use super::executor::Executor;

fn single_source_sink_impl(schema: Schema) {
Expand Down Expand Up @@ -96,7 +98,7 @@ fn single_source_sink_impl(schema: Schema) {
tmp_path,
);
let flags = Flags::default();
match executor.run(None, None, Some(flags)) {
match executor.run(None, CacheSinkSettings::new(Some(flags), None)) {
Ok(_) => {}
Err(e) => warn!("Exiting: {:?}", e),
}
Expand Down
4 changes: 4 additions & 0 deletions dozer-orchestrator/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ pub fn get_api_security_config(config: Config) -> Option<ApiSecurity> {
get_api_config(config).api_security
}

pub fn get_flags(config: Config) -> Option<dozer_types::models::flags::Flags> {
config.flags
}

pub fn get_repl_history_path(config: &Config) -> PathBuf {
PathBuf::from(format!("{:}/history.txt", config.home_dir))
}
Expand Down
23 changes: 3 additions & 20 deletions dozer-types/src/models/app_config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::{
api_config::ApiConfig, api_endpoint::ApiEndpoint, connection::Connection, source::Source,
api_config::ApiConfig, api_endpoint::ApiEndpoint, connection::Connection, flags::Flags,
source::Source,
};
use crate::{constants::DEFAULT_HOME_DIR, models::api_config::default_api_config};
use serde::{
Expand Down Expand Up @@ -38,24 +39,6 @@ pub struct Config {
pub flags: Option<Flags>,
}

#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, prost::Message)]
pub struct Flags {
/// dynamic grpc enabled; Default: true
#[prost(bool, tag = "1", default = true)]
pub dynamic: bool,
/// http1 + web support for grpc. This is required for browser clients.; Default: true
#[prost(bool, tag = "2", default = true)]
pub grpc_web: bool,

/// push events enabled. Currently unstable.; Default: false
#[prost(bool, tag = "3", default = false)]
pub push_events: bool,

/// require authentication to access grpc server reflection service if true.; Default: false
#[prost(bool, tag = "4", default = false)]
pub authenticate_server_reflection: bool,
}

pub fn default_home_dir() -> String {
DEFAULT_HOME_DIR.to_owned()
}
Expand All @@ -78,7 +61,7 @@ impl<'de> Deserialize<'de> for Config {
A: serde::de::MapAccess<'de>,
{
let mut api: Option<ApiConfig> = Some(default_api_config());
let mut flags: Option<Flags> = None;
let mut flags: Option<Flags> = Some(Flags::default());
let mut connections: Vec<Connection> = vec![];
let mut sources_value: Vec<serde_yaml::Value> = vec![];
let mut endpoints: Vec<ApiEndpoint> = vec![];
Expand Down
2 changes: 1 addition & 1 deletion dozer-types/src/tests/flags_config_yaml_deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ fn test_partial_flag_config_input() {
let input_config_with_flag = r#"
app_name: working_app
flags:
dynamic: true
grpc_web: false
push_events: false
"#;
Expand All @@ -20,6 +19,7 @@ fn test_partial_flag_config_input() {
flags_deserialize.authenticate_server_reflection,
default_flags.authenticate_server_reflection,
);
assert_eq!(flags_deserialize.dynamic, default_flags.dynamic);
}

#[test]
Expand Down

0 comments on commit 70da95c

Please sign in to comment.