Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/memory leaks #846

Merged
merged 24 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
ace10ab
break memory leaks due to cyclic references
DenisBiryukov91 Mar 15, 2024
301ab46
terminate tasks to prevent corresponding memory leaks
DenisBiryukov91 Mar 15, 2024
8b81210
fix infinite recursion if session.close() is called explicitly
DenisBiryukov91 Mar 15, 2024
2e7c911
add function to force-drop ZRUNTIME_POOL static variable
DenisBiryukov91 Mar 18, 2024
adb7da7
make TaskController copyable
DenisBiryukov91 Mar 18, 2024
44dfef4
make TaskController use tokio_util TaskTracker and CancellationTokens
DenisBiryukov91 Mar 19, 2024
e12eedf
terminate more tasks
DenisBiryukov91 Mar 19, 2024
15b1c92
clippy, fmt
DenisBiryukov91 Mar 19, 2024
757cd48
- add ci bash script to run valgrind test;
DenisBiryukov91 Mar 21, 2024
a1beb28
-fix fmt
DenisBiryukov91 Mar 21, 2024
f45df41
-fix fmt; update valgrind-check script
DenisBiryukov91 Mar 21, 2024
04aeeed
clippy
DenisBiryukov91 Mar 21, 2024
e7134e5
drop runtimes instead of shutdown_background
DenisBiryukov91 Mar 21, 2024
5c672cd
include memory-leaks test in ci
DenisBiryukov91 Mar 21, 2024
ca05313
add sudo before valgrind install in ci
DenisBiryukov91 Mar 21, 2024
a94e86f
replace ZRuntime drop with shutdown_timeout to prevent infinite wait …
DenisBiryukov91 Mar 21, 2024
918b1f8
docs update
DenisBiryukov91 Mar 22, 2024
1b4622f
- rename spawn -> spawn_abortable, spawn_cancellable -> spawn, for mo…
DenisBiryukov91 Mar 25, 2024
9d293a7
merge
DenisBiryukov91 Mar 25, 2024
13ae47b
Simplify the drop of ZRUNTIME_POOL
YuanYuYuan Mar 26, 2024
93eeacf
Update the doc
YuanYuYuan Mar 26, 2024
9841bea
- use ZRuntime executor instead of futures::executor
DenisBiryukov91 Mar 26, 2024
f593283
Merge pull request #3 from YuanYuYuan/feat/memory_leaks/drop-zruntime…
DenisBiryukov91 Mar 26, 2024
8520a24
terminate scouting task
DenisBiryukov91 Mar 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ jobs:
- name: Run doctests
run: cargo test --doc

- name: Install valgrind
if: ${{ matrix.os == 'ubuntu-latest' }}
run: sudo apt-get install -y valgrind
shell: bash

- name: Run memory leaks check
if: ${{ matrix.os == 'ubuntu-latest' }}
run: ci/valgrind-check/run.sh
shell: bash

# NOTE: In GitHub repository settings, the "Require status checks to pass
# before merging" branch protection rule ensures that commits are only merged
# from branches where specific status checks have passed. These checks are
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@
.vscode

cargo-timing*.html

ci/valgrind-check/*.log
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ members = [
"commons/zenoh-result",
"commons/zenoh-shm",
"commons/zenoh-sync",
"commons/zenoh-task",
"commons/zenoh-util",
"commons/zenoh-runtime",
"examples",
Expand All @@ -51,7 +52,7 @@ members = [
"zenoh-ext",
"zenohd",
]
exclude = ["ci/nostd-check"]
exclude = ["ci/nostd-check", "ci/valgrind-check"]

[workspace.package]
rust-version = "1.66.1"
Expand Down Expand Up @@ -197,6 +198,7 @@ zenoh-link = { version = "0.11.0-dev", path = "io/zenoh-link" }
zenoh-link-commons = { version = "0.11.0-dev", path = "io/zenoh-link-commons" }
zenoh = { version = "0.11.0-dev", path = "zenoh", default-features = false }
zenoh-runtime = { version = "0.11.0-dev", path = "commons/zenoh-runtime" }
zenoh-task = { version = "0.11.0-dev", path = "commons/zenoh-task" }

[profile.dev]
debug = true
Expand All @@ -215,4 +217,4 @@ debug = false # If you want debug symbol in release mode, set the env variab
lto = "fat"
codegen-units = 1
opt-level = 3
panic = "abort"
panic = "abort"
37 changes: 37 additions & 0 deletions ci/valgrind-check/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#
# Copyright (c) 2024 ZettaScale Technology
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
# which is available at https://www.apache.org/licenses/LICENSE-2.0.
#
# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
#
# Contributors:
# ZettaScale Zenoh Team, <zenoh@zettascale.tech>
#
[package]
name = "valgrind-check"
version = "0.1.0"
repository = "https://github.com/eclipse-zenoh/zenoh"
homepage = "http://zenoh.io"
license = "EPL-2.0 OR Apache-2.0"
edition = "2021"
categories = ["network-programming"]
description = "Internal crate for zenoh."

[dependencies]
tokio = { version = "1.35.1", features = ["rt-multi-thread", "time", "io-std"] }
env_logger = "0.11.0"
futures = "0.3.25"
zenoh = { path = "../../zenoh/" }
zenoh-runtime = { path = "../../commons/zenoh-runtime/" }

[[bin]]
name = "pub_sub"
path = "src/pub_sub/bin/z_pub_sub.rs"

[[bin]]
name = "queryable_get"
path = "src/queryable_get/bin/z_queryable_get.rs"
21 changes: 21 additions & 0 deletions ci/valgrind-check/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/usr/bin/env bash
set -e
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )

function check_leaks {
echo "Checking $1 for memory leaks"
valgrind --leak-check=full --num-callers=50 --log-file="$SCRIPT_DIR/$1_leaks.log" $SCRIPT_DIR/target/debug/$1
num_leaks=$(grep 'ERROR SUMMARY: [0-9]+' -Eo "$SCRIPT_DIR/$1_leaks.log" | grep '[0-9]+' -Eo)
echo "Detected $num_leaks memory leaks"
if (( num_leaks == 0 ))
then
return 0
else
cat $SCRIPT_DIR/$1_leaks.log
return -1
fi
}

cargo build --manifest-path=$SCRIPT_DIR/Cargo.toml
check_leaks "queryable_get"
check_leaks "pub_sub"
58 changes: 58 additions & 0 deletions ci/valgrind-check/src/pub_sub/bin/z_pub_sub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
use std::time::Duration;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;

#[tokio::main]
async fn main() {
let _z = zenoh_runtime::ZRuntimePoolGuard;
env_logger::init();

let pub_key_expr = KeyExpr::try_from("test/valgrind/data").unwrap();
let sub_key_expr = KeyExpr::try_from("test/valgrind/**").unwrap();

println!("Declaring Publisher on '{pub_key_expr}'...");
let pub_session = zenoh::open(Config::default()).res().await.unwrap();
let publisher = pub_session
.declare_publisher(&pub_key_expr)
.res()
.await
.unwrap();

println!("Declaring Subscriber on '{sub_key_expr}'...");
let sub_session = zenoh::open(Config::default()).res().await.unwrap();
let _subscriber = sub_session
.declare_subscriber(&sub_key_expr)
.callback(|sample| {
println!(
">> [Subscriber] Received {} ('{}': '{}')",
sample.kind,
sample.key_expr.as_str(),
sample.value
);
})
.res()
.await
.unwrap();

for idx in 0..5 {
tokio::time::sleep(Duration::from_secs(1)).await;
let buf = format!("[{idx:4}] data");
println!("Putting Data ('{}': '{}')...", &pub_key_expr, buf);
publisher.put(buf).res().await.unwrap();
}

tokio::time::sleep(Duration::from_secs(1)).await;
}
71 changes: 71 additions & 0 deletions ci/valgrind-check/src/queryable_get/bin/z_queryable_get.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
use std::convert::TryFrom;
use std::time::Duration;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;

#[tokio::main]
async fn main() {
let _z = zenoh_runtime::ZRuntimePoolGuard;
env_logger::init();

let queryable_key_expr = KeyExpr::try_from("test/valgrind/data").unwrap();
let get_selector = Selector::try_from("test/valgrind/**").unwrap();

println!("Declaring Queryable on '{queryable_key_expr}'...");
let queryable_session = zenoh::open(Config::default()).res().await.unwrap();
let _queryable = queryable_session
.declare_queryable(&queryable_key_expr.clone())
.callback(move |query| {
println!(">> Handling query '{}'", query.selector());
let reply = Ok(Sample::new(
queryable_key_expr.clone(),
query.value().unwrap().clone(),
));
zenoh_runtime::ZRuntime::Application.block_in_place(
async move { query.reply(reply).res().await.unwrap(); }
);
})
.complete(true)
.res()
.await
.unwrap();

println!("Declaring Get session for '{get_selector}'...");
let get_session = zenoh::open(Config::default()).res().await.unwrap();

for idx in 0..5 {
tokio::time::sleep(Duration::from_secs(1)).await;
println!("Sending Query '{get_selector}'...");
let replies = get_session
.get(&get_selector)
.with_value(idx)
.target(QueryTarget::All)
.res()
.await
.unwrap();
while let Ok(reply) = replies.recv_async().await {
match reply.sample {
Ok(sample) => println!(
">> Received ('{}': '{}')",
sample.key_expr.as_str(),
sample.value,
),
Err(err) => println!(">> Received (ERROR: '{}')", String::try_from(&err).unwrap()),
}
}
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
1 change: 1 addition & 0 deletions commons/zenoh-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ description = { workspace = true }
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
futures = { workspace = true }
lazy_static = { workspace = true }
zenoh-result = { workspace = true, features = ["std"] }
zenoh-collections = { workspace = true, features = ["std"] }
Expand Down
36 changes: 36 additions & 0 deletions commons/zenoh-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::{
atomic::{AtomicUsize, Ordering},
OnceLock,
},
time::Duration,
};
use tokio::runtime::{Handle, Runtime, RuntimeFlavor};
use zenoh_collections::Properties;
Expand Down Expand Up @@ -147,6 +148,41 @@ impl ZRuntimePool {
}
}

// If there are any blocking tasks spawned by ZRuntimes, the function will block until they return.
impl Drop for ZRuntimePool {
fn drop(&mut self) {
let handles: Vec<_> = self
.0
.drain()
.map(|(name, mut rt)| {
std::thread::spawn(move || {
rt.take()
.unwrap_or_else(|| panic!("ZRuntime {name:?} failed to shutdown."))
.shutdown_timeout(Duration::from_secs(1))
})
})
.collect();

for hd in handles {
let _ = hd.join();
}
}
}

/// In order to prevent valgrind reporting memory leaks,
/// we use this guard to force drop ZRUNTIME_POOL since Rust does not drop static variables.
#[doc(hidden)]
pub struct ZRuntimePoolGuard;

impl Drop for ZRuntimePoolGuard {
fn drop(&mut self) {
unsafe {
let ptr = &(*ZRUNTIME_POOL) as *const ZRuntimePool;
std::mem::drop(ptr.read());
}
}
}

#[derive(Debug, Copy, Clone)]
pub struct ZRuntimeConfig {
pub application_threads: usize,
Expand Down
33 changes: 33 additions & 0 deletions commons/zenoh-task/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#
# Copyright (c) 2024 ZettaScale Technology
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
# which is available at https://www.apache.org/licenses/LICENSE-2.0.
#
# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
#
# Contributors:
# ZettaScale Zenoh Team, <zenoh@zettascale.tech>
#
[package]
rust-version = { workspace = true }
name = "zenoh-task"
version = { workspace = true }
repository = { workspace = true }
homepage = { workspace = true }
authors = {workspace = true }
edition = { workspace = true }
license = { workspace = true }
categories = { workspace = true }
description = "Internal crate for zenoh."
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { workspace = true, features = ["default", "sync"] }
futures = { workspace = true }
log = { workspace = true }
zenoh-core = { workspace = true }
zenoh-runtime = { workspace = true }
tokio-util = { workspace = true, features = ["rt"] }
Loading
Loading