Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/execution/dumper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use yaml_rust2::YamlEmitter;

use super::indexer;
use super::memoization::EvaluationMemoryOptions;
use super::row_indexer;
use crate::base::{schema, value};
use crate::builder::plan::{AnalyzedSourceOp, ExecutionPlan};
use crate::ops::interface::SourceExecutorListOptions;
Expand Down Expand Up @@ -76,7 +76,7 @@ impl<'a> Dumper<'a> {
where
'a: 'b,
{
let data_builder = indexer::evaluate_source_entry_with_memory(
let data_builder = row_indexer::evaluate_source_entry_with_memory(
self.plan,
source_op,
self.schema,
Expand Down Expand Up @@ -113,8 +113,10 @@ impl<'a> Dumper<'a> {
data: collected_values_buffer[collector_idx]
.iter()
.map(|v| -> Result<_> {
let key =
indexer::extract_primary_key(&export_op.primary_key_def, v)?;
let key = row_indexer::extract_primary_key(
&export_op.primary_key_def,
v,
)?;
Ok((key, v))
})
.collect::<Result<_>>()?,
Expand Down
9 changes: 5 additions & 4 deletions src/execution/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
pub(crate) mod db_tracking_setup;
pub(crate) mod dumper;
pub(crate) mod evaluator;
pub(crate) mod indexer;
pub(crate) mod memoization;
pub(crate) mod query;
pub(crate) mod row_indexer;
pub(crate) mod source_indexer;
pub(crate) mod stats;

mod db_tracking;
pub mod db_tracking_setup;

pub(crate) mod memoization;
157 changes: 11 additions & 146 deletions src/execution/indexer.rs → src/execution/row_indexer.rs
Original file line number Diff line number Diff line change
@@ -1,92 +1,24 @@
use crate::prelude::*;

use futures::future::{join, join_all, try_join_all};
use itertools::Itertools;
use futures::future::try_join_all;
use log::error;
use serde::Serialize;
use sqlx::PgPool;
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};
use std::sync::atomic::Ordering::Relaxed;

use super::db_tracking::{self, read_source_tracking_info_for_processing, TrackedTargetKey};
use super::db_tracking_setup;
use super::evaluator::{evaluate_source_entry, ScopeValueBuilder};
use super::memoization::{EvaluationMemory, EvaluationMemoryOptions, StoredMemoizationInfo};
use super::stats;

use crate::base::schema;
use crate::base::value::{self, FieldValues, KeyValue};
use crate::builder::plan::*;
use crate::ops::interface::{
ExportTargetMutation, ExportTargetUpsertEntry, Ordinal, SourceExecutorListOptions,
};
use crate::ops::interface::{ExportTargetMutation, ExportTargetUpsertEntry, Ordinal};
use crate::utils::db::WriteAction;
use crate::utils::fingerprint::{Fingerprint, Fingerprinter};

use super::evaluator::{evaluate_source_entry, ScopeValueBuilder};

#[derive(Debug, Serialize, Default)]
pub struct UpdateStats {
pub num_skipped: AtomicUsize,
pub num_insertions: AtomicUsize,
pub num_deletions: AtomicUsize,
pub num_repreocesses: AtomicUsize,
pub num_errors: AtomicUsize,
}

impl std::fmt::Display for UpdateStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let num_skipped = self.num_skipped.load(Relaxed);
if num_skipped > 0 {
write!(f, "{} rows skipped", num_skipped)?;
}

let num_insertions = self.num_insertions.load(Relaxed);
let num_deletions = self.num_deletions.load(Relaxed);
let num_reprocesses = self.num_repreocesses.load(Relaxed);
let num_source_rows = num_insertions + num_deletions + num_reprocesses;
if num_source_rows > 0 {
if num_skipped > 0 {
write!(f, ", ")?;
}
write!(f, "{num_source_rows} source rows processed",)?;

let num_errors = self.num_errors.load(Relaxed);
if num_errors > 0 {
write!(f, " with {num_errors} ERRORS",)?;
}
write!(
f,
": {num_insertions} added, {num_deletions} removed, {num_reprocesses} repocessed",
)?;
}
Ok(())
}
}

#[derive(Debug, Serialize)]
pub struct SourceUpdateInfo {
pub source_name: String,
pub stats: UpdateStats,
}

impl std::fmt::Display for SourceUpdateInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}: {}", self.source_name, self.stats)
}
}

#[derive(Debug, Serialize)]
pub struct IndexUpdateInfo {
pub sources: Vec<SourceUpdateInfo>,
}

impl std::fmt::Display for IndexUpdateInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for source in self.sources.iter() {
writeln!(f, "{}", source)?;
}
Ok(())
}
}

pub fn extract_primary_key(
primary_key_def: &AnalyzedPrimaryKeyDef,
record: &FieldValues,
Expand Down Expand Up @@ -470,14 +402,14 @@ pub async fn evaluate_source_entry_with_memory(
Ok(Some(output))
}

pub async fn update_source_entry(
pub async fn update_source_row(
plan: &ExecutionPlan,
source_op: &AnalyzedSourceOp,
schema: &schema::DataSchema,
key: &value::KeyValue,
only_for_deletion: bool,
pool: &PgPool,
stats: &UpdateStats,
stats: &stats::UpdateStats,
) -> Result<()> {
let source_key_json = serde_json::to_value(key)?;
let process_timestamp = chrono::Utc::now();
Expand Down Expand Up @@ -617,85 +549,18 @@ pub async fn update_source_entry(
Ok(())
}

async fn update_source_entry_with_err_handling(
pub(super) async fn update_source_row_with_err_handling(
plan: &ExecutionPlan,
source_op: &AnalyzedSourceOp,
schema: &schema::DataSchema,
key: &value::KeyValue,
only_for_deletion: bool,
pool: &PgPool,
stats: &UpdateStats,
stats: &stats::UpdateStats,
) {
let r = update_source_entry(plan, source_op, schema, key, only_for_deletion, pool, stats).await;
let r = update_source_row(plan, source_op, schema, key, only_for_deletion, pool, stats).await;
if let Err(e) = r {
stats.num_errors.fetch_add(1, Relaxed);
error!("{:?}", e.context("Error in indexing a source row"));
}
}

async fn update_source(
source_name: &str,
plan: &ExecutionPlan,
source_op: &AnalyzedSourceOp,
schema: &schema::DataSchema,
pool: &PgPool,
) -> Result<SourceUpdateInfo> {
let existing_keys_json = db_tracking::list_source_tracking_keys(
source_op.source_id,
&plan.tracking_table_setup,
pool,
)
.await?;

let mut keys = Vec::new();
let mut rows_stream = source_op.executor.list(SourceExecutorListOptions {
include_ordinal: false,
});
while let Some(rows) = rows_stream.next().await {
keys.extend(rows?.into_iter().map(|row| row.key));
}

let stats = UpdateStats::default();
let upsert_futs = join_all(keys.iter().map(|key| {
update_source_entry_with_err_handling(plan, source_op, schema, key, false, pool, &stats)
}));
let deleted_keys = existing_keys_json
.into_iter()
.map(|existing_key_json| {
value::Value::<value::ScopeValue>::from_json(
existing_key_json.source_key,
&source_op.primary_key_type,
)?
.as_key()
})
.filter_ok(|existing_key| !keys.contains(existing_key))
.collect::<Result<Vec<_>>>()?;
let delete_futs = join_all(deleted_keys.iter().map(|key| {
update_source_entry_with_err_handling(plan, source_op, schema, key, true, pool, &stats)
}));
join(upsert_futs, delete_futs).await;

Ok(SourceUpdateInfo {
source_name: source_name.to_string(),
stats,
})
}

pub async fn update(
plan: &ExecutionPlan,
schema: &schema::DataSchema,
pool: &PgPool,
) -> Result<IndexUpdateInfo> {
let source_update_stats = try_join_all(
plan.source_ops
.iter()
.map(|source_op| async move {
update_source(source_op.name.as_str(), plan, source_op, schema, pool).await
})
.collect::<Vec<_>>(),
)
.await?;
Ok(IndexUpdateInfo {
sources: source_update_stats,
})
}
78 changes: 78 additions & 0 deletions src/execution/source_indexer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use crate::prelude::*;

use super::{db_tracking, row_indexer, stats};
use futures::future::{join, join_all, try_join_all};
use sqlx::PgPool;

async fn update_source(
source_name: &str,
plan: &plan::ExecutionPlan,
source_op: &plan::AnalyzedSourceOp,
schema: &schema::DataSchema,
pool: &PgPool,
) -> Result<stats::SourceUpdateInfo> {
let existing_keys_json = db_tracking::list_source_tracking_keys(
source_op.source_id,
&plan.tracking_table_setup,
pool,
)
.await?;

let mut keys = Vec::new();
let mut rows_stream = source_op
.executor
.list(interface::SourceExecutorListOptions {
include_ordinal: false,
});
while let Some(rows) = rows_stream.next().await {
keys.extend(rows?.into_iter().map(|row| row.key));
}

let stats = stats::UpdateStats::default();
let upsert_futs = join_all(keys.iter().map(|key| {
row_indexer::update_source_row_with_err_handling(
plan, source_op, schema, key, false, pool, &stats,
)
}));
let deleted_keys = existing_keys_json
.into_iter()
.map(|existing_key_json| {
value::Value::<value::ScopeValue>::from_json(
existing_key_json.source_key,
&source_op.primary_key_type,
)?
.as_key()
})
.filter_ok(|existing_key| !keys.contains(existing_key))
.collect::<Result<Vec<_>>>()?;
let delete_futs = join_all(deleted_keys.iter().map(|key| {
row_indexer::update_source_row_with_err_handling(
plan, source_op, schema, key, true, pool, &stats,
)
}));
join(upsert_futs, delete_futs).await;

Ok(stats::SourceUpdateInfo {
source_name: source_name.to_string(),
stats,
})
}

pub async fn update(
plan: &plan::ExecutionPlan,
schema: &schema::DataSchema,
pool: &PgPool,
) -> Result<stats::IndexUpdateInfo> {
let source_update_stats = try_join_all(
plan.source_ops
.iter()
.map(|source_op| async move {
update_source(source_op.name.as_str(), plan, source_op, schema, pool).await
})
.collect::<Vec<_>>(),
)
.await?;
Ok(stats::IndexUpdateInfo {
sources: source_update_stats,
})
}
68 changes: 68 additions & 0 deletions src/execution/stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use crate::prelude::*;

use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};

#[derive(Debug, Serialize, Default)]
pub struct UpdateStats {
pub num_skipped: AtomicUsize,
pub num_insertions: AtomicUsize,
pub num_deletions: AtomicUsize,
pub num_repreocesses: AtomicUsize,
pub num_errors: AtomicUsize,
}

impl std::fmt::Display for UpdateStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let num_skipped = self.num_skipped.load(Relaxed);
if num_skipped > 0 {
write!(f, "{} rows skipped", num_skipped)?;
}

let num_insertions = self.num_insertions.load(Relaxed);
let num_deletions = self.num_deletions.load(Relaxed);
let num_reprocesses = self.num_repreocesses.load(Relaxed);
let num_source_rows = num_insertions + num_deletions + num_reprocesses;
if num_source_rows > 0 {
if num_skipped > 0 {
write!(f, ", ")?;
}
write!(f, "{num_source_rows} source rows processed",)?;

let num_errors = self.num_errors.load(Relaxed);
if num_errors > 0 {
write!(f, " with {num_errors} ERRORS",)?;
}
write!(
f,
": {num_insertions} added, {num_deletions} removed, {num_reprocesses} repocessed",
)?;
}
Ok(())
}
}

#[derive(Debug, Serialize)]
pub struct SourceUpdateInfo {
pub source_name: String,
pub stats: UpdateStats,
}

impl std::fmt::Display for SourceUpdateInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}: {}", self.source_name, self.stats)
}
}

#[derive(Debug, Serialize)]
pub struct IndexUpdateInfo {
pub sources: Vec<SourceUpdateInfo>,
}

impl std::fmt::Display for IndexUpdateInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for source in self.sources.iter() {
writeln!(f, "{}", source)?;
}
Ok(())
}
}
Loading