Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
http_addr = '0.0.0.0:3000'
rpc_addr = '0.0.0.0:3001'
wal_dir = '/tmp/greptimedb/wal'
rpc_runtime_size = 8

mysql_addr = '0.0.0.0:3306'
mysql_runtime_size = 4
Expand Down
11 changes: 9 additions & 2 deletions src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use clap::Parser;
use frontend::frontend::{Frontend, FrontendOptions};
use frontend::grpc::GrpcOptions;
use frontend::influxdb::InfluxdbOptions;
use frontend::mysql::MysqlOptions;
use frontend::opentsdb::OpentsdbOptions;
Expand Down Expand Up @@ -74,7 +75,10 @@ impl TryFrom<StartCommand> for FrontendOptions {
opts.http_addr = Some(addr);
}
if let Some(addr) = cmd.grpc_addr {
opts.grpc_addr = Some(addr);
opts.grpc_options = Some(GrpcOptions {
addr,
..Default::default()
});
}
if let Some(addr) = cmd.mysql_addr {
opts.mysql_options = Some(MysqlOptions {
Expand Down Expand Up @@ -130,7 +134,10 @@ mod tests {
);

let default_opts = FrontendOptions::default();
assert_eq!(opts.grpc_addr, default_opts.grpc_addr);
assert_eq!(
opts.grpc_options.unwrap().addr,
default_opts.grpc_options.unwrap().addr
);
assert_eq!(
opts.mysql_options.as_ref().unwrap().runtime_size,
default_opts.mysql_options.as_ref().unwrap().runtime_size
Expand Down
6 changes: 4 additions & 2 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ impl Default for ObjectStoreConfig {
pub struct DatanodeOptions {
pub http_addr: String,
pub rpc_addr: String,
pub rpc_runtime_size: usize,
pub mysql_addr: String,
pub mysql_runtime_size: u32,
pub mysql_runtime_size: usize,
pub postgres_addr: String,
pub postgres_runtime_size: u32,
pub postgres_runtime_size: usize,
pub wal_dir: String,
pub storage: ObjectStoreConfig,
}
Expand All @@ -37,6 +38,7 @@ impl Default for DatanodeOptions {
Self {
http_addr: "0.0.0.0:3000".to_string(),
rpc_addr: "0.0.0.0:3001".to_string(),
rpc_runtime_size: 8,
mysql_addr: "0.0.0.0:3306".to_string(),
mysql_runtime_size: 2,
postgres_addr: "0.0.0.0:5432".to_string(),
Expand Down
9 changes: 8 additions & 1 deletion src/datanode/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,16 @@ impl Services {
.build()
.context(error::RuntimeResourceSnafu)?,
);
let grpc_runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(opts.rpc_runtime_size as usize)
.thread_name("grpc-io-handlers")
.build()
.context(error::RuntimeResourceSnafu)?,
);
Ok(Self {
http_server: HttpServer::new(instance.clone()),
grpc_server: GrpcServer::new(instance.clone(), instance.clone()),
grpc_server: GrpcServer::new(instance.clone(), instance.clone(), grpc_runtime),
mysql_server: MysqlServer::create_server(instance.clone(), mysql_io_runtime),
postgres_server: Box::new(PostgresServer::new(instance, postgres_io_runtime)),
})
Expand Down
11 changes: 10 additions & 1 deletion src/datanode/src/tests/grpc_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use api::v1::{
};
use client::admin::Admin;
use client::{Client, Database, ObjectResult};
use common_runtime::Builder as RuntimeBuilder;
use servers::grpc::GrpcServer;
use servers::server::Server;

Expand All @@ -27,9 +28,17 @@ async fn setup_grpc_server(name: &str, port: usize) -> (String, TestGuard, Arc<G
instance.start().await.unwrap();

let addr_cloned = addr.clone();
let grpc_server = Arc::new(GrpcServer::new(instance.clone(), instance));
let runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(2)
.thread_name("grpc-handlers")
.build()
.unwrap(),
);

let grpc_server = Arc::new(GrpcServer::new(instance.clone(), instance, runtime));
let grpc_server_clone = grpc_server.clone();

tokio::spawn(async move {
let addr = addr_cloned.parse::<SocketAddr>().unwrap();
grpc_server_clone.start(addr).await.unwrap()
Expand Down
5 changes: 3 additions & 2 deletions src/frontend/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize};
use snafu::prelude::*;

use crate::error::{self, Result};
use crate::grpc::GrpcOptions;
use crate::influxdb::InfluxdbOptions;
use crate::instance::Instance;
use crate::mysql::MysqlOptions;
Expand All @@ -15,7 +16,7 @@ use crate::server::Services;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct FrontendOptions {
pub http_addr: Option<String>,
pub grpc_addr: Option<String>,
pub grpc_options: Option<GrpcOptions>,
pub mysql_options: Option<MysqlOptions>,
pub postgres_options: Option<PostgresOptions>,
pub opentsdb_options: Option<OpentsdbOptions>,
Expand All @@ -27,7 +28,7 @@ impl Default for FrontendOptions {
fn default() -> Self {
Self {
http_addr: Some("0.0.0.0:4000".to_string()),
grpc_addr: Some("0.0.0.0:4001".to_string()),
grpc_options: Some(GrpcOptions::default()),
mysql_options: Some(MysqlOptions::default()),
postgres_options: Some(PostgresOptions::default()),
opentsdb_options: Some(OpentsdbOptions::default()),
Expand Down
16 changes: 16 additions & 0 deletions src/frontend/src/grpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct GrpcOptions {
pub addr: String,
pub runtime_size: usize,
}

impl Default for GrpcOptions {
fn default() -> Self {
Self {
addr: "0.0.0.0:4001".to_string(),
runtime_size: 8,
}
}
}
1 change: 1 addition & 0 deletions src/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

pub mod error;
pub mod frontend;
pub mod grpc;
pub mod influxdb;
pub mod instance;
pub mod mysql;
Expand Down
14 changes: 11 additions & 3 deletions src/frontend/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,18 @@ pub(crate) struct Services;

impl Services {
pub(crate) async fn start(opts: &FrontendOptions, instance: InstanceRef) -> Result<()> {
let grpc_server_and_addr = if let Some(grpc_addr) = &opts.grpc_addr {
let grpc_addr = parse_addr(grpc_addr)?;
let grpc_server_and_addr = if let Some(opts) = &opts.grpc_options {
let grpc_addr = parse_addr(&opts.addr)?;

let grpc_server = GrpcServer::new(instance.clone(), instance.clone());
let grpc_runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(opts.runtime_size)
.thread_name("grpc-handlers")
.build()
.context(error::RuntimeResourceSnafu)?,
);

let grpc_server = GrpcServer::new(instance.clone(), instance.clone(), grpc_runtime);

Some((Box::new(grpc_server) as _, grpc_addr))
} else {
Expand Down
11 changes: 10 additions & 1 deletion src/frontend/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;

use client::Client;
use common_grpc::channel_manager::ChannelManager;
use common_runtime::Builder as RuntimeBuilder;
use datanode::instance::Instance as DatanodeInstance;
use servers::grpc::GrpcServer;
use tonic::transport::Server;
Expand All @@ -22,10 +23,18 @@ pub(crate) async fn create_frontend_instance() -> Arc<Instance> {

let (client, server) = tokio::io::duplex(1024);

let runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(2)
.thread_name("grpc-handlers")
.build()
.unwrap(),
);

// create a mock datanode grpc service, see example here:
// https://github.com/hyperium/tonic/blob/master/examples/src/mock/mock.rs
let datanode_service =
GrpcServer::new(datanode_instance.clone(), datanode_instance).create_service();
GrpcServer::new(datanode_instance.clone(), datanode_instance, runtime).create_service();
tokio::spawn(async move {
Server::builder()
.add_service(datanode_service)
Expand Down
16 changes: 14 additions & 2 deletions src/servers/src/grpc.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
pub mod handler;

use std::net::SocketAddr;
use std::sync::Arc;

use api::v1::{greptime_server, BatchRequest, BatchResponse};
use async_trait::async_trait;
use common_runtime::Runtime;
use common_telemetry::logging::info;
use futures::FutureExt;
use snafu::ensure;
Expand All @@ -23,20 +25,30 @@ pub struct GrpcServer {
query_handler: GrpcQueryHandlerRef,
admin_handler: GrpcAdminHandlerRef,
shutdown_tx: Mutex<Option<Sender<()>>>,
runtime: Arc<Runtime>,
}

impl GrpcServer {
pub fn new(query_handler: GrpcQueryHandlerRef, admin_handler: GrpcAdminHandlerRef) -> Self {
pub fn new(
query_handler: GrpcQueryHandlerRef,
admin_handler: GrpcAdminHandlerRef,
runtime: Arc<Runtime>,
) -> Self {
Self {
query_handler,
admin_handler,
shutdown_tx: Mutex::new(None),
runtime,
}
}

pub fn create_service(&self) -> greptime_server::GreptimeServer<GrpcService> {
let service = GrpcService {
handler: BatchHandler::new(self.query_handler.clone(), self.admin_handler.clone()),
handler: BatchHandler::new(
self.query_handler.clone(),
self.admin_handler.clone(),
self.runtime.clone(),
),
};
greptime_server::GreptimeServer::new(service)
}
Expand Down
34 changes: 28 additions & 6 deletions src/servers/src/grpc/handler.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::sync::Arc;

use api::v1::{AdminResponse, BatchRequest, BatchResponse, DatabaseResponse};
use common_runtime::Runtime;
use tokio::sync::oneshot;

use crate::error::Result;
use crate::query_handler::{GrpcAdminHandlerRef, GrpcQueryHandlerRef};
Expand All @@ -7,13 +11,19 @@ use crate::query_handler::{GrpcAdminHandlerRef, GrpcQueryHandlerRef};
pub struct BatchHandler {
query_handler: GrpcQueryHandlerRef,
admin_handler: GrpcAdminHandlerRef,
runtime: Arc<Runtime>,
}

impl BatchHandler {
pub fn new(query_handler: GrpcQueryHandlerRef, admin_handler: GrpcAdminHandlerRef) -> Self {
pub fn new(
query_handler: GrpcQueryHandlerRef,
admin_handler: GrpcAdminHandlerRef,
runtime: Arc<Runtime>,
) -> Self {
Self {
query_handler,
admin_handler,
runtime,
}
}

Expand All @@ -30,12 +40,24 @@ impl BatchHandler {
}
batch_resp.admins.push(admin_resp);

for db_req in batch_req.databases {
for obj_expr in db_req.exprs {
let object_resp = self.query_handler.do_query(obj_expr).await?;
db_resp.results.push(object_resp);
let (tx, rx) = oneshot::channel();
let query_handler = self.query_handler.clone();
let _ = self.runtime.spawn(async move {
// execute request in another runtime to prevent the execution from being cancelled unexpected by tonic runtime.
let mut result = vec![];
for db_req in batch_req.databases {
for obj_expr in db_req.exprs {
let object_resp = query_handler.do_query(obj_expr).await;

result.push(object_resp);
}
}
}
// Ignore send result. Usually an error indicates the rx is dropped (request timeouted).
let _ = tx.send(result);
});
// Safety: An early-dropped tx usually indicates a serious problem (like panic). This unwrap
// is used to poison the upper layer.
db_resp.results = rx.await.unwrap().into_iter().collect::<Result<_>>()?;
batch_resp.databases.push(db_resp);
Ok(batch_resp)
}
Expand Down