Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 75 additions & 84 deletions src/execution/live_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::prelude::*;
use super::stats;
use futures::future::try_join_all;
use sqlx::PgPool;
use std::time::Instant;
use tokio::{task::JoinSet, time::MissedTickBehavior};

pub struct FlowLiveUpdater {
Expand All @@ -22,12 +21,6 @@ pub struct FlowLiveUpdaterOptions {
pub print_stats: bool,
}

struct StatsReportState {
last_report_time: Option<Instant>,
last_stats: stats::UpdateStats,
}

const MIN_REPORT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(5);
const REPORT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10);

struct SharedAckFn {
Expand Down Expand Up @@ -74,32 +67,18 @@ async fn update_source(

let import_op = &plan.import_ops[source_idx];

let stats_report_state = Mutex::new(StatsReportState {
last_report_time: None,
last_stats: source_update_stats.as_ref().clone(),
});
let report_stats = || {
let new_stats = source_update_stats.as_ref().clone();
let now = Instant::now();
let delta = {
let mut state = stats_report_state.lock().unwrap();
if let Some(last_report_time) = state.last_report_time {
if now.duration_since(last_report_time) < MIN_REPORT_INTERVAL {
return;
}
}
let delta = new_stats.delta(&state.last_stats);
if delta.is_zero() {
return;
}
state.last_stats = new_stats;
state.last_report_time = Some(now);
delta
};
let report_stats = |stats: &stats::UpdateStats, kind: &str| {
source_update_stats.merge(stats);
if options.print_stats {
println!("{}.{}: {}", flow.flow_instance.name, import_op.name, delta);
println!(
"{}.{} ({kind}): {}",
flow.flow_instance.name, import_op.name, stats
);
} else {
trace!("{}.{}: {}", flow.flow_instance.name, import_op.name, delta);
trace!(
"{}.{} ({kind}): {}",
flow.flow_instance.name, import_op.name, stats
);
}
};

Expand All @@ -108,68 +87,78 @@ async fn update_source(
// Deal with change streams.
if options.live_mode {
if let Some(change_stream) = import_op.executor.change_stream().await? {
let pool = pool.clone();
let source_update_stats = source_update_stats.clone();
let change_stream_stats = Arc::new(stats::UpdateStats::default());
futs.push(
async move {
let mut change_stream = change_stream;
let retry_options = retryable::RetryOptions {
max_retries: None,
initial_backoff: std::time::Duration::from_secs(5),
max_backoff: std::time::Duration::from_secs(60),
};
loop {
// Workaround as AsyncFnMut isn't mature yet.
// Should be changed to use AsyncFnMut once it is.
let change_stream = tokio::sync::Mutex::new(&mut change_stream);
let change_msg = retryable::run(
|| async {
let mut change_stream = change_stream.lock().await;
change_stream
.next()
.await
.transpose()
.map_err(retryable::Error::always_retryable)
},
&retry_options,
)
.await?;
let change_msg = if let Some(change_msg) = change_msg {
change_msg
} else {
break;
{
let change_stream_stats = change_stream_stats.clone();
let pool = pool.clone();
async move {
let mut change_stream = change_stream;
let retry_options = retryable::RetryOptions {
max_retries: None,
initial_backoff: std::time::Duration::from_secs(5),
max_backoff: std::time::Duration::from_secs(60),
};
let ack_fn = change_msg.ack_fn.map(|ack_fn| {
Arc::new(Mutex::new(SharedAckFn::new(
change_msg.changes.iter().len(),
ack_fn,
)))
});
for change in change_msg.changes {
let ack_fn = ack_fn.clone();
tokio::spawn(source_context.clone().process_source_key(
change.key,
change.data,
source_update_stats.clone(),
ack_fn.map(|ack_fn| {
move || async move { SharedAckFn::ack(&ack_fn).await }
}),
pool.clone(),
));
loop {
// Workaround as AsyncFnMut isn't mature yet.
// Should be changed to use AsyncFnMut once it is.
let change_stream = tokio::sync::Mutex::new(&mut change_stream);
let change_msg = retryable::run(
|| async {
let mut change_stream = change_stream.lock().await;
change_stream
.next()
.await
.transpose()
.map_err(retryable::Error::always_retryable)
},
&retry_options,
)
.await?;
let change_msg = if let Some(change_msg) = change_msg {
change_msg
} else {
break;
};
let ack_fn = change_msg.ack_fn.map(|ack_fn| {
Arc::new(Mutex::new(SharedAckFn::new(
change_msg.changes.iter().len(),
ack_fn,
)))
});
for change in change_msg.changes {
let ack_fn = ack_fn.clone();
tokio::spawn(source_context.clone().process_source_key(
change.key,
change.data,
change_stream_stats.clone(),
ack_fn.map(|ack_fn| {
move || async move { SharedAckFn::ack(&ack_fn).await }
}),
pool.clone(),
));
}
}
Ok(())
}
Ok(())
}
.boxed(),
);

futs.push(
async move {
let mut interval = tokio::time::interval(REPORT_INTERVAL);
let mut last_change_stream_stats = change_stream_stats.as_ref().clone();
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
interval.tick().await;
loop {
interval.tick().await;
report_stats();
let curr_change_stream_stats = change_stream_stats.as_ref().clone();
let delta = curr_change_stream_stats.delta(&last_change_stream_stats);
if !delta.has_any_change() {
report_stats(&delta, "change stream");
last_change_stream_stats = curr_change_stream_stats;
}
}
}
.boxed(),
Expand All @@ -178,11 +167,11 @@ async fn update_source(
}

// The main update loop.
let source_update_stats = source_update_stats.clone();
futs.push(
async move {
source_context.update(&pool, &source_update_stats).await?;
report_stats();
let update_stats = Arc::new(stats::UpdateStats::default());
source_context.update(&pool, &update_stats).await?;
report_stats(&update_stats, "batch update");

if let (true, Some(refresh_interval)) = (
options.live_mode,
Expand All @@ -193,8 +182,10 @@ async fn update_source(
interval.tick().await;
loop {
interval.tick().await;
source_context.update(&pool, &source_update_stats).await?;
report_stats();

let update_stats = Arc::new(stats::UpdateStats::default());
source_context.update(&pool, &update_stats).await?;
report_stats(&update_stats, "interval refresh");
}
}
Ok(())
Expand Down
26 changes: 19 additions & 7 deletions src/execution/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ impl Counter {
pub fn into_inner(self) -> i64 {
self.0.into_inner()
}

pub fn merge(&self, delta: &Self) {
self.0.fetch_add(delta.get(), Relaxed);
}
}

impl AddAssign for Counter {
Expand Down Expand Up @@ -74,13 +78,21 @@ impl UpdateStats {
}
}

pub fn is_zero(&self) -> bool {
self.num_no_change.get() == 0
&& self.num_insertions.get() == 0
&& self.num_deletions.get() == 0
&& self.num_updates.get() == 0
&& self.num_reprocesses.get() == 0
&& self.num_errors.get() == 0
pub fn merge(&self, delta: &Self) {
self.num_no_change.merge(&delta.num_no_change);
self.num_insertions.merge(&delta.num_insertions);
self.num_deletions.merge(&delta.num_deletions);
self.num_updates.merge(&delta.num_updates);
self.num_reprocesses.merge(&delta.num_reprocesses);
self.num_errors.merge(&delta.num_errors);
}

pub fn has_any_change(&self) -> bool {
self.num_insertions.get() > 0
|| self.num_deletions.get() > 0
|| self.num_updates.get() > 0
|| self.num_reprocesses.get() > 0
|| self.num_errors.get() > 0
}
}

Expand Down