Skip to content

Commit

Permalink
merge pingcap/release-3.1 until (tikv#6617) (#10)
Browse files Browse the repository at this point in the history
* [3.1] external_storage: add S3 support (tikv#6209) (tikv#6536)

* external_storage: add S3 support (tikv#6209)

Signed-off-by: Yi Wu <yiwu@pingcap.com>
Signed-off-by: kennytm <kennytm@gmail.com>

* lock_manager: more metrics (tikv#6392) (tikv#6422) (tikv#6444)

Signed-off-by: youjiali1995 <zlwgx1023@gmail.com>

* lock_manager: update default config (tikv#6426) (tikv#6429) (tikv#6446)

Signed-off-by: youjiali1995 <zlwgx1023@gmail.com>

* Update rocksdb and titan. Update rust-rocksdb url (tikv#6554)

Signed-off-by: Yi Wu <yiwu@pingcap.com>

* raft_client: connect to other stores using peer_address (tikv#5569) (tikv#6491)

* *: pick threads statistics for regions (tikv#6605)

Co-authored-by: jiyingtk <1039793452@qq.com>
Co-authored-by: disksing <i@disksing.com>

* fix test_replica_read::test_read_after_cleanup_range_for_snap (tikv#6493)

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* deadlock: more solid role change observer (tikv#6415) (tikv#6431) (tikv#6445)

Signed-off-by: youjiali1995 <zlwgx1023@gmail.com>

* raftstore: read index message count metric (tikv#6579) (tikv#6610)

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* raftstore: speed up conf change (tikv#6421) (tikv#6432)

Signed-off-by: Jay Lee <busyjaylee@gmail.com>

* raftstore: fix a panic in read index queue (tikv#6609) (tikv#6613)

Signed-off-by: qupeng <qupeng@pingcap.com>

* pick tikv#6414, tikv#6487, tikv#6477 and tikv#6539 to release-3.1 (tikv#6564)

* raftstore: extract batch system (tikv#6414)
* apply: support yield (tikv#6487)
* batch-system: add benchmark (tikv#6477)
* Introduce pre transfer leader (tikv#6539)
Signed-off-by: Jay Lee <BusyJayLee@gmail.com>

* raftstore: set wait_merge_state to none after resuming pending state (tikv#6621)

Signed-off-by: Liqi Geng <gengliqiii@gmail.com>

* raftstore: learner load merge target & fix a merge network recovery bug (tikv#6623)

Signed-off-by: Liqi Geng <gengliqiii@gmail.com>

* change merge flow path (tikv#6617)

Signed-off-by: Liqi Geng <gengliqiii@gmail.com>

Co-authored-by: kennytm <kennytm@gmail.com>
Co-authored-by: Lei Zhao <zlwgx1023@gmail.com>
Co-authored-by: yiwu-arbug <yiwu@pingcap.com>
Co-authored-by: disksing <i@disksing.com>
Co-authored-by: ShuNing <nolouch@gmail.com>
Co-authored-by: jiyingtk <1039793452@qq.com>
Co-authored-by: pingcap-github-bot <sre-bot@pingcap.com>
Co-authored-by: 5kbpers <20279863+5kbpers@users.noreply.github.com>
Co-authored-by: Jay <BusyJay@users.noreply.github.com>
Co-authored-by: gengliqi <gengliqiii@gmail.com>
  • Loading branch information
11 people committed Feb 21, 2020
1 parent e26e209 commit 9e74683
Show file tree
Hide file tree
Showing 64 changed files with 5,713 additions and 3,165 deletions.
3,521 changes: 2,076 additions & 1,445 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Expand Up @@ -105,6 +105,7 @@ engine = { path = "components/engine" }
tikv_util = { path = "components/tikv_util" }
farmhash = "1.1.5"
external_storage = { path = "components/external_storage" }
batch-system = { path = "components/batch-system" }

[dependencies.murmur3]
git = "https://github.com/pingcap/murmur3.git"
Expand All @@ -122,7 +123,6 @@ default-features = false
[replace]
"log:0.3.9" = { git = "https://github.com/busyjay/log", branch = "use-static-module" }
"log:0.4.6" = { git = "https://github.com/busyjay/log", branch = "revert-to-static" }
"https://github.com/pingcap/kvproto.git#kvproto:0.0.1" = { git = "https://github.com/solotzg/kvproto.git", branch = "learner-merge" }
"prometheus:0.4.2" = { git = "https://github.com/birdstorm/rust-prometheus", branch = "0.4.2" }

[dev-dependencies]
Expand Down
18 changes: 11 additions & 7 deletions components/backup/src/writer.rs
Expand Up @@ -69,14 +69,18 @@ impl Writer {
.observe(sst_info.file_size() as f64);
let file_name = format!("{}_{}.sst", name, cf);

let reader = Sha256Reader::new(sst_reader)
let (reader, hasher) = Sha256Reader::new(sst_reader)
.map_err(|e| Error::Other(box_err!("Sha256 error: {:?}", e)))?;
let mut reader = limiter.limit(AllowStdIo::new(reader));
storage.write(&file_name, &mut reader)?;
let sha256 = reader
.into_inner()
.into_inner()
.hash()
storage.write(
&file_name,
Box::new(limiter.limit(AllowStdIo::new(reader))),
sst_info.file_size(),
)?;
let sha256 = hasher
.lock()
.unwrap()
.finish()
.map(|digest| digest.to_vec())
.map_err(|e| Error::Other(box_err!("Sha256 error: {:?}", e)))?;

let mut file = File::new();
Expand Down
29 changes: 29 additions & 0 deletions components/batch-system/Cargo.toml
@@ -0,0 +1,29 @@
[package]
name = "batch-system"
version = "0.1.0"
edition = "2018"

[dependencies]
crossbeam = "0.7"
tikv_util = { path = "../tikv_util" }
slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] }
slog-global = { version = "0.1", git = "https://github.com/breeswish/slog-global.git", rev = "91904ade" }
derive_more = "0.15.0"

[dev-dependencies]
criterion = "0.3"

[[test]]
name = "tests"
path = "tests/cases/mod.rs"
required-features = ["test-runner"]

[[bench]]
name = "router"
harness = false
required-features = ["test-runner"]

[[bench]]
name = "batch-system"
harness = false
required-features = ["test-runner"]
142 changes: 142 additions & 0 deletions components/batch-system/benches/batch-system.rs
@@ -0,0 +1,142 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

#![feature(test)]

extern crate test;

use batch_system::test_runner::*;
use batch_system::*;
use criterion::*;
use std::sync::atomic::*;
use std::sync::Arc;

fn end_hook(tx: &std::sync::mpsc::Sender<()>) -> Message {
let tx = tx.clone();
Message::Callback(Box::new(move |_| {
tx.send(()).unwrap();
}))
}

/// Benches how it performs when many messages are sent to the bench system.
///
/// A better router and lightweight batch scheduling can lead to better result.
fn bench_spawn_many(c: &mut Criterion) {
let (control_tx, control_fsm) = Runner::new(100000);
let (router, mut system) = batch_system::create_system(2, 2, control_tx, control_fsm);
system.spawn("test".to_owned(), Builder::new());
const ID_LIMIT: u64 = 32;
const MESSAGE_LIMIT: usize = 256;
for id in 0..ID_LIMIT {
let (normal_tx, normal_fsm) = Runner::new(100000);
let normal_box = BasicMailbox::new(normal_tx, normal_fsm);
router.register(id, normal_box);
}

let (tx, rx) = std::sync::mpsc::channel();
c.bench_function("spawn_many", |b| {
b.iter(|| {
for id in 0..ID_LIMIT {
for i in 0..MESSAGE_LIMIT {
router.send(id, Message::Loop(i)).unwrap();
}
router.send(id, end_hook(&tx)).unwrap();
}
for _ in 0..ID_LIMIT {
rx.recv().unwrap();
}
})
});
system.shutdown();
}

/// Bench how it performs if two hot FSMs are shown up at the same time.
///
/// A good scheduling algorithm should be able to spread the hot FSMs to
/// all available threads as soon as possible.
fn bench_imbalance(c: &mut Criterion) {
let (control_tx, control_fsm) = Runner::new(100000);
let (router, mut system) = batch_system::create_system(2, 2, control_tx, control_fsm);
system.spawn("test".to_owned(), Builder::new());
const ID_LIMIT: u64 = 10;
const MESSAGE_LIMIT: usize = 512;
for id in 0..ID_LIMIT {
let (normal_tx, normal_fsm) = Runner::new(100000);
let normal_box = BasicMailbox::new(normal_tx, normal_fsm);
router.register(id, normal_box);
}

let (tx, rx) = std::sync::mpsc::channel();
c.bench_function("imbalance", |b| {
b.iter(|| {
for i in 0..MESSAGE_LIMIT {
for id in 0..2 {
router.send(id, Message::Loop(i)).unwrap();
}
}
for id in 0..2 {
router.send(id, end_hook(&tx)).unwrap();
}
for _ in 0..2 {
rx.recv().unwrap();
}
})
});
system.shutdown();
}

/// Bench how it performs when scheduling a lot of quick tasks during an long-polling
/// tasks.
///
/// A good scheduling algorithm should not starve the quick tasks.
fn bench_fairness(c: &mut Criterion) {
let (control_tx, control_fsm) = Runner::new(100000);
let (router, mut system) = batch_system::create_system(2, 2, control_tx, control_fsm);
system.spawn("test".to_owned(), Builder::new());
for id in 0..10 {
let (normal_tx, normal_fsm) = Runner::new(100000);
let normal_box = BasicMailbox::new(normal_tx, normal_fsm);
router.register(id, normal_box);
}

let (tx, _rx) = std::sync::mpsc::channel();
let running = Arc::new(AtomicBool::new(true));
let router1 = router.clone();
let running1 = running.clone();
let handle = std::thread::spawn(move || {
while running1.load(Ordering::SeqCst) {
// Using 4 to ensure all worker threads are busy spinning.
for id in 0..4 {
let _ = router1.send(id, Message::Loop(16));
}
}
tx.send(()).unwrap();
});

let (tx2, rx2) = std::sync::mpsc::channel();
c.bench_function("fairness", |b| {
b.iter(|| {
for _ in 0..10 {
for id in 4..6 {
router.send(id, Message::Loop(10)).unwrap();
}
}
for id in 4..6 {
router.send(id, end_hook(&tx2)).unwrap();
}
for _ in 4..6 {
rx2.recv().unwrap();
}
})
});
running.store(false, Ordering::SeqCst);
system.shutdown();
let _ = handle.join();
}

criterion_group!(fair, bench_fairness);
criterion_group!(
name = load;
config = Criterion::default().sample_size(30);
targets = bench_imbalance, bench_spawn_many
);
criterion_main!(fair, load);
24 changes: 24 additions & 0 deletions components/batch-system/benches/router.rs
@@ -0,0 +1,24 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

use batch_system::test_runner::*;
use batch_system::*;
use criterion::*;

fn bench_send(c: &mut Criterion) {
let (control_tx, control_fsm) = Runner::new(100000);
let (router, mut system) = batch_system::create_system(2, 2, control_tx, control_fsm);
system.spawn("test".to_owned(), Builder::new());
let (normal_tx, normal_fsm) = Runner::new(100000);
let normal_box = BasicMailbox::new(normal_tx, normal_fsm);
router.register(1, normal_box);

c.bench_function("router::send", |b| {
b.iter(|| {
router.send(1, Message::Loop(0)).unwrap();
})
});
system.shutdown();
}

criterion_group!(benches, bench_send);
criterion_main!(benches);

0 comments on commit 9e74683

Please sign in to comment.