Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Optimize export metric behavior #3047

Merged
merged 4 commits into from Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions config/datanode.example.toml
Expand Up @@ -131,8 +131,8 @@ parallel_scan_channel_size = 32
# enable = false
# The url of metrics export endpoint, default is `frontend` default HTTP endpoint.
# endpoint = "127.0.0.1:4000"
# The database name of exported metrics stores, user needs to specify a valid database
# db = ""
# The database name of exported metrics stores, default is `information_schema`
# db = "information_schema"
# The interval of export metrics
# write_interval = "30s"
# HTTP headers of Prometheus remote-write carry
Expand Down
4 changes: 2 additions & 2 deletions config/frontend.example.toml
Expand Up @@ -89,8 +89,8 @@ tcp_nodelay = true
# enable = false
# The url of metrics export endpoint, default is `frontend` default HTTP endpoint.
# endpoint = "127.0.0.1:4000"
# The database name of exported metrics stores, user needs to specify a valid database
# db = ""
# The database name of exported metrics stores, default is `information_schema`
# db = "information_schema"
# The interval of export metrics
# write_interval = "30s"
# HTTP headers of Prometheus remote-write carry
Expand Down
4 changes: 2 additions & 2 deletions config/metasrv.example.toml
Expand Up @@ -88,8 +88,8 @@ provider = "raft_engine"
# enable = false
# The url of metrics export endpoint, default is `frontend` default HTTP endpoint.
# endpoint = "127.0.0.1:4000"
# The database name of exported metrics stores, user needs to specify a valid database
# db = ""
# The database name of exported metrics stores, default is `information_schema`
# db = "information_schema"
# The interval of export metrics
# write_interval = "30s"
# HTTP headers of Prometheus remote-write carry
Expand Down
4 changes: 2 additions & 2 deletions config/standalone.example.toml
Expand Up @@ -232,8 +232,8 @@ parallel_scan_channel_size = 32
# enable = false
# The url of metrics export endpoint, default is `frontend` default HTTP endpoint.
# endpoint = "127.0.0.1:4000"
# The database name of exported metrics stores, user needs to specify a valid database
# db = ""
# The database name of exported metrics stores, default is `information_schema`
# db = "information_schema"
# The interval of export metrics
# write_interval = "30s"
# HTTP headers of Prometheus remote-write carry
Expand Down
4 changes: 0 additions & 4 deletions src/cmd/src/frontend.rs
Expand Up @@ -252,10 +252,6 @@ impl StartCommand {
.await
.context(StartFrontendSnafu)?;

instance
.build_export_metrics_task(&opts.export_metrics)
.context(StartFrontendSnafu)?;

instance
.build_servers(opts)
.await
Expand Down
4 changes: 0 additions & 4 deletions src/cmd/src/standalone.rs
Expand Up @@ -425,10 +425,6 @@ impl StartCommand {
.await
.context(StartFrontendSnafu)?;

frontend
.build_export_metrics_task(&opts.frontend.export_metrics)
.context(StartFrontendSnafu)?;

frontend
.build_servers(opts)
.await
Expand Down
4 changes: 2 additions & 2 deletions src/datanode/src/datanode.rs
Expand Up @@ -98,7 +98,7 @@ impl Datanode {
self.start_telemetry();

if let Some(t) = self.export_metrics_task.as_ref() {
t.start()
t.start(None)
}

self.start_services().await
Expand Down Expand Up @@ -284,7 +284,7 @@ impl DatanodeBuilder {
};

let export_metrics_task =
ExportMetricsTask::try_new(&self.opts.export_metrics, Some(&self.plugins))
ExportMetricsTask::try_new(&self.opts.export_metrics, None, Some(&self.plugins))
.context(StartServerSnafu)?;

Ok(Datanode {
Expand Down
26 changes: 18 additions & 8 deletions src/frontend/src/instance.rs
Expand Up @@ -55,7 +55,7 @@ use query::QueryEngineRef;
use raft_engine::{Config, ReadableSize, RecoveryMode};
use servers::error as server_error;
use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu};
use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
use servers::export_metrics::ExportMetricsTask;
use servers::interceptor::{
PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
};
Expand All @@ -76,6 +76,7 @@ use sql::statements::statement::Statement;
use sqlparser::ast::ObjectName;
pub use standalone::StandaloneDatanodeManager;

use self::prom_store::ExportMetricHandler;
use crate::error::{
self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, ParseSqlSnafu,
PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu, StartServerSnafu,
Expand Down Expand Up @@ -190,18 +191,19 @@ impl Instance {
&mut self,
opts: impl Into<FrontendOptions> + TomlSerializable,
) -> Result<()> {
let opts: FrontendOptions = opts.into();
self.export_metrics_task = ExportMetricsTask::try_new(
&opts.export_metrics,
Some(&opts.http.addr),
Some(&self.plugins),
)
.context(StartServerSnafu)?;
let servers = Services::build(opts, Arc::new(self.clone()), self.plugins.clone()).await?;
self.servers = Arc::new(servers);

Ok(())
}

pub fn build_export_metrics_task(&mut self, opts: &ExportMetricsOption) -> Result<()> {
self.export_metrics_task =
ExportMetricsTask::try_new(opts, Some(&self.plugins)).context(StartServerSnafu)?;
Ok(())
}

pub fn catalog_manager(&self) -> &CatalogManagerRef {
&self.catalog_manager
}
Expand Down Expand Up @@ -232,7 +234,15 @@ impl FrontendInstance for Instance {
self.script_executor.start(self)?;

if let Some(t) = self.export_metrics_task.as_ref() {
t.start()
if t.send_by_handler {
let handler = ExportMetricHandler::new_handler(
self.inserter.clone(),
self.statement_executor.clone(),
);
t.start(Some(handler))
} else {
t.start(None)
}
}

futures::future::try_join_all(self.servers.iter().map(|(name, handler)| async move {
Expand Down
54 changes: 53 additions & 1 deletion src/frontend/src/instance/prom_store.rs
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use api::prom_store::remote::read_request::ResponseType;
use api::prom_store::remote::{Query, QueryResult, ReadRequest, ReadResponse, WriteRequest};
use async_trait::async_trait;
Expand All @@ -21,10 +23,14 @@ use common_error::ext::BoxedError;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::logging;
use operator::insert::InserterRef;
use operator::statement::StatementExecutor;
use prost::Message;
use servers::error::{self, AuthSnafu, Result as ServerResult};
use servers::prom_store::{self, Metrics};
use servers::query_handler::{PromStoreProtocolHandler, PromStoreResponse};
use servers::query_handler::{
PromStoreProtocolHandler, PromStoreProtocolHandlerRef, PromStoreResponse,
};
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};

Expand Down Expand Up @@ -209,3 +215,49 @@ impl PromStoreProtocolHandler for Instance {
todo!();
}
}

/// This handler is mainly used for `frontend` or `standalone` to directly import
/// the metrics collected by itself, thereby avoiding importing metrics through the network,
/// thus reducing compression and network transmission overhead,
/// so only implement `PromStoreProtocolHandler::write` method.
pub struct ExportMetricHandler {
inserter: InserterRef,
statement_executor: Arc<StatementExecutor>,
}

impl ExportMetricHandler {
pub fn new_handler(
inserter: InserterRef,
statement_executor: Arc<StatementExecutor>,
) -> PromStoreProtocolHandlerRef {
Arc::new(Self {
inserter,
statement_executor,
})
}
}

#[async_trait]
impl PromStoreProtocolHandler for ExportMetricHandler {
async fn write(&self, request: WriteRequest, ctx: QueryContextRef) -> ServerResult<()> {
let (requests, _) = prom_store::to_grpc_row_insert_requests(request)?;
self.inserter
.handle_row_inserts(requests, ctx, self.statement_executor.as_ref())
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;
Ok(())
}

async fn read(
&self,
_request: ReadRequest,
_ctx: QueryContextRef,
) -> ServerResult<PromStoreResponse> {
unreachable!();
}

async fn ingest_metrics(&self, _metrics: Metrics) -> ServerResult<()> {
unreachable!();
}
}
7 changes: 4 additions & 3 deletions src/meta-srv/src/bootstrap.rs
Expand Up @@ -78,8 +78,9 @@ impl MetaSrvInstance {
);
// put meta_srv into plugins for later use
plugins.insert::<Arc<MetaSrv>>(Arc::new(meta_srv.clone()));
let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
.context(InitExportMetricsTaskSnafu)?;
let export_metrics_task =
ExportMetricsTask::try_new(&opts.export_metrics, None, Some(&plugins))
.context(InitExportMetricsTaskSnafu)?;
Ok(MetaSrvInstance {
meta_srv,
http_srv,
Expand All @@ -94,7 +95,7 @@ impl MetaSrvInstance {
self.meta_srv.try_start().await?;

if let Some(t) = self.export_metrics_task.as_ref() {
t.start()
t.start(None)
}

let (tx, rx) = mpsc::channel::<()>(1);
Expand Down