From b414f36ff02a3f98019bc1d74f5e480f22403127 Mon Sep 17 00:00:00 2001 From: LJ Date: Wed, 23 Apr 2025 09:10:05 -0700 Subject: [PATCH] feat(declaration): merge states from declarations in analyzer --- src/builder/analyzer.rs | 119 ++++++++++++++++++++-------------------- 1 file changed, 61 insertions(+), 58 deletions(-) diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index bc6bb25b..3d047518 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -818,32 +818,29 @@ impl AnalyzerContext<'_> { Ok(result_fut) } - fn analyze_export_op( + fn merge_export_op_states( &self, - export_op: &NamedSpec, + target_kind: String, + setup_key: serde_json::Value, + setup_state: serde_json::Value, + setup_by_user: bool, export_factory: &dyn ExportTargetFactory, - data_coll_output: ExportDataCollectionBuildOutput, - data_fields_info: ExportDataFieldsInfo, flow_setup_state: &mut FlowSetupState, existing_target_states: &HashMap<&ResourceIdentifier, Vec<&TargetSetupState>>, - ) -> Result> + Send> { + ) -> Result { let resource_id = ResourceIdentifier { - key: data_coll_output.setup_key.clone(), - target_kind: export_op.spec.target.kind.clone(), + key: setup_key, + target_kind, }; let existing_target_states = existing_target_states.get(&resource_id); let mut compatible_target_ids = HashSet::>::new(); let mut reusable_schema_version_ids = HashSet::>::new(); for existing_state in existing_target_states.iter().flat_map(|v| v.iter()) { - let compatibility = - if export_op.spec.setup_by_user == existing_state.common.setup_by_user { - export_factory.check_state_compatibility( - &data_coll_output.desired_setup_state, - &existing_state.state, - )? - } else { - SetupStateCompatibility::NotCompatible - }; + let compatibility = if setup_by_user == existing_state.common.setup_by_user { + export_factory.check_state_compatibility(&setup_state, &existing_state.state)? + } else { + SetupStateCompatibility::NotCompatible + }; let compatible_target_id = if compatibility != SetupStateCompatibility::NotCompatible { reusable_schema_version_ids.insert( (compatibility == SetupStateCompatibility::Compatible) @@ -897,36 +894,18 @@ impl AnalyzerContext<'_> { target_id, schema_version_id, max_schema_version_id: max_schema_version_id.max(schema_version_id), - setup_by_user: export_op.spec.setup_by_user, + setup_by_user, }, - state: data_coll_output.desired_setup_state, + state: setup_state, }); } } - let op_name = export_op.name.clone(); - Ok(async move { - trace!("Start building executor for export op `{op_name}`"); - let executors = data_coll_output - .executors - .await - .with_context(|| format!("Analyzing export op: {op_name}"))?; - trace!("Finished building executor for export op `{op_name}`"); - Ok(AnalyzedExportOp { - name: op_name, - target_id, - input: data_fields_info.local_collector_ref, - export_context: executors.export_context, - query_target: executors.query_target, - primary_key_def: data_fields_info.primary_key_def, - primary_key_type: data_fields_info.primary_key_type, - value_fields: data_fields_info.value_fields_idx, - value_stable: data_fields_info.value_stable, - }) - }) + Ok(target_id) } fn analyze_export_op_group( &self, + target_kind: String, scope: &mut DataScopeBuilder, flow_inst: &FlowInstanceSpec, export_op_group: &AnalyzedExportTargetOpGroup, @@ -1006,38 +985,61 @@ impl AnalyzerContext<'_> { }); data_fields_infos.push(data_collection_info); } - let (data_collections_output, _) = export_op_group.target_factory.clone().build( - collection_specs, - declarations, - self.flow_ctx.clone(), - )?; - if data_collections_output.len() != data_fields_infos.len() { - api_bail!( - "Data collection output length mismatch: expect {}, got {}", - data_fields_infos.len(), - data_collections_output.len() - ); - } - - let result = export_op_group + let (data_collections_output, declarations_output) = export_op_group + .target_factory + .clone() + .build(collection_specs, declarations, self.flow_ctx.clone())?; + let analyzed_export_ops = export_op_group .op_idx .iter() .zip(data_collections_output.into_iter()) .zip(data_fields_infos.into_iter()) .map(|((idx, data_coll_output), data_fields_info)| { let export_op = &flow_inst.export_ops[*idx]; - Ok(self.analyze_export_op( - export_op, + let target_id = self.merge_export_op_states( + export_op.spec.target.kind.clone(), + data_coll_output.setup_key, + data_coll_output.desired_setup_state, + export_op.spec.setup_by_user, export_op_group.target_factory.as_ref(), - data_coll_output, - data_fields_info, flow_setup_state, existing_target_states, - )?) + )?; + let op_name = export_op.name.clone(); + Ok(async move { + trace!("Start building executor for export op `{op_name}`"); + let executors = data_coll_output + .executors + .await + .with_context(|| format!("Analyzing export op: {op_name}"))?; + trace!("Finished building executor for export op `{op_name}`"); + Ok(AnalyzedExportOp { + name: op_name, + target_id, + input: data_fields_info.local_collector_ref, + export_context: executors.export_context, + query_target: executors.query_target, + primary_key_def: data_fields_info.primary_key_def, + primary_key_type: data_fields_info.primary_key_type, + value_fields: data_fields_info.value_fields_idx, + value_stable: data_fields_info.value_stable, + }) + }) }) .collect::>>()?; + for (setup_key, setup_state) in declarations_output.into_iter() { + self.merge_export_op_states( + target_kind.clone(), + setup_key, + setup_state, + /*setup_by_user=*/ false, + export_op_group.target_factory.as_ref(), + flow_setup_state, + existing_target_states, + )?; + } - Ok(result) + Ok(analyzed_export_ops) } fn analyze_op_scope( @@ -1198,6 +1200,7 @@ pub fn analyze_flow( op_idx: op_ids.export_op_ids, }; export_ops_futs.extend(analyzer_ctx.analyze_export_op_group( + target_kind, root_exec_scope.data, flow_inst, &analyzed_target_op_group,