Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix MockInstance rebuild issue #3218

Merged
merged 2 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
68 changes: 45 additions & 23 deletions tests-integration/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ use crate::test_util::{

pub struct GreptimeDbCluster {
pub storage_guards: Vec<StorageGuard>,
pub _dir_guards: Vec<FileDirGuard>,
pub dir_guards: Vec<FileDirGuard>,
pub datanode_options: Vec<DatanodeOptions>,

pub datanode_instances: HashMap<DatanodeId, Datanode>,
pub kv_backend: KvBackendRef,
pub meta_srv: MetaSrv,
pub frontend: Arc<FeInstance>,
}

#[derive(Clone)]
pub struct GreptimeDbClusterBuilder {
cluster_name: String,
kv_backend: KvBackendRef,
Expand Down Expand Up @@ -157,9 +157,13 @@ impl GreptimeDbClusterBuilder {
self
}

pub async fn build(self) -> GreptimeDbCluster {
let datanodes = self.datanodes.unwrap_or(4);

pub async fn build_with(
&self,
datanode_options: Vec<DatanodeOptions>,
storage_guards: Vec<StorageGuard>,
dir_guards: Vec<FileDirGuard>,
) -> GreptimeDbCluster {
let datanodes = datanode_options.len();
let channel_config = ChannelConfig::new().timeout(Duration::from_secs(20));
let datanode_clients = Arc::new(DatanodeClients::new(channel_config));

Expand All @@ -182,8 +186,9 @@ impl GreptimeDbClusterBuilder {
)
.await;

let (datanode_instances, storage_guards, dir_guards) =
self.build_datanodes(meta_srv.clone(), datanodes).await;
let datanode_instances = self
.build_datanodes_with_options(meta_srv.clone(), &datanode_options)
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
.await;

build_datanode_clients(datanode_clients.clone(), &datanode_instances, datanodes).await;

Expand All @@ -199,25 +204,29 @@ impl GreptimeDbClusterBuilder {
frontend.start().await.unwrap();

GreptimeDbCluster {
datanode_options,
storage_guards,
_dir_guards: dir_guards,
dir_guards,
datanode_instances,
kv_backend: self.kv_backend.clone(),
meta_srv: meta_srv.meta_srv,
frontend,
}
}

async fn build_datanodes(
pub async fn build(&self) -> GreptimeDbCluster {
let datanodes = self.datanodes.unwrap_or(4);
let (datanode_options, storage_guards, dir_guards) =
self.build_datanode_options(datanodes).await;
self.build_with(datanode_options, storage_guards, dir_guards)
.await
}

async fn build_datanode_options(
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
&self,
meta_srv: MockInfo,
datanodes: u32,
) -> (
HashMap<DatanodeId, Datanode>,
Vec<StorageGuard>,
Vec<FileDirGuard>,
) {
let mut instances = HashMap::with_capacity(datanodes as usize);
) -> (Vec<DatanodeOptions>, Vec<StorageGuard>, Vec<FileDirGuard>) {
let mut options = Vec::with_capacity(datanodes as usize);
let mut storage_guards = Vec::with_capacity(datanodes as usize);
let mut dir_guards = Vec::with_capacity(datanodes as usize);

Expand Down Expand Up @@ -258,28 +267,41 @@ impl GreptimeDbClusterBuilder {
};
opts.node_id = Some(datanode_id);

let datanode = self.create_datanode(opts, meta_srv.clone()).await;

instances.insert(datanode_id, datanode);
options.push(opts);
}
(
instances,
options,
storage_guards.into_iter().flatten().collect(),
dir_guards,
)
}

async fn build_datanodes_with_options(
&self,
meta_srv: MockInfo,
options: &[DatanodeOptions],
) -> HashMap<DatanodeId, Datanode> {
let mut instances = HashMap::with_capacity(options.len());

for opts in options {
let datanode = self.create_datanode(opts.clone(), meta_srv.clone()).await;
instances.insert(opts.node_id.unwrap(), datanode);
}

instances
}

async fn wait_datanodes_alive(
&self,
meta_peer_client: &MetaPeerClientRef,
expected_datanodes: u32,
expected_datanodes: usize,
) {
for _ in 0..10 {
let alive_datanodes =
meta_srv::lease::filter_datanodes(1000, meta_peer_client, |_, _| true)
.await
.unwrap()
.len() as u32;
.len();
if alive_datanodes == expected_datanodes {
return;
}
Expand Down Expand Up @@ -355,7 +377,7 @@ impl GreptimeDbClusterBuilder {
async fn build_datanode_clients(
clients: Arc<DatanodeClients>,
instances: &HashMap<DatanodeId, Datanode>,
datanodes: u32,
datanodes: usize,
) {
for i in 0..datanodes {
let datanode_id = i as u64 + 1;
Expand Down
105 changes: 61 additions & 44 deletions tests-integration/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::ddl::table_meta::TableMetadataAllocator;
use common_meta::ddl_manager::DdlManager;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::WalOptionsAllocator;
use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
use common_telemetry::logging::LoggingOptions;
use common_wal::config::{DatanodeWalConfig, MetaSrvWalConfig};
use datanode::config::DatanodeOptions;
use datanode::datanode::DatanodeBuilder;
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
Expand All @@ -39,12 +40,13 @@ use crate::test_util::{self, create_tmp_dir_and_datanode_opts, StorageType, Test

pub struct GreptimeDbStandalone {
pub instance: Arc<Instance>,
pub datanode_opts: DatanodeOptions,
pub mix_options: MixOptions,
pub guard: TestGuard,
// Used in rebuild.
pub kv_backend: KvBackendRef,
pub procedure_manager: ProcedureManagerRef,
}

#[derive(Clone)]
pub struct GreptimeDbStandaloneBuilder {
instance_name: String,
wal_config: DatanodeWalConfig,
Expand Down Expand Up @@ -104,31 +106,16 @@ impl GreptimeDbStandaloneBuilder {
self
}

pub async fn build(self) -> GreptimeDbStandalone {
let default_store_type = self.default_store.unwrap_or(StorageType::File);
let store_types = self.store_providers.unwrap_or_default();

let (opts, guard) = create_tmp_dir_and_datanode_opts(
Mode::Standalone,
default_store_type,
store_types,
&self.instance_name,
self.wal_config.clone(),
);

let procedure_config = ProcedureConfig::default();
let kv_backend_config = KvBackendConfig::default();
let (kv_backend, procedure_manager) = Instance::try_build_standalone_components(
format!("{}/kv", &opts.storage.data_home),
kv_backend_config.clone(),
procedure_config.clone(),
)
.await
.unwrap();

let plugins = self.plugin.unwrap_or_default();
pub async fn build_with(
&self,
kv_backend: KvBackendRef,
procedure_manager: ProcedureManagerRef,
guard: TestGuard,
mix_options: MixOptions,
) -> GreptimeDbStandalone {
let plugins = self.plugin.clone().unwrap_or_default();

let datanode = DatanodeBuilder::new(opts.clone(), plugins.clone())
let datanode = DatanodeBuilder::new(mix_options.datanode.clone(), plugins.clone())
.with_kv_backend(kv_backend.clone())
.build()
.await
Expand All @@ -145,9 +132,8 @@ impl GreptimeDbStandaloneBuilder {
.step(10)
.build(),
);
let wal_meta = self.meta_wal_config.clone();
let wal_options_allocator = Arc::new(WalOptionsAllocator::new(
wal_meta.clone(),
mix_options.wal_meta.clone(),
kv_backend.clone(),
));
let table_meta_allocator = TableMetadataAllocator::new(
Expand All @@ -168,11 +154,12 @@ impl GreptimeDbStandaloneBuilder {
.unwrap(),
);

let instance = FrontendBuilder::new(kv_backend, datanode_manager, ddl_task_executor)
.with_plugin(plugins)
.try_build()
.await
.unwrap();
let instance =
FrontendBuilder::new(kv_backend.clone(), datanode_manager, ddl_task_executor)
.with_plugin(plugins)
.try_build()
.await
.unwrap();

procedure_manager.start().await.unwrap();
wal_options_allocator.start().await.unwrap();
Expand All @@ -183,17 +170,47 @@ impl GreptimeDbStandaloneBuilder {

GreptimeDbStandalone {
instance: Arc::new(instance),
datanode_opts: opts.clone(),
mix_options: MixOptions {
data_home: opts.storage.data_home.to_string(),
procedure: procedure_config,
metadata_store: kv_backend_config,
frontend: FrontendOptions::default(),
datanode: opts,
logging: LoggingOptions::default(),
wal_meta,
},
mix_options,
guard,
kv_backend,
procedure_manager,
}
}

pub async fn build(&self) -> GreptimeDbStandalone {
let default_store_type = self.default_store.unwrap_or(StorageType::File);
let store_types = self.store_providers.clone().unwrap_or_default();

let (opts, guard) = create_tmp_dir_and_datanode_opts(
Mode::Standalone,
default_store_type,
store_types,
&self.instance_name,
self.wal_config.clone(),
);

let kv_backend_config = KvBackendConfig::default();
let procedure_config = ProcedureConfig::default();
let (kv_backend, procedure_manager) = Instance::try_build_standalone_components(
format!("{}/kv", &opts.storage.data_home),
kv_backend_config.clone(),
procedure_config.clone(),
)
.await
.unwrap();

let wal_meta = self.meta_wal_config.clone();
let mix_options = MixOptions {
data_home: opts.storage.data_home.to_string(),
procedure: procedure_config,
metadata_store: kv_backend_config,
frontend: FrontendOptions::default(),
datanode: opts,
logging: LoggingOptions::default(),
wal_meta,
};

self.build_with(kv_backend, procedure_manager, guard, mix_options)
.await
}
}
4 changes: 2 additions & 2 deletions tests-integration/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router
instance.instance.clone(),
))
.with_metrics_handler(MetricsHandler)
.with_greptime_config_options(instance.datanode_opts.to_toml_string())
.with_greptime_config_options(instance.mix_options.datanode.to_toml_string())
.build();
(http_server.build(http_server.make_app()), instance.guard)
}
Expand Down Expand Up @@ -463,7 +463,7 @@ pub async fn setup_test_prom_app_with_frontend(
.with_script_handler(frontend_ref.clone())
.with_prom_handler(frontend_ref.clone())
.with_prometheus_handler(frontend_ref)
.with_greptime_config_options(instance.datanode_opts.to_toml_string())
.with_greptime_config_options(instance.mix_options.datanode.to_toml_string())
.build();
let app = http_server.build(http_server.make_app());
(app, instance.guard)
Expand Down