Skip to content

Commit

Permalink
feat: introduce the region role (#2640)
Browse files Browse the repository at this point in the history
* feat: introduce region role

* chore: apply suggestions from CR
  • Loading branch information
WenyXu authored Oct 31, 2023
1 parent 465c8f7 commit 54ed752
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 23 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ derive_builder = "0.12"
etcd-client = "0.11"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "20cdc57c3f320345b122eea43bc549a19d342e51" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "3137cd184770e03f6a4dc191deaf02beb11fae7d" }
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
Expand Down
11 changes: 6 additions & 5 deletions src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;

use api::v1::meta::{HeartbeatRequest, Peer, RegionStat, Role};
use api::v1::meta::{HeartbeatRequest, Peer, RegionRole, RegionStat, Role};
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
Expand Down Expand Up @@ -282,14 +282,15 @@ impl HeartbeatTask {
let regions = region_server.opened_regions();

let mut region_stats = Vec::new();
for (region_id, engine) in regions {
for stat in regions {
let approximate_bytes = region_server
.region_disk_usage(region_id)
.region_disk_usage(stat.region_id)
.await
.unwrap_or(0);
let region_stat = RegionStat {
region_id: region_id.as_u64(),
engine,
region_id: stat.region_id.as_u64(),
engine: stat.engine,
role: RegionRole::from(stat.role).into(),
approximate_bytes,
// TODO(ruihang): scratch more info
..Default::default()
Expand Down
20 changes: 17 additions & 3 deletions src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use servers::grpc::region_server::RegionServerHandler;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::RegionEngineRef;
use store_api::region_engine::{RegionEngineRef, RegionRole};
use store_api::region_request::{RegionCloseRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
Expand All @@ -66,6 +66,12 @@ pub struct RegionServer {
inner: Arc<RegionServerInner>,
}

pub struct RegionStat {
pub region_id: RegionId,
pub engine: String,
pub role: RegionRole,
}

impl RegionServer {
pub fn new(
query_engine: QueryEngineRef,
Expand Down Expand Up @@ -97,11 +103,19 @@ impl RegionServer {
self.inner.handle_read(request).await
}

pub fn opened_regions(&self) -> Vec<(RegionId, String)> {
pub fn opened_regions(&self) -> Vec<RegionStat> {
self.inner
.region_map
.iter()
.map(|e| (*e.key(), e.value().name().to_string()))
.filter_map(|e| {
let region_id = *e.key();
// Filters out any regions whose role equals None.
e.role(region_id).map(|role| RegionStat {
region_id,
engine: e.value().name().to_string(),
role,
})
})
.collect()
}

Expand Down
6 changes: 5 additions & 1 deletion src/datanode/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use query::query_engine::DescribeResult;
use query::QueryEngine;
use session::context::QueryContextRef;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::RegionEngine;
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};
use table::TableRef;
Expand Down Expand Up @@ -190,4 +190,8 @@ impl RegionEngine for MockRegionEngine {
fn set_writable(&self, _region_id: RegionId, _writable: bool) -> Result<(), BoxedError> {
Ok(())
}

fn role(&self, _region_id: RegionId) -> Option<RegionRole> {
Some(RegionRole::Leader)
}
}
32 changes: 22 additions & 10 deletions src/file-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;
use std::sync::{Arc, RwLock};

use async_trait::async_trait;
use common_catalog::consts::FILE_ENGINE;
Expand All @@ -24,12 +24,12 @@ use common_telemetry::{error, info};
use object_store::ObjectStore;
use snafu::{ensure, OptionExt};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::RegionEngine;
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_request::{
RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest, RegionRequest,
};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::{Mutex, RwLock};
use tokio::sync::Mutex;

use crate::config::EngineConfig;
use crate::error::{
Expand Down Expand Up @@ -102,6 +102,10 @@ impl RegionEngine for FileRegionEngine {
.set_writable(region_id, writable)
.map_err(BoxedError::new)
}

fn role(&self, region_id: RegionId) -> Option<RegionRole> {
self.inner.state(region_id)
}
}

struct EngineInner {
Expand Down Expand Up @@ -147,14 +151,22 @@ impl EngineInner {

async fn stop(&self) -> EngineResult<()> {
let _lock = self.region_mutex.lock().await;
self.regions.write().await.clear();
self.regions.write().unwrap().clear();
Ok(())
}

fn set_writable(&self, _region_id: RegionId, _writable: bool) -> EngineResult<()> {
// TODO(zhongzc): Improve the semantics and implementation of this API.
Ok(())
}

fn state(&self, region_id: RegionId) -> Option<RegionRole> {
if self.regions.read().unwrap().get(&region_id).is_some() {
Some(RegionRole::Leader)
} else {
None
}
}
}

impl EngineInner {
Expand Down Expand Up @@ -189,7 +201,7 @@ impl EngineInner {
region_id, err
);
})?;
self.regions.write().await.insert(region_id, region);
self.regions.write().unwrap().insert(region_id, region);

info!("A new region is created, region_id: {}", region_id);
Ok(Output::AffectedRows(0))
Expand Down Expand Up @@ -219,7 +231,7 @@ impl EngineInner {
region_id, err
);
})?;
self.regions.write().await.insert(region_id, region);
self.regions.write().unwrap().insert(region_id, region);

info!("Region opened, region_id: {}", region_id);
Ok(Output::AffectedRows(0))
Expand All @@ -232,7 +244,7 @@ impl EngineInner {
) -> EngineResult<Output> {
let _lock = self.region_mutex.lock().await;

let mut regions = self.regions.write().await;
let mut regions = self.regions.write().unwrap();
if regions.remove(&region_id).is_some() {
info!("Region closed, region_id: {}", region_id);
}
Expand Down Expand Up @@ -263,17 +275,17 @@ impl EngineInner {
);
})?;
}
let _ = self.regions.write().await.remove(&region_id);
let _ = self.regions.write().unwrap().remove(&region_id);

info!("Region dropped, region_id: {}", region_id);
Ok(Output::AffectedRows(0))
}

async fn get_region(&self, region_id: RegionId) -> Option<FileRegionRef> {
self.regions.read().await.get(&region_id).cloned()
self.regions.read().unwrap().get(&region_id).cloned()
}

async fn exists(&self, region_id: RegionId) -> bool {
self.regions.read().await.contains_key(&region_id)
self.regions.read().unwrap().contains_key(&region_id)
}
}
16 changes: 15 additions & 1 deletion src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use object_store::manager::ObjectStoreManagerRef;
use snafu::{OptionExt, ResultExt};
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::RegionEngine;
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};

Expand Down Expand Up @@ -184,6 +184,16 @@ impl EngineInner {
region.set_writable(writable);
Ok(())
}

fn role(&self, region_id: RegionId) -> Option<RegionRole> {
self.workers.get_region(region_id).map(|region| {
if region.is_writable() {
RegionRole::Leader
} else {
RegionRole::Follower
}
})
}
}

#[async_trait]
Expand Down Expand Up @@ -247,6 +257,10 @@ impl RegionEngine for MitoEngine {
.set_writable(region_id, writable)
.map_err(BoxedError::new)
}

fn role(&self, region_id: RegionId) -> Option<RegionRole> {
self.inner.role(region_id)
}
}

// Tests methods.
Expand Down
7 changes: 6 additions & 1 deletion src/mito2/src/engine/open_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::time::Duration;
use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use store_api::region_engine::RegionEngine;
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_request::{
RegionCloseRequest, RegionOpenRequest, RegionPutRequest, RegionRequest,
};
Expand Down Expand Up @@ -49,6 +49,8 @@ async fn test_engine_open_empty() {
assert_eq!(StatusCode::RegionNotFound, err.status_code());
let err = engine.set_writable(region_id, true).unwrap_err();
assert_eq!(StatusCode::RegionNotFound, err.status_code());
let role = engine.role(region_id);
assert_eq!(role, None);
}

#[tokio::test]
Expand Down Expand Up @@ -124,8 +126,11 @@ async fn test_engine_open_readonly() {
.unwrap_err();
assert_eq!(StatusCode::RegionReadonly, err.status_code());

assert_eq!(Some(RegionRole::Follower), engine.role(region_id));
// Set writable and write.
engine.set_writable(region_id, true).unwrap();
assert_eq!(Some(RegionRole::Leader), engine.role(region_id));

put_rows(&engine, region_id, rows).await;
}

Expand Down
23 changes: 23 additions & 0 deletions src/store-api/src/region_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

use std::sync::Arc;

use api::greptime_proto::v1::meta::RegionRole as PbRegionRole;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_query::Output;
Expand All @@ -25,6 +26,23 @@ use crate::metadata::RegionMetadataRef;
use crate::region_request::RegionRequest;
use crate::storage::{RegionId, ScanRequest};

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RegionRole {
// Readonly region(mito2), Readonly region(file).
Follower,
// Writable region(mito2).
Leader,
}

impl From<RegionRole> for PbRegionRole {
fn from(value: RegionRole) -> Self {
match value {
RegionRole::Follower => PbRegionRole::Follower,
RegionRole::Leader => PbRegionRole::Leader,
}
}
}

#[async_trait]
pub trait RegionEngine: Send + Sync {
/// Name of this engine
Expand Down Expand Up @@ -61,6 +79,11 @@ pub trait RegionEngine: Send + Sync {
/// the region as readonly doesn't guarantee that write operations in progress will not
/// take effect.
fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<(), BoxedError>;

/// Indicates region role.
///
/// Returns the `None` if the region is not found.
fn role(&self, region_id: RegionId) -> Option<RegionRole>;
}

pub type RegionEngineRef = Arc<dyn RegionEngine>;

0 comments on commit 54ed752

Please sign in to comment.