Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 61 additions & 58 deletions src/builder/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -818,32 +818,29 @@ impl AnalyzerContext<'_> {
Ok(result_fut)
}

fn analyze_export_op(
fn merge_export_op_states(
&self,
export_op: &NamedSpec<ExportOpSpec>,
target_kind: String,
setup_key: serde_json::Value,
setup_state: serde_json::Value,
setup_by_user: bool,
export_factory: &dyn ExportTargetFactory,
data_coll_output: ExportDataCollectionBuildOutput,
data_fields_info: ExportDataFieldsInfo,
flow_setup_state: &mut FlowSetupState<DesiredMode>,
existing_target_states: &HashMap<&ResourceIdentifier, Vec<&TargetSetupState>>,
) -> Result<impl Future<Output = Result<AnalyzedExportOp>> + Send> {
) -> Result<i32> {
let resource_id = ResourceIdentifier {
key: data_coll_output.setup_key.clone(),
target_kind: export_op.spec.target.kind.clone(),
key: setup_key,
target_kind,
};
let existing_target_states = existing_target_states.get(&resource_id);
let mut compatible_target_ids = HashSet::<Option<i32>>::new();
let mut reusable_schema_version_ids = HashSet::<Option<i32>>::new();
for existing_state in existing_target_states.iter().flat_map(|v| v.iter()) {
let compatibility =
if export_op.spec.setup_by_user == existing_state.common.setup_by_user {
export_factory.check_state_compatibility(
&data_coll_output.desired_setup_state,
&existing_state.state,
)?
} else {
SetupStateCompatibility::NotCompatible
};
let compatibility = if setup_by_user == existing_state.common.setup_by_user {
export_factory.check_state_compatibility(&setup_state, &existing_state.state)?
} else {
SetupStateCompatibility::NotCompatible
};
let compatible_target_id = if compatibility != SetupStateCompatibility::NotCompatible {
reusable_schema_version_ids.insert(
(compatibility == SetupStateCompatibility::Compatible)
Expand Down Expand Up @@ -897,36 +894,18 @@ impl AnalyzerContext<'_> {
target_id,
schema_version_id,
max_schema_version_id: max_schema_version_id.max(schema_version_id),
setup_by_user: export_op.spec.setup_by_user,
setup_by_user,
},
state: data_coll_output.desired_setup_state,
state: setup_state,
});
}
}
let op_name = export_op.name.clone();
Ok(async move {
trace!("Start building executor for export op `{op_name}`");
let executors = data_coll_output
.executors
.await
.with_context(|| format!("Analyzing export op: {op_name}"))?;
trace!("Finished building executor for export op `{op_name}`");
Ok(AnalyzedExportOp {
name: op_name,
target_id,
input: data_fields_info.local_collector_ref,
export_context: executors.export_context,
query_target: executors.query_target,
primary_key_def: data_fields_info.primary_key_def,
primary_key_type: data_fields_info.primary_key_type,
value_fields: data_fields_info.value_fields_idx,
value_stable: data_fields_info.value_stable,
})
})
Ok(target_id)
}

fn analyze_export_op_group(
&self,
target_kind: String,
scope: &mut DataScopeBuilder,
flow_inst: &FlowInstanceSpec,
export_op_group: &AnalyzedExportTargetOpGroup,
Expand Down Expand Up @@ -1006,38 +985,61 @@ impl AnalyzerContext<'_> {
});
data_fields_infos.push(data_collection_info);
}
let (data_collections_output, _) = export_op_group.target_factory.clone().build(
collection_specs,
declarations,
self.flow_ctx.clone(),
)?;
if data_collections_output.len() != data_fields_infos.len() {
api_bail!(
"Data collection output length mismatch: expect {}, got {}",
data_fields_infos.len(),
data_collections_output.len()
);
}

let result = export_op_group
let (data_collections_output, declarations_output) = export_op_group
.target_factory
.clone()
.build(collection_specs, declarations, self.flow_ctx.clone())?;
let analyzed_export_ops = export_op_group
.op_idx
.iter()
.zip(data_collections_output.into_iter())
.zip(data_fields_infos.into_iter())
.map(|((idx, data_coll_output), data_fields_info)| {
let export_op = &flow_inst.export_ops[*idx];
Ok(self.analyze_export_op(
export_op,
let target_id = self.merge_export_op_states(
export_op.spec.target.kind.clone(),
data_coll_output.setup_key,
data_coll_output.desired_setup_state,
export_op.spec.setup_by_user,
export_op_group.target_factory.as_ref(),
data_coll_output,
data_fields_info,
flow_setup_state,
existing_target_states,
)?)
)?;
let op_name = export_op.name.clone();
Ok(async move {
trace!("Start building executor for export op `{op_name}`");
let executors = data_coll_output
.executors
.await
.with_context(|| format!("Analyzing export op: {op_name}"))?;
trace!("Finished building executor for export op `{op_name}`");
Ok(AnalyzedExportOp {
name: op_name,
target_id,
input: data_fields_info.local_collector_ref,
export_context: executors.export_context,
query_target: executors.query_target,
primary_key_def: data_fields_info.primary_key_def,
primary_key_type: data_fields_info.primary_key_type,
value_fields: data_fields_info.value_fields_idx,
value_stable: data_fields_info.value_stable,
})
})
})
.collect::<Result<Vec<_>>>()?;
for (setup_key, setup_state) in declarations_output.into_iter() {
self.merge_export_op_states(
target_kind.clone(),
setup_key,
setup_state,
/*setup_by_user=*/ false,
export_op_group.target_factory.as_ref(),
flow_setup_state,
existing_target_states,
)?;
}

Ok(result)
Ok(analyzed_export_ops)
}

fn analyze_op_scope(
Expand Down Expand Up @@ -1198,6 +1200,7 @@ pub fn analyze_flow(
op_idx: op_ids.export_op_ids,
};
export_ops_futs.extend(analyzer_ctx.analyze_export_op_group(
target_kind,
root_exec_scope.data,
flow_inst,
&analyzed_target_op_group,
Expand Down