Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Optimize export metric behavior #3047

Merged
merged 4 commits into from Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 3 additions & 4 deletions config/datanode.example.toml
Expand Up @@ -129,11 +129,10 @@ parallel_scan_channel_size = 32
# [export_metrics]
# whether enable export metrics, default is false
# enable = false
# The url of metrics export endpoint, default is `frontend` default HTTP endpoint.
# endpoint = "127.0.0.1:4000"
# The database name of exported metrics stores, user needs to specify a valid database
# db = ""
# The interval of export metrics
# write_interval = "30s"
# [export_metrics.remote_write]
# The url the metrics send to. The url is empty by default, url example: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`
# url = ""
# HTTP headers of Prometheus remote-write carry
# headers = {}
9 changes: 3 additions & 6 deletions config/frontend.example.toml
Expand Up @@ -87,11 +87,8 @@ tcp_nodelay = true
# [export_metrics]
# whether enable export metrics, default is false
# enable = false
# The url of metrics export endpoint, default is `frontend` default HTTP endpoint.
# endpoint = "127.0.0.1:4000"
# The database name of exported metrics stores, user needs to specify a valid database
# db = ""
# The interval of export metrics
# write_interval = "30s"
# HTTP headers of Prometheus remote-write carry
# headers = {}
# for `frontend`, `self_import` is recommend to collect metrics generated by itself
# [export_metrics.self_import]
# db = "information_schema"
7 changes: 3 additions & 4 deletions config/metasrv.example.toml
Expand Up @@ -86,11 +86,10 @@ provider = "raft_engine"
# [export_metrics]
# whether enable export metrics, default is false
# enable = false
# The url of metrics export endpoint, default is `frontend` default HTTP endpoint.
# endpoint = "127.0.0.1:4000"
# The database name of exported metrics stores, user needs to specify a valid database
# db = ""
# The interval of export metrics
# write_interval = "30s"
# [export_metrics.remote_write]
# The url the metrics send to. The url is empty by default, url example: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`
# url = ""
# HTTP headers of Prometheus remote-write carry
# headers = {}
9 changes: 3 additions & 6 deletions config/standalone.example.toml
Expand Up @@ -230,11 +230,8 @@ parallel_scan_channel_size = 32
# [export_metrics]
# whether enable export metrics, default is false
# enable = false
# The url of metrics export endpoint, default is `frontend` default HTTP endpoint.
# endpoint = "127.0.0.1:4000"
# The database name of exported metrics stores, user needs to specify a valid database
# db = ""
# The interval of export metrics
# write_interval = "30s"
# HTTP headers of Prometheus remote-write carry
# headers = {}
# for `standalone`, `self_import` is recommend to collect metrics generated by itself
# [export_metrics.self_import]
# db = "information_schema"
Taylor-lagrange marked this conversation as resolved.
Show resolved Hide resolved
4 changes: 0 additions & 4 deletions src/cmd/src/frontend.rs
Expand Up @@ -252,10 +252,6 @@ impl StartCommand {
.await
.context(StartFrontendSnafu)?;

instance
.build_export_metrics_task(&opts.export_metrics)
.context(StartFrontendSnafu)?;

instance
.build_servers(opts)
.await
Expand Down
4 changes: 0 additions & 4 deletions src/cmd/src/standalone.rs
Expand Up @@ -425,10 +425,6 @@ impl StartCommand {
.await
.context(StartFrontendSnafu)?;

frontend
.build_export_metrics_task(&opts.frontend.export_metrics)
.context(StartFrontendSnafu)?;

frontend
.build_servers(opts)
.await
Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/datanode.rs
Expand Up @@ -98,7 +98,7 @@ impl Datanode {
self.start_telemetry();

if let Some(t) = self.export_metrics_task.as_ref() {
t.start()
t.start(None).context(StartServerSnafu)?
}

self.start_services().await
Expand Down
23 changes: 15 additions & 8 deletions src/frontend/src/instance.rs
Expand Up @@ -55,7 +55,7 @@ use query::QueryEngineRef;
use raft_engine::{Config, ReadableSize, RecoveryMode};
use servers::error as server_error;
use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu};
use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
use servers::export_metrics::ExportMetricsTask;
use servers::interceptor::{
PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
};
Expand All @@ -76,6 +76,7 @@ use sql::statements::statement::Statement;
use sqlparser::ast::ObjectName;
pub use standalone::StandaloneDatanodeManager;

use self::prom_store::ExportMetricHandler;
use crate::error::{
self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, ParseSqlSnafu,
PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu, StartServerSnafu,
Expand Down Expand Up @@ -190,18 +191,16 @@ impl Instance {
&mut self,
opts: impl Into<FrontendOptions> + TomlSerializable,
) -> Result<()> {
let opts: FrontendOptions = opts.into();
self.export_metrics_task =
ExportMetricsTask::try_new(&opts.export_metrics, Some(&self.plugins))
.context(StartServerSnafu)?;
let servers = Services::build(opts, Arc::new(self.clone()), self.plugins.clone()).await?;
self.servers = Arc::new(servers);

Ok(())
}

pub fn build_export_metrics_task(&mut self, opts: &ExportMetricsOption) -> Result<()> {
self.export_metrics_task =
ExportMetricsTask::try_new(opts, Some(&self.plugins)).context(StartServerSnafu)?;
Ok(())
}

pub fn catalog_manager(&self) -> &CatalogManagerRef {
&self.catalog_manager
}
Expand Down Expand Up @@ -232,7 +231,15 @@ impl FrontendInstance for Instance {
self.script_executor.start(self)?;

if let Some(t) = self.export_metrics_task.as_ref() {
t.start()
if t.send_by_handler {
let handler = ExportMetricHandler::new_handler(
self.inserter.clone(),
self.statement_executor.clone(),
);
t.start(Some(handler)).context(StartServerSnafu)?
} else {
t.start(None).context(StartServerSnafu)?;
}
}

futures::future::try_join_all(self.servers.iter().map(|(name, handler)| async move {
Expand Down
54 changes: 53 additions & 1 deletion src/frontend/src/instance/prom_store.rs
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use api::prom_store::remote::read_request::ResponseType;
use api::prom_store::remote::{Query, QueryResult, ReadRequest, ReadResponse, WriteRequest};
use async_trait::async_trait;
Expand All @@ -21,10 +23,14 @@ use common_error::ext::BoxedError;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::logging;
use operator::insert::InserterRef;
use operator::statement::StatementExecutor;
use prost::Message;
use servers::error::{self, AuthSnafu, Result as ServerResult};
use servers::prom_store::{self, Metrics};
use servers::query_handler::{PromStoreProtocolHandler, PromStoreResponse};
use servers::query_handler::{
PromStoreProtocolHandler, PromStoreProtocolHandlerRef, PromStoreResponse,
};
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};

Expand Down Expand Up @@ -209,3 +215,49 @@ impl PromStoreProtocolHandler for Instance {
todo!();
}
}

/// This handler is mainly used for `frontend` or `standalone` to directly import
/// the metrics collected by itself, thereby avoiding importing metrics through the network,
/// thus reducing compression and network transmission overhead,
/// so only implement `PromStoreProtocolHandler::write` method.
pub struct ExportMetricHandler {
inserter: InserterRef,
statement_executor: Arc<StatementExecutor>,
}

impl ExportMetricHandler {
pub fn new_handler(
inserter: InserterRef,
statement_executor: Arc<StatementExecutor>,
) -> PromStoreProtocolHandlerRef {
Arc::new(Self {
inserter,
statement_executor,
})
}
}

#[async_trait]
impl PromStoreProtocolHandler for ExportMetricHandler {
async fn write(&self, request: WriteRequest, ctx: QueryContextRef) -> ServerResult<()> {
let (requests, _) = prom_store::to_grpc_row_insert_requests(request)?;
self.inserter
.handle_row_inserts(requests, ctx, self.statement_executor.as_ref())
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;
Ok(())
}

async fn read(
&self,
_request: ReadRequest,
_ctx: QueryContextRef,
) -> ServerResult<PromStoreResponse> {
unreachable!();
}

async fn ingest_metrics(&self, _metrics: Metrics) -> ServerResult<()> {
unreachable!();
}
}
2 changes: 1 addition & 1 deletion src/meta-srv/src/bootstrap.rs
Expand Up @@ -94,7 +94,7 @@ impl MetaSrvInstance {
self.meta_srv.try_start().await?;

if let Some(t) = self.export_metrics_task.as_ref() {
t.start()
t.start(None).context(InitExportMetricsTaskSnafu)?
}

let (tx, rx) = mpsc::channel::<()>(1);
Expand Down