Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(prom)!: rename prometheus(remote storage) to prom-store and promql(HTTP server) to prometheus #1931

Merged
merged 3 commits into from Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions config/frontend.example.toml
Expand Up @@ -47,12 +47,12 @@ runtime_size = 2
[influxdb_options]
enable = true

# Prometheus protocol options, see `standalone.example.toml`.
[prometheus_options]
# Prometheus remote storage options, see `standalone.example.toml`.
[prom_store_options]
enable = true

# Prometheus protocol options, see `standalone.example.toml`.
[prom_options]
[prometheus_options]
addr = "127.0.0.1:4004"

# Metasrv client options, see `datanode.example.toml`.
Expand Down
8 changes: 4 additions & 4 deletions config/standalone.example.toml
Expand Up @@ -69,13 +69,13 @@ runtime_size = 2
# Whether to enable InfluxDB protocol in HTTP API, true by default.
enable = true

# Prometheus protocol options.
[prometheus_options]
# Prometheus remote storage options
[prom_store_options]
# Whether to enable Prometheus remote write and read in HTTP API, true by default.
enable = true

# Prom protocol options.
[prom_options]
# Prometheus protocol options
[prometheus_options]
# Prometheus API server address, "127.0.0.1:4004" by default.
addr = "127.0.0.1:4004"

Expand Down
2 changes: 1 addition & 1 deletion src/api/src/lib.rs
Expand Up @@ -15,7 +15,7 @@
pub mod error;
pub mod helper;

pub mod prometheus {
pub mod prom_store {
pub mod remote {
pub use greptime_proto::prometheus::remote::*;
}
Expand Down
9 changes: 6 additions & 3 deletions src/cmd/src/frontend.rs
Expand Up @@ -19,7 +19,7 @@ use common_base::Plugins;
use common_telemetry::logging;
use frontend::frontend::FrontendOptions;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
use frontend::service_config::{InfluxdbOptions, PromOptions};
use frontend::service_config::{InfluxdbOptions, PrometheusOptions};
use meta_client::MetaClientOptions;
use servers::auth::UserProviderRef;
use servers::tls::{TlsMode, TlsOption};
Expand Down Expand Up @@ -172,7 +172,7 @@ impl StartCommand {
}

if let Some(addr) = &self.prom_addr {
opts.prom_options = Some(PromOptions { addr: addr.clone() });
opts.prometheus_options = Some(PrometheusOptions { addr: addr.clone() });
}

if let Some(addr) = &self.postgres_addr {
Expand Down Expand Up @@ -274,7 +274,10 @@ mod tests {
opts.opentsdb_options.as_ref().unwrap().addr,
"127.0.0.1:4321"
);
assert_eq!(opts.prom_options.as_ref().unwrap().addr, "127.0.0.1:4444");
assert_eq!(
opts.prometheus_options.as_ref().unwrap().addr,
"127.0.0.1:4444"
);

let default_opts = FrontendOptions::default();
assert_eq!(
Expand Down
10 changes: 5 additions & 5 deletions src/cmd/src/standalone.rs
Expand Up @@ -23,7 +23,7 @@ use datanode::instance::InstanceRef;
use frontend::frontend::FrontendOptions;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
use frontend::service_config::{
GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromOptions,
GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions,
PrometheusOptions,
};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -89,8 +89,8 @@ pub struct StandaloneOptions {
pub postgres_options: Option<PostgresOptions>,
pub opentsdb_options: Option<OpentsdbOptions>,
pub influxdb_options: Option<InfluxdbOptions>,
pub prom_store_options: Option<PromStoreOptions>,
pub prometheus_options: Option<PrometheusOptions>,
pub prom_options: Option<PromOptions>,
pub wal: WalConfig,
pub storage: StorageConfig,
pub procedure: ProcedureConfig,
Expand All @@ -108,8 +108,8 @@ impl Default for StandaloneOptions {
postgres_options: Some(PostgresOptions::default()),
opentsdb_options: Some(OpentsdbOptions::default()),
influxdb_options: Some(InfluxdbOptions::default()),
prom_store_options: Some(PromStoreOptions::default()),
prometheus_options: Some(PrometheusOptions::default()),
prom_options: Some(PromOptions::default()),
wal: WalConfig::default(),
storage: StorageConfig::default(),
procedure: ProcedureConfig::default(),
Expand All @@ -128,8 +128,8 @@ impl StandaloneOptions {
postgres_options: self.postgres_options,
opentsdb_options: self.opentsdb_options,
influxdb_options: self.influxdb_options,
prom_store_options: self.prom_store_options,
prometheus_options: self.prometheus_options,
prom_options: self.prom_options,
meta_client_options: None,
logging: self.logging,
..Default::default()
Expand Down Expand Up @@ -269,7 +269,7 @@ impl StartCommand {
}

if let Some(addr) = &self.prom_addr {
opts.prom_options = Some(PromOptions { addr: addr.clone() })
opts.prometheus_options = Some(PrometheusOptions { addr: addr.clone() })
}

if let Some(addr) = &self.postgres_addr {
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/error.rs
Expand Up @@ -388,7 +388,7 @@ pub enum Error {
"Failed to create logical plan for prometheus query, source: {}",
source
))]
PrometheusRemoteQueryPlan {
PromStoreRemoteQueryPlan {
#[snafu(backtrace)]
source: servers::error::Error,
},
Expand Down Expand Up @@ -605,7 +605,7 @@ impl ErrorExt for Error {
Error::HandleHeartbeatResponse { source, .. } => source.status_code(),

Error::RuntimeResource { source, .. } => source.status_code(),
Error::PrometheusRemoteQueryPlan { source, .. }
Error::PromStoreRemoteQueryPlan { source, .. }
| Error::ExecutePromql { source, .. } => source.status_code(),

Error::SqlExecIntercepted { source, .. } => source.status_code(),
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/frontend.rs
Expand Up @@ -19,7 +19,7 @@ use servers::http::HttpOptions;
use servers::Mode;

use crate::service_config::{
GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromOptions,
GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions,
PrometheusOptions,
};

Expand All @@ -35,8 +35,8 @@ pub struct FrontendOptions {
pub postgres_options: Option<PostgresOptions>,
pub opentsdb_options: Option<OpentsdbOptions>,
pub influxdb_options: Option<InfluxdbOptions>,
pub prom_store_options: Option<PromStoreOptions>,
pub prometheus_options: Option<PrometheusOptions>,
pub prom_options: Option<PromOptions>,
pub meta_client_options: Option<MetaClientOptions>,
pub logging: LoggingOptions,
}
Expand All @@ -53,8 +53,8 @@ impl Default for FrontendOptions {
postgres_options: Some(PostgresOptions::default()),
opentsdb_options: Some(OpentsdbOptions::default()),
influxdb_options: Some(InfluxdbOptions::default()),
prom_store_options: Some(PromStoreOptions::default()),
prometheus_options: Some(PrometheusOptions::default()),
prom_options: Some(PromOptions::default()),
meta_client_options: None,
logging: LoggingOptions::default(),
}
Expand Down
14 changes: 7 additions & 7 deletions src/frontend/src/instance.rs
Expand Up @@ -16,7 +16,7 @@ pub mod distributed;
mod grpc;
mod influxdb;
mod opentsdb;
mod prometheus;
mod prom_store;
mod script;
mod standalone;

Expand Down Expand Up @@ -61,11 +61,11 @@ use servers::error::{ExecuteQuerySnafu, ParsePromQLSnafu};
use servers::interceptor::{
PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
};
use servers::prom::PromHandler;
use servers::prometheus::PrometheusHandler;
use servers::query_handler::grpc::{GrpcQueryHandler, GrpcQueryHandlerRef};
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::{
InfluxdbLineProtocolHandler, OpentsdbProtocolHandler, PrometheusProtocolHandler, ScriptHandler,
InfluxdbLineProtocolHandler, OpentsdbProtocolHandler, PromStoreProtocolHandler, ScriptHandler,
};
use session::context::QueryContextRef;
use snafu::prelude::*;
Expand Down Expand Up @@ -96,9 +96,9 @@ pub trait FrontendInstance:
+ SqlQueryHandler<Error = Error>
+ OpentsdbProtocolHandler
+ InfluxdbLineProtocolHandler
+ PrometheusProtocolHandler
+ PromStoreProtocolHandler
+ ScriptHandler
+ PromHandler
+ PrometheusHandler
+ Send
+ Sync
+ 'static
Expand Down Expand Up @@ -525,7 +525,7 @@ impl SqlQueryHandler for Instance {
query: &PromQuery,
query_ctx: QueryContextRef,
) -> Vec<Result<Output>> {
let result = PromHandler::do_query(self, query, query_ctx)
let result = PrometheusHandler::do_query(self, query, query_ctx)
.await
.with_context(|_| ExecutePromqlSnafu {
query: format!("{query:?}"),
Expand Down Expand Up @@ -567,7 +567,7 @@ impl SqlQueryHandler for Instance {
}

#[async_trait]
impl PromHandler for Instance {
impl PrometheusHandler for Instance {
async fn do_query(
&self,
query: &PromQuery,
Expand Down
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use api::prometheus::remote::read_request::ResponseType;
use api::prometheus::remote::{Query, QueryResult, ReadRequest, ReadResponse, WriteRequest};
use api::prom_store::remote::read_request::ResponseType;
use api::prom_store::remote::{Query, QueryResult, ReadRequest, ReadResponse, WriteRequest};
use async_trait::async_trait;
use common_catalog::format_full_table_name;
use common_error::prelude::BoxedError;
Expand All @@ -23,17 +23,17 @@ use common_telemetry::logging;
use metrics::counter;
use prost::Message;
use servers::error::{self, Result as ServerResult};
use servers::prometheus::{self, Metrics};
use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse};
use servers::prom_store::{self, Metrics};
use servers::query_handler::{PromStoreProtocolHandler, PromStoreResponse};
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};

use crate::error::{
CatalogSnafu, ExecLogicalPlanSnafu, PrometheusRemoteQueryPlanSnafu, ReadTableSnafu, Result,
CatalogSnafu, ExecLogicalPlanSnafu, PromStoreRemoteQueryPlanSnafu, ReadTableSnafu, Result,
TableNotFoundSnafu,
};
use crate::instance::Instance;
use crate::metrics::PROMETHEUS_REMOTE_WRITE_SAMPLES;
use crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES;

const SAMPLES_RESPONSE_TYPE: i32 = ResponseType::Samples as i32;

Expand Down Expand Up @@ -72,7 +72,7 @@ async fn to_query_result(table_name: &str, output: Output) -> ServerResult<Query
.await
.context(error::CollectRecordbatchSnafu)?;
Ok(QueryResult {
timeseries: prometheus::recordbatches_to_timeseries(table_name, recordbatches)?,
timeseries: prom_store::recordbatches_to_timeseries(table_name, recordbatches)?,
})
}

Expand Down Expand Up @@ -102,7 +102,7 @@ impl Instance {
})?;

let logical_plan =
prometheus::query_to_plan(dataframe, query).context(PrometheusRemoteQueryPlanSnafu)?;
prom_store::query_to_plan(dataframe, query).context(PromStoreRemoteQueryPlanSnafu)?;

logging::debug!(
"Prometheus remote read, table: {}, logical plan: {}",
Expand All @@ -127,7 +127,7 @@ impl Instance {
let schema_name = ctx.current_schema();

for query in queries {
let table_name = prometheus::table_name(query)?;
let table_name = prom_store::table_name(query)?;

let output = self
.handle_remote_query(&ctx, &catalog_name, &schema_name, &table_name, query)
Expand All @@ -144,24 +144,24 @@ impl Instance {
}

#[async_trait]
impl PrometheusProtocolHandler for Instance {
impl PromStoreProtocolHandler for Instance {
async fn write(&self, request: WriteRequest, ctx: QueryContextRef) -> ServerResult<()> {
let (requests, samples) = prometheus::to_grpc_insert_requests(request)?;
let (requests, samples) = prom_store::to_grpc_insert_requests(request)?;
let _ = self
.handle_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;

counter!(PROMETHEUS_REMOTE_WRITE_SAMPLES, samples as u64);
counter!(PROM_STORE_REMOTE_WRITE_SAMPLES, samples as u64);
Ok(())
}

async fn read(
&self,
request: ReadRequest,
ctx: QueryContextRef,
) -> ServerResult<PrometheusResponse> {
) -> ServerResult<PromStoreResponse> {
let response_type = negotiate_response_type(&request.accepted_response_types)?;

// TODO(dennis): use read_hints to speedup query if possible
Expand All @@ -179,10 +179,10 @@ impl PrometheusProtocolHandler for Instance {
};

// TODO(dennis): may consume too much memory, adds flow control
Ok(PrometheusResponse {
Ok(PromStoreResponse {
content_type: "application/x-protobuf".to_string(),
content_encoding: "snappy".to_string(),
body: prometheus::snappy_compress(&response.encode_to_vec())?,
body: prom_store::snappy_compress(&response.encode_to_vec())?,
})
}
ResponseType::StreamedXorChunks => error::NotSupportedSnafu {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/metrics.rs
Expand Up @@ -23,4 +23,4 @@ pub const DIST_CREATE_TABLE: &str = "frontend.dist.create_table";
pub const DIST_INGEST_ROW_COUNT: &str = "frontend.dist.ingest_rows";

/// The samples count of Prometheus remote write.
pub const PROMETHEUS_REMOTE_WRITE_SAMPLES: &str = "frontend.prometheus.remote_write.samples";
pub const PROM_STORE_REMOTE_WRITE_SAMPLES: &str = "frontend.prometheus.remote_write.samples";
14 changes: 7 additions & 7 deletions src/frontend/src/server.rs
Expand Up @@ -28,7 +28,7 @@ use servers::metrics_handler::MetricsHandler;
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
use servers::opentsdb::OpentsdbServer;
use servers::postgres::PostgresServer;
use servers::prom::PromServer;
use servers::prometheus::PrometheusServer;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor;
use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor;
use servers::server::Server;
Expand All @@ -38,7 +38,7 @@ use crate::error::Error::StartServer;
use crate::error::{self, Result};
use crate::frontend::FrontendOptions;
use crate::instance::FrontendInstance;
use crate::service_config::{InfluxdbOptions, PrometheusOptions};
use crate::service_config::{InfluxdbOptions, PromStoreOptions};

pub(crate) struct Services;

Expand Down Expand Up @@ -172,8 +172,8 @@ impl Services {
}

if matches!(
opts.prometheus_options,
Some(PrometheusOptions { enable: true })
opts.prom_store_options,
Some(PromStoreOptions { enable: true })
) {
let _ = http_server_builder.with_prom_handler(instance.clone());
}
Expand All @@ -186,10 +186,10 @@ impl Services {
result.push((Box::new(http_server), http_addr));
}

if let Some(prom_options) = &opts.prom_options {
let prom_addr = parse_addr(&prom_options.addr)?;
if let Some(prometheus_options) = &opts.prometheus_options {
let prom_addr = parse_addr(&prometheus_options.addr)?;

let mut prom_server = PromServer::create_server(instance);
let mut prom_server = PrometheusServer::create_server(instance);
if let Some(user_provider) = user_provider {
prom_server.set_user_provider(user_provider);
}
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/service_config.rs
Expand Up @@ -17,13 +17,13 @@ pub mod influxdb;
pub mod mysql;
pub mod opentsdb;
pub mod postgres;
pub mod prom;
pub mod prom_store;
pub mod prometheus;

pub use grpc::GrpcOptions;
pub use influxdb::InfluxdbOptions;
pub use mysql::MysqlOptions;
pub use opentsdb::OpentsdbOptions;
pub use postgres::PostgresOptions;
pub use prom::PromOptions;
pub use prom_store::PromStoreOptions;
pub use prometheus::PrometheusOptions;