Skip to content

Commit

Permalink
feat(remote_wal): add skeleton for remote wal related to datanode (#2941
Browse files Browse the repository at this point in the history
)

* refactor: refactor wal config

* test: update tests related to wal

* feat: introduce kafka wal config

* chore: augment proto with wal options

* feat: augment region open request with wal options

* feat: augment mito region with wal options

* feat: augment region create request with wal options

* refactor: refactor log store trait

* feat: add skeleton for kafka log store

* feat: generalize building log store when starting datanode

* feat: integrate wal options to region write

* chore: minor update

* refactor: remove wal options from region create/open requests

* fix: compliation issues

* chore: insert wal options into region options upon initializing region server

* chore: integrate wal options into region options

* chore: fill in kafka wal config

* chore: reuse namespaces while writing to wal

* chore: minor update

* chore: fetch wal options from region while handling truncate/flush

* fix: region options test

* fix: resolve some review conversations

* refactor: serde with wal options

* fix: resolve some review conversations
  • Loading branch information
niebayes committed Dec 20, 2023
1 parent 6ac47e9 commit 9da1f23
Show file tree
Hide file tree
Showing 39 changed files with 930 additions and 251 deletions.
48 changes: 47 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 23 additions & 1 deletion config/datanode.example.toml
Expand Up @@ -29,16 +29,38 @@ connect_timeout = "1s"
# `TCP_NODELAY` option for accepted connections, true by default.
tcp_nodelay = true

# WAL options, see `standalone.example.toml`.
# WAL options.
# Currently, users are expected to choose the wal through the provider field.
# When a wal provider is chose, the user should comment out all other wal config
# except those corresponding to the chosen one.
[wal]
# WAL data directory
provider = "raft_engine"

# Raft-engine wal options, see `standalone.example.toml`
# dir = "/tmp/greptimedb/wal"
file_size = "256MB"
purge_threshold = "4GB"
purge_interval = "10m"
read_batch_size = 128
sync_write = false

# Kafka wal options.
# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default.
# broker_endpoints = ["127.0.0.1:9090"]
# Number of topics shall be created beforehand.
# num_topics = 64
# Topic name prefix.
# topic_name_prefix = "greptimedb_wal_kafka_topic"
# Number of partitions per topic.
# num_partitions = 1
# The maximum log size an rskafka batch producer could buffer.
# max_batch_size = "4MB"
# The linger duration of an rskafka batch producer.
# linger = "200ms"
# The maximum amount of time (in milliseconds) to wait for Kafka records to be returned.
# max_wait_time = "100ms"

# Storage options, see `standalone.example.toml`.
[storage]
# The working home directory.
Expand Down
7 changes: 7 additions & 0 deletions config/standalone.example.toml
Expand Up @@ -82,6 +82,13 @@ enable = true

# WAL options.
[wal]
# Available wal providers:
# - "RaftEngine" (default)
# - "Kafka"
provider = "raft_engine"

# There's no kafka wal config for standalone mode.

# WAL data directory
# dir = "/tmp/greptimedb/wal"
# WAL file size in bytes.
Expand Down
49 changes: 37 additions & 12 deletions src/cmd/src/datanode.rs
Expand Up @@ -18,7 +18,8 @@ use std::time::Duration;
use async_trait::async_trait;
use catalog::kvbackend::MetaKvBackend;
use clap::Parser;
use common_telemetry::logging;
use common_config::WalConfig;
use common_telemetry::{info, logging};
use datanode::config::DatanodeOptions;
use datanode::datanode::{Datanode, DatanodeBuilder};
use meta_client::MetaClientOptions;
Expand Down Expand Up @@ -166,8 +167,18 @@ impl StartCommand {
opts.storage.data_home = data_home.clone();
}

if let Some(wal_dir) = &self.wal_dir {
opts.wal.dir = Some(wal_dir.clone());
// `wal_dir` only affects raft-engine config.
if let Some(wal_dir) = &self.wal_dir
&& let WalConfig::RaftEngine(raft_engine_config) = &mut opts.wal
{
if raft_engine_config
.dir
.as_ref()
.is_some_and(|original_dir| original_dir != wal_dir)
{
info!("The wal dir of raft-engine is altered to {wal_dir}");
}
raft_engine_config.dir.replace(wal_dir.clone());
}

if let Some(http_addr) = &self.http_addr {
Expand Down Expand Up @@ -256,6 +267,7 @@ mod tests {
tcp_nodelay = true
[wal]
provider = "raft_engine"
dir = "/other/wal"
file_size = "1GB"
purge_threshold = "50GB"
Expand Down Expand Up @@ -293,12 +305,18 @@ mod tests {

assert_eq!("127.0.0.1:3001".to_string(), options.rpc_addr);
assert_eq!(Some(42), options.node_id);
assert_eq!("/other/wal", options.wal.dir.unwrap());

assert_eq!(Duration::from_secs(600), options.wal.purge_interval);
assert_eq!(1024 * 1024 * 1024, options.wal.file_size.0);
assert_eq!(1024 * 1024 * 1024 * 50, options.wal.purge_threshold.0);
assert!(!options.wal.sync_write);
let WalConfig::RaftEngine(raft_engine_config) = options.wal else {
unreachable!()
};
assert_eq!("/other/wal", raft_engine_config.dir.unwrap());
assert_eq!(Duration::from_secs(600), raft_engine_config.purge_interval);
assert_eq!(1024 * 1024 * 1024, raft_engine_config.file_size.0);
assert_eq!(
1024 * 1024 * 1024 * 50,
raft_engine_config.purge_threshold.0
);
assert!(!raft_engine_config.sync_write);

let HeartbeatOptions {
interval: heart_beat_interval,
Expand Down Expand Up @@ -412,9 +430,10 @@ mod tests {
tcp_nodelay = true
[wal]
provider = "raft_engine"
file_size = "1GB"
purge_threshold = "50GB"
purge_interval = "10m"
purge_interval = "5m"
sync_write = false
[storage]
Expand Down Expand Up @@ -475,7 +494,10 @@ mod tests {
};

// Should be read from env, env > default values.
assert_eq!(opts.wal.read_batch_size, 100,);
let WalConfig::RaftEngine(raft_engine_config) = opts.wal else {
unreachable!()
};
assert_eq!(raft_engine_config.read_batch_size, 100);
assert_eq!(
opts.meta_client.unwrap().metasrv_addrs,
vec![
Expand All @@ -486,10 +508,13 @@ mod tests {
);

// Should be read from config file, config file > env > default values.
assert_eq!(opts.wal.purge_interval, Duration::from_secs(60 * 10));
assert_eq!(
raft_engine_config.purge_interval,
Duration::from_secs(60 * 5)
);

// Should be read from cli, cli > config file > env > default values.
assert_eq!(opts.wal.dir.unwrap(), "/other/wal/dir");
assert_eq!(raft_engine_config.dir.unwrap(), "/other/wal/dir");

// Should be default value.
assert_eq!(opts.http.addr, DatanodeOptions::default().http.addr);
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/src/lib.rs
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(assert_matches)]
#![feature(assert_matches, let_chains)]

use async_trait::async_trait;
use clap::arg;
Expand Down
7 changes: 6 additions & 1 deletion src/cmd/src/options.rs
Expand Up @@ -171,6 +171,7 @@ impl Options {
mod tests {
use std::io::Write;

use common_config::WalConfig;
use common_test_util::temp_dir::create_named_temp_file;
use datanode::config::{DatanodeOptions, ObjectStoreConfig};

Expand All @@ -194,6 +195,7 @@ mod tests {
tcp_nodelay = true
[wal]
provider = "raft_engine"
dir = "/tmp/greptimedb/wal"
file_size = "1GB"
purge_threshold = "50GB"
Expand Down Expand Up @@ -277,7 +279,10 @@ mod tests {
);

// Should be the values from config file, not environment variables.
assert_eq!(opts.wal.dir.unwrap(), "/tmp/greptimedb/wal");
let WalConfig::RaftEngine(raft_engine_config) = opts.wal else {
unreachable!()
};
assert_eq!(raft_engine_config.dir.unwrap(), "/tmp/greptimedb/wal");

// Should be default values.
assert_eq!(opts.node_id, None);
Expand Down
6 changes: 5 additions & 1 deletion src/cmd/src/standalone.rs
Expand Up @@ -500,6 +500,7 @@ mod tests {
enable_memory_catalog = true
[wal]
provider = "raft_engine"
dir = "/tmp/greptimedb/test/wal"
file_size = "1GB"
purge_threshold = "50GB"
Expand Down Expand Up @@ -562,7 +563,10 @@ mod tests {
assert_eq!(None, fe_opts.mysql.reject_no_database);
assert!(fe_opts.influxdb.enable);

assert_eq!("/tmp/greptimedb/test/wal", dn_opts.wal.dir.unwrap());
let WalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
unreachable!()
};
assert_eq!("/tmp/greptimedb/test/wal", raft_engine_config.dir.unwrap());

assert!(matches!(
&dn_opts.storage.store,
Expand Down
4 changes: 4 additions & 0 deletions src/common/config/Cargo.toml
Expand Up @@ -7,4 +7,8 @@ license.workspace = true
[dependencies]
common-base.workspace = true
humantime-serde.workspace = true
rskafka = "0.5"
serde.workspace = true
serde_json.workspace = true
serde_with = "3"
toml.workspace = true
33 changes: 2 additions & 31 deletions src/common/config/src/lib.rs
Expand Up @@ -12,41 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;
pub mod wal;

use common_base::readable_size::ReadableSize;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct WalConfig {
// wal directory
pub dir: Option<String>,
// wal file size in bytes
pub file_size: ReadableSize,
// wal purge threshold in bytes
pub purge_threshold: ReadableSize,
// purge interval in seconds
#[serde(with = "humantime_serde")]
pub purge_interval: Duration,
// read batch size
pub read_batch_size: usize,
// whether to sync log file after every write
pub sync_write: bool,
}

impl Default for WalConfig {
fn default() -> Self {
Self {
dir: None,
file_size: ReadableSize::mb(256), // log file size 256MB
purge_threshold: ReadableSize::gb(4), // purge threshold 4GB
purge_interval: Duration::from_secs(600),
read_batch_size: 128,
sync_write: false,
}
}
}
pub use crate::wal::{KafkaWalOptions, WalConfig, WalOptions, WAL_OPTIONS_KEY};

pub fn metadata_store_dir(store_dir: &str) -> String {
format!("{store_dir}/metadata")
Expand Down

0 comments on commit 9da1f23

Please sign in to comment.