Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: return new added columns in region server's extension response #3533

Merged
merged 13 commits into from
Mar 23, 2024
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ etcd-client = "0.12"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "96f1f0404f421ee560a4310c73c5071e49168168" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "349cb385583697f41010dabeb3c106d58f9599b4" }
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
Expand Down
36 changes: 17 additions & 19 deletions src/client/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::sync::Arc;

use api::v1::region::{QueryRequest, RegionRequest, RegionResponse};
use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::ResponseHeader;
use arc_swap::ArcSwapOption;
use arrow_flight::Ticket;
Expand All @@ -23,7 +23,7 @@ use async_trait::async_trait;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_meta::datanode_manager::{AffectedRows, Datanode};
use common_meta::datanode_manager::{Datanode, HandleResponse};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
Expand All @@ -46,7 +46,7 @@ pub struct RegionRequester {

#[async_trait]
impl Datanode for RegionRequester {
async fn handle(&self, request: RegionRequest) -> MetaResult<AffectedRows> {
async fn handle(&self, request: RegionRequest) -> MetaResult<HandleResponse> {
self.handle_inner(request).await.map_err(|err| {
if err.should_retry() {
meta_error::Error::RetryLater {
Expand Down Expand Up @@ -165,7 +165,7 @@ impl RegionRequester {
Ok(Box::pin(record_batch_stream))
}

async fn handle_inner(&self, request: RegionRequest) -> Result<AffectedRows> {
async fn handle_inner(&self, request: RegionRequest) -> Result<HandleResponse> {
let request_type = request
.body
.as_ref()
Expand All @@ -178,10 +178,7 @@ impl RegionRequester {

let mut client = self.client.raw_region_client()?;

let RegionResponse {
header,
affected_rows,
} = client
let response = client
.handle(request)
.await
.map_err(|e| {
Expand All @@ -195,19 +192,20 @@ impl RegionRequester {
})?
.into_inner();

check_response_header(header)?;
check_response_header(&response.header)?;

Ok(affected_rows as _)
Ok(HandleResponse::from_region_response(response))
}

pub async fn handle(&self, request: RegionRequest) -> Result<AffectedRows> {
pub async fn handle(&self, request: RegionRequest) -> Result<HandleResponse> {
self.handle_inner(request).await
}
}

pub fn check_response_header(header: Option<ResponseHeader>) -> Result<()> {
pub fn check_response_header(header: &Option<ResponseHeader>) -> Result<()> {
let status = header
.and_then(|header| header.status)
.as_ref()
.and_then(|header| header.status.as_ref())
.context(IllegalDatabaseResponseSnafu {
err_msg: "either response header or status is missing",
})?;
Expand All @@ -221,7 +219,7 @@ pub fn check_response_header(header: Option<ResponseHeader>) -> Result<()> {
})?;
ServerSnafu {
code,
msg: status.err_msg,
msg: status.err_msg.clone(),
}
.fail()
}
Expand All @@ -236,27 +234,27 @@ mod test {

#[test]
fn test_check_response_header() {
let result = check_response_header(None);
let result = check_response_header(&None);
assert!(matches!(
result.unwrap_err(),
IllegalDatabaseResponse { .. }
));

let result = check_response_header(Some(ResponseHeader { status: None }));
let result = check_response_header(&Some(ResponseHeader { status: None }));
assert!(matches!(
result.unwrap_err(),
IllegalDatabaseResponse { .. }
));

let result = check_response_header(Some(ResponseHeader {
let result = check_response_header(&Some(ResponseHeader {
status: Some(PbStatus {
status_code: StatusCode::Success as u32,
err_msg: String::default(),
}),
}));
assert!(result.is_ok());

let result = check_response_header(Some(ResponseHeader {
let result = check_response_header(&Some(ResponseHeader {
status: Some(PbStatus {
status_code: u32::MAX,
err_msg: String::default(),
Expand All @@ -267,7 +265,7 @@ mod test {
IllegalDatabaseResponse { .. }
));

let result = check_response_header(Some(ResponseHeader {
let result = check_response_header(&Some(ResponseHeader {
status: Some(PbStatus {
status_code: StatusCode::Internal as u32,
err_msg: "blabla".to_string(),
Expand Down
29 changes: 27 additions & 2 deletions src/common/meta/src/datanode_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::region::{QueryRequest, RegionRequest, RegionResponse};
pub use common_base::AffectedRows;
use common_recordbatch::SendableRecordBatchStream;

Expand All @@ -25,7 +26,7 @@ use crate::peer::Peer;
#[async_trait::async_trait]
pub trait Datanode: Send + Sync {
/// Handles DML, and DDL requests.
async fn handle(&self, request: RegionRequest) -> Result<AffectedRows>;
async fn handle(&self, request: RegionRequest) -> Result<HandleResponse>;

/// Handles query requests
async fn handle_query(&self, request: QueryRequest) -> Result<SendableRecordBatchStream>;
Expand All @@ -41,3 +42,27 @@ pub trait DatanodeManager: Send + Sync {
}

pub type DatanodeManagerRef = Arc<dyn DatanodeManager>;

/// This result struct is derived from [RegionResponse]
#[derive(Debug)]
pub struct HandleResponse {
pub affected_rows: AffectedRows,
pub extension: HashMap<String, Vec<u8>>,
}

impl HandleResponse {
pub fn from_region_response(region_response: RegionResponse) -> Self {
Self {
affected_rows: region_response.affected_rows as _,
extension: region_response.extension,
}
}

/// Creates one response without extension
pub fn new(affected_rows: AffectedRows) -> Self {
Self {
affected_rows,
extension: Default::default(),
}
}
}
8 changes: 7 additions & 1 deletion src/common/meta/src/ddl/create_logical_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl CreateLogicalTablesProcedure {
/// - Checks whether physical table exists.
/// - Checks whether logical tables exist.
/// - Allocates the table ids.
/// - Modify tasks to sort logical columns on their names.
///
/// Abort(non-retry):
/// - The physical table does not exist.
Expand Down Expand Up @@ -130,7 +131,7 @@ impl CreateLogicalTablesProcedure {
));
}

// Allocates table ids
// Allocates table ids and sort columns on their names.
fengjiachun marked this conversation as resolved.
Show resolved Hide resolved
for (task, table_id) in tasks.iter_mut().zip(already_exists_tables_ids.iter()) {
let table_id = if let Some(table_id) = table_id {
*table_id
Expand All @@ -141,6 +142,11 @@ impl CreateLogicalTablesProcedure {
.await?
};
task.set_table_id(table_id);

// sort columns in task
task.sort_columns();

common_telemetry::info!("[DEBUG] sorted task {:?}", task);
}

self.creator
Expand Down
7 changes: 4 additions & 3 deletions src/common/meta/src/ddl/tests/create_logical_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use common_telemetry::debug;
use store_api::storage::RegionId;
use table::metadata::RawTableInfo;

use crate::datanode_manager::HandleResponse;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::test_util::create_table::build_raw_table_info_from_expr;
use crate::ddl::test_util::{TestColumnDefBuilder, TestCreateTableExprBuilder};
Expand All @@ -36,7 +37,7 @@ use crate::error::{Error, Result};
use crate::key::table_route::TableRouteValue;
use crate::peer::Peer;
use crate::rpc::ddl::CreateTableTask;
use crate::test_util::{new_ddl_context, AffectedRows, MockDatanodeHandler, MockDatanodeManager};
use crate::test_util::{new_ddl_context, MockDatanodeHandler, MockDatanodeManager};

// Note: this code may be duplicated with others.
// However, it's by design, ensures the tests are easy to be modified or added.
Expand Down Expand Up @@ -332,9 +333,9 @@ pub struct NaiveDatanodeHandler;

#[async_trait::async_trait]
impl MockDatanodeHandler for NaiveDatanodeHandler {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<AffectedRows> {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<HandleResponse> {
debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
Ok(0)
Ok(HandleResponse::new(0))
}

async fn handle_query(
Expand Down
13 changes: 7 additions & 6 deletions src/common/meta/src/ddl/tests/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use common_procedure_test::MockContextProvider;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::debug;

use crate::datanode_manager::HandleResponse;
use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::test_util::create_table::build_raw_table_info_from_expr;
use crate::ddl::test_util::{TestColumnDefBuilder, TestCreateTableExprBuilder};
Expand All @@ -34,11 +35,11 @@ use crate::error::{Error, Result};
use crate::key::table_route::TableRouteValue;
use crate::peer::Peer;
use crate::rpc::ddl::CreateTableTask;
use crate::test_util::{new_ddl_context, AffectedRows, MockDatanodeHandler, MockDatanodeManager};
use crate::test_util::{new_ddl_context, MockDatanodeHandler, MockDatanodeManager};

#[async_trait::async_trait]
impl MockDatanodeHandler for () {
async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<AffectedRows> {
async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<HandleResponse> {
unreachable!()
}

Expand Down Expand Up @@ -176,7 +177,7 @@ pub struct RetryErrorDatanodeHandler;

#[async_trait::async_trait]
impl MockDatanodeHandler for RetryErrorDatanodeHandler {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<AffectedRows> {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<HandleResponse> {
debug!("Returning retry later for request: {request:?}, peer: {peer:?}");
Err(Error::RetryLater {
source: BoxedError::new(
Expand Down Expand Up @@ -220,7 +221,7 @@ pub struct UnexpectedErrorDatanodeHandler;

#[async_trait::async_trait]
impl MockDatanodeHandler for UnexpectedErrorDatanodeHandler {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<AffectedRows> {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<HandleResponse> {
debug!("Returning mock error for request: {request:?}, peer: {peer:?}");
error::UnexpectedSnafu {
err_msg: "mock error",
Expand Down Expand Up @@ -260,9 +261,9 @@ pub struct NaiveDatanodeHandler;

#[async_trait::async_trait]
impl MockDatanodeHandler for NaiveDatanodeHandler {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<AffectedRows> {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<HandleResponse> {
debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
Ok(0)
Ok(HandleResponse::new(0))
}

async fn handle_query(
Expand Down