Skip to content

Commit

Permalink
feat: integrate remote wal to standalone
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Dec 25, 2023
1 parent ce7a470 commit 3fbeae7
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 18 deletions.
38 changes: 36 additions & 2 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,42 @@ enable = true
# Whether to enable Prometheus remote write and read in HTTP API, true by default.
enable = true

# WAL options.
[wal]
[wal_meta]
# Available wal providers:
# - "raft_engine" (default)
# - "kafka"
provider = "raft_engine"

# There're none raft-engine wal config since meta srv only involves in remote wal currently.

# Kafka wal config.
# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default.
# broker_endpoints = ["127.0.0.1:9090"]
# Number of topics to be created upon start.
# num_topics = 64
# Topic selector type.
# Available selector types:
# - "round_robin" (default)
# selector_type = "round_robin"
# A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
# topic_name_prefix = "greptimedb_wal_topic"
# Number of partitions per topic.
# num_partitions = 1
# Expected number of replicas of each partition.
# replication_factor = 3
# Above which a topic creation operation will be cancelled.
# create_topic_timeout = "30s"
# The initial backoff for kafka clients.
# backoff_init = "500ms"
# The maximum backoff for kafka clients.
# backoff_max = "10s"
# Exponential backoff rate, i.e. next backoff = base * current backoff.
# backoff_base = 2.0
# Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate.
# backoff_deadline = "5mins"

# WAL options for datanode.
[wal_datanode]
# Available wal providers:
# - "RaftEngine" (default)
# - "Kafka"
Expand Down
2 changes: 2 additions & 0 deletions src/cmd/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use clap::ArgMatches;
use common_config::KvBackendConfig;
use common_meta::wal::WalConfig as MetaSrvWalConfig;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
use config::{Config, Environment, File, FileFormat};
use datanode::config::{DatanodeOptions, ProcedureConfig};
Expand All @@ -37,6 +38,7 @@ pub struct MixOptions {
pub frontend: FrontendOptions,
pub datanode: DatanodeOptions,
pub logging: LoggingOptions,
pub wal_meta: MetaSrvWalConfig,
}

impl From<MixOptions> for FrontendOptions {
Expand Down
26 changes: 16 additions & 10 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::{fs, path};
use async_trait::async_trait;
use clap::Parser;
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_config::{metadata_store_dir, KvBackendConfig, WalConfig};
use common_config::{metadata_store_dir, KvBackendConfig, WalConfig as DatanodeWalConfig};
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::{DdlTaskExecutorRef, TableMetadataAllocatorRef};
Expand All @@ -27,7 +27,9 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal::{WalOptionsAllocator, WalOptionsAllocatorRef};
use common_meta::wal::{
WalConfig as MetaSrvWalConfig, WalOptionsAllocator, WalOptionsAllocatorRef,
};
use common_procedure::ProcedureManagerRef;
use common_telemetry::info;
use common_telemetry::logging::LoggingOptions;
Expand Down Expand Up @@ -104,7 +106,8 @@ pub struct StandaloneOptions {
pub opentsdb: OpentsdbOptions,
pub influxdb: InfluxdbOptions,
pub prom_store: PromStoreOptions,
pub wal: WalConfig,
pub wal_meta: MetaSrvWalConfig,
pub wal_datanode: DatanodeWalConfig,
pub storage: StorageConfig,
pub metadata_store: KvBackendConfig,
pub procedure: ProcedureConfig,
Expand All @@ -127,7 +130,8 @@ impl Default for StandaloneOptions {
opentsdb: OpentsdbOptions::default(),
influxdb: InfluxdbOptions::default(),
prom_store: PromStoreOptions::default(),
wal: WalConfig::default(),
wal_meta: MetaSrvWalConfig::default(),
wal_datanode: DatanodeWalConfig::default(),
storage: StorageConfig::default(),
metadata_store: KvBackendConfig::default(),
procedure: ProcedureConfig::default(),
Expand Down Expand Up @@ -166,7 +170,7 @@ impl StandaloneOptions {
DatanodeOptions {
node_id: Some(0),
enable_telemetry: self.enable_telemetry,
wal: self.wal,
wal: self.wal_datanode,
storage: self.storage,
region_engine: self.region_engine,
rpc_addr: self.grpc.addr,
Expand Down Expand Up @@ -338,7 +342,8 @@ impl StartCommand {
let procedure = opts.procedure.clone();
let frontend = opts.clone().frontend_options();
let logging = opts.logging.clone();
let datanode = opts.datanode_options();
let wal_meta = opts.wal_meta.clone();
let datanode = opts.datanode_options().clone();

Ok(Options::Standalone(Box::new(MixOptions {
procedure,
Expand All @@ -347,6 +352,7 @@ impl StartCommand {
frontend,
datanode,
logging,
wal_meta,
})))
}

Expand Down Expand Up @@ -392,9 +398,8 @@ impl StartCommand {
.step(10)
.build(),
);
// TODO(niebayes): add a wal config into the MixOptions and pass it to the allocator builder.
let wal_options_allocator = Arc::new(WalOptionsAllocator::new(
common_meta::wal::WalConfig::default(),
opts.wal_meta.clone(),
kv_backend.clone(),
));
let table_meta_allocator = Arc::new(StandaloneTableMetadataAllocator::new(
Expand Down Expand Up @@ -585,7 +590,7 @@ mod tests {
assert_eq!(None, fe_opts.mysql.reject_no_database);
assert!(fe_opts.influxdb.enable);

let WalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
let DatanodeWalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
unreachable!()
};
assert_eq!("/tmp/greptimedb/test/wal", raft_engine_config.dir.unwrap());
Expand Down Expand Up @@ -731,7 +736,8 @@ mod tests {
assert_eq!(options.opentsdb, default_options.opentsdb);
assert_eq!(options.influxdb, default_options.influxdb);
assert_eq!(options.prom_store, default_options.prom_store);
assert_eq!(options.wal, default_options.wal);
assert_eq!(options.wal_meta, default_options.wal_meta);
assert_eq!(options.wal_datanode, default_options.wal_datanode);
assert_eq!(options.metadata_store, default_options.metadata_store);
assert_eq!(options.procedure, default_options.procedure);
assert_eq!(options.logging, default_options.logging);
Expand Down
2 changes: 0 additions & 2 deletions src/common/meta/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,4 @@ mod tests {
};
assert_eq!(wal_config, WalConfig::Kafka(expected_kafka_config));
}

// TODO(niebayes): the integrate test needs to test that the example config file can be successfully parsed.
}
2 changes: 1 addition & 1 deletion src/common/meta/src/wal/kafka/topic_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl TopicManager {
)
})
.collect::<Vec<_>>();
// TODO(niebayes): Determine how rskafka handles an already-exist topic. Check if an error would be raised.
// FIXME(niebayes): try to create an already-exist topic would raise an error.
futures::future::try_join_all(tasks)
.await
.context(CreateKafkaWalTopicSnafu)
Expand Down
7 changes: 4 additions & 3 deletions tests-integration/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use common_meta::ddl_manager::DdlManager;
use common_meta::key::TableMetadataManager;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal::WalOptionsAllocator;
use common_meta::wal::{WalConfig as MetaSrvWalConfig, WalOptionsAllocator};
use common_procedure::options::ProcedureConfig;
use common_telemetry::logging::LoggingOptions;
use datanode::config::DatanodeOptions;
Expand Down Expand Up @@ -118,9 +118,9 @@ impl GreptimeDbStandaloneBuilder {
.step(10)
.build(),
);
// TODO(niebayes): add a wal config into the MixOptions and pass it to the allocator builder.
let wal_meta = MetaSrvWalConfig::default();
let wal_options_allocator = Arc::new(WalOptionsAllocator::new(
common_meta::wal::WalConfig::default(),
wal_meta.clone(),
kv_backend.clone(),
));
let table_meta_allocator = Arc::new(StandaloneTableMetadataAllocator::new(
Expand Down Expand Up @@ -163,6 +163,7 @@ impl GreptimeDbStandaloneBuilder {
frontend: FrontendOptions::default(),
datanode: opts,
logging: LoggingOptions::default(),
wal_meta,
},
guard,
}
Expand Down

0 comments on commit 3fbeae7

Please sign in to comment.