Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 36 additions & 26 deletions src/builder/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -816,20 +816,11 @@ impl AnalyzerContext<'_> {
&self,
scope: &mut DataScopeBuilder,
export_op: NamedSpec<ExportOpSpec>,
export_factory: Arc<dyn ExportTargetFactory>,
setup_state: Option<&mut FlowSetupState<DesiredMode>>,
existing_target_states: &HashMap<&ResourceIdentifier, Vec<&TargetSetupState>>,
) -> Result<impl Future<Output = Result<AnalyzedExportOp>> + Send> {
let export_target = export_op.spec.target;
let export_factory = match self.registry.get(&export_target.kind) {
Some(ExecutorFactory::ExportTarget(export_executor)) => export_executor,
_ => {
return Err(anyhow::anyhow!(
"Export target kind not found: {}",
export_target.kind
))
}
};

let spec = serde_json::Value::Object(export_target.spec.clone());
let (local_collector_ref, collector_schema) =
scope.consume_collector(&export_op.spec.collector_name)?;
Expand Down Expand Up @@ -986,8 +977,8 @@ impl AnalyzerContext<'_> {
.unwrap_or(false);
Ok(async move {
trace!("Start building executor for export op `{}`", export_op.name);
let (executor, query_target) = setup_output
.executor
let executors = setup_output
.executors
.await
.with_context(|| format!("Analyzing export op: {}", export_op.name))?;
trace!(
Expand All @@ -999,8 +990,8 @@ impl AnalyzerContext<'_> {
name,
target_id: target_id.unwrap_or_default(),
input: local_collector_ref,
executor,
query_target,
export_context: executors.export_context,
query_target: executors.query_target,
primary_key_def,
primary_key_type,
value_fields: value_fields_idx,
Expand Down Expand Up @@ -1127,18 +1118,36 @@ pub fn analyze_flow(
&flow_inst.reactive_ops,
RefList::Nil,
)?;
let export_ops_futs = flow_inst
.export_ops
.iter()
.map(|export_op| {
analyzer_ctx.analyze_export_op(
root_exec_scope.data,
export_op.clone(),
Some(&mut setup_state),
&target_states_by_name_type,
)
})
.collect::<Result<Vec<_>>>()?;

let mut target_groups = IndexMap::<String, AnalyzedExportTargetOpGroup>::new();
let mut export_ops_futs = vec![];
for (idx, export_op) in flow_inst.export_ops.iter().enumerate() {
let target_kind = export_op.spec.target.kind.clone();
let export_factory = match registry.get(&target_kind) {
Some(ExecutorFactory::ExportTarget(export_executor)) => export_executor,
_ => {
return Err(anyhow::anyhow!(
"Export target kind not found: {}",
export_op.spec.target.kind
))
}
};
export_ops_futs.push(analyzer_ctx.analyze_export_op(
root_exec_scope.data,
export_op.clone(),
export_factory.clone(),
Some(&mut setup_state),
&target_states_by_name_type,
)?);
target_groups
.entry(target_kind)
.or_insert_with(|| AnalyzedExportTargetOpGroup {
target_factory: export_factory.clone(),
op_idx: vec![],
})
.op_idx
.push(idx);
}

let tracking_table_setup = setup_state.tracking_table.clone();
let data_schema = root_data_scope.into_data_schema()?;
Expand All @@ -1160,6 +1169,7 @@ pub fn analyze_flow(
import_ops,
op_scope,
export_ops,
export_op_groups: target_groups.into_values().collect(),
})
};

Expand Down
8 changes: 7 additions & 1 deletion src/builder/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub struct AnalyzedExportOp {
pub name: String,
pub target_id: i32,
pub input: AnalyzedLocalCollectorReference,
pub executor: Arc<dyn ExportTargetExecutor>,
pub export_context: Arc<dyn Any + Send + Sync>,
pub query_target: Option<Arc<dyn QueryTarget>>,
pub primary_key_def: AnalyzedPrimaryKeyDef,
pub primary_key_type: schema::ValueType,
Expand All @@ -111,6 +111,11 @@ pub struct AnalyzedExportOp {
pub value_stable: bool,
}

pub struct AnalyzedExportTargetOpGroup {
pub target_factory: Arc<dyn ExportTargetFactory + Send + Sync>,
pub op_idx: Vec<usize>,
}

pub enum AnalyzedReactiveOp {
Transform(AnalyzedTransformOp),
ForEach(AnalyzedForEachOp),
Expand All @@ -128,6 +133,7 @@ pub struct ExecutionPlan {
pub import_ops: Vec<AnalyzedImportOp>,
pub op_scope: AnalyzedOpScope,
pub export_ops: Vec<AnalyzedExportOp>,
pub export_op_groups: Vec<AnalyzedExportTargetOpGroup>,
}

pub struct TransientExecutionPlan {
Expand Down
28 changes: 19 additions & 9 deletions src/execution/row_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,16 +554,26 @@ pub async fn update_source_row(

// Phase 3: Apply changes to the target storage, including upserting new target records and removing existing ones.
let mut target_mutations = precommit_output.target_mutations;
let apply_futs = plan.export_ops.iter().filter_map(|export_op| {
target_mutations
.remove(&export_op.target_id)
.and_then(|mutation| {
if !mutation.is_empty() {
Some(export_op.executor.apply_mutation(mutation))
} else {
None
}
let apply_futs = plan.export_op_groups.iter().filter_map(|export_op_group| {
let mutations_w_ctx: Vec<_> = export_op_group
.op_idx
.iter()
.filter_map(|export_op_idx| {
let export_op = &plan.export_ops[*export_op_idx];
target_mutations
.remove(&export_op.target_id)
.filter(|m| !m.is_empty())
.map(|mutation| interface::ExportTargetMutationWithContext {
mutation,
export_context: export_op.export_context.as_ref(),
})
})
.collect();
(!mutations_w_ctx.is_empty()).then(|| {
export_op_group
.target_factory
.apply_mutation(mutations_w_ctx)
})
});

// TODO: Handle errors.
Expand Down
48 changes: 43 additions & 5 deletions src/ops/factory_bases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,17 +264,23 @@ impl<T: SimpleFunctionFactoryBase> SimpleFunctionFactory for T {
}
}

pub struct ExportTargetBuildOutput<F: StorageFactoryBase + ?Sized> {
pub executor:
BoxFuture<'static, Result<(Arc<dyn ExportTargetExecutor>, Option<Arc<dyn QueryTarget>>)>>,
pub struct TypedExportTargetExecutors<F: StorageFactoryBase + ?Sized> {
pub export_context: Arc<F::ExportContext>,
pub query_target: Option<Arc<dyn QueryTarget>>,
}

pub struct TypedExportTargetBuildOutput<F: StorageFactoryBase + ?Sized> {
pub executors: BoxFuture<'static, Result<TypedExportTargetExecutors<F>>>,
pub setup_key: F::Key,
pub desired_setup_state: F::SetupState,
}

#[async_trait]
pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
type Spec: DeserializeOwned + Send + Sync;
type Key: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash + Send + Sync;
type SetupState: Debug + Clone + Serialize + DeserializeOwned + Send + Sync;
type ExportContext: Send + Sync + 'static;

fn name(&self) -> &str;

Expand All @@ -286,7 +292,7 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
value_fields_schema: Vec<FieldSchema>,
storage_options: IndexOptions,
context: Arc<FlowInstanceContext>,
) -> Result<ExportTargetBuildOutput<Self>>;
) -> Result<TypedExportTargetBuildOutput<Self>>;

/// Will not be called if it's setup by user.
/// It returns an error if the target only supports setup by user.
Expand Down Expand Up @@ -315,8 +321,14 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
ExecutorFactory::ExportTarget(Arc::new(self)),
)
}

async fn apply_mutation(
&self,
mutations: Vec<ExportTargetMutationWithContext<'async_trait, Self::ExportContext>>,
) -> Result<()>;
}

#[async_trait]
impl<T: StorageFactoryBase> ExportTargetFactory for T {
fn build(
self: Arc<Self>,
Expand All @@ -337,10 +349,17 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
storage_options,
context,
)?;
let executors = async move {
let executors = build_output.executors.await?;
Ok(interface::ExportTargetExecutors {
export_context: executors.export_context,
query_target: executors.query_target,
})
};
Ok(interface::ExportTargetBuildOutput {
executor: build_output.executor,
setup_key: serde_json::to_value(build_output.setup_key)?,
desired_setup_state: serde_json::to_value(build_output.desired_setup_state)?,
executors: executors.boxed(),
})
}

Expand Down Expand Up @@ -383,6 +402,25 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
)?;
Ok(result)
}

async fn apply_mutation(
&self,
mutations: Vec<ExportTargetMutationWithContext<'async_trait, dyn Any + Send + Sync>>,
) -> Result<()> {
let mutations = mutations
.into_iter()
.map(|m| {
anyhow::Ok(ExportTargetMutationWithContext {
mutation: m.mutation,
export_context: m
.export_context
.downcast_ref::<T::ExportContext>()
.ok_or_else(|| anyhow!("Unexpected export context type"))?,
})
})
.collect::<Result<_>>()?;
StorageFactoryBase::apply_mutation(self, mutations).await
}
}

fn from_json_combined_state<T: Debug + Clone + Serialize + DeserializeOwned>(
Expand Down
22 changes: 16 additions & 6 deletions src/ops/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,10 @@ impl ExportTargetMutation {
}
}

#[async_trait]
pub trait ExportTargetExecutor: Send + Sync {
async fn apply_mutation(&self, mutation: ExportTargetMutation) -> Result<()>;
#[derive(Debug)]
pub struct ExportTargetMutationWithContext<'ctx, T: ?Sized + Send + Sync> {
pub mutation: ExportTargetMutation,
pub export_context: &'ctx T,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand All @@ -156,14 +157,18 @@ pub enum SetupStateCompatibility {
NotCompatible,
}

pub struct ExportTargetExecutors {
pub export_context: Arc<dyn Any + Send + Sync>,
pub query_target: Option<Arc<dyn QueryTarget>>,
}
pub struct ExportTargetBuildOutput {
pub executor:
BoxFuture<'static, Result<(Arc<dyn ExportTargetExecutor>, Option<Arc<dyn QueryTarget>>)>>,
pub executors: BoxFuture<'static, Result<ExportTargetExecutors>>,
pub setup_key: serde_json::Value,
pub desired_setup_state: serde_json::Value,
}

pub trait ExportTargetFactory {
#[async_trait]
pub trait ExportTargetFactory: Send + Sync {
fn build(
self: Arc<Self>,
name: String,
Expand Down Expand Up @@ -191,6 +196,11 @@ pub trait ExportTargetFactory {
) -> Result<SetupStateCompatibility>;

fn describe_resource(&self, key: &serde_json::Value) -> Result<String>;

async fn apply_mutation(
&self,
mutations: Vec<ExportTargetMutationWithContext<'async_trait, dyn Any + Send + Sync>>,
) -> Result<()>;
}

#[derive(Clone)]
Expand Down
2 changes: 1 addition & 1 deletion src/ops/sdk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub use crate::base::spec::*;
pub use crate::base::value::*;

// Disambiguate the ExportTargetBuildOutput type.
pub use super::factory_bases::ExportTargetBuildOutput;
pub use super::factory_bases::TypedExportTargetBuildOutput;
/// Defined for all types convertible to ValueType, to ease creation for ValueType in various operation factories.
pub trait TypeCore {
fn into_type(self) -> ValueType;
Expand Down
Loading