Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: return request outdated error on handling alter #3239

Merged
merged 9 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/common/error/src/status_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ pub enum StatusCode {
// ====== Begin of storage related status code =====
/// Storage is temporarily unable to handle the request
StorageUnavailable = 5000,
/// Request is outdated, e.g., version mismatch
RequestOutdated = 5001,
// ====== End of storage related status code =======

// ====== Begin of server related status code =====
Expand Down Expand Up @@ -135,7 +137,8 @@ impl StatusCode {
| StatusCode::AuthHeaderNotFound
| StatusCode::InvalidAuthHeader
| StatusCode::AccessDenied
| StatusCode::PermissionDenied => false,
| StatusCode::PermissionDenied
| StatusCode::RequestOutdated => false,
}
}

Expand Down Expand Up @@ -172,7 +175,8 @@ impl StatusCode {
| StatusCode::AuthHeaderNotFound
| StatusCode::InvalidAuthHeader
| StatusCode::AccessDenied
| StatusCode::PermissionDenied => false,
| StatusCode::PermissionDenied
| StatusCode::RequestOutdated => false,
}
}

Expand Down
14 changes: 11 additions & 3 deletions src/common/meta/src/ddl/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use api::v1::region::{
};
use api::v1::{AlterExpr, RenameTable};
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_grpc_expr::alter_expr_to_request;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{
Expand Down Expand Up @@ -218,10 +220,16 @@ impl AlterTableProcedure {
let requester = requester.clone();

alter_region_tasks.push(async move {
if let Err(e) = requester.handle(request).await {
return Err(handle_operate_region_error(datanode)(e));
let result = requester.handle(request).await;
match result {
Ok(_) => Ok(()),
// Treat request outdated as success.
// The engine will throw this code when the schema version not match.
// As this procedure has locked the table, the only reason for this error
// is procedure is succeeded before and is retrying.
Err(e) if e.status_code() == StatusCode::RequestOutdated => Ok(()),
evenyag marked this conversation as resolved.
Show resolved Hide resolved
Err(e) => Err(handle_operate_region_error(datanode)(e)),
waynexia marked this conversation as resolved.
Show resolved Hide resolved
}
Ok(())
});
}
}
Expand Down
50 changes: 38 additions & 12 deletions src/metric-engine/src/data_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
// limitations under the License.

use api::v1::SemanticType;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_telemetry::info;
use common_telemetry::tracing::warn;
use mito2::engine::MitoEngine;
use snafu::ResultExt;
Expand All @@ -30,6 +33,8 @@ use crate::error::{
use crate::metrics::MITO_DDL_DURATION;
use crate::utils;

const MAX_RETRIES: usize = 5;

/// This is a generic handler like [MetricEngine](crate::engine::MetricEngine). It
/// will handle all the data related operations across physical tables. Thus
/// every operation should be associated to a [RegionId], which is the physical
Expand Down Expand Up @@ -61,6 +66,35 @@ impl DataRegion {
) -> Result<()> {
let region_id = utils::to_data_region_id(region_id);

let mut retries = 0;
// submit alter request
while retries < MAX_RETRIES {
let request = self.assemble_alter_request(region_id, &columns).await?;

let _timer = MITO_DDL_DURATION.start_timer();

let result = self.mito.handle_request(region_id, request).await;
match result {
Ok(_) => return Ok(()),
Err(e) if e.status_code() == StatusCode::RequestOutdated => {
info!("Retrying alter {region_id} due to outdated schema version, times {retries}");
retries += 1;
continue;
}
Err(e) => {
return Err(e).context(MitoWriteOperationSnafu)?;
}
}
}

Ok(())
}

async fn assemble_alter_request(
&self,
region_id: RegionId,
columns: &[ColumnMetadata],
) -> Result<RegionRequest> {
// retrieve underlying version
let region_metadata = self
.mito
Expand All @@ -85,9 +119,10 @@ impl DataRegion {

// overwrite semantic type
let columns = columns
.into_iter()
.iter()
.enumerate()
.map(|(delta, mut c)| {
.map(|(delta, c)| {
let mut c = c.clone();
if c.semantic_type == SemanticType::Tag {
c.semantic_type = SemanticType::Field;
if !c.column_schema.data_type.is_string() {
Expand Down Expand Up @@ -120,16 +155,7 @@ impl DataRegion {
kind: AlterKind::AddColumns { columns },
});

// submit alter request
{
let _timer = MITO_DDL_DURATION.start_timer();
self.mito
.handle_request(region_id, alter_request)
.await
.context(MitoWriteOperationSnafu)?;
}

Ok(())
Ok(alter_request)
}

pub async fn write_data(
Expand Down
19 changes: 11 additions & 8 deletions src/metric-engine/src/engine/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,18 @@ impl MetricEngineInner {
}
}
}
info!("Found new columns {new_columns:?} to add to physical region {data_region_id}");

self.add_columns_to_physical_data_region(
data_region_id,
metadata_region_id,
logical_region_id,
new_columns,
)
.await?;
if !new_columns.is_empty() {
info!("Found new columns {new_columns:?} to add to physical region {data_region_id}");

self.add_columns_to_physical_data_region(
data_region_id,
metadata_region_id,
logical_region_id,
new_columns,
)
.await?;
}

// register logical region to metadata region
self.metadata_region
Expand Down
7 changes: 5 additions & 2 deletions src/mito2/src/engine/alter_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use std::collections::HashMap;

use api::v1::value::ValueData;
use api::v1::{ColumnDataType, Row, Rows, SemanticType};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatches;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
Expand Down Expand Up @@ -277,10 +279,11 @@ async fn test_alter_region_retry() {
.unwrap();
// Retries request.
let request = add_tag1();
engine
let err = engine
.handle_request(region_id, RegionRequest::Alter(request))
.await
.unwrap();
.unwrap_err();
assert!(matches!(err.status_code(), StatusCode::RequestOutdated));
evenyag marked this conversation as resolved.
Show resolved Hide resolved

let expected = "\
+-------+-------+---------+---------------------+
Expand Down
14 changes: 14 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,17 @@ pub enum Error {
location: Location,
},

#[snafu(display(
"Schema version doesn't match. Expect greater than {} but gives {}",
evenyag marked this conversation as resolved.
Show resolved Hide resolved
expect,
actual
))]
InvalidRegionRequestSchemaVersion {
expect: u64,
actual: u64,
location: Location,
},

#[snafu(display("Region {} is read only", region_id))]
RegionReadonly {
region_id: RegionId,
Expand Down Expand Up @@ -591,6 +602,9 @@ impl ErrorExt for Error {
| ConvertColumnDataType { .. }
| ColumnNotFound { .. }
| InvalidMetadata { .. } => StatusCode::InvalidArguments,

InvalidRegionRequestSchemaVersion { .. } => StatusCode::RequestOutdated,

RegionMetadataNotFound { .. }
| Join { .. }
| WorkerStopped { .. }
Expand Down
14 changes: 11 additions & 3 deletions src/mito2/src/worker/handle_alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use store_api::metadata::{RegionMetadata, RegionMetadataBuilder, RegionMetadataR
use store_api::region_request::RegionAlterRequest;
use store_api::storage::RegionId;

use crate::error::{InvalidMetadataSnafu, InvalidRegionRequestSnafu, Result};
use crate::error::{
InvalidMetadataSnafu, InvalidRegionRequestSchemaVersionSnafu, InvalidRegionRequestSnafu, Result,
};
use crate::flush::FlushReason;
use crate::manifest::action::{RegionChange, RegionMetaAction, RegionMetaActionList};
use crate::memtable::MemtableBuilderRef;
Expand Down Expand Up @@ -52,8 +54,14 @@ impl<S> RegionWorkerLoop<S> {
"Ignores alter request, region id:{}, region schema version {} is greater than request schema version {}",
evenyag marked this conversation as resolved.
Show resolved Hide resolved
region_id, version.metadata.schema_version, request.schema_version
);
// Returns if it altered.
sender.send(Ok(0));
// Returns an error.
evenyag marked this conversation as resolved.
Show resolved Hide resolved
sender.send(
InvalidRegionRequestSchemaVersionSnafu {
evenyag marked this conversation as resolved.
Show resolved Hide resolved
expect: version.metadata.schema_version,
actual: request.schema_version,
}
.fail(),
);
return;
}
// Validate request.
Expand Down
4 changes: 3 additions & 1 deletion src/servers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,9 @@ pub fn status_to_tonic_code(status_code: StatusCode) -> Code {
| StatusCode::Internal
| StatusCode::PlanQuery
| StatusCode::EngineExecuteQuery => Code::Internal,
StatusCode::InvalidArguments | StatusCode::InvalidSyntax => Code::InvalidArgument,
StatusCode::InvalidArguments | StatusCode::InvalidSyntax | StatusCode::RequestOutdated => {
Code::InvalidArgument
}
StatusCode::Cancelled => Code::Cancelled,
StatusCode::TableAlreadyExists
| StatusCode::TableColumnExists
Expand Down