Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/builder/analyzed_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ impl AnalyzedFlow {
flow_instance: crate::base::spec::FlowInstanceSpec,
existing_flow_ss: Option<&setup::FlowSetupState<setup::ExistingMode>>,
registry: &ExecutorFactoryRegistry,
auth_registry: Arc<AuthRegistry>,
auth_registry: &Arc<AuthRegistry>,
) -> Result<Self> {
let ctx = analyzer::build_flow_instance_context(&flow_instance.name, auth_registry);
let (data_schema, execution_plan_fut, desired_state) =
analyzer::analyze_flow(&flow_instance, &ctx, existing_flow_ss, registry)?;
let setup_status_check =
setup::check_flow_setup_status(Some(&desired_state), existing_flow_ss)?;
setup::check_flow_setup_status(Some(&desired_state), existing_flow_ss, auth_registry)?;
let execution_plan = if setup_status_check.is_up_to_date() {
Some(
async move {
Expand Down Expand Up @@ -73,7 +73,7 @@ impl AnalyzedTransientFlow {
pub async fn from_transient_flow(
transient_flow: spec::TransientFlowSpec,
registry: &ExecutorFactoryRegistry,
auth_registry: Arc<AuthRegistry>,
auth_registry: &Arc<AuthRegistry>,
) -> Result<Self> {
let ctx = analyzer::build_flow_instance_context(&transient_flow.name, auth_registry);
let (output_type, data_schema, execution_plan_fut) =
Expand Down
4 changes: 2 additions & 2 deletions src/builder/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1029,11 +1029,11 @@ impl AnalyzerContext<'_> {

pub fn build_flow_instance_context(
flow_inst_name: &str,
auth_registry: Arc<AuthRegistry>,
auth_registry: &Arc<AuthRegistry>,
) -> Arc<FlowInstanceContext> {
Arc::new(FlowInstanceContext {
flow_instance_name: flow_inst_name.to_string(),
auth_registry,
auth_registry: auth_registry.clone(),
})
}

Expand Down
7 changes: 3 additions & 4 deletions src/builder/flow_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,7 @@ impl FlowBuilder {
.get(name)
.cloned();
let root_data_scope = Arc::new(Mutex::new(DataScopeBuilder::new()));
let flow_inst_context =
build_flow_instance_context(name, lib_context.auth_registry.clone());
let flow_inst_context = build_flow_instance_context(name, &lib_context.auth_registry);
let result = Self {
lib_context,
flow_inst_context,
Expand Down Expand Up @@ -650,7 +649,7 @@ impl FlowBuilder {
spec,
self.existing_flow_ss.as_ref(),
&crate::ops::executor_factory_registry(),
self.lib_context.auth_registry.clone(),
&self.lib_context.auth_registry,
))
})
.into_py_result()?;
Expand Down Expand Up @@ -691,7 +690,7 @@ impl FlowBuilder {
get_runtime().block_on(super::AnalyzedTransientFlow::from_transient_flow(
spec,
&crate::ops::executor_factory_registry(),
self.lib_context.auth_registry.clone(),
&self.lib_context.auth_registry,
))
})
.into_py_result()?;
Expand Down
11 changes: 9 additions & 2 deletions src/ops/factory_bases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
key: Self::Key,
desired_state: Option<Self::SetupState>,
existing_states: setup::CombinedState<Self::SetupState>,
auth_registry: &Arc<AuthRegistry>,
) -> Result<impl setup::ResourceSetupStatusCheck<Self::Key, Self::SetupState> + 'static>;

fn check_state_compatibility(
Expand Down Expand Up @@ -402,6 +403,7 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
key: &serde_json::Value,
desired_state: Option<serde_json::Value>,
existing_states: setup::CombinedState<serde_json::Value>,
auth_registry: &Arc<AuthRegistry>,
) -> Result<
Box<
dyn setup::ResourceSetupStatusCheck<serde_json::Value, serde_json::Value> + Send + Sync,
Expand All @@ -412,8 +414,13 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
.map(|v| serde_json::from_value(v.clone()))
.transpose()?;
let existing_states = from_json_combined_state(existing_states)?;
let status_check =
StorageFactoryBase::check_setup_status(self, key, desired_state, existing_states)?;
let status_check = StorageFactoryBase::check_setup_status(
self,
key,
desired_state,
existing_states,
auth_registry,
)?;
Ok(Box::new(ResourceSetupStatusCheckWrapper::<T>::new(
Box::new(status_check),
)?))
Expand Down
1 change: 1 addition & 0 deletions src/ops/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ pub trait ExportTargetFactory {
key: &serde_json::Value,
desired_state: Option<serde_json::Value>,
existing_states: setup::CombinedState<serde_json::Value>,
auth_registry: &Arc<AuthRegistry>,
) -> Result<
Box<
dyn setup::ResourceSetupStatusCheck<serde_json::Value, serde_json::Value> + Send + Sync,
Expand Down
3 changes: 3 additions & 0 deletions src/ops/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ fn register_executor_factories(registry: &mut ExecutorFactoryRegistry) -> Result

Arc::new(storages::postgres::Factory::default()).register(registry)?;

let neo4j_pool = Arc::new(storages::neo4j::GraphPool::default());
storages::neo4j::RelationshipFactory::new(neo4j_pool).register(registry)?;

Ok(())
}

Expand Down
Loading