Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Patch] fix 2.3 bugs #1739

Merged
merged 6 commits into from Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions client/src/ctx.rs
Expand Up @@ -301,10 +301,9 @@ impl SessionContext {

Ok(ResultSet::Bytes((body.to_vec(), 0)))
}
_ => {
code => {
let body = resp.text().await?;

Ok(ResultSet::Bytes((body.into(), 0)))
Err(anyhow!("{}, body: {}", code, body))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion common/models/src/predicate/domain.rs
Expand Up @@ -872,7 +872,7 @@ impl Domain {
ranges.sort_by(|l, r| l.low.cmp(&r.low));

// at least one element
let mut current = ranges.get(0).unwrap().to_owned();
let mut current = ranges.first().unwrap().to_owned();

let mut low_indexed_ranges: BTreeMap<Marker, Range> = BTreeMap::default();

Expand Down
3 changes: 2 additions & 1 deletion common/protos/Cargo.toml
Expand Up @@ -12,7 +12,8 @@ flatbuffers = { workspace = true }
prost = { workspace = true }
rand = { workspace = true }
snafu = { workspace = true }
tonic = { workspace = true }
tonic = { workspace = true, features = ["transport", "tls"] }
tower = { workspace = true }
protobuf = { workspace = true }
async-backtrace = { workspace = true, optional = true }

Expand Down
21 changes: 20 additions & 1 deletion common/protos/src/lib.rs
Expand Up @@ -7,15 +7,23 @@ pub mod test_helper;

use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::time::Duration;

use flatbuffers::{FlatBufferBuilder, ForwardsUOffset, WIPOffset};
use flatbuffers::{FlatBufferBuilder, WIPOffset};
use generated::models::Point;
use snafu::Snafu;
use utils::{bitset::BitSet, BkdrHasher};

use crate::kv_service::tskv_service_client::TskvServiceClient;
use crate::models::{
Field, FieldBuilder, FieldType, Points, Schema, SchemaBuilder, Table, Tag, TagBuilder,
};
use flatbuffers::ForwardsUOffset;
use tonic::transport::Channel;
use tower::timeout::Timeout;

// Default 100 MB
pub const DEFAULT_GRPC_SERVER_MESSAGE_LEN: usize = 100 * 1024 * 1024;

type PointsResult<T> = Result<T, PointsError>;

Expand Down Expand Up @@ -655,6 +663,17 @@ pub fn insert_field<'a, 'fbb: 'mut_fbb, 'mut_fbb>(
field_nullbits.set(field_index);
}

pub fn tskv_service_time_out_client(
channel: Channel,
time_out: Duration,
max_message_size: usize,
) -> TskvServiceClient<Timeout<Channel>> {
let timeout_channel = Timeout::new(channel, time_out);
let client = TskvServiceClient::<Timeout<Channel>>::new(timeout_channel);
let client = TskvServiceClient::max_decoding_message_size(client, max_message_size);
TskvServiceClient::max_encoding_message_size(client, max_message_size)
}

#[cfg(test)]
pub mod test {
use utils::bitset::BitSet;
Expand Down
12 changes: 6 additions & 6 deletions config/src/deployment_config.rs
Expand Up @@ -24,7 +24,7 @@ impl DeploymentConfig {
}

pub fn default_memory() -> usize {
get_sys_mem()
get_sys_mem_gb()
}
}

Expand Down Expand Up @@ -87,11 +87,11 @@ fn get_sys_cpu() -> usize {
num_cpus
}

fn get_sys_mem() -> usize {
let mut mem: usize = 16 * 1024 * 1024;
fn get_sys_mem_gb() -> usize {
if let Ok(mem_info) = sys_info::mem_info() {
mem = mem_info.total as usize;
//mem_info.total unit is KB
mem_info.total as usize / 1024 / 1024
} else {
16
}

mem / 1024 / 1024
}
12 changes: 6 additions & 6 deletions coordinator/src/reader/table_scan/opener.rs
Expand Up @@ -5,10 +5,8 @@ use config::QueryConfig;
use futures::TryStreamExt;
use meta::model::MetaRef;
use models::meta_data::VnodeInfo;
use protos::kv_service::tskv_service_client::TskvServiceClient;
use protos::{tskv_service_time_out_client, DEFAULT_GRPC_SERVER_MESSAGE_LEN};
use tokio::runtime::Runtime;
use tonic::transport::Channel;
use tower::timeout::Timeout;
use trace::{SpanContext, SpanExt, SpanRecorder};
use trace_http::ctx::append_trace_context;
use tskv::reader::status_listener::VnodeStatusListener;
Expand Down Expand Up @@ -103,9 +101,11 @@ impl VnodeOpener for TemporaryTableScanOpener {
error: error.to_string(),
}
})?;
let timeout_channel =
Timeout::new(channel, Duration::from_millis(config.read_timeout_ms));
let mut client = TskvServiceClient::<Timeout<Channel>>::new(timeout_channel);
let mut client = tskv_service_time_out_client(
channel,
Duration::from_millis(config.read_timeout_ms),
DEFAULT_GRPC_SERVER_MESSAGE_LEN,
);
client
.query_record_batch(request)
.await
Expand Down
12 changes: 6 additions & 6 deletions coordinator/src/reader/tag_scan/opener.rs
Expand Up @@ -4,9 +4,7 @@ use config::QueryConfig;
use futures::TryStreamExt;
use meta::model::MetaRef;
use models::meta_data::VnodeInfo;
use protos::kv_service::tskv_service_client::TskvServiceClient;
use tonic::transport::Channel;
use tower::timeout::Timeout;
use protos::{tskv_service_time_out_client, DEFAULT_GRPC_SERVER_MESSAGE_LEN};
use trace::{SpanContext, SpanExt, SpanRecorder};
use trace_http::ctx::append_trace_context;
use tskv::reader::tag_scan::LocalTskvTagScanStream;
Expand Down Expand Up @@ -91,9 +89,11 @@ impl VnodeOpener for TemporaryTagScanOpener {
error: error.to_string(),
}
})?;
let timeout_channel =
Timeout::new(channel, Duration::from_millis(config.read_timeout_ms));
let mut client = TskvServiceClient::<Timeout<Channel>>::new(timeout_channel);
let mut client = tskv_service_time_out_client(
channel,
Duration::from_millis(config.read_timeout_ms),
DEFAULT_GRPC_SERVER_MESSAGE_LEN,
);
client
.tag_scan(request)
.await
Expand Down
21 changes: 11 additions & 10 deletions coordinator/src/service.rs
Expand Up @@ -18,13 +18,10 @@ use models::predicate::domain::{ResolvedPredicateRef, TimeRanges};
use models::record_batch_decode;
use models::schema::{Precision, DEFAULT_CATALOG};
use protos::kv_service::admin_command_request::Command::*;
use protos::kv_service::tskv_service_client::TskvServiceClient;
use protos::kv_service::{WritePointsRequest, *};
use protos::models::Points;
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
use tonic::transport::Channel;
use tower::timeout::Timeout;
use trace::{debug, error, info, SpanContext, SpanExt, SpanRecorder};
use tskv::EngineRef;

Expand All @@ -43,6 +40,7 @@ use crate::{

pub type CoordinatorRef = Arc<dyn Coordinator>;

use protos::{tskv_service_time_out_client, DEFAULT_GRPC_SERVER_MESSAGE_LEN};
#[derive(Clone)]
pub struct CoordService {
node_id: u64,
Expand Down Expand Up @@ -264,9 +262,11 @@ impl CoordService {
) -> CoordinatorResult<()> {
let channel = self.meta.get_node_conn(node_id).await?;

let timeout_channel = Timeout::new(channel, Duration::from_secs(60 * 60));

let mut client = TskvServiceClient::<Timeout<Channel>>::new(timeout_channel);
let mut client = tskv_service_time_out_client(
channel,
Duration::from_secs(60 * 60),
DEFAULT_GRPC_SERVER_MESSAGE_LEN,
);
let request = tonic::Request::new(req.clone());

let response = client
Expand Down Expand Up @@ -317,11 +317,12 @@ impl CoordService {
) -> CoordinatorResult<RecordBatch> {
let channel = self.meta.get_node_conn(node_id).await?;

let timeout_channel = Timeout::new(channel, Duration::from_secs(60 * 60));

let mut client = TskvServiceClient::<Timeout<Channel>>::new(timeout_channel);
let request = tonic::Request::new(req.clone());

let mut client = tskv_service_time_out_client(
channel,
Duration::from_secs(60 * 60),
DEFAULT_GRPC_SERVER_MESSAGE_LEN,
);
let response = client
.exec_admin_fetch_command(request)
.await
Expand Down
15 changes: 11 additions & 4 deletions coordinator/src/vnode_mgr.rs
Expand Up @@ -9,6 +9,7 @@ use protos::kv_service::{
AdminCommandRequest, DeleteVnodeRequest, DownloadFileRequest, FetchVnodeSummaryRequest,
GetVnodeFilesMetaRequest, GetVnodeFilesMetaResponse,
};
use protos::{tskv_service_time_out_client, DEFAULT_GRPC_SERVER_MESSAGE_LEN};
use tokio::io::AsyncWriteExt;
use tokio_stream::StreamExt;
use tonic::transport::Channel;
Expand Down Expand Up @@ -79,8 +80,11 @@ impl VnodeManager {
let path = self.kv_inst.get_storage_options().move_dir(&owner, new_id);

let channel = self.meta.get_node_conn(all_info.node_id).await?;
let timeout_channel = Timeout::new(channel, Duration::from_secs(60 * 60));
let mut client = TskvServiceClient::<Timeout<Channel>>::new(timeout_channel);
let mut client = tskv_service_time_out_client(
channel,
Duration::from_secs(60 * 60 * 60),
DEFAULT_GRPC_SERVER_MESSAGE_LEN,
);

if let Err(err) = self
.download_vnode_files(&all_info, &path, &mut client)
Expand Down Expand Up @@ -171,8 +175,11 @@ impl VnodeManager {
};

let channel = self.meta.get_node_conn(node_id).await?;
let timeout_channel = Timeout::new(channel, Duration::from_secs(60 * 60));
let mut client = TskvServiceClient::<Timeout<Channel>>::new(timeout_channel);
let mut client = tskv_service_time_out_client(
channel,
Duration::from_secs(60 * 60),
DEFAULT_GRPC_SERVER_MESSAGE_LEN,
);
let request = tonic::Request::new(cmd);

let response = client
Expand Down
14 changes: 6 additions & 8 deletions coordinator/src/writer.rs
Expand Up @@ -7,19 +7,16 @@ use meta::model::{MetaClientRef, MetaRef};
use models::meta_data::*;
use models::schema::{timestamp_convert, Precision};
use models::utils::{now_timestamp_millis, now_timestamp_nanos};
use protos::kv_service::tskv_service_client::TskvServiceClient;
use protos::kv_service::{Meta, WritePointsRequest, WriteVnodeRequest};
use protos::models as fb_models;
use protos::models::{
FieldBuilder, Point, PointBuilder, Points, PointsArgs, Schema, SchemaBuilder, TableBuilder,
TagBuilder,
};
use protos::{models as fb_models, tskv_service_time_out_client, DEFAULT_GRPC_SERVER_MESSAGE_LEN};
use snafu::ResultExt;
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;
use tonic::transport::Channel;
use tonic::Code;
use tower::timeout::Timeout;
use trace::{debug, info, SpanContext, SpanExt, SpanRecorder};
use trace_http::ctx::append_trace_context;
use tskv::EngineRef;
Expand Down Expand Up @@ -254,7 +251,7 @@ impl<'a> Default for VnodeMapping<'a> {
#[derive(Debug)]
pub struct PointWriter {
node_id: u64,
timeout_ms: u64,
timeout: Duration,
kv_inst: Option<EngineRef>,
meta_manager: MetaRef,
hh_sender: Sender<HintedOffWriteReq>,
Expand All @@ -268,10 +265,11 @@ impl PointWriter {
meta_manager: MetaRef,
hh_sender: Sender<HintedOffWriteReq>,
) -> Self {
let timeout = Duration::from_millis(timeout_ms);
Self {
node_id,
kv_inst,
timeout_ms,
timeout,
meta_manager,
hh_sender,
}
Expand Down Expand Up @@ -473,8 +471,8 @@ impl PointWriter {
id: node_id,
error: error.to_string(),
})?;
let timeout_channel = Timeout::new(channel, Duration::from_millis(self.timeout_ms));
let mut client = TskvServiceClient::<Timeout<Channel>>::new(timeout_channel);
let mut client =
tskv_service_time_out_client(channel, self.timeout, DEFAULT_GRPC_SERVER_MESSAGE_LEN);

let mut cmd = tonic::Request::new(WriteVnodeRequest {
vnode_id,
Expand Down
12 changes: 8 additions & 4 deletions main/src/http/mod.rs
Expand Up @@ -150,12 +150,16 @@ impl From<&Error> for Response {
Error::Query { .. }
| Error::FetchResult { .. }
| Error::Tskv { .. }
| Error::Coordinator { .. } => {
| Error::Coordinator { .. }
| Error::Meta { .. }
| Error::NotFoundTenant { .. } => {
ResponseBuilder::new(UNPROCESSABLE_ENTITY).json(&error_resp)
}
Error::InvalidHeader { .. } | Error::ParseAuth { .. } | Error::TraceHttp { .. } => {
ResponseBuilder::bad_request(&error_resp)
}
Error::InvalidHeader { .. }
| Error::ParseAuth { .. }
| Error::TraceHttp { .. }
| Error::ParseOpentsdbProtocol { .. }
| Error::ParseOpentsdbJsonProtocol { .. } => ResponseBuilder::bad_request(&error_resp),
_ => ResponseBuilder::internal_server_error(),
}
}
Expand Down
3 changes: 2 additions & 1 deletion main/src/rpc/grpc_service.rs
Expand Up @@ -5,6 +5,7 @@ use config::TLSConfig;
use coordinator::service::CoordinatorRef;
use metrics::metric_register::MetricsRegister;
use protos::kv_service::tskv_service_server::TskvServiceServer;
use protos::DEFAULT_GRPC_SERVER_MESSAGE_LEN;
use tokio::runtime::Runtime;
use tokio::sync::oneshot;
use tonic::transport::{Identity, Server, ServerTlsConfig};
Expand Down Expand Up @@ -81,7 +82,7 @@ impl Service for GrpcService {
coord: self.coord.clone(),
metrics_register: self.metrics_register.clone(),
})
.max_decoding_message_size(100 * 1024 * 1024);
.max_decoding_message_size(DEFAULT_GRPC_SERVER_MESSAGE_LEN);

let mut grpc_builder =
build_grpc_server!(&self.tls_config, self.span_context_extractor.clone());
Expand Down
2 changes: 1 addition & 1 deletion meta/src/model/meta_admin.rs
Expand Up @@ -508,7 +508,7 @@ impl AdminMeta {
pub async fn drop_user(&self, name: &str) -> MetaResult<bool> {
let req = command::WriteCommand::DropUser(self.cluster(), name.to_string());

self.client.write::<()>(&req).await?;
self.client.write::<bool>(&req).await?;
Ok(true)
}

Expand Down
18 changes: 14 additions & 4 deletions meta/src/store/state_machine.rs
Expand Up @@ -1027,10 +1027,20 @@ impl StateMachine {
})
}

fn process_drop_user(&self, cluster: &str, user_name: &str) -> MetaResult<()> {
let key = KeyPath::user(cluster, user_name);

Ok(self.remove(&key)?)
fn process_drop_user(&self, cluster: &str, user_name: &str) -> MetaResult<bool> {
let user_key = KeyPath::user(cluster, user_name);
let tenants_key = KeyPath::tenants(cluster);
if let Some(user) = self.get_struct::<UserDesc>(&user_key)? {
// first delete member of tenant
for tenant in self.children_data::<Tenant>(&tenants_key)?.into_values() {
let member_key = KeyPath::member(tenant.name(), tenant.name(), user.id());
self.remove(&member_key)?;
}
self.remove(&user_key)?;
Ok(true)
} else {
Ok(false)
}
}

fn set_tenant_limiter(
Expand Down