Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 77 additions & 49 deletions src/execution/indexer.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use anyhow::Result;
use futures::future::{join_all, try_join, try_join_all};
use futures::future::{join, join_all, try_join, try_join_all};
use itertools::Itertools;
use log::error;
use serde::Serialize;
use sqlx::PgPool;
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};

use super::db_tracking::{self, read_source_tracking_info, TrackedTargetKey};
use super::db_tracking_setup;
Expand All @@ -20,23 +22,27 @@ use super::evaluator::{evaluate_source_entry, ScopeValueBuilder};

#[derive(Debug, Serialize, Default)]
pub struct UpdateStats {
pub num_insertions: usize,
pub num_deletions: usize,
pub num_already_exists: usize,
pub num_errors: usize,
pub num_insertions: AtomicUsize,
pub num_deletions: AtomicUsize,
pub num_already_exists: AtomicUsize,
pub num_errors: AtomicUsize,
}

impl std::fmt::Display for UpdateStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let num_source_rows = self.num_insertions + self.num_deletions + self.num_already_exists;
let num_source_rows = self.num_insertions.load(Relaxed)
+ self.num_deletions.load(Relaxed)
+ self.num_already_exists.load(Relaxed);
write!(f, "{num_source_rows} source rows processed",)?;
if self.num_errors > 0 {
write!(f, " with {} ERRORS", self.num_errors)?;
if self.num_errors.load(Relaxed) > 0 {
write!(f, " with {} ERRORS", self.num_errors.load(Relaxed))?;
}
write!(
f,
": {} added, {} removed, {} already exists",
self.num_insertions, self.num_deletions, self.num_already_exists
self.num_insertions.load(Relaxed),
self.num_deletions.load(Relaxed),
self.num_already_exists.load(Relaxed)
)?;
Ok(())
}
Expand Down Expand Up @@ -442,7 +448,9 @@ pub async fn update_source_entry(
source_op_idx: usize,
schema: &schema::DataSchema,
key: &value::KeyValue,
only_for_deletion: bool,
pool: &PgPool,
stats: &UpdateStats,
) -> Result<()> {
let source_id = plan.source_ops[source_op_idx].source_id;
let source_key_json = serde_json::to_value(key)?;
Expand All @@ -464,11 +472,22 @@ pub async fn update_source_entry(
.flatten();
let evaluation_cache =
EvaluationCache::new(process_timestamp, memoization_info.map(|info| info.cache));
let value_builder =
evaluate_source_entry(plan, source_op_idx, schema, key, Some(&evaluation_cache)).await?;
let value_builder = if !only_for_deletion {
evaluate_source_entry(plan, source_op_idx, schema, key, Some(&evaluation_cache)).await?
} else {
None
};
let exists = value_builder.is_some();

// Didn't exist and still doesn't exist. No need to apply any changes.
if !already_exists && value_builder.is_none() {
if already_exists {
if exists {
stats.num_already_exists.fetch_add(1, Relaxed);
} else {
stats.num_deletions.fetch_add(1, Relaxed);
}
} else if exists {
stats.num_insertions.fetch_add(1, Relaxed);
} else {
return Ok(());
}

Expand Down Expand Up @@ -548,6 +567,31 @@ pub async fn update_source_entry(
Ok(())
}

async fn update_source_entry_with_err_handling(
plan: &ExecutionPlan,
source_op_idx: usize,
schema: &schema::DataSchema,
key: &value::KeyValue,
only_for_deletion: bool,
pool: &PgPool,
stats: &UpdateStats,
) {
let r = update_source_entry(
plan,
source_op_idx,
schema,
key,
only_for_deletion,
pool,
stats,
)
.await;
if let Err(e) = r {
stats.num_errors.fetch_add(1, Relaxed);
error!("{:?}", e.context("Error in indexing a source row"));
}
}

async fn update_source(
source_name: &str,
plan: &ExecutionPlan,
Expand All @@ -566,45 +610,29 @@ async fn update_source(
)
.await?;

let num_new_keys = keys.len();
let mut num_updates = 0;
let mut num_deletions = 0;

let mut all_keys_set = keys.into_iter().collect::<HashSet<_>>();
for existing_key_json in existing_keys_json.into_iter() {
let existing_key = value::Value::<value::ScopeValue>::from_json(
existing_key_json.source_key,
&source_op.primary_key_type,
)?;
let inserted = all_keys_set.insert(existing_key.as_key()?);
if inserted {
num_deletions += 1;
} else {
num_updates += 1;
}
}

let num_errors = join_all(all_keys_set.into_iter().map(|key| async move {
let result = update_source_entry(plan, source_op_idx, schema, &key, pool).await;
if let Err(e) = result {
error!("{:?}", e.context("Error in indexing a source row"));
1
} else {
0
}
}))
.await
.iter()
.sum();
let stats = UpdateStats::default();
let upsert_futs = join_all(keys.iter().map(|key| {
update_source_entry_with_err_handling(plan, source_op_idx, schema, key, false, pool, &stats)
}));
let deleted_keys = existing_keys_json
.into_iter()
.map(|existing_key_json| {
value::Value::<value::ScopeValue>::from_json(
existing_key_json.source_key,
&source_op.primary_key_type,
)?
.as_key()
})
.filter_ok(|existing_key| !keys.contains(existing_key))
.collect::<Result<Vec<_>>>()?;
let delete_futs = join_all(deleted_keys.iter().map(|key| {
update_source_entry_with_err_handling(plan, source_op_idx, schema, key, true, pool, &stats)
}));
join(upsert_futs, delete_futs).await;

Ok(SourceUpdateInfo {
source_name: source_name.to_string(),
stats: UpdateStats {
num_insertions: num_new_keys - num_updates,
num_deletions,
num_already_exists: num_updates,
num_errors,
},
stats,
})
}

Expand Down