Skip to content

Commit

Permalink
feat: impl migrate_region and query_procedure_state for procedure ser…
Browse files Browse the repository at this point in the history
…vice/client
  • Loading branch information
killme2008 committed Jan 26, 2024
1 parent 00dd9b8 commit 2cd56fe
Show file tree
Hide file tree
Showing 15 changed files with 315 additions and 109 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ etcd-client = "0.12"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/killme2008/greptime-proto.git", rev = "761941e906116271ce319c00d7062049ddda1718" }
greptime-proto = { git = "https://github.com/killme2008/greptime-proto.git", rev = "e114e7db7e82192a098d242f6767c551fbf9c28d" }
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
Expand Down
9 changes: 8 additions & 1 deletion src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ pub enum Error {
source: common_procedure::Error,
},

#[snafu(display("Failed to parse procedure id"))]
ParseProcedureId {
location: Location,
#[snafu(source)]
error: common_procedure::ParseIdError,
},

#[snafu(display("Unsupported operation {}", operation))]
Unsupported {
operation: String,
Expand Down Expand Up @@ -422,7 +429,7 @@ impl ErrorExt for Error {
InvalidCatalogValue { source, .. } => source.status_code(),
ConvertAlterTableRequest { source, .. } => source.status_code(),

InvalidNumTopics { .. } => StatusCode::InvalidArguments,
ParseProcedureId { .. } | InvalidNumTopics { .. } => StatusCode::InvalidArguments,
}
}

Expand Down
1 change: 1 addition & 0 deletions src/common/meta/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

pub mod ddl;
pub mod lock;
pub mod procedure;
pub mod router;
pub mod store;
pub mod util;
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/rpc/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl TryFrom<PbDdlTaskResponse> for SubmitDdlTaskResponse {
fn try_from(resp: PbDdlTaskResponse) -> Result<Self> {
let table_id = resp.table_id.map(|t| t.id);
Ok(Self {
key: resp.pid.map(|pid| pid.key).unwrap_or(vec![]),
key: resp.pid.map(|pid| pid.key).unwrap_or_default(),
table_id,
})
}
Expand Down
4 changes: 2 additions & 2 deletions src/common/procedure/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub mod watcher;

pub use crate::error::{Error, Result};
pub use crate::procedure::{
BoxedProcedure, Context, ContextProvider, LockKey, Procedure, ProcedureId, ProcedureManager,
ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, StringKey,
BoxedProcedure, Context, ContextProvider, LockKey, ParseIdError, Procedure, ProcedureId,
ProcedureManager, ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, StringKey,
};
pub use crate::watcher::Watcher;
10 changes: 5 additions & 5 deletions src/meta-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
// limitations under the License.

mod ask_leader;
mod ddl;
mod heartbeat;
mod load_balance;
mod lock;
mod procedure;

mod store;

Expand All @@ -33,9 +33,9 @@ use common_meta::rpc::store::{
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
};
use common_telemetry::info;
use ddl::Client as DdlClient;
use heartbeat::Client as HeartbeatClient;
use lock::Client as LockClient;
use procedure::Client as ProcedureClient;
use snafu::{OptionExt, ResultExt};
use store::Client as StoreClient;

Expand Down Expand Up @@ -157,7 +157,7 @@ impl MetaClientBuilder {
}
if self.enable_ddl {
let mgr = self.ddl_channel_manager.unwrap_or(mgr);
client.ddl = Some(DdlClient::new(
client.ddl = Some(ProcedureClient::new(
self.id,
self.role,
mgr,
Expand All @@ -176,7 +176,7 @@ pub struct MetaClient {
heartbeat: Option<HeartbeatClient>,
store: Option<StoreClient>,
lock: Option<LockClient>,
ddl: Option<DdlClient>,
ddl: Option<ProcedureClient>,
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -365,7 +365,7 @@ impl MetaClient {
}

#[inline]
pub fn ddl_client(&self) -> Result<DdlClient> {
pub fn ddl_client(&self) -> Result<ProcedureClient> {
self.ddl
.clone()
.context(error::NotStartedSnafu { name: "ddl_client" })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future::Future;
use std::sync::Arc;
use std::time::Duration;

use api::v1::meta::procedure_client::ProcedureClient;
use api::v1::meta::{DdlTaskRequest, DdlTaskResponse, ErrorCode, ResponseHeader, Role};
use api::v1::meta::procedure_service_client::ProcedureServiceClient;
use api::v1::meta::{
DdlTaskRequest, DdlTaskResponse, ErrorCode, MigrateRegionRequest, MigrateRegionResponse,
ProcedureId, ProcedureStateResponse, QueryProcedureRequest, ResponseHeader, Role,
};
use common_grpc::channel_manager::ChannelManager;
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{info, warn};
Expand Down Expand Up @@ -65,10 +70,33 @@ impl Client {
let inner = self.inner.read().await;
inner.submit_ddl_task(req).await
}

/// Query the procedure' state by its id
pub async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
let inner = self.inner.read().await;
inner.query_procedure_state(pid).await
}

/// Migrate the region from one datanode to the other datanode:
/// - `region_id`: the migrated region id
/// - `from_peer`: the source datanode id
/// - `to_peer`: the target datanode id
/// - `replay_timeout`: replay WAL timeout after migration.
pub async fn migrate_region(
&self,
region_id: u64,
from_peer: u64,
to_peer: u64,
replay_timeout: Duration,
) -> Result<MigrateRegionResponse> {
let inner = self.inner.read().await;
inner
.migrate_region(region_id, from_peer, to_peer, replay_timeout)
.await
}
}

#[derive(Debug)]

struct Inner {
id: Id,
role: Role,
Expand Down Expand Up @@ -106,44 +134,47 @@ impl Inner {
Ok(())
}

fn make_client(&self, addr: impl AsRef<str>) -> Result<ProcedureClient<Channel>> {
fn make_client(&self, addr: impl AsRef<str>) -> Result<ProcedureServiceClient<Channel>> {
let channel = self
.channel_manager
.get(addr)
.context(error::CreateChannelSnafu)?;

Ok(ProcedureClient::new(channel))
Ok(ProcedureServiceClient::new(channel))
}

#[inline]
fn is_started(&self) -> bool {
self.ask_leader.is_some()
}

pub async fn submit_ddl_task(&self, mut req: DdlTaskRequest) -> Result<DdlTaskResponse> {
fn ask_leader(&self) -> Result<&AskLeader> {
ensure!(
self.is_started(),
error::IllegalGrpcClientStateSnafu {
err_msg: "DDL client not start"
}
);

req.set_header(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);
let ask_leader = self.ask_leader.as_ref().unwrap();
Ok(self.ask_leader.as_ref().unwrap())
}

async fn with_retry<T, F, R, H>(&self, task: &str, body_fn: F, get_header: H) -> Result<T>
where
R: Future<Output = std::result::Result<T, Status>>,
F: Fn(ProcedureServiceClient<Channel>) -> R,
H: Fn(&T) -> &Option<ResponseHeader>,
{
let ask_leader = self.ask_leader()?;
let mut times = 0;

while times < self.max_retry {
if let Some(leader) = &ask_leader.get_leader() {
let mut client = self.make_client(leader)?;
match client.ddl(req.clone()).await {
let client = self.make_client(leader)?;
match body_fn(client).await {
Ok(res) => {
let res = res.into_inner();
if is_not_leader(&res.header) {
warn!("Failed to submitting ddl to {leader}, not a leader");
if is_not_leader(get_header(&res)) {
warn!("Failed to {task} to {leader}, not a leader");
let leader = ask_leader.ask_leader().await?;
info!("DDL client updated to new leader addr: {leader}");
times += 1;
Expand All @@ -154,9 +185,9 @@ impl Inner {
Err(status) => {
// The leader may be unreachable.
if is_unreachable(&status) {
warn!("Failed to submitting ddl to {leader}, source: {status}");
warn!("Failed to {task} to {leader}, source: {status}");
let leader = ask_leader.ask_leader().await?;
info!("DDL client updated to new leader addr: {leader}");
info!("Procedure client updated to new leader addr: {leader}");
times += 1;
continue;
} else {
Expand All @@ -170,11 +201,86 @@ impl Inner {
}

error::RetryTimesExceededSnafu {
msg: "Failed to submit DDL task",
msg: "Failed to {task}",
times: self.max_retry,
}
.fail()
}

async fn migrate_region(
&self,
region_id: u64,
from_peer: u64,
to_peer: u64,
replay_timeout: Duration,
) -> Result<MigrateRegionResponse> {
let mut req = MigrateRegionRequest {
region_id,
from_peer,
to_peer,
replay_timeout_secs: replay_timeout.as_secs() as u32,
..Default::default()
};

req.set_header(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);

self.with_retry(
"migrate_region",
move |mut client| {
let req = req.clone();

async move { client.migrate(req).await.map(|res| res.into_inner()) }
},
|resp: &MigrateRegionResponse| &resp.header,
)
.await
}

async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
let mut req = QueryProcedureRequest {
pid: Some(ProcedureId { key: pid.into() }),
..Default::default()
};

req.set_header(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);

self.with_retry(
"query procedure state",
move |mut client| {
let req = req.clone();

async move { client.query(req).await.map(|res| res.into_inner()) }
},
|resp: &ProcedureStateResponse| &resp.header,
)
.await
}

async fn submit_ddl_task(&self, mut req: DdlTaskRequest) -> Result<DdlTaskResponse> {
req.set_header(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);

self.with_retry(
"submit ddl task",
move |mut client| {
let req = req.clone();
async move { client.ddl(req).await.map(|res| res.into_inner()) }
},
|resp: &DdlTaskResponse| &resp.header,
)
.await
}
}

fn is_unreachable(status: &Status) -> bool {
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;
use api::v1::meta::cluster_server::ClusterServer;
use api::v1::meta::heartbeat_server::HeartbeatServer;
use api::v1::meta::lock_server::LockServer;
use api::v1::meta::procedure_server::ProcedureServer;
use api::v1::meta::procedure_service_server::ProcedureServiceServer;
use api::v1::meta::store_server::StoreServer;
use common_base::Plugins;
use common_meta::kv_backend::chroot::ChrootKvBackend;
Expand Down Expand Up @@ -172,7 +172,7 @@ pub fn router(meta_srv: MetaSrv) -> Router {
.add_service(StoreServer::new(meta_srv.clone()))
.add_service(ClusterServer::new(meta_srv.clone()))
.add_service(LockServer::new(meta_srv.clone()))
.add_service(ProcedureServer::new(meta_srv.clone()))
.add_service(ProcedureServiceServer::new(meta_srv.clone()))
.add_service(admin::make_admin_service(meta_srv))
}

Expand Down
16 changes: 13 additions & 3 deletions src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,15 @@ pub enum Error {
source: common_procedure::Error,
},

#[snafu(display("Failed to query procedure state"))]
QueryProcedure {
location: Location,
source: common_procedure::Error,
},

#[snafu(display("Procedure not found: {pid}"))]
ProcedureNotFound { location: Location, pid: String },

#[snafu(display("Failed to submit procedure"))]
SubmitProcedure {
location: Location,
Expand Down Expand Up @@ -706,6 +715,7 @@ impl ErrorExt for Error {
| Error::InvalidArguments { .. }
| Error::InitExportMetricsTask { .. }
| Error::InvalidHeartbeatRequest { .. }
| Error::ProcedureNotFound { .. }
| Error::TooManyPartitions { .. } => StatusCode::InvalidArguments,
Error::LeaseKeyFromUtf8 { .. }
| Error::LeaseValueFromUtf8 { .. }
Expand All @@ -731,9 +741,9 @@ impl ErrorExt for Error {
Error::RequestDatanode { source, .. } => source.status_code(),
Error::InvalidCatalogValue { source, .. }
| Error::InvalidFullTableName { source, .. } => source.status_code(),
Error::SubmitProcedure { source, .. } | Error::WaitProcedure { source, .. } => {
source.status_code()
}
Error::SubmitProcedure { source, .. }
| Error::WaitProcedure { source, .. }
| Error::QueryProcedure { source, .. } => source.status_code(),
Error::ShutdownServer { source, .. } | Error::StartHttp { source, .. } => {
source.status_code()
}
Expand Down

0 comments on commit 2cd56fe

Please sign in to comment.