Skip to content

Commit

Permalink
refactor: disable region failover on local WAL implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Jun 24, 2024
1 parent 117ea03 commit 5a41445
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 35 deletions.
5 changes: 5 additions & 0 deletions src/common/meta/src/wal_options_allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ impl WalOptionsAllocator {
}
}
}

/// Returns true if it's the remote WAL.
pub fn is_remote_wal(&self) -> bool {
matches!(&self, WalOptionsAllocator::Kafka(_))
}
}

/// Allocates a wal options for each region. The allocated wal options is encoded immediately.
Expand Down
6 changes: 3 additions & 3 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ pub struct MetaStateHandler {
}

impl MetaStateHandler {
pub async fn on_become_leader(&mut self) {
pub async fn on_become_leader(&self) {
self.state.write().unwrap().next_state(become_leader(false));

if let Err(e) = self.leader_cached_kv_backend.load().await {
Expand All @@ -296,7 +296,7 @@ impl MetaStateHandler {
self.greptimedb_telemetry_task.should_report(true);
}

pub async fn on_become_follower(&mut self) {
pub async fn on_become_follower(&self) {
self.state.write().unwrap().next_state(become_follower());

// Stops the procedures.
Expand Down Expand Up @@ -381,7 +381,7 @@ impl Metasrv {
.start()
.context(StartTelemetryTaskSnafu)?;
let region_supervisor_ticker = self.region_supervisor_ticker.clone();
let mut state_handler = MetaStateHandler {
let state_handler = MetaStateHandler {
greptimedb_telemetry_task,
subscribe_manager,
procedure_manager,
Expand Down
70 changes: 38 additions & 32 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use common_meta::state_store::KvStateStore;
use common_meta::wal_options_allocator::WalOptionsAllocator;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::ProcedureManagerRef;
use common_telemetry::warn;
use snafu::ResultExt;

use super::{SelectTarget, FLOW_ID_SEQ};
Expand Down Expand Up @@ -223,6 +224,7 @@ impl MetasrvBuilder {
options.wal.clone(),
kv_backend.clone(),
));
let is_remote_wal = wal_options_allocator.is_remote_wal();
let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| {
let sequence = Arc::new(
SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
Expand Down Expand Up @@ -294,38 +296,42 @@ impl MetasrvBuilder {
));
region_migration_manager.try_start()?;

let (region_failover_handler, region_supervisor_ticker) = if options.enable_region_failover
{
let region_supervisor = RegionSupervisor::new(
options.failure_detector,
DEFAULT_TICK_INTERVAL,
// Requires to select MUST alive nodes.
SelectorContext {
server_addr: options.server_addr.clone(),
datanode_lease_secs: Duration::from_millis(
distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS,
)
.as_secs(),
flownode_lease_secs: Duration::from_millis(
distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS,
)
.as_secs(),
kv_backend: kv_backend.clone(),
meta_peer_client: meta_peer_client.clone(),
table_id: None,
},
selector.clone(),
region_migration_manager.clone(),
leader_cached_kv_backend.clone() as _,
);
let region_supervisor_ticker = region_supervisor.ticker();
(
Some(RegionFailureHandler::new(region_supervisor)),
Some(region_supervisor_ticker),
)
} else {
(None, None)
};
if !is_remote_wal && options.enable_region_failover {
warn!("Region failover is not supported in the local WAL implementation!");
}

let (region_failover_handler, region_supervisor_ticker) =
if options.enable_region_failover && is_remote_wal {
let region_supervisor = RegionSupervisor::new(
options.failure_detector,
DEFAULT_TICK_INTERVAL,
// Requires to select MUST alive nodes.
SelectorContext {
server_addr: options.server_addr.clone(),
datanode_lease_secs: Duration::from_millis(
distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS,
)
.as_secs(),
flownode_lease_secs: Duration::from_millis(
distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS,
)
.as_secs(),
kv_backend: kv_backend.clone(),
meta_peer_client: meta_peer_client.clone(),
table_id: None,
},
selector.clone(),
region_migration_manager.clone(),
leader_cached_kv_backend.clone() as _,
);
let region_supervisor_ticker = region_supervisor.ticker();
(
Some(RegionFailureHandler::new(region_supervisor)),
Some(region_supervisor_ticker),
)
} else {
(None, None)
};

let handler_group = match handler_group {
Some(handler_group) => handler_group,
Expand Down

0 comments on commit 5a41445

Please sign in to comment.