Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into engine-traits-table…
Browse files Browse the repository at this point in the history
…-props-2
  • Loading branch information
brson committed Feb 21, 2020
2 parents 37d87b2 + 58ed7e1 commit 65ded0f
Show file tree
Hide file tree
Showing 61 changed files with 1,648 additions and 485 deletions.
26 changes: 20 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ log = { version = "0.4", features = ["max_level_trace", "release_max_level_debug
log_wrappers = { path = "components/log_wrappers" }
mime = "0.3.13"
more-asserts = "0.1"
murmur3 = "0.5.1"
nix = "0.11"
nom = "5.0.1"
pd_client = { path = "components/pd_client" }
Expand Down Expand Up @@ -129,9 +130,7 @@ toml = "0.4"
url = "2"
uuid = { version = "0.7", features = [ "serde", "v4" ] }
vlog = "0.1.4"

[dependencies.murmur3]
git = "https://github.com/pingcap/murmur3.git"
into_other = { path = "components/into_other" }

[dependencies.prometheus]
git = "https://github.com/tikv/rust-prometheus.git"
Expand Down Expand Up @@ -197,6 +196,7 @@ members = [
"components/batch-system",
"components/cdc",
"components/raftstore",
"components/into_other",
]
default-members = ["cmd"]

Expand Down
32 changes: 23 additions & 9 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -223,33 +223,35 @@ test:
# they require special compile-time and run-time setup
# Forturately rebuilding with the mem-profiling feature will only
# rebuild starting at jemalloc-sys.
# TODO: remove cd commands after https://github.com/rust-lang/cargo/issues/5364 is resolved.
export DYLD_LIBRARY_PATH="${DYLD_LIBRARY_PATH}:${LOCAL_DIR}/lib" && \
export LOG_LEVEL=DEBUG && \
export RUST_BACKTRACE=1 && \
cargo test --no-default-features --features "${ENABLE_FEATURES}" --all --exclude tests --exclude cdc ${EXTRA_CARGO_ARGS} -- --nocapture && \
cargo test --no-default-features --features "${ENABLE_FEATURES}" --all --exclude tests --exclude \
cdc --exclude fuzz-targets --exclude fuzzer-honggfuzz --exclude fuzzer-afl --exclude fuzzer-libfuzzer \
${EXTRA_CARGO_ARGS} -- --nocapture && \
cd tests && cargo test --features "${ENABLE_FEATURES}" ${EXTRA_CARGO_ARGS} -- --nocapture && cd .. && \
cargo test --no-default-features --features "${ENABLE_FEATURES}" -p tests --bench misc ${EXTRA_CARGO_ARGS} -- --nocapture && \
cd components/cdc && cargo test --no-default-features --features "${ENABLE_FEATURES}" -p cdc ${EXTRA_CARGO_ARGS} -- --nocapture && cd ../.. && \
cd components/cdc && cargo test --no-default-features --features "${ENABLE_FEATURES}" ${EXTRA_CARGO_ARGS} -- --nocapture && cd ../.. && \
if [[ "`uname`" == "Linux" ]]; then \
export MALLOC_CONF=prof:true,prof_active:false && \
cargo test --no-default-features --features "${ENABLE_FEATURES},mem-profiling" ${EXTRA_CARGO_ARGS} --bin tikv-server -- --nocapture --ignored; \
fi
bash scripts/check-bins-for-jemalloc.sh
bash scripts/check-udeps.sh
# TODO: remove the section after https://github.com/rust-lang/cargo/issues/5364 is resolved.
export DYLD_LIBRARY_PATH="${DYLD_LIBRARY_PATH}:${LOCAL_DIR}/lib" && \
export LOG_LEVEL=DEBUG && \
export RUST_BACKTRACE=1 && \
cd tests && cargo test --features "${ENABLE_FEATURES}" ${EXTRA_CARGO_ARGS} -- --nocapture && cd .. && \
cd components/cdc && cargo test --no-default-features --features "${ENABLE_FEATURES}" ${EXTRA_CARGO_ARGS} -- --nocapture

# This is used for CI test
ci_test: ci_doc_test
cargo test --no-default-features --features "${ENABLE_FEATURES}" --all --exclude tests --exclude cdc --exclude cdc --all-targets --no-run --message-format=json
cargo test --no-default-features --features "${ENABLE_FEATURES}" --all --exclude tests --exclude cdc \
--exclude fuzz-targets --exclude fuzzer-honggfuzz --exclude fuzzer-afl --exclude fuzzer-libfuzzer \
--all-targets --no-run --message-format=json
cd tests && cargo test --no-default-features --features "${ENABLE_FEATURES}" --no-run --message-format=json
cd components/cdc && cargo test --no-default-features --features "${ENABLE_FEATURES}" --no-run --message-format=json

ci_doc_test:
cargo test --no-default-features --features "${ENABLE_FEATURES}" --all --exclude tests --doc
cargo test --no-default-features --features "${ENABLE_FEATURES}" --all --exclude tests --exclude cdc \
--exclude fuzz-targets --exclude fuzzer-honggfuzz --exclude fuzzer-afl --exclude fuzzer-libfuzzer --doc

## Static analysis
## ---------------
Expand Down Expand Up @@ -280,6 +282,7 @@ ALLOWED_CLIPPY_LINTS=-A clippy::module_inception -A clippy::needless_pass_by_val
# PROST feature works differently in test cdc and backup package, they need to be checked under their folders.
clippy: pre-clippy
@cargo clippy --all --exclude cdc --exclude backup \
--exclude fuzz-targets --exclude fuzzer-honggfuzz --exclude fuzzer-afl --exclude fuzzer-libfuzzer \
--all-targets --no-default-features \
--features "${ENABLE_FEATURES}" -- $(ALLOWED_CLIPPY_LINTS)
@for pkg in "cdc" "backup"; do \
Expand All @@ -288,6 +291,11 @@ clippy: pre-clippy
--features "${ENABLE_FEATURES}" -- $(ALLOWED_CLIPPY_LINTS) && \
cd ../.. ;\
done
@for pkg in "fuzz" "fuzz/fuzzer-afl" "fuzz/fuzzer-honggfuzz" "fuzz/fuzzer-libfuzzer"; do \
cd $$pkg && \
cargo clippy --all-targets -- $(ALLOWED_CLIPPY_LINTS) && \
cd - >/dev/null; \
done

# TODO fix tests warnings
# @cd tests && \
Expand All @@ -306,6 +314,12 @@ pre-audit:
audit: pre-audit
cargo audit

FUZZER ?= Honggfuzz

.PHONY: fuzz
fuzz:
@cargo run --package fuzz --no-default-features --features "${ENABLE_FEATURES}" -- run ${FUZZER} ${FUZZ_TARGET} \
|| echo "" && echo "Set the target for fuzzing using FUZZ_TARGET and the fuzzer using FUZZER (default is Honggfuzz)"

## Special targets
## ---------------
Expand Down
24 changes: 12 additions & 12 deletions cmd/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,21 +418,21 @@ impl TiKVServer {
None
};

let storage_read_pool = if self.config.readpool.unify_read_pool {
ReadPool::from(unified_read_pool.as_ref().unwrap().remote().clone())
let storage_read_pool_handle = if self.config.readpool.unify_read_pool {
unified_read_pool.as_ref().unwrap().handle()
} else {
let storage_read_pools = storage::build_read_pool(
let storage_read_pools = ReadPool::from(storage::build_read_pool(
&self.config.readpool.storage,
pd_sender.clone(),
engines.engine.clone(),
);
ReadPool::from(storage_read_pools)
));
storage_read_pools.handle()
};

let storage = create_raft_storage(
engines.engine.clone(),
&self.config.storage,
storage_read_pool,
storage_read_pool_handle,
lock_mgr.clone(),
)
.unwrap_or_else(|e| fatal!("failed to create raft storage: {}", e));
Expand All @@ -454,15 +454,15 @@ impl TiKVServer {
.build(snap_path, Some(self.router.clone()));

// Create coprocessor endpoint.
let cop_read_pool = if self.config.readpool.unify_read_pool {
ReadPool::from(unified_read_pool.as_ref().unwrap().remote().clone())
let cop_read_pool_handle = if self.config.readpool.unify_read_pool {
unified_read_pool.as_ref().unwrap().handle()
} else {
let cop_read_pools = coprocessor::readpool_impl::build_read_pool(
let cop_read_pools = ReadPool::from(coprocessor::readpool_impl::build_read_pool(
&self.config.readpool.coprocessor,
pd_sender.clone(),
engines.engine.clone(),
);
ReadPool::from(cop_read_pools)
));
cop_read_pools.handle()
};

let server_config = Arc::new(self.config.server.clone());
Expand All @@ -472,7 +472,7 @@ impl TiKVServer {
&server_config,
&self.security_mgr,
storage,
coprocessor::Endpoint::new(&server_config, cop_read_pool),
coprocessor::Endpoint::new(&server_config, cop_read_pool_handle),
engines.raft_router.clone(),
self.resolver.clone(),
snap_mgr.clone(),
Expand Down
2 changes: 1 addition & 1 deletion components/backup/tests/integrations/test_backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl TestSuite {
let mut response = self.tikv_cli.raw_put(&request).unwrap();
retry_req!(
self.tikv_cli.raw_put(&request).unwrap(),
!response.has_region_error() && !response.error.is_empty(),
!response.has_region_error() && response.error.is_empty(),
response,
10, // retry 10 times
1000 // 1s timeout
Expand Down
11 changes: 10 additions & 1 deletion components/batch-system/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,18 @@ where
let name_prefix = self.name_prefix.take().unwrap();
info!("shutdown batch system {}", name_prefix);
self.router.broadcast_shutdown();
let mut last_error = None;
for h in self.workers.drain(..) {
debug!("waiting for {}", h.thread().name().unwrap());
h.join().unwrap();
if let Err(e) = h.join() {
error!("failed to join worker thread: {:?}", e);
last_error = Some(e);
}
}
if let Some(e) = last_error {
if !thread::panicking() {
panic!("failed to join worker thread: {:?}", e);
}
}
info!("batch system {} is stopped.", name_prefix);
}
Expand Down
15 changes: 14 additions & 1 deletion components/cdc/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use futures::future::{lazy, Future};
use kvproto::cdcpb::*;
use kvproto::metapb::Region;
use pd_client::PdClient;
use raftstore::store::fsm::{ApplyRouter, ApplyTask};
use raftstore::coprocessor::{Cmd, CmdBatch, CmdObserver, ObserverContext, RoleObserver};
use raftstore::store::fsm::{ApplyRouter, ApplyTask, ChangeCmd};
use raftstore::store::msg::{Callback, ReadResponse};
use resolved_ts::Resolver;
use tikv::storage::kv::Snapshot;
Expand All @@ -35,6 +36,9 @@ pub enum Task {
downstream_id: Option<DownstreamID>,
err: Option<Error>,
},
MultiBatch {
multi: Vec<CmdBatch>,
},
MinTS {
min_ts: TimeStamp,
},
Expand Down Expand Up @@ -79,6 +83,7 @@ impl fmt::Debug for Task {
.field("err", err)
.field("downstream_id", downstream_id)
.finish(),
Task::MultiBatch { multi } => de.field("multibatch", &multi.len()).finish(),
Task::MinTS { ref min_ts } => de.field("min_ts", min_ts).finish(),
Task::ResolverReady { ref region_id, .. } => de.field("region_id", region_id).finish(),
Task::IncrementalScan {
Expand All @@ -99,6 +104,7 @@ pub struct Endpoint {
capture_regions: HashMap<u64, Delegate>,
scheduler: Scheduler<Task>,
apply_router: ApplyRouter,
observer: CdcObserver,

pd_client: Arc<dyn PdClient>,
timer: SteadyTimer,
Expand All @@ -113,6 +119,7 @@ impl Endpoint {
pd_client: Arc<dyn PdClient>,
scheduler: Scheduler<Task>,
apply_router: ApplyRouter,
observer: CdcObserver,
) -> Endpoint {
let workers = Builder::new().name_prefix("cdcwkr").pool_size(4).build();
Endpoint {
Expand All @@ -122,6 +129,7 @@ impl Endpoint {
timer: SteadyTimer::default(),
workers,
apply_router,
observer,
scan_batch_size: 1024,
min_ts_interval: Duration::from_secs(10),
}
Expand All @@ -141,6 +149,10 @@ impl Endpoint {
unimplemented!()
}

pub fn on_multi_batch(&mut self, _multi: Vec<CmdBatch>) {
unimplemented!()
}

pub fn on_incremental_scan(
&mut self,
_region_id: u64,
Expand Down Expand Up @@ -339,6 +351,7 @@ impl Runnable<Task> for Endpoint {
} => {
self.on_incremental_scan(region_id, downstream_id, entries);
}
Task::MultiBatch { multi } => self.on_multi_batch(multi),
Task::Validate(region_id, validate) => {
validate(self.capture_regions.get(&region_id));
}
Expand Down
Loading

0 comments on commit 65ded0f

Please sign in to comment.