Skip to content

Commit

Permalink
refactor: use bounded channel anyway
Browse files Browse the repository at this point in the history
  • Loading branch information
discord9 committed Aug 13, 2024
1 parent 61b309b commit dc14c84
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 23 deletions.
3 changes: 2 additions & 1 deletion src/flow/src/adapter/flownode_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use store_api::storage::RegionId;

use crate::adapter::FlowWorkerManager;
use crate::error::InternalSnafu;
use crate::repr::{self, DiffRow};
use crate::metrics::METRIC_FLOW_TASK_COUNT;
use crate::repr::{self, DiffRow};

fn to_meta_err(err: crate::error::Error) -> common_meta::error::Error {
// TODO(discord9): refactor this
Expand Down Expand Up @@ -94,6 +94,7 @@ impl Flownode for FlowWorkerManager {
self.remove_flow(flow_id.id as u64)
.await
.map_err(to_meta_err)?;
METRIC_FLOW_TASK_COUNT.dec();
Ok(Default::default())
}
Some(flow_request::Body::Flush(FlushFlow {
Expand Down
27 changes: 6 additions & 21 deletions src/flow/src/adapter/node_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::Duration;

use common_telemetry::debug;
use session::context::QueryContext;
Expand Down Expand Up @@ -70,14 +69,14 @@ pub struct FlownodeContext {
pub struct SourceSender {
// TODO(discord9): make it all Vec<DiffRow>?
sender: broadcast::Sender<DiffRow>,
send_buf_tx: mpsc::UnboundedSender<Vec<DiffRow>>,
send_buf_rx: RwLock<mpsc::UnboundedReceiver<Vec<DiffRow>>>,
send_buf_tx: mpsc::Sender<Vec<DiffRow>>,
send_buf_rx: RwLock<mpsc::Receiver<Vec<DiffRow>>>,
send_buf_row_cnt: AtomicUsize,
}

impl Default for SourceSender {
fn default() -> Self {
let (send_buf_tx, send_buf_rx) = mpsc::unbounded_channel();
let (send_buf_tx, send_buf_rx) = mpsc::channel(SEND_BUF_CAP);
Self {
// TODO(discord9): found a better way then increase this to prevent lagging and hence missing input data
sender: broadcast::Sender::new(BROADCAST_CAP * 2),
Expand Down Expand Up @@ -139,12 +138,7 @@ impl SourceSender {

/// return number of rows it actual send(including what's in the buffer)
pub async fn send_rows(&self, rows: Vec<DiffRow>) -> Result<usize, Error> {
let len = rows.len();
self.send_buf_row_cnt
.fetch_add(len, std::sync::atomic::Ordering::SeqCst);
self.send_buf_tx.send(rows).map_err(|e| {
self.send_buf_row_cnt
.fetch_sub(len, std::sync::atomic::Ordering::SeqCst);
self.send_buf_tx.send(rows).await.map_err(|e| {
crate::error::InternalSnafu {
reason: format!("Failed to send row, error = {:?}", e),
}
Expand All @@ -165,17 +159,8 @@ impl FlownodeContext {
.with_context(|| TableNotFoundSnafu {
name: table_id.to_string(),
})?;
// wait until send buf is not full
while sender
.send_buf_row_cnt
.load(std::sync::atomic::Ordering::SeqCst)
>= SEND_BUF_CAP
{
// TODO: maybe just using bounded sender instead of this
common_telemetry::debug!("send buf is full, waiting for flow worker to process");
tokio::time::sleep(Duration::from_millis(10)).await;
}
// debug!("FlownodeContext::send: trying to send {} rows", rows.len());

debug!("FlownodeContext::send: trying to send {} rows", rows.len());
sender.send_rows(rows).await
}

Expand Down
2 changes: 1 addition & 1 deletion src/flow/src/repr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub type KeyValDiffRow = ((Row, Row), Timestamp, Diff);
/// TODO(discord9): add config for this, so cpu&mem usage can be balanced and configured by this
pub const BROADCAST_CAP: usize = 65535;

/// The maxmium capacity of the send buffer, to prevent the buffer from growing too large
/// The maximum capacity of the send buffer, to prevent the buffer from growing too large
pub const SEND_BUF_CAP: usize = BROADCAST_CAP * 2;

pub const BATCH_SIZE: usize = BROADCAST_CAP / 2;
Expand Down

0 comments on commit dc14c84

Please sign in to comment.