Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: grant lease to opening regions #2752

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 51 additions & 2 deletions src/meta-srv/src/handler/region_lease_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,27 @@ use store_api::storage::RegionId;
use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
use crate::region::lease_keeper::RegionLeaseKeeperRef;
use crate::region::lease_keeper::{OpeningRegionKeeperRef, RegionLeaseKeeperRef};
use crate::region::RegionLeaseKeeper;

pub struct RegionLeaseHandler {
region_lease_seconds: u64,
region_lease_keeper: RegionLeaseKeeperRef,
opening_region_keeper: OpeningRegionKeeperRef,
}

impl RegionLeaseHandler {
pub fn new(region_lease_seconds: u64, table_metadata_manager: TableMetadataManagerRef) -> Self {
pub fn new(
region_lease_seconds: u64,
table_metadata_manager: TableMetadataManagerRef,
opening_region_keeper: OpeningRegionKeeperRef,
) -> Self {
let region_lease_keeper = RegionLeaseKeeper::new(table_metadata_manager);

Self {
region_lease_seconds,
region_lease_keeper: Arc::new(region_lease_keeper),
opening_region_keeper,
}
}
}
Expand Down Expand Up @@ -124,6 +130,11 @@ impl HeartbeatHandler for RegionLeaseHandler {
.find_staled_follower_regions(cluster_id, datanode_id, &followers)
.await?;

// If a region is opening, it will be filtered out from the closable regions set.
let closable = self
.opening_region_keeper
.filter_opening_regions(datanode_id, closable);

grant(
&mut granted_regions,
&upgradeable,
Expand Down Expand Up @@ -161,6 +172,7 @@ mod test {
use super::*;
use crate::handler::node_stat::{RegionStat, Stat};
use crate::metasrv::builder::MetaSrvBuilder;
use crate::region::lease_keeper::OpeningRegionKeeper;

fn new_test_keeper() -> RegionLeaseKeeper {
let store = Arc::new(MemoryKvBackend::new());
Expand Down Expand Up @@ -230,9 +242,12 @@ mod test {
..Default::default()
};

let opening_region_keeper = Arc::new(OpeningRegionKeeper::default());

let handler = RegionLeaseHandler::new(
distributed_time_constants::REGION_LEASE_SECS,
table_metadata_manager.clone(),
opening_region_keeper.clone(),
);

handler.handle(&req, ctx, acc).await.unwrap();
Expand Down Expand Up @@ -262,6 +277,39 @@ mod test {
acc,
vec![GrantedRegion::new(region_id, RegionRole::Follower)],
);

let opening_region_id = RegionId::new(table_id, region_number + 2);
let _guard = opening_region_keeper
.register(follower_peer.id, opening_region_id)
.unwrap();

let acc = &mut HeartbeatAccumulator::default();

acc.stat = Some(Stat {
cluster_id,
id: follower_peer.id,
region_stats: vec![
new_empty_region_stat(region_id, RegionRole::Follower),
new_empty_region_stat(another_region_id, RegionRole::Follower),
new_empty_region_stat(opening_region_id, RegionRole::Follower),
],
..Default::default()
});

handler.handle(&req, ctx, acc).await.unwrap();

assert_eq!(
acc.region_lease.as_ref().unwrap().lease_seconds,
distributed_time_constants::REGION_LEASE_SECS
);

assert_region_lease(
acc,
vec![
GrantedRegion::new(region_id, RegionRole::Follower),
GrantedRegion::new(opening_region_id, RegionRole::Follower),
],
);
}

#[tokio::test]
Expand Down Expand Up @@ -325,6 +373,7 @@ mod test {
let handler = RegionLeaseHandler::new(
distributed_time_constants::REGION_LEASE_SECS,
table_metadata_manager.clone(),
Default::default(),
);

handler.handle(&req, ctx, acc).await.unwrap();
Expand Down
6 changes: 6 additions & 0 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::handler::HeartbeatHandlerGroup;
use crate::lock::DistLockRef;
use crate::pubsub::{PublishRef, SubscribeManagerRef};
use crate::region::lease_keeper::OpeningRegionKeeperRef;
use crate::selector::{Selector, SelectorType};
use crate::service::mailbox::MailboxRef;
use crate::service::store::cached_kv::LeaderCachedKvBackend;
Expand Down Expand Up @@ -241,6 +242,7 @@ pub struct MetaSrv {
mailbox: MailboxRef,
ddl_executor: DdlTaskExecutorRef,
table_metadata_manager: TableMetadataManagerRef,
opening_region_keeper: OpeningRegionKeeperRef,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,

plugins: Plugins,
Expand Down Expand Up @@ -403,6 +405,10 @@ impl MetaSrv {
&self.table_metadata_manager
}

pub fn opening_region_keeper(&self) -> &OpeningRegionKeeperRef {
&self.opening_region_keeper
}

pub fn publish(&self) -> Option<PublishRef> {
self.plugins.get::<PublishRef>()
}
Expand Down
4 changes: 4 additions & 0 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use crate::metasrv::{
};
use crate::procedure::region_failover::RegionFailoverManager;
use crate::pubsub::PublishRef;
use crate::region::lease_keeper::OpeningRegionKeeper;
use crate::selector::lease_based::LeaseBasedSelector;
use crate::service::mailbox::MailboxRef;
use crate::service::store::cached_kv::{CheckLeader, LeaderCachedKvBackend};
Expand Down Expand Up @@ -197,6 +198,7 @@ impl MetaSrvBuilder {
&table_id_sequence,
);
let _ = ddl_manager.try_start();
let opening_region_keeper = Arc::new(OpeningRegionKeeper::default());

let handler_group = match handler_group {
Some(handler_group) => handler_group,
Expand Down Expand Up @@ -231,6 +233,7 @@ impl MetaSrvBuilder {
let region_lease_handler = RegionLeaseHandler::new(
distributed_time_constants::REGION_LEASE_SECS,
table_metadata_manager.clone(),
opening_region_keeper.clone(),
);

let group = HeartbeatHandlerGroup::new(pushers);
Expand Down Expand Up @@ -283,6 +286,7 @@ impl MetaSrvBuilder {
)
.await,
plugins: plugins.unwrap_or_else(Plugins::default),
opening_region_keeper,
})
}
}
Expand Down
111 changes: 109 additions & 2 deletions src/meta-srv/src/region/lease_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ pub mod mito;
pub mod utils;

use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::sync::{Arc, RwLock};

use common_meta::key::table_route::TableRouteValue;
use common_meta::key::TableMetadataManagerRef;
use common_meta::DatanodeId;
use snafu::ResultExt;
use store_api::storage::{RegionId, TableId};

Expand Down Expand Up @@ -156,8 +157,84 @@ impl RegionLeaseKeeper {
}
}

#[derive(Debug, Clone)]
pub struct OpeningRegionGuard {
datanode_id: DatanodeId,
region_id: RegionId,
inner: Arc<RwLock<HashSet<(DatanodeId, RegionId)>>>,
}

impl Drop for OpeningRegionGuard {
fn drop(&mut self) {
let mut inner = self.inner.write().unwrap();
inner.remove(&(self.datanode_id, self.region_id));
}
}

pub type OpeningRegionKeeperRef = Arc<OpeningRegionKeeper>;

#[derive(Debug, Clone, Default)]
/// Tracks opening regions.
pub struct OpeningRegionKeeper {
inner: Arc<RwLock<HashSet<(DatanodeId, RegionId)>>>,
}

impl OpeningRegionKeeper {
pub fn new() -> Self {
Default::default()
}

/// Returns [OpeningRegionGuard] if Region(`region_id`) on Peer(`datanode_id`) does not exist.
pub fn register(
&self,
datanode_id: DatanodeId,
region_id: RegionId,
) -> Option<OpeningRegionGuard> {
let mut inner = self.inner.write().unwrap();

if inner.insert((datanode_id, region_id)) {
Some(OpeningRegionGuard {
datanode_id,
region_id,
inner: self.inner.clone(),
})
} else {
None
}
}

/// Returns true if the keeper contains a (`datanoe_id`, `region_id`) tuple.
pub fn contains(&self, datanode_id: DatanodeId, region_id: RegionId) -> bool {
let inner = self.inner.read().unwrap();
inner.contains(&(datanode_id, region_id))
}

/// Returns a set of filtered out regions that are opening.
pub fn filter_opening_regions(
&self,
datanode_id: DatanodeId,
mut region_ids: HashSet<RegionId>,
) -> HashSet<RegionId> {
let inner = self.inner.read().unwrap();
region_ids.retain(|region_id| !inner.contains(&(datanode_id, *region_id)));

region_ids
}

/// Returns number of element in tracking set.
pub fn len(&self) -> usize {
let inner = self.inner.read().unwrap();
inner.len()
}

pub fn is_empty(&self) -> bool {
self.len() == 0
}
}

#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::sync::Arc;

use common_meta::key::test_utils::new_test_table_info;
Expand All @@ -168,7 +245,7 @@ mod tests {
use store_api::storage::RegionId;
use table::metadata::RawTableInfo;

use super::RegionLeaseKeeper;
use super::{OpeningRegionKeeper, RegionLeaseKeeper};

fn new_test_keeper() -> RegionLeaseKeeper {
let store = Arc::new(MemoryKvBackend::new());
Expand Down Expand Up @@ -433,4 +510,34 @@ mod tests {
assert!(upgradable.is_empty());
assert!(closable.is_empty());
}

#[test]
fn test_opening_region_keeper() {
let keeper = OpeningRegionKeeper::new();

let guard = keeper.register(1, RegionId::from_u64(1)).unwrap();
assert!(keeper.register(1, RegionId::from_u64(1)).is_none());
let guard2 = keeper.register(1, RegionId::from_u64(2)).unwrap();

let output = keeper.filter_opening_regions(
1,
HashSet::from([
RegionId::from_u64(1),
RegionId::from_u64(2),
RegionId::from_u64(3),
]),
);
assert_eq!(output.len(), 1);
assert!(output.contains(&RegionId::from_u64(3)));

assert_eq!(keeper.len(), 2);
drop(guard);

assert_eq!(keeper.len(), 1);

assert!(keeper.contains(1, RegionId::from_u64(2)));
drop(guard2);

assert!(keeper.is_empty());
}
}
Loading