Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(flow): add some metrics #4539

Merged
merged 5 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/flow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ greptime-proto.workspace = true
# otherwise it is the same with upstream repo
hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" }
itertools.workspace = true
lazy_static.workspace = true
meta-client.workspace = true
minstant = "0.1.7"
nom = "7.1.3"
num-traits = "0.2"
operator.workspace = true
partition.workspace = true
prometheus.workspace = true
prost.workspace = true
query.workspace = true
serde.workspace = true
Expand Down
42 changes: 31 additions & 11 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
use crate::compute::ErrCollector;
use crate::error::{ExternalSnafu, InternalSnafu, TableNotFoundSnafu, UnexpectedSnafu};
use crate::expr::GlobalId;
use crate::metrics::{
METRIC_FLOW_INPUT_BUF_SIZE, METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_RUN_INTERVAL_MS,
};
use crate::repr::{self, DiffRow, Row, BATCH_SIZE};
use crate::transform::sql_to_flow_plan;

Expand Down Expand Up @@ -193,6 +196,15 @@ pub enum DiffRequest {
Delete(Vec<(Row, repr::Timestamp)>),
}

impl DiffRequest {
pub fn len(&self) -> usize {
match self {
Self::Insert(v) => v.len(),
Self::Delete(v) => v.len(),
}
}
}

/// iterate through the diff row and form continuous diff row with same diff type
pub fn diff_row_to_request(rows: Vec<DiffRow>) -> Vec<DiffRequest> {
let mut reqs = Vec::new();
Expand Down Expand Up @@ -544,6 +556,7 @@ impl FlowWorkerManager {
let new_wait = BATCH_SIZE * 1000 / avg_spd.max(1); //in ms
let new_wait = Duration::from_millis(new_wait as u64).min(default_interval);
trace!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt);
METRIC_FLOW_RUN_INTERVAL_MS.set(new_wait.as_millis() as i64);
since_last_run = tokio::time::Instant::now();
tokio::time::sleep(new_wait).await;
}
Expand Down Expand Up @@ -575,7 +588,7 @@ impl FlowWorkerManager {
}
}
// check row send and rows remain in send buf
let (flush_res, buf_len) = if blocking {
let (flush_res, _buf_len) = if blocking {
let ctx = self.node_context.read().await;
(ctx.flush_all_sender().await, ctx.get_send_buf_size().await)
} else {
Expand All @@ -585,16 +598,19 @@ impl FlowWorkerManager {
}
};
match flush_res {
Ok(r) => row_cnt += r,
Ok(r) => {
common_telemetry::trace!("Flushed {} rows", r);
row_cnt += r;
// send buf is likely to be somewhere empty now, wait
if r < BATCH_SIZE / 2 {
break;
}
}
Err(err) => {
common_telemetry::error!("Flush send buf errors: {:?}", err);
break;
}
};
// if not enough rows, break
if buf_len < BATCH_SIZE {
break;
}
}

Ok(row_cnt)
Expand All @@ -606,13 +622,17 @@ impl FlowWorkerManager {
region_id: RegionId,
rows: Vec<DiffRow>,
) -> Result<(), Error> {
debug!(
"Handling write request for region_id={:?} with {} rows",
region_id,
rows.len()
);
let rows_len = rows.len();
let table_id = region_id.table_id();
METRIC_FLOW_INPUT_BUF_SIZE.add(rows_len as _);
let _timer = METRIC_FLOW_INSERT_ELAPSED
.with_label_values(&[table_id.to_string().as_str()])
.start_timer();
self.node_context.read().await.send(table_id, rows).await?;
debug!(
"Handling write request for table_id={} with {} rows",
table_id, rows_len
);
Ok(())
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/flow/src/adapter/flownode_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use store_api::storage::RegionId;

use crate::adapter::FlowWorkerManager;
use crate::error::InternalSnafu;
use crate::metrics::METRIC_FLOW_TASK_COUNT;
use crate::repr::{self, DiffRow};

fn to_meta_err(err: crate::error::Error) -> common_meta::error::Error {
Expand Down Expand Up @@ -78,6 +79,7 @@ impl Flownode for FlowWorkerManager {
)
.await
.map_err(to_meta_err)?;
METRIC_FLOW_TASK_COUNT.inc();
Ok(FlowResponse {
affected_flows: ret
.map(|id| greptime_proto::v1::FlowId { id: id as u32 })
Expand All @@ -92,6 +94,7 @@ impl Flownode for FlowWorkerManager {
self.remove_flow(flow_id.id as u64)
.await
.map_err(to_meta_err)?;
METRIC_FLOW_TASK_COUNT.dec();
Ok(Default::default())
}
Some(flow_request::Body::Flush(FlushFlow {
Expand Down
28 changes: 18 additions & 10 deletions src/flow/src/adapter/node_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! Node context, prone to change with every incoming requests

use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;

use common_telemetry::debug;
Expand All @@ -27,7 +28,8 @@ use crate::adapter::{FlowId, TableName, TableSource};
use crate::error::{Error, EvalSnafu, TableNotFoundSnafu};
use crate::expr::error::InternalSnafu;
use crate::expr::GlobalId;
use crate::repr::{DiffRow, RelationDesc, BROADCAST_CAP};
use crate::metrics::METRIC_FLOW_INPUT_BUF_SIZE;
use crate::repr::{DiffRow, RelationDesc, BROADCAST_CAP, SEND_BUF_CAP};

/// A context that holds the information of the dataflow
#[derive(Default, Debug)]
Expand Down Expand Up @@ -67,18 +69,20 @@ pub struct FlownodeContext {
pub struct SourceSender {
// TODO(discord9): make it all Vec<DiffRow>?
sender: broadcast::Sender<DiffRow>,
send_buf_tx: mpsc::UnboundedSender<Vec<DiffRow>>,
send_buf_rx: RwLock<mpsc::UnboundedReceiver<Vec<DiffRow>>>,
send_buf_tx: mpsc::Sender<Vec<DiffRow>>,
send_buf_rx: RwLock<mpsc::Receiver<Vec<DiffRow>>>,
send_buf_row_cnt: AtomicUsize,
}

impl Default for SourceSender {
fn default() -> Self {
let (send_buf_tx, send_buf_rx) = mpsc::unbounded_channel();
let (send_buf_tx, send_buf_rx) = mpsc::channel(SEND_BUF_CAP);
Self {
// TODO(discord9): found a better way then increase this to prevent lagging and hence missing input data
sender: broadcast::Sender::new(BROADCAST_CAP * 2),
send_buf_tx,
send_buf_rx: RwLock::new(send_buf_rx),
send_buf_row_cnt: AtomicUsize::new(0),
}
}
}
Expand All @@ -94,15 +98,18 @@ impl SourceSender {
/// until send buf is empty or broadchannel is full
pub async fn try_flush(&self) -> Result<usize, Error> {
let mut row_cnt = 0;
let mut iterations = 0;
while iterations < Self::MAX_ITERATIONS {
loop {
let mut send_buf = self.send_buf_rx.write().await;
// if inner sender channel is empty or send buf is empty, there
// is nothing to do for now, just break
if self.sender.len() >= BROADCAST_CAP || send_buf.is_empty() {
break;
}
// TODO(discord9): send rows instead so it's just moving a point
if let Some(rows) = send_buf.recv().await {
let len = rows.len();
self.send_buf_row_cnt
.fetch_sub(len, std::sync::atomic::Ordering::SeqCst);
for row in rows {
self.sender
.send(row)
Expand All @@ -116,10 +123,10 @@ impl SourceSender {
row_cnt += 1;
}
}
iterations += 1;
}
if row_cnt > 0 {
debug!("Send {} rows", row_cnt);
METRIC_FLOW_INPUT_BUF_SIZE.sub(row_cnt as _);
debug!(
"Remaining Send buf.len() = {}",
self.send_buf_rx.read().await.len()
Expand All @@ -131,13 +138,12 @@ impl SourceSender {

/// return number of rows it actual send(including what's in the buffer)
pub async fn send_rows(&self, rows: Vec<DiffRow>) -> Result<usize, Error> {
self.send_buf_tx.send(rows).map_err(|e| {
self.send_buf_tx.send(rows).await.map_err(|e| {
crate::error::InternalSnafu {
reason: format!("Failed to send row, error = {:?}", e),
}
.build()
})?;

Ok(0)
}
}
Expand All @@ -153,7 +159,8 @@ impl FlownodeContext {
.with_context(|| TableNotFoundSnafu {
name: table_id.to_string(),
})?;
// debug!("FlownodeContext::send: trying to send {} rows", rows.len());

debug!("FlownodeContext::send: trying to send {} rows", rows.len());
sender.send_rows(rows).await
}

Expand All @@ -169,6 +176,7 @@ impl FlownodeContext {
}

/// Return the sum number of rows in all send buf
/// TODO(discord9): remove this since we can't get correct row cnt anyway
pub async fn get_send_buf_size(&self) -> usize {
let mut sum = 0;
for sender in self.source_sender.values() {
Expand Down
6 changes: 2 additions & 4 deletions src/flow/src/expr/func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,14 +278,12 @@ impl UnaryFunc {
}
Self::Cast(to) => {
let arg_ty = arg.data_type();
let res = cast(arg, to).context({
cast(arg, to).context({
CastValueSnafu {
from: arg_ty,
to: to.clone(),
}
});
debug!("Cast to type: {to:?}, result: {:?}", res);
res
})
}
Self::TumbleWindowFloor {
window_size,
Expand Down
1 change: 1 addition & 0 deletions src/flow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ mod compute;
pub mod error;
mod expr;
pub mod heartbeat;
mod metrics;
mod plan;
mod repr;
mod server;
Expand Down
33 changes: 33 additions & 0 deletions src/flow/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Some of the metrics used in the flow module.

use lazy_static::lazy_static;
use prometheus::*;

lazy_static! {
pub static ref METRIC_FLOW_TASK_COUNT: IntGauge =
register_int_gauge!("greptime_flow_task_count", "flow task count").unwrap();
pub static ref METRIC_FLOW_INPUT_BUF_SIZE: IntGauge =
register_int_gauge!("greptime_flow_input_buf_size", "flow input buf size").unwrap();
pub static ref METRIC_FLOW_INSERT_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_flow_insert_elapsed",
"flow insert elapsed",
&["table_id"]
)
.unwrap();
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
}
3 changes: 3 additions & 0 deletions src/flow/src/repr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ pub type KeyValDiffRow = ((Row, Row), Timestamp, Diff);
/// TODO(discord9): add config for this, so cpu&mem usage can be balanced and configured by this
pub const BROADCAST_CAP: usize = 65535;

/// The maximum capacity of the send buffer, to prevent the buffer from growing too large
pub const SEND_BUF_CAP: usize = BROADCAST_CAP * 2;

pub const BATCH_SIZE: usize = BROADCAST_CAP / 2;

/// Convert a value that is or can be converted to Datetime to internal timestamp
Expand Down
16 changes: 9 additions & 7 deletions src/flow/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,13 +398,15 @@ impl FlownodeBuilder {
let (tx, rx) = oneshot::channel();

let node_id = self.opts.node_id.map(|id| id as u32);
let _handle = std::thread::spawn(move || {
let (flow_node_manager, mut worker) =
FlowWorkerManager::new_with_worker(node_id, query_engine, table_meta);
let _ = tx.send(flow_node_manager);
info!("Flow Worker started in new thread");
worker.run();
});
let _handle = std::thread::Builder::new()
.name("flow-worker".to_string())
.spawn(move || {
let (flow_node_manager, mut worker) =
FlowWorkerManager::new_with_worker(node_id, query_engine, table_meta);
let _ = tx.send(flow_node_manager);
info!("Flow Worker started in new thread");
worker.run();
});
let man = rx.await.map_err(|_e| {
UnexpectedSnafu {
reason: "sender is dropped, failed to create flow node manager",
Expand Down