Skip to content

Commit

Permalink
Merge pull request #846 from DenisBiryukov91/fix/memory_leaks
Browse files Browse the repository at this point in the history
Fix/memory leaks
  • Loading branch information
milyin committed Mar 29, 2024
2 parents e04c861 + 8520a24 commit 888d340
Show file tree
Hide file tree
Showing 34 changed files with 996 additions and 338 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ jobs:
- name: Run doctests
run: cargo test --doc

- name: Install valgrind
if: ${{ matrix.os == 'ubuntu-latest' }}
run: sudo apt-get install -y valgrind
shell: bash

- name: Run memory leaks check
if: ${{ matrix.os == 'ubuntu-latest' }}
run: ci/valgrind-check/run.sh
shell: bash

# NOTE: In GitHub repository settings, the "Require status checks to pass
# before merging" branch protection rule ensures that commits are only merged
# from branches where specific status checks have passed. These checks are
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@
.vscode

cargo-timing*.html

ci/valgrind-check/*.log
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ members = [
"commons/zenoh-result",
"commons/zenoh-shm",
"commons/zenoh-sync",
"commons/zenoh-task",
"commons/zenoh-util",
"commons/zenoh-runtime",
"examples",
Expand All @@ -51,7 +52,7 @@ members = [
"zenoh-ext",
"zenohd",
]
exclude = ["ci/nostd-check"]
exclude = ["ci/nostd-check", "ci/valgrind-check"]

[workspace.package]
rust-version = "1.66.1"
Expand Down Expand Up @@ -197,6 +198,7 @@ zenoh-link = { version = "0.11.0-dev", path = "io/zenoh-link" }
zenoh-link-commons = { version = "0.11.0-dev", path = "io/zenoh-link-commons" }
zenoh = { version = "0.11.0-dev", path = "zenoh", default-features = false }
zenoh-runtime = { version = "0.11.0-dev", path = "commons/zenoh-runtime" }
zenoh-task = { version = "0.11.0-dev", path = "commons/zenoh-task" }

[profile.dev]
debug = true
Expand All @@ -215,4 +217,4 @@ debug = false # If you want debug symbol in release mode, set the env variab
lto = "fat"
codegen-units = 1
opt-level = 3
panic = "abort"
panic = "abort"
37 changes: 37 additions & 0 deletions ci/valgrind-check/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#
# Copyright (c) 2024 ZettaScale Technology
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
# which is available at https://www.apache.org/licenses/LICENSE-2.0.
#
# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
#
# Contributors:
# ZettaScale Zenoh Team, <zenoh@zettascale.tech>
#
[package]
name = "valgrind-check"
version = "0.1.0"
repository = "https://github.com/eclipse-zenoh/zenoh"
homepage = "http://zenoh.io"
license = "EPL-2.0 OR Apache-2.0"
edition = "2021"
categories = ["network-programming"]
description = "Internal crate for zenoh."

[dependencies]
tokio = { version = "1.35.1", features = ["rt-multi-thread", "time", "io-std"] }
env_logger = "0.11.0"
futures = "0.3.25"
zenoh = { path = "../../zenoh/" }
zenoh-runtime = { path = "../../commons/zenoh-runtime/" }

[[bin]]
name = "pub_sub"
path = "src/pub_sub/bin/z_pub_sub.rs"

[[bin]]
name = "queryable_get"
path = "src/queryable_get/bin/z_queryable_get.rs"
21 changes: 21 additions & 0 deletions ci/valgrind-check/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/usr/bin/env bash
set -e
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )

function check_leaks {
echo "Checking $1 for memory leaks"
valgrind --leak-check=full --num-callers=50 --log-file="$SCRIPT_DIR/$1_leaks.log" $SCRIPT_DIR/target/debug/$1
num_leaks=$(grep 'ERROR SUMMARY: [0-9]+' -Eo "$SCRIPT_DIR/$1_leaks.log" | grep '[0-9]+' -Eo)
echo "Detected $num_leaks memory leaks"
if (( num_leaks == 0 ))
then
return 0
else
cat $SCRIPT_DIR/$1_leaks.log
return -1
fi
}

cargo build --manifest-path=$SCRIPT_DIR/Cargo.toml
check_leaks "queryable_get"
check_leaks "pub_sub"
58 changes: 58 additions & 0 deletions ci/valgrind-check/src/pub_sub/bin/z_pub_sub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
use std::time::Duration;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;

#[tokio::main]
async fn main() {
let _z = zenoh_runtime::ZRuntimePoolGuard;
env_logger::init();

let pub_key_expr = KeyExpr::try_from("test/valgrind/data").unwrap();
let sub_key_expr = KeyExpr::try_from("test/valgrind/**").unwrap();

println!("Declaring Publisher on '{pub_key_expr}'...");
let pub_session = zenoh::open(Config::default()).res().await.unwrap();
let publisher = pub_session
.declare_publisher(&pub_key_expr)
.res()
.await
.unwrap();

println!("Declaring Subscriber on '{sub_key_expr}'...");
let sub_session = zenoh::open(Config::default()).res().await.unwrap();
let _subscriber = sub_session
.declare_subscriber(&sub_key_expr)
.callback(|sample| {
println!(
">> [Subscriber] Received {} ('{}': '{}')",
sample.kind,
sample.key_expr.as_str(),
sample.value
);
})
.res()
.await
.unwrap();

for idx in 0..5 {
tokio::time::sleep(Duration::from_secs(1)).await;
let buf = format!("[{idx:4}] data");
println!("Putting Data ('{}': '{}')...", &pub_key_expr, buf);
publisher.put(buf).res().await.unwrap();
}

tokio::time::sleep(Duration::from_secs(1)).await;
}
71 changes: 71 additions & 0 deletions ci/valgrind-check/src/queryable_get/bin/z_queryable_get.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
use std::convert::TryFrom;
use std::time::Duration;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;

#[tokio::main]
async fn main() {
let _z = zenoh_runtime::ZRuntimePoolGuard;
env_logger::init();

let queryable_key_expr = KeyExpr::try_from("test/valgrind/data").unwrap();
let get_selector = Selector::try_from("test/valgrind/**").unwrap();

println!("Declaring Queryable on '{queryable_key_expr}'...");
let queryable_session = zenoh::open(Config::default()).res().await.unwrap();
let _queryable = queryable_session
.declare_queryable(&queryable_key_expr.clone())
.callback(move |query| {
println!(">> Handling query '{}'", query.selector());
let reply = Ok(Sample::new(
queryable_key_expr.clone(),
query.value().unwrap().clone(),
));
zenoh_runtime::ZRuntime::Application.block_in_place(
async move { query.reply(reply).res().await.unwrap(); }
);
})
.complete(true)
.res()
.await
.unwrap();

println!("Declaring Get session for '{get_selector}'...");
let get_session = zenoh::open(Config::default()).res().await.unwrap();

for idx in 0..5 {
tokio::time::sleep(Duration::from_secs(1)).await;
println!("Sending Query '{get_selector}'...");
let replies = get_session
.get(&get_selector)
.with_value(idx)
.target(QueryTarget::All)
.res()
.await
.unwrap();
while let Ok(reply) = replies.recv_async().await {
match reply.sample {
Ok(sample) => println!(
">> Received ('{}': '{}')",
sample.key_expr.as_str(),
sample.value,
),
Err(err) => println!(">> Received (ERROR: '{}')", String::try_from(&err).unwrap()),
}
}
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
1 change: 1 addition & 0 deletions commons/zenoh-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ description = { workspace = true }
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
futures = { workspace = true }
lazy_static = { workspace = true }
zenoh-result = { workspace = true, features = ["std"] }
zenoh-collections = { workspace = true, features = ["std"] }
Expand Down
36 changes: 36 additions & 0 deletions commons/zenoh-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::{
atomic::{AtomicUsize, Ordering},
OnceLock,
},
time::Duration,
};
use tokio::runtime::{Handle, Runtime, RuntimeFlavor};
use zenoh_collections::Properties;
Expand Down Expand Up @@ -147,6 +148,41 @@ impl ZRuntimePool {
}
}

// If there are any blocking tasks spawned by ZRuntimes, the function will block until they return.
impl Drop for ZRuntimePool {
fn drop(&mut self) {
let handles: Vec<_> = self
.0
.drain()
.map(|(name, mut rt)| {
std::thread::spawn(move || {
rt.take()
.unwrap_or_else(|| panic!("ZRuntime {name:?} failed to shutdown."))
.shutdown_timeout(Duration::from_secs(1))
})
})
.collect();

for hd in handles {
let _ = hd.join();
}
}
}

/// In order to prevent valgrind reporting memory leaks,
/// we use this guard to force drop ZRUNTIME_POOL since Rust does not drop static variables.
#[doc(hidden)]
pub struct ZRuntimePoolGuard;

impl Drop for ZRuntimePoolGuard {
fn drop(&mut self) {
unsafe {
let ptr = &(*ZRUNTIME_POOL) as *const ZRuntimePool;
std::mem::drop(ptr.read());
}
}
}

#[derive(Debug, Copy, Clone)]
pub struct ZRuntimeConfig {
pub application_threads: usize,
Expand Down
33 changes: 33 additions & 0 deletions commons/zenoh-task/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#
# Copyright (c) 2024 ZettaScale Technology
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
# which is available at https://www.apache.org/licenses/LICENSE-2.0.
#
# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
#
# Contributors:
# ZettaScale Zenoh Team, <zenoh@zettascale.tech>
#
[package]
rust-version = { workspace = true }
name = "zenoh-task"
version = { workspace = true }
repository = { workspace = true }
homepage = { workspace = true }
authors = {workspace = true }
edition = { workspace = true }
license = { workspace = true }
categories = { workspace = true }
description = "Internal crate for zenoh."
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { workspace = true, features = ["default", "sync"] }
futures = { workspace = true }
log = { workspace = true }
zenoh-core = { workspace = true }
zenoh-runtime = { workspace = true }
tokio-util = { workspace = true, features = ["rt"] }
Loading

0 comments on commit 888d340

Please sign in to comment.