Skip to content

Commit

Permalink
fix: fix MockInstance rebuild issue
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Jan 23, 2024
1 parent d75cf86 commit 7b595e7
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 82 deletions.
68 changes: 45 additions & 23 deletions tests-integration/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ use crate::test_util::{

pub struct GreptimeDbCluster {
pub storage_guards: Vec<StorageGuard>,
pub _dir_guards: Vec<FileDirGuard>,
pub dir_guards: Vec<FileDirGuard>,
pub datanode_options: Vec<DatanodeOptions>,

pub datanode_instances: HashMap<DatanodeId, Datanode>,
pub kv_backend: KvBackendRef,
pub meta_srv: MetaSrv,
pub frontend: Arc<FeInstance>,
}

#[derive(Clone)]
pub struct GreptimeDbClusterBuilder {
cluster_name: String,
kv_backend: KvBackendRef,
Expand Down Expand Up @@ -157,9 +157,13 @@ impl GreptimeDbClusterBuilder {
self
}

pub async fn build(self) -> GreptimeDbCluster {
let datanodes = self.datanodes.unwrap_or(4);

pub async fn build_with(
&self,
datanode_options: Vec<DatanodeOptions>,
storage_guards: Vec<StorageGuard>,
dir_guards: Vec<FileDirGuard>,
) -> GreptimeDbCluster {
let datanodes = datanode_options.len();
let channel_config = ChannelConfig::new().timeout(Duration::from_secs(20));
let datanode_clients = Arc::new(DatanodeClients::new(channel_config));

Expand All @@ -182,8 +186,9 @@ impl GreptimeDbClusterBuilder {
)
.await;

let (datanode_instances, storage_guards, dir_guards) =
self.build_datanodes(meta_srv.clone(), datanodes).await;
let datanode_instances = self
.build_datanodes_with_options(meta_srv.clone(), &datanode_options)
.await;

build_datanode_clients(datanode_clients.clone(), &datanode_instances, datanodes).await;

Expand All @@ -199,25 +204,29 @@ impl GreptimeDbClusterBuilder {
frontend.start().await.unwrap();

GreptimeDbCluster {
datanode_options,
storage_guards,
_dir_guards: dir_guards,
dir_guards,
datanode_instances,
kv_backend: self.kv_backend.clone(),
meta_srv: meta_srv.meta_srv,
frontend,
}
}

async fn build_datanodes(
pub async fn build(&self) -> GreptimeDbCluster {
let datanodes = self.datanodes.unwrap_or(4);
let (datanode_options, storage_guards, dir_guards) =
self.build_datanode_options(datanodes).await;
self.build_with(datanode_options, storage_guards, dir_guards)
.await
}

async fn build_datanode_options(
&self,
meta_srv: MockInfo,
datanodes: u32,
) -> (
HashMap<DatanodeId, Datanode>,
Vec<StorageGuard>,
Vec<FileDirGuard>,
) {
let mut instances = HashMap::with_capacity(datanodes as usize);
) -> (Vec<DatanodeOptions>, Vec<StorageGuard>, Vec<FileDirGuard>) {
let mut options = Vec::with_capacity(datanodes as usize);
let mut storage_guards = Vec::with_capacity(datanodes as usize);
let mut dir_guards = Vec::with_capacity(datanodes as usize);

Expand Down Expand Up @@ -258,28 +267,41 @@ impl GreptimeDbClusterBuilder {
};
opts.node_id = Some(datanode_id);

let datanode = self.create_datanode(opts, meta_srv.clone()).await;

instances.insert(datanode_id, datanode);
options.push(opts);
}
(
instances,
options,
storage_guards.into_iter().flatten().collect(),
dir_guards,
)
}

async fn build_datanodes_with_options(
&self,
meta_srv: MockInfo,
options: &[DatanodeOptions],
) -> HashMap<DatanodeId, Datanode> {
let mut instances = HashMap::with_capacity(options.len());

for opts in options {
let datanode = self.create_datanode(opts.clone(), meta_srv.clone()).await;
instances.insert(opts.node_id.unwrap(), datanode);
}

instances
}

async fn wait_datanodes_alive(
&self,
meta_peer_client: &MetaPeerClientRef,
expected_datanodes: u32,
expected_datanodes: usize,
) {
for _ in 0..10 {
let alive_datanodes =
meta_srv::lease::filter_datanodes(1000, meta_peer_client, |_, _| true)
.await
.unwrap()
.len() as u32;
.len();
if alive_datanodes == expected_datanodes {
return;
}
Expand Down Expand Up @@ -355,7 +377,7 @@ impl GreptimeDbClusterBuilder {
async fn build_datanode_clients(
clients: Arc<DatanodeClients>,
instances: &HashMap<DatanodeId, Datanode>,
datanodes: u32,
datanodes: usize,
) {
for i in 0..datanodes {
let datanode_id = i as u64 + 1;
Expand Down
105 changes: 61 additions & 44 deletions tests-integration/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::ddl::table_meta::TableMetadataAllocator;
use common_meta::ddl_manager::DdlManager;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::WalOptionsAllocator;
use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
use common_telemetry::logging::LoggingOptions;
use common_wal::config::{DatanodeWalConfig, MetaSrvWalConfig};
use datanode::config::DatanodeOptions;
use datanode::datanode::DatanodeBuilder;
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
Expand All @@ -39,12 +40,13 @@ use crate::test_util::{self, create_tmp_dir_and_datanode_opts, StorageType, Test

pub struct GreptimeDbStandalone {
pub instance: Arc<Instance>,
pub datanode_opts: DatanodeOptions,
pub mix_options: MixOptions,
pub guard: TestGuard,
// Used in rebuild.
pub kv_backend: KvBackendRef,
pub procedure_manager: ProcedureManagerRef,
}

#[derive(Clone)]
pub struct GreptimeDbStandaloneBuilder {
instance_name: String,
wal_config: DatanodeWalConfig,
Expand Down Expand Up @@ -104,31 +106,16 @@ impl GreptimeDbStandaloneBuilder {
self
}

pub async fn build(self) -> GreptimeDbStandalone {
let default_store_type = self.default_store.unwrap_or(StorageType::File);
let store_types = self.store_providers.unwrap_or_default();

let (opts, guard) = create_tmp_dir_and_datanode_opts(
Mode::Standalone,
default_store_type,
store_types,
&self.instance_name,
self.wal_config.clone(),
);

let procedure_config = ProcedureConfig::default();
let kv_backend_config = KvBackendConfig::default();
let (kv_backend, procedure_manager) = Instance::try_build_standalone_components(
format!("{}/kv", &opts.storage.data_home),
kv_backend_config.clone(),
procedure_config.clone(),
)
.await
.unwrap();

let plugins = self.plugin.unwrap_or_default();
pub async fn build_with(
&self,
kv_backend: KvBackendRef,
procedure_manager: ProcedureManagerRef,
guard: TestGuard,
mix_options: MixOptions,
) -> GreptimeDbStandalone {
let plugins = self.plugin.clone().unwrap_or_default();

let datanode = DatanodeBuilder::new(opts.clone(), plugins.clone())
let datanode = DatanodeBuilder::new(mix_options.datanode.clone(), plugins.clone())
.with_kv_backend(kv_backend.clone())
.build()
.await
Expand All @@ -145,9 +132,8 @@ impl GreptimeDbStandaloneBuilder {
.step(10)
.build(),
);
let wal_meta = self.meta_wal_config.clone();
let wal_options_allocator = Arc::new(WalOptionsAllocator::new(
wal_meta.clone(),
mix_options.wal_meta.clone(),
kv_backend.clone(),
));
let table_meta_allocator = TableMetadataAllocator::new(
Expand All @@ -168,11 +154,12 @@ impl GreptimeDbStandaloneBuilder {
.unwrap(),
);

let instance = FrontendBuilder::new(kv_backend, datanode_manager, ddl_task_executor)
.with_plugin(plugins)
.try_build()
.await
.unwrap();
let instance =
FrontendBuilder::new(kv_backend.clone(), datanode_manager, ddl_task_executor)
.with_plugin(plugins)
.try_build()
.await
.unwrap();

procedure_manager.start().await.unwrap();
wal_options_allocator.start().await.unwrap();
Expand All @@ -183,17 +170,47 @@ impl GreptimeDbStandaloneBuilder {

GreptimeDbStandalone {
instance: Arc::new(instance),
datanode_opts: opts.clone(),
mix_options: MixOptions {
data_home: opts.storage.data_home.to_string(),
procedure: procedure_config,
metadata_store: kv_backend_config,
frontend: FrontendOptions::default(),
datanode: opts,
logging: LoggingOptions::default(),
wal_meta,
},
mix_options,
guard,
kv_backend,
procedure_manager,
}
}

pub async fn build(&self) -> GreptimeDbStandalone {
let default_store_type = self.default_store.unwrap_or(StorageType::File);
let store_types = self.store_providers.clone().unwrap_or_default();

let (opts, guard) = create_tmp_dir_and_datanode_opts(
Mode::Standalone,
default_store_type,
store_types,
&self.instance_name,
self.wal_config.clone(),
);

let kv_backend_config = KvBackendConfig::default();
let procedure_config = ProcedureConfig::default();
let (kv_backend, procedure_manager) = Instance::try_build_standalone_components(
format!("{}/kv", &opts.storage.data_home),
kv_backend_config.clone(),
procedure_config.clone(),
)
.await
.unwrap();

let wal_meta = self.meta_wal_config.clone();
let mix_options = MixOptions {
data_home: opts.storage.data_home.to_string(),
procedure: procedure_config,
metadata_store: kv_backend_config,
frontend: FrontendOptions::default(),
datanode: opts,
logging: LoggingOptions::default(),
wal_meta,
};

self.build_with(kv_backend, procedure_manager, guard, mix_options)
.await
}
}
4 changes: 2 additions & 2 deletions tests-integration/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router
instance.instance.clone(),
))
.with_metrics_handler(MetricsHandler)
.with_greptime_config_options(instance.datanode_opts.to_toml_string())
.with_greptime_config_options(instance.mix_options.datanode.to_toml_string())
.build();
(http_server.build(http_server.make_app()), instance.guard)
}
Expand Down Expand Up @@ -463,7 +463,7 @@ pub async fn setup_test_prom_app_with_frontend(
.with_script_handler(frontend_ref.clone())
.with_prom_handler(frontend_ref.clone())
.with_prometheus_handler(frontend_ref)
.with_greptime_config_options(instance.datanode_opts.to_toml_string())
.with_greptime_config_options(instance.mix_options.datanode.to_toml_string())
.build();
let app = http_server.build(http_server.make_app());
(app, instance.guard)
Expand Down

0 comments on commit 7b595e7

Please sign in to comment.