Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/builder/analyzed_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl AnalyzedFlow {
registry,
)?;
let setup_status_check =
setup::check_flow_setup_status(Some(&desired_state), existing_flow_ss)?;
setup::check_flow_setup_status(Some(&desired_state), existing_flow_ss).await?;
let execution_plan = if setup_status_check.is_up_to_date() {
Some(
async move {
Expand Down
7 changes: 4 additions & 3 deletions src/ops/factory_bases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {

/// Will not be called if it's setup by user.
/// It returns an error if the target only supports setup by user.
fn check_setup_status(
async fn check_setup_status(
&self,
key: Self::Key,
desired_state: Option<Self::SetupState>,
Expand Down Expand Up @@ -392,7 +392,7 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
Ok((data_coll_output, decl_output))
}

fn check_setup_status(
async fn check_setup_status(
&self,
key: &serde_json::Value,
desired_state: Option<serde_json::Value>,
Expand All @@ -410,7 +410,8 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
desired_state,
existing_states,
auth_registry,
)?;
)
.await?;
Ok(Box::new(status_check))
}

Expand Down
2 changes: 1 addition & 1 deletion src/ops/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ pub trait ExportTargetFactory: Send + Sync {

/// Will not be called if it's setup by user.
/// It returns an error if the target only supports setup by user.
fn check_setup_status(
async fn check_setup_status(
&self,
key: &serde_json::Value,
desired_state: Option<serde_json::Value>,
Expand Down
2 changes: 1 addition & 1 deletion src/ops/storages/neo4j.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1248,7 +1248,7 @@ impl StorageFactoryBase for Factory {
Ok((data_coll_output, decl_output))
}

fn check_setup_status(
async fn check_setup_status(
&self,
key: GraphElement,
desired: Option<SetupState>,
Expand Down
2 changes: 1 addition & 1 deletion src/ops/storages/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,7 @@ impl StorageFactoryBase for Arc<Factory> {
Ok((data_coll_output, vec![]))
}

fn check_setup_status(
async fn check_setup_status(
&self,
key: TableId,
desired: Option<SetupState>,
Expand Down
2 changes: 1 addition & 1 deletion src/ops/storages/qdrant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ impl StorageFactoryBase for Arc<Factory> {
Ok((data_coll_output, vec![]))
}

fn check_setup_status(
async fn check_setup_status(
&self,
_key: String,
_desired: Option<()>,
Expand Down
24 changes: 18 additions & 6 deletions src/py/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,20 +304,32 @@ impl SetupStatusCheck {
}

#[pyfunction]
fn sync_setup() -> PyResult<SetupStatusCheck> {
fn sync_setup(py: Python<'_>) -> PyResult<SetupStatusCheck> {
let lib_context = get_lib_context().into_py_result()?;
let flows = lib_context.flows.lock().unwrap();
let all_setup_states = lib_context.all_setup_states.read().unwrap();
let setup_status = setup::sync_setup(&flows, &all_setup_states).into_py_result()?;
Ok(SetupStatusCheck(setup_status))
py.allow_threads(|| {
get_runtime()
.block_on(async {
let setup_status = setup::sync_setup(&flows, &all_setup_states).await?;
anyhow::Ok(SetupStatusCheck(setup_status))
})
.into_py_result()
})
}

#[pyfunction]
fn drop_setup(flow_names: Vec<String>) -> PyResult<SetupStatusCheck> {
fn drop_setup(py: Python<'_>, flow_names: Vec<String>) -> PyResult<SetupStatusCheck> {
let lib_context = get_lib_context().into_py_result()?;
let all_setup_states = lib_context.all_setup_states.read().unwrap();
let setup_status = setup::drop_setup(flow_names, &all_setup_states).into_py_result()?;
Ok(SetupStatusCheck(setup_status))
py.allow_threads(|| {
get_runtime()
.block_on(async {
let setup_status = setup::drop_setup(flow_names, &all_setup_states).await?;
anyhow::Ok(SetupStatusCheck(setup_status))
})
.into_py_result()
})
}

#[pyfunction]
Expand Down
26 changes: 15 additions & 11 deletions src/setup/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ fn group_resource_states<'a>(
Ok(grouped)
}

pub fn check_flow_setup_status(
pub async fn check_flow_setup_status(
desired_state: Option<&FlowSetupState<DesiredMode>>,
existing_state: Option<&FlowSetupState<ExistingMode>>,
) -> Result<FlowSetupStatusCheck> {
Expand Down Expand Up @@ -286,12 +286,16 @@ pub fn check_flow_setup_status(
let status_check = if never_setup_by_sys {
None
} else {
Some(factory.check_setup_status(
&resource_id.key,
target_state,
existing_without_setup_by_user,
get_auth_registry(),
)?)
Some(
factory
.check_setup_status(
&resource_id.key,
target_state,
existing_without_setup_by_user,
get_auth_registry(),
)
.await?,
)
};
target_resources.push(ResourceSetupInfo {
key: resource_id.clone(),
Expand All @@ -310,7 +314,7 @@ pub fn check_flow_setup_status(
})
}

pub fn sync_setup(
pub async fn sync_setup(
flows: &BTreeMap<String, Arc<FlowContext>>,
all_setup_state: &AllSetupState<ExistingMode>,
) -> Result<AllSetupStatusCheck> {
Expand All @@ -319,7 +323,7 @@ pub fn sync_setup(
let existing_state = all_setup_state.flows.get(flow_name);
flow_status_checks.insert(
flow_name.clone(),
check_flow_setup_status(Some(&flow_context.flow.desired_state), existing_state)?,
check_flow_setup_status(Some(&flow_context.flow.desired_state), existing_state).await?,
);
}
Ok(AllSetupStatusCheck {
Expand All @@ -331,7 +335,7 @@ pub fn sync_setup(
})
}

pub fn drop_setup(
pub async fn drop_setup(
flow_names: impl IntoIterator<Item = String>,
all_setup_state: &AllSetupState<ExistingMode>,
) -> Result<AllSetupStatusCheck> {
Expand All @@ -343,7 +347,7 @@ pub fn drop_setup(
if let Some(existing_state) = all_setup_state.flows.get(&flow_name) {
flow_status_checks.insert(
flow_name,
check_flow_setup_status(None, Some(existing_state))?,
check_flow_setup_status(None, Some(existing_state)).await?,
);
}
}
Expand Down