Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 57 additions & 53 deletions src/builder/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1094,63 +1094,67 @@ pub fn analyze_flow(
// TODO: Fill it with a meaningful value.
targets: IndexMap::new(),
};
let plan_fut = {
let analyzer_ctx = AnalyzerContext { registry, flow_ctx };
let mut root_exec_scope = ExecutionScope {
name: ROOT_SCOPE_NAME,
data: &mut root_data_scope,
};
let source_ops_futs = flow_inst
.source_ops
.iter()
.map(|source_op| {
let existing_source_states = source_states_by_name.get(source_op.name.as_str());
analyzer_ctx.analyze_source_op(
root_exec_scope.data,
source_op.clone(),
Some(&mut setup_state.metadata),
existing_source_states,
)
})
.collect::<Result<Vec<_>>>()?;
let op_scope_fut = analyzer_ctx.analyze_op_scope(
&mut root_exec_scope,
&flow_inst.reactive_ops,
RefList::Nil,
)?;
let export_ops_futs = flow_inst
.export_ops
.iter()
.map(|export_op| {
analyzer_ctx.analyze_export_op(
root_exec_scope.data,
export_op.clone(),
Some(&mut setup_state),
&target_states_by_name_type,
)
})
.collect::<Result<Vec<_>>>()?;

let tracking_table_setup = setup_state.tracking_table.clone();
async move {
let (source_ops, op_scope, export_ops) = try_join3(
try_join_all(source_ops_futs),
op_scope_fut,
try_join_all(export_ops_futs),
let analyzer_ctx = AnalyzerContext { registry, flow_ctx };
let mut root_exec_scope = ExecutionScope {
name: ROOT_SCOPE_NAME,
data: &mut root_data_scope,
};
let source_ops_futs = flow_inst
.source_ops
.iter()
.map(|source_op| {
let existing_source_states = source_states_by_name.get(source_op.name.as_str());
analyzer_ctx.analyze_source_op(
root_exec_scope.data,
source_op.clone(),
Some(&mut setup_state.metadata),
existing_source_states,
)
.await?;

Ok(ExecutionPlan {
tracking_table_setup,
logic_fingerprint: vec![0; 8], // TODO: Fill it with a meaningful value automatically
source_ops,
op_scope,
export_ops,
})
}
})
.collect::<Result<Vec<_>>>()?;
let op_scope_fut = analyzer_ctx.analyze_op_scope(
&mut root_exec_scope,
&flow_inst.reactive_ops,
RefList::Nil,
)?;
let export_ops_futs = flow_inst
.export_ops
.iter()
.map(|export_op| {
analyzer_ctx.analyze_export_op(
root_exec_scope.data,
export_op.clone(),
Some(&mut setup_state),
&target_states_by_name_type,
)
})
.collect::<Result<Vec<_>>>()?;

let tracking_table_setup = setup_state.tracking_table.clone();
let data_schema = root_data_scope.into_data_schema()?;
let logic_fingerprint = Fingerprinter::default()
.with(&flow_inst)?
.with(&data_schema)?
.into_fingerprint();
let plan_fut = async move {
let (source_ops, op_scope, export_ops) = try_join3(
try_join_all(source_ops_futs),
op_scope_fut,
try_join_all(export_ops_futs),
)
.await?;

Ok(ExecutionPlan {
tracking_table_setup,
logic_fingerprint,
source_ops,
op_scope,
export_ops,
})
};

Ok((root_data_scope.into_data_schema()?, plan_fut, setup_state))
Ok((data_schema, plan_fut, setup_state))
}

pub fn analyze_transient_flow<'a>(
Expand Down
4 changes: 2 additions & 2 deletions src/builder/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::base::schema::ValueType;
use crate::base::value;
use crate::execution::db_tracking_setup;
use crate::ops::interface::*;
use crate::utils::fingerprint::Fingerprinter;
use crate::utils::fingerprint::{Fingerprint, Fingerprinter};

#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct AnalyzedLocalFieldReference {
Expand Down Expand Up @@ -126,7 +126,7 @@ pub struct AnalyzedOpScope {

pub struct ExecutionPlan {
pub tracking_table_setup: db_tracking_setup::TrackingTableSetupState,
pub logic_fingerprint: Vec<u8>,
pub logic_fingerprint: Fingerprint,

pub source_ops: Vec<AnalyzedSourceOp>,
pub op_scope: AnalyzedOpScope,
Expand Down
48 changes: 31 additions & 17 deletions src/execution/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,39 @@ use super::evaluator::{evaluate_source_entry, ScopeValueBuilder};

#[derive(Debug, Serialize, Default)]
pub struct UpdateStats {
pub num_skipped: AtomicUsize,
pub num_insertions: AtomicUsize,
pub num_deletions: AtomicUsize,
pub num_already_exists: AtomicUsize,
pub num_repreocesses: AtomicUsize,
pub num_errors: AtomicUsize,
}

impl std::fmt::Display for UpdateStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let num_source_rows = self.num_insertions.load(Relaxed)
+ self.num_deletions.load(Relaxed)
+ self.num_already_exists.load(Relaxed);
write!(f, "{num_source_rows} source rows processed",)?;
if self.num_errors.load(Relaxed) > 0 {
write!(f, " with {} ERRORS", self.num_errors.load(Relaxed))?;
let num_skipped = self.num_skipped.load(Relaxed);
if num_skipped > 0 {
write!(f, "{} rows skipped", num_skipped)?;
}

let num_insertions = self.num_insertions.load(Relaxed);
let num_deletions = self.num_deletions.load(Relaxed);
let num_reprocesses = self.num_repreocesses.load(Relaxed);
let num_source_rows = num_insertions + num_deletions + num_reprocesses;
if num_source_rows > 0 {
if num_skipped > 0 {
write!(f, ", ")?;
}
write!(f, "{num_source_rows} source rows processed",)?;

let num_errors = self.num_errors.load(Relaxed);
if num_errors > 0 {
write!(f, " with {num_errors} ERRORS",)?;
}
write!(
f,
": {num_insertions} added, {num_deletions} removed, {num_reprocesses} repocessed",
)?;
}
write!(
f,
": {} added, {} removed, {} already exists",
self.num_insertions.load(Relaxed),
self.num_deletions.load(Relaxed),
self.num_already_exists.load(Relaxed)
)?;
Ok(())
}
}
Expand Down Expand Up @@ -495,8 +506,11 @@ pub async fn update_source_entry(
(Some(source_ordinal), Some(existing_source_ordinal)) => {
if source_ordinal < existing_source_ordinal
|| (source_ordinal == existing_source_ordinal
&& existing_logic_fingerprint == source_op.)
&& existing_logic_fingerprint.as_ref().map(|v| v.as_slice())
== Some(plan.logic_fingerprint.0.as_slice()))
{
// TODO: We should detect based on finer grain fingerprint.
stats.num_skipped.fetch_add(1, Relaxed);
return Ok(());
}
}
Expand Down Expand Up @@ -538,7 +552,7 @@ pub async fn update_source_entry(
};
if already_exists {
if output.is_some() {
stats.num_already_exists.fetch_add(1, Relaxed);
stats.num_repreocesses.fetch_add(1, Relaxed);
} else {
stats.num_deletions.fetch_add(1, Relaxed);
}
Expand Down Expand Up @@ -590,7 +604,7 @@ pub async fn update_source_entry(
source_op.source_id,
&source_key_json,
source_ordinal.map(|o| o.into()),
&plan.logic_fingerprint,
&plan.logic_fingerprint.0,
precommit_output.metadata,
&process_timestamp,
&plan.tracking_table_setup,
Expand Down