Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 89 additions & 22 deletions src/execution/live_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use std::time::Instant;
use crate::prelude::*;

use super::stats;
use futures::future::try_join_all;
use sqlx::PgPool;
use tokio::task::JoinSet;
use tokio::{task::JoinSet, time::MissedTickBehavior};

pub struct FlowLiveUpdater {
flow_ctx: Arc<FlowContext>,
Expand All @@ -22,6 +23,14 @@ pub struct FlowLiveUpdaterOptions {
pub print_stats: bool,
}

struct StatsReportState {
last_report_time: Option<Instant>,
last_stats: stats::UpdateStats,
}

const MIN_REPORT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(5);
const REPORT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10);

async fn update_source(
flow_ctx: Arc<FlowContext>,
plan: Arc<plan::ExecutionPlan>,
Expand All @@ -35,44 +44,102 @@ async fn update_source(
.await?;

let import_op = &plan.import_ops[source_idx];
let maybe_print_stats = |stats: &stats::UpdateStats| {

let stats_report_state = Mutex::new(StatsReportState {
last_report_time: None,
last_stats: source_update_stats.as_ref().clone(),
});
let report_stats = || {
let new_stats = source_update_stats.as_ref().clone();
let now = Instant::now();
let delta = {
let mut state = stats_report_state.lock().unwrap();
if let Some(last_report_time) = state.last_report_time {
if now.duration_since(last_report_time) < MIN_REPORT_INTERVAL {
return;
}
}
let delta = new_stats.delta(&state.last_stats);
if delta.is_zero() {
return;
}
state.last_stats = new_stats;
state.last_report_time = Some(now);
delta
};
if options.print_stats {
println!(
"{}.{}: {}",
flow_ctx.flow.flow_instance.name, import_op.name, stats
flow_ctx.flow.flow_instance.name, import_op.name, delta
);
} else {
trace!(
"{}.{}: {}",
flow_ctx.flow.flow_instance.name,
import_op.name,
stats
delta
);
}
};

let mut update_start = Instant::now();
source_context.update(&pool, &source_update_stats).await?;
maybe_print_stats(&source_update_stats);

if let (true, Some(refresh_interval)) = (
options.live_mode,
import_op.refresh_options.refresh_interval,
) {
let mut last_stats = source_update_stats.as_ref().clone();
loop {
let elapsed = update_start.elapsed();
if elapsed < refresh_interval {
tokio::time::sleep(refresh_interval - elapsed).await;
let mut futs: Vec<BoxFuture<'_, Result<()>>> = Vec::new();

// Deal with change streams.
if let (true, Some(change_stream)) = (options.live_mode, import_op.executor.change_stream()) {
let pool = pool.clone();
let source_update_stats = source_update_stats.clone();
futs.push(
async move {
let mut change_stream = change_stream;
while let Some(change) = change_stream.next().await {
source_context
.process_change(change, &pool, &source_update_stats)
.map(tokio::spawn);
}
Ok(())
}
update_start = Instant::now();
.boxed(),
);
futs.push(
async move {
let mut interval = tokio::time::interval(REPORT_INTERVAL);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
interval.tick().await;
loop {
interval.tick().await;
report_stats();
}
}
.boxed(),
);
}

// The main update loop.
let source_update_stats = source_update_stats.clone();
futs.push(
async move {
source_context.update(&pool, &source_update_stats).await?;
report_stats();

let this_stats = source_update_stats.as_ref().clone();
maybe_print_stats(&this_stats.delta(&last_stats));
last_stats = this_stats;
if let (true, Some(refresh_interval)) = (
options.live_mode,
import_op.refresh_options.refresh_interval,
) {
let mut interval = tokio::time::interval(refresh_interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
interval.tick().await;
loop {
interval.tick().await;
source_context.update(&pool, &source_update_stats).await?;
report_stats();
}
}
Ok(())
}
}
.boxed(),
);

try_join_all(futs).await?;
Ok(())
}

Expand Down
73 changes: 52 additions & 21 deletions src/execution/source_indexer.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use crate::prelude::*;
use crate::{ops::interface::SourceValueChange, prelude::*};

use sqlx::PgPool;
use std::collections::{hash_map, HashMap};
use tokio::{sync::Semaphore, task::JoinSet};

use super::{
db_tracking,
row_indexer::{self, SkippedOr, SourceVersion},
row_indexer::{self, SkippedOr, SourceVersion, SourceVersionKind},
stats,
};
struct SourceRowIndexingState {
Expand Down Expand Up @@ -78,21 +78,23 @@ impl SourceIndexingContext {
})
}

fn process_source_key(
async fn process_source_key(
self: Arc<Self>,
key: value::KeyValue,
source_version: SourceVersion,
value: Option<value::FieldValues>,
update_stats: Arc<stats::UpdateStats>,
processing_sem: Arc<Semaphore>,
pool: PgPool,
join_set: &mut JoinSet<Result<()>>,
) {
let fut = async move {
let process = async move {
let permit = processing_sem.acquire().await?;
let plan = self.flow.get_execution_plan().await?;
let import_op = &plan.import_ops[self.source_idx];
let source_value = if source_version.kind == row_indexer::SourceVersionKind::Deleted {
None
} else if let Some(value) = value {
Some(value)
} else {
// Even if the source version kind is not Deleted, the source value might be gone one polling.
// In this case, we still use the current source version even if it's already stale - actually this version skew
Expand Down Expand Up @@ -154,17 +156,19 @@ impl SourceIndexingContext {
drop(permit);
anyhow::Ok(())
};
join_set.spawn(fut);
if let Err(e) = process.await {
error!("{:?}", e.context("Error in processing a source row"));
}
}

fn process_source_key_if_newer(
self: &Arc<Self>,
key: value::KeyValue,
source_version: SourceVersion,
value: Option<value::FieldValues>,
update_stats: &Arc<stats::UpdateStats>,
pool: &PgPool,
join_set: &mut JoinSet<Result<()>>,
) {
) -> Option<impl Future<Output = ()> + Send + 'static> {
let processing_sem = {
let mut state = self.state.lock().unwrap();
let scan_generation = state.scan_generation;
Expand All @@ -174,19 +178,19 @@ impl SourceIndexingContext {
.source_version
.should_skip(&source_version, Some(&update_stats))
{
return;
return None;
}
row_state.source_version = source_version.clone();
row_state.processing_sem.clone()
};
self.clone().process_source_key(
Some(self.clone().process_source_key(
key,
source_version,
value,
update_stats.clone(),
processing_sem,
pool.clone(),
join_set,
);
))
}

pub async fn update(
Expand All @@ -212,15 +216,18 @@ impl SourceIndexingContext {
self.process_source_key_if_newer(
row.key,
SourceVersion::from_current(row.ordinal),
None,
update_stats,
pool,
&mut join_set,
);
)
.map(|fut| join_set.spawn(fut));
}
}
while let Some(result) = join_set.join_next().await {
if let Err(e) = (|| anyhow::Ok(result??))() {
error!("{:?}", e.context("Error in indexing a source row"));
if let Err(e) = result {
if !e.is_cancelled() {
error!("{:?}", e);
}
}
}

Expand All @@ -239,21 +246,45 @@ impl SourceIndexingContext {
deleted_key_versions
};
for (key, source_version, processing_sem) in deleted_key_versions {
self.clone().process_source_key(
join_set.spawn(self.clone().process_source_key(
key,
source_version,
None,
update_stats.clone(),
processing_sem,
pool.clone(),
&mut join_set,
);
));
}
while let Some(result) = join_set.join_next().await {
if let Err(e) = (|| anyhow::Ok(result??))() {
error!("{:?}", e.context("Error in deleting a source row"));
if let Err(e) = result {
if !e.is_cancelled() {
error!("{:?}", e);
}
}
}

Ok(())
}

pub fn process_change(
self: &Arc<Self>,
change: interface::SourceChange,
pool: &PgPool,
update_stats: &Arc<stats::UpdateStats>,
) -> Option<impl Future<Output = ()> + Send + 'static> {
let (source_version_kind, value) = match change.value {
SourceValueChange::Upsert(value) => (SourceVersionKind::CurrentLogic, value),
SourceValueChange::Delete => (SourceVersionKind::Deleted, None),
};
self.process_source_key_if_newer(
change.key,
SourceVersion {
ordinal: change.ordinal,
kind: source_version_kind,
},
value,
update_stats,
pool,
)
}
}
12 changes: 12 additions & 0 deletions src/execution/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ impl UpdateStats {
num_errors: self.num_errors.delta(&base.num_errors),
}
}

pub fn is_zero(&self) -> bool {
self.num_skipped.get() == 0
&& self.num_insertions.get() == 0
&& self.num_deletions.get() == 0
&& self.num_repreocesses.get() == 0
&& self.num_errors.get() == 0
}
}

impl std::fmt::Display for UpdateStats {
Expand Down Expand Up @@ -97,6 +105,10 @@ impl std::fmt::Display for UpdateStats {
": {num_insertions} added, {num_deletions} removed, {num_reprocesses} repocessed",
)?;
}

if num_skipped == 0 && num_source_rows == 0 {
write!(f, "no changes")?;
}
Ok(())
}
}
Expand Down
13 changes: 9 additions & 4 deletions src/ops/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,17 @@ pub struct SourceRowMetadata {
pub ordinal: Option<Ordinal>,
}

pub struct SourceChange<'a> {
pub enum SourceValueChange {
/// None means value unavailable in this change - needs a separate poll by get_value() API.
Upsert(Option<FieldValues>),
Delete,
}

pub struct SourceChange {
/// Last update/deletion ordinal. None means unavailable.
pub ordinal: Option<Ordinal>,
pub key: KeyValue,
/// None means a deletion. None within the `BoxFuture` means the item is gone when polling.
pub value: Option<BoxFuture<'a, Result<Option<FieldValues>>>>,
pub value: SourceValueChange,
}

#[derive(Debug, Default)]
Expand All @@ -70,7 +75,7 @@ pub trait SourceExecutor: Send + Sync {
// Get the value for the given key.
async fn get_value(&self, key: &KeyValue) -> Result<Option<FieldValues>>;

fn change_stream<'a>(&'a self) -> Option<BoxStream<'a, SourceChange<'a>>> {
fn change_stream<'a>(&'a self) -> Option<BoxStream<'a, SourceChange>> {
None
}
}
Expand Down