From aeffcef0c2e0f1f90936780edf348078cca0a18c Mon Sep 17 00:00:00 2001 From: huanchao Date: Mon, 6 May 2024 07:25:40 +0000 Subject: [PATCH] [Agent] Support for the Agent to modify its own container resource limits --- agent/Cargo.lock | 365 +++++++++++++--- agent/Cargo.toml | 1 + agent/src/config/config.rs | 15 +- agent/src/config/handler.rs | 53 ++- agent/src/monitor.rs | 4 +- agent/src/trident.rs | 5 +- agent/src/utils/cgroups/linux.rs | 14 +- agent/src/utils/environment.rs | 408 ++---------------- agent/src/utils/environment/linux.rs | 400 +++++++++++++++++ agent/src/utils/environment/windows.rs | 77 ++++ agent/src/utils/guard.rs | 4 +- .../docker-compose.yaml | 1 + 12 files changed, 868 insertions(+), 479 deletions(-) create mode 100644 agent/src/utils/environment/linux.rs create mode 100644 agent/src/utils/environment/windows.rs diff --git a/agent/Cargo.lock b/agent/Cargo.lock index 0132bd08d40..d2e00ef15a1 100644 --- a/agent/Cargo.lock +++ b/agent/Cargo.lock @@ -8,7 +8,16 @@ version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f4fa78e18c64fce05e902adecd7a5eed15a5e0a3439f7b0e169f0252214865e3" dependencies = [ - "gimli", + "gimli 0.27.2", +] + +[[package]] +name = "addr2line" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb" +dependencies = [ + "gimli 0.28.1", ] [[package]] @@ -164,9 +173,9 @@ dependencies = [ "bitflags 1.3.2", "bytes 1.4.0", "futures-util", - "http", - "http-body", - "hyper", + "http 0.2.9", + "http-body 0.4.5", + "hyper 0.14.26", "itoa", "matchit", "memchr", @@ -190,8 +199,8 @@ dependencies = [ "async-trait", "bytes 1.4.0", "futures-util", - "http", - "http-body", + "http 0.2.9", + "http-body 0.4.5", "mime", "rustversion", "tower-layer", @@ -209,6 +218,21 @@ dependencies = [ "rand", ] +[[package]] +name = "backtrace" +version = "0.3.71" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b05800d2e817c8b3b4b54abd461726265fa9789ae34330622f2db9ee696f9d" +dependencies = [ + "addr2line 0.21.0", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object 0.32.2", + "rustc-demangle", +] + [[package]] name = "base64" version = "0.12.3" @@ -227,6 +251,12 @@ version = "0.21.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "604178f6c5c21f02dc555784810edfb88d34ac2c73b2eae109655649ee73ce3d" +[[package]] +name = "base64" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9475866fec1451be56a3c2400fd081ff546538961565ccb5b7142cbd22bc7a51" + [[package]] name = "bincode" version = "1.3.3" @@ -288,6 +318,50 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bollard" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0aed08d3adb6ebe0eff737115056652670ae290f177759aac19c30456135f94c" +dependencies = [ + "base64 0.22.0", + "bollard-stubs", + "bytes 1.4.0", + "futures-core", + "futures-util", + "hex", + "http 1.1.0", + "http-body-util", + "hyper 1.3.1", + "hyper-named-pipe", + "hyper-util", + "hyperlocal-next", + "log 0.4.18", + "pin-project-lite", + "serde", + "serde_derive", + "serde_json", + "serde_repr", + "serde_urlencoded", + "thiserror", + "tokio", + "tokio-util", + "tower-service", + "url", + "winapi", +] + +[[package]] +name = "bollard-stubs" +version = "1.44.0-rc.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "709d9aa1c37abb89d40f19f5d0ad6f0d88cb1581264e571c9350fc5bb89cf1c5" +dependencies = [ + "serde", + "serde_repr", + "serde_with", +] + [[package]] name = "bson" version = "2.7.0" @@ -419,11 +493,12 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.0.79" +version = "1.0.94" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f" +checksum = "17f6e324229dc011159fcc089755d1e2e216a90d43a7dea6853ca740b84f35e7" dependencies = [ "jobserver", + "libc", ] [[package]] @@ -570,7 +645,7 @@ dependencies = [ "cranelift-control", "cranelift-entity", "cranelift-isle", - "gimli", + "gimli 0.27.2", "hashbrown 0.13.2", "log 0.4.18", "regalloc2", @@ -842,6 +917,7 @@ dependencies = [ "base64 0.21.2", "bincode 2.0.0-rc.3", "bitflags 1.3.2", + "bollard", "bson", "bytesize", "cadence", @@ -864,10 +940,10 @@ dependencies = [ "hex", "hostname", "hpack", - "http", + "http 0.2.9", "http2", "humantime-serde", - "hyper", + "hyper 0.14.26", "ipnet", "ipnetwork", "k8s-openapi", @@ -906,7 +982,7 @@ dependencies = [ "serde_json", "serde_yaml", "signal-hook", - "socket2", + "socket2 0.4.9", "special_recv_engine", "sysinfo", "tempfile", @@ -1002,7 +1078,7 @@ checksum = "53ecafc952c4528d9b51a458d1a8904b81783feff9fde08ab6ed2545ff396872" dependencies = [ "cfg-if", "libc", - "socket2", + "socket2 0.4.9", "winapi", ] @@ -1372,6 +1448,12 @@ dependencies = [ "stable_deref_trait", ] +[[package]] +name = "gimli" +version = "0.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" + [[package]] name = "glob" version = "0.3.1" @@ -1397,7 +1479,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.9", "indexmap 2.0.0", "slab", "tokio", @@ -1514,6 +1596,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +dependencies = [ + "bytes 1.4.0", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.5" @@ -1521,7 +1614,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ "bytes 1.4.0", - "http", + "http 0.2.9", + "pin-project-lite", +] + +[[package]] +name = "http-body" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +dependencies = [ + "bytes 1.4.0", + "http 1.1.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0475f8b2ac86659c21b64320d5d653f9efe42acd2a4e560073ec61a155a34f1d" +dependencies = [ + "bytes 1.4.0", + "futures-core", + "http 1.1.0", + "http-body 1.0.0", "pin-project-lite", ] @@ -1574,27 +1690,61 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", - "http-body", + "http 0.2.9", + "http-body 0.4.5", "httparse", "httpdate", "itoa", "pin-project-lite", - "socket2", + "socket2 0.4.9", "tokio", "tower-service", "tracing", "want", ] +[[package]] +name = "hyper" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe575dd17d0862a9a33781c8c4696a55c320909004a67a00fb286ba8b1bc496d" +dependencies = [ + "bytes 1.4.0", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "httparse", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", + "want", +] + +[[package]] +name = "hyper-named-pipe" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73b7d8abf35697b81a825e386fc151e0d503e8cb5fcb93cc8669c376dfd6f278" +dependencies = [ + "hex", + "hyper 1.3.1", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", + "winapi", +] + [[package]] name = "hyper-rustls" version = "0.23.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c" dependencies = [ - "http", - "hyper", + "http 0.2.9", + "hyper 0.14.26", "log 0.4.18", "rustls 0.20.9", "rustls-native-certs", @@ -1609,8 +1759,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d78e1e73ec14cf7375674f74d7dde185c8206fd9dea6fb6295e8a98098aaa97" dependencies = [ "futures-util", - "http", - "hyper", + "http 0.2.9", + "hyper 0.14.26", "rustls 0.21.7", "tokio", "tokio-rustls 0.24.1", @@ -1622,12 +1772,47 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ - "hyper", + "hyper 0.14.26", "pin-project-lite", "tokio", "tokio-io-timeout", ] +[[package]] +name = "hyper-util" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa" +dependencies = [ + "bytes 1.4.0", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "hyper 1.3.1", + "pin-project-lite", + "socket2 0.5.6", + "tokio", + "tower", + "tower-service", + "tracing", +] + +[[package]] +name = "hyperlocal-next" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acf569d43fa9848e510358c07b80f4adf34084ddc28c6a4a651ee8474c070dcc" +dependencies = [ + "hex", + "http-body-util", + "hyper 1.3.1", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "iana-time-zone" version = "0.1.56" @@ -1681,6 +1866,7 @@ checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" dependencies = [ "autocfg", "hashbrown 0.12.3", + "serde", ] [[package]] @@ -1841,7 +2027,7 @@ dependencies = [ "base64 0.13.1", "bytes 1.4.0", "chrono", - "http", + "http 0.2.9", "percent-encoding", "schemars", "serde", @@ -1873,9 +2059,9 @@ dependencies = [ "dirs-next", "either", "futures", - "http", - "http-body", - "hyper", + "http 0.2.9", + "http-body 0.4.5", + "hyper 0.14.26", "hyper-rustls 0.23.2", "hyper-timeout", "jsonpath_lib", @@ -1904,7 +2090,7 @@ source = "git+https://github.com/deepflowio/kube?tag=0.74.2#c423734b258393beb101 dependencies = [ "chrono", "form_urlencoded", - "http", + "http 0.2.9", "json-patch", "k8s-openapi", "once_cell", @@ -1980,9 +2166,9 @@ checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" [[package]] name = "libc" -version = "0.2.149" +version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" +checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" [[package]] name = "libloading" @@ -2153,9 +2339,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.8" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" dependencies = [ "libc", "wasi 0.11.0+wasi-snapshot-preview1", @@ -2314,6 +2500,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "object" +version = "0.32.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441" +dependencies = [ + "memchr", +] + [[package]] name = "once_cell" version = "1.17.2" @@ -2334,7 +2529,7 @@ dependencies = [ "base64 0.12.3", "bytes 0.5.6", "chrono", - "http", + "http 0.2.9", "k8s-openapi", "percent-encoding", "serde", @@ -2534,9 +2729,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.9" +version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" +checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" [[package]] name = "pin-utils" @@ -2825,7 +3020,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml", - "socket2", + "socket2 0.4.9", "thiserror", "tonic", "tonic-build", @@ -2981,9 +3176,9 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", - "http-body", - "hyper", + "http 0.2.9", + "http-body 0.4.5", + "hyper 0.14.26", "hyper-rustls 0.24.1", "ipnet", "js-sys", @@ -3323,6 +3518,17 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_repr" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3081f5ffbb02284dda55132aa26daecedd7372a42417bbbab6f14ab7d6bb9145" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.32", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -3335,6 +3541,23 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "3.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c85f8e96d1d6857f13768fcbd895fcb06225510022a2774ed8b5150581847b0" +dependencies = [ + "base64 0.22.0", + "chrono", + "hex", + "indexmap 1.9.3", + "indexmap 2.0.0", + "serde", + "serde_derive", + "serde_json", + "time 0.3.21", +] + [[package]] name = "serde_yaml" version = "0.8.26" @@ -3403,9 +3626,9 @@ checksum = "826167069c09b99d56f31e9ae5c99049e932a98c9dc2dac47645b08dbbf76ba7" [[package]] name = "smallvec" -version = "1.10.0" +version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" [[package]] name = "socket2" @@ -3417,6 +3640,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "socket2" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "special_recv_engine" version = "0.1.0" @@ -3645,11 +3878,11 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.28.2" +version = "1.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94d7b1cfd2aa4011f2de74c2c4c63665e27a71006b0a192dcd2710272e73dfa2" +checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" dependencies = [ - "autocfg", + "backtrace", "bytes 1.4.0", "libc", "mio", @@ -3657,7 +3890,7 @@ dependencies = [ "parking_lot 0.12.1", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.5.6", "tokio-macros", "windows-sys 0.48.0", ] @@ -3674,9 +3907,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.1.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", @@ -3770,9 +4003,9 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", - "http-body", - "hyper", + "http 0.2.9", + "http-body 0.4.5", + "hyper 0.14.26", "hyper-timeout", "percent-encoding", "pin-project", @@ -3832,8 +4065,8 @@ dependencies = [ "bytes 1.4.0", "futures-core", "futures-util", - "http", - "http-body", + "http 0.2.9", + "http-body 0.4.5", "http-range-header", "pin-project-lite", "tower-layer", @@ -4204,7 +4437,7 @@ dependencies = [ "indexmap 2.0.0", "libc", "log 0.4.18", - "object", + "object 0.31.1", "once_cell", "paste", "psm", @@ -4290,9 +4523,9 @@ dependencies = [ "cranelift-frontend", "cranelift-native", "cranelift-wasm", - "gimli", + "gimli 0.27.2", "log 0.4.18", - "object", + "object 0.31.1", "target-lexicon", "thiserror", "wasmparser 0.110.0", @@ -4311,8 +4544,8 @@ dependencies = [ "cranelift-codegen", "cranelift-control", "cranelift-native", - "gimli", - "object", + "gimli 0.27.2", + "object 0.31.1", "target-lexicon", "wasmtime-environ", ] @@ -4325,10 +4558,10 @@ checksum = "ad336809866b743410ac86ec0bdc34899d6f1af5d3deed97188e90503ff527d7" dependencies = [ "anyhow", "cranelift-entity", - "gimli", + "gimli 0.27.2", "indexmap 2.0.0", "log 0.4.18", - "object", + "object 0.31.1", "serde", "target-lexicon", "thiserror", @@ -4359,15 +4592,15 @@ version = "12.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2004b30ea1ad9fd288bce54af19ef08281250e1087f0b5ffc6ca06bacd821edb" dependencies = [ - "addr2line", + "addr2line 0.20.0", "anyhow", "bincode 1.3.3", "cfg-if", "cpp_demangle", - "gimli", + "gimli 0.27.2", "ittapi", "log 0.4.18", - "object", + "object 0.31.1", "rustc-demangle", "rustix 0.38.21", "serde", @@ -4385,7 +4618,7 @@ version = "12.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "54aa8081162b13a96f47ab40f9aa03fc02dad38ee10b1418243ac8517c5af6d3" dependencies = [ - "object", + "object 0.31.1", "once_cell", "rustix 0.38.21", "wasmtime-versioned-export-macros", @@ -4493,8 +4726,8 @@ checksum = "f9d016c3f1d0c8ac905bfda51936cb6dae040e0d8edc75b7a1ef9f21773a19f6" dependencies = [ "anyhow", "cranelift-codegen", - "gimli", - "object", + "gimli 0.27.2", + "object 0.31.1", "target-lexicon", "wasmparser 0.110.0", "wasmtime-cranelift-shared", @@ -4672,7 +4905,7 @@ checksum = "38e6f2f344ec89998f047d0aa3aec77088eb8e33c91f5efdd191b140fda6fa40" dependencies = [ "anyhow", "cranelift-codegen", - "gimli", + "gimli 0.27.2", "regalloc2", "smallvec", "target-lexicon", diff --git a/agent/Cargo.toml b/agent/Cargo.toml index 23d5e268dec..c1ebfed50b2 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -14,6 +14,7 @@ arc-swap = "1.5.0" base64 = "0.21" bincode = "2.0.0-rc.1" bitflags = "1.3.2" +bollard = "0.16.1" bson = "2.7.0" bytesize = "1.1.0" cadence = "0.27.0" diff --git a/agent/src/config/config.rs b/agent/src/config/config.rs index 1a59f3aa642..70f5b703442 100644 --- a/agent/src/config/config.rs +++ b/agent/src/config/config.rs @@ -1159,7 +1159,7 @@ pub struct RuntimeConfig { pub vtap_group_id: String, #[serde(skip)] pub enabled: bool, - pub max_cpus: u32, + pub max_millicpus: u32, pub max_memory: u64, pub sync_interval: u64, // unit(second) pub platform_sync_interval: u64, // unit(second) @@ -1307,7 +1307,7 @@ impl RuntimeConfig { Self { vtap_group_id: Default::default(), enabled: true, - max_cpus: 1, + max_millicpus: 1000, max_memory: 768, sync_interval: 60, platform_sync_interval: 10, @@ -1487,7 +1487,16 @@ impl TryFrom for RuntimeConfig { let rc = Self { vtap_group_id: Default::default(), enabled: conf.enabled(), - max_cpus: conf.max_cpus(), + max_millicpus: { + // Compatible with max_cpus and max_millicpus, take the smaller value in milli-cores. + let max_cpus = conf.max_cpus() * 1000; + let max_millicpus = conf.max_millicpus.unwrap_or(max_cpus); // conf.max_millicpus may be None, handle the case where conf.max_millicpus is None + if max_cpus != 0 && max_millicpus != 0 { + max_cpus.min(max_millicpus) + } else { + max_cpus | max_millicpus + } + }, max_memory: (conf.max_memory() as u64) << 20, sync_interval: conf.sync_interval() as u64, platform_sync_interval: conf.platform_sync_interval() as u64, diff --git a/agent/src/config/handler.rs b/agent/src/config/handler.rs index a70ccb58ec8..5f33fe7d07d 100755 --- a/agent/src/config/handler.rs +++ b/agent/src/config/handler.rs @@ -65,14 +65,17 @@ use crate::{ handler::PacketHandlerBuilder, metric::document::TapSide, trident::{AgentComponents, RunningMode}, - utils::environment::{free_memory_check, get_container_mem_limit, running_in_container}, + utils::environment::{free_memory_check, running_in_container}, }; #[cfg(any(target_os = "linux", target_os = "android"))] use crate::{ dispatcher::recv_engine::af_packet::OptTpacketVersion, ebpf::CAP_LEN_MAX, platform::ProcRegRewrite, - utils::environment::{get_ctrl_ip_and_mac, is_tt_workload}, + utils::environment::{ + get_container_resource_limits, get_ctrl_ip_and_mac, is_tt_workload, + set_container_resource_limit, + }, }; #[cfg(target_os = "linux")] use crate::{ @@ -194,7 +197,7 @@ impl fmt::Debug for CollectorConfig { #[derive(Clone, Debug, PartialEq)] pub struct EnvironmentConfig { pub max_memory: u64, - pub max_cpus: u32, + pub max_millicpus: u32, pub process_threshold: u32, pub thread_threshold: u32, pub sys_free_memory_limit: u32, @@ -1202,10 +1205,7 @@ impl TryFrom<(Config, RuntimeConfig)> for ModuleConfig { type Error = ConfigError; fn try_from(conf: (Config, RuntimeConfig)) -> Result { - let (static_config, mut conf) = conf; - if running_in_container() { - conf.max_memory = get_container_mem_limit().unwrap_or(conf.max_memory); - } + let (static_config, conf) = conf; let controller_ip = static_config.controller_ips[0].parse::().unwrap(); let dest_ip = if conf.analyzer_ip.len() > 0 { conf.analyzer_ip.clone() @@ -1231,7 +1231,7 @@ impl TryFrom<(Config, RuntimeConfig)> for ModuleConfig { }, environment: EnvironmentConfig { max_memory: conf.max_memory, - max_cpus: conf.max_cpus, + max_millicpus: conf.max_millicpus, process_threshold: conf.process_threshold, thread_threshold: conf.thread_threshold, sys_free_memory_limit: conf.sys_free_memory_limit, @@ -1574,6 +1574,8 @@ impl TryFrom<(Config, RuntimeConfig)> for ModuleConfig { pub struct ConfigHandler { pub ctrl_ip: IpAddr, pub ctrl_mac: MacAddr, + pub container_cpu_limit: u32, // unit: milli-core + pub container_mem_limit: u64, // unit: bytes pub logger_handle: Option, // need update pub static_config: Config, @@ -1587,10 +1589,17 @@ impl ConfigHandler { ModuleConfig::try_from((config.clone(), RuntimeConfig::default())).unwrap(); let current_config = Arc::new(ArcSwap::from_pointee(candidate_config.clone())); + #[cfg(any(target_os = "linux", target_os = "android"))] + let (container_cpu_limit, container_mem_limit) = get_container_resource_limits(); + #[cfg(target_os = "windows")] + let (container_cpu_limit, container_mem_limit) = (0, 0); + Self { static_config: config, ctrl_ip, ctrl_mac, + container_cpu_limit, + container_mem_limit, candidate_config, current_config, logger_handle: None, @@ -2194,9 +2203,26 @@ impl ConfigHandler { candidate_config.environment.max_memory = new_config.environment.max_memory; } - if candidate_config.environment.max_cpus != new_config.environment.max_cpus { - info!("cpu limit set to {}", new_config.environment.max_cpus); - candidate_config.environment.max_cpus = new_config.environment.max_cpus; + if candidate_config.environment.max_millicpus != new_config.environment.max_millicpus { + info!( + "cpu limit set to {}", + new_config.environment.max_millicpus as f64 / 1000.0 + ); + candidate_config.environment.max_millicpus = new_config.environment.max_millicpus; + } + #[cfg(target_os = "linux")] + if running_in_container() { + if self.container_cpu_limit != candidate_config.environment.max_millicpus + || self.container_mem_limit != candidate_config.environment.max_memory + { + info!("current container cpu limit: {}, memory limit: {}bytes, set cpu limit {} and memory limit {}bytes", self.container_cpu_limit as f64 / 1000.0, self.container_mem_limit, candidate_config.environment.max_millicpus as f64 / 1000.0, candidate_config.environment.max_memory); + if let Err(e) = runtime.block_on(set_container_resource_limit( + candidate_config.environment.max_millicpus, + candidate_config.environment.max_memory, + )) { + warn!("set container resources limit failed: {:?}", e); + }; + } } } else { let mut system = sysinfo::System::new(); @@ -2204,15 +2230,16 @@ impl ConfigHandler { let max_memory = system.total_memory(); system.refresh_cpu(); let max_cpus = 1.max(system.cpus().len()) as u32; + let max_millicpus = max_cpus * 1000; if candidate_config.environment.max_memory != max_memory { info!("memory set ulimit when tap_mode=analyzer"); candidate_config.environment.max_memory = max_memory; } - if candidate_config.environment.max_cpus != max_cpus { + if candidate_config.environment.max_millicpus != max_millicpus { info!("cpu set ulimit when tap_mode=analyzer"); - candidate_config.environment.max_cpus = max_cpus; + candidate_config.environment.max_millicpus = max_millicpus; } } diff --git a/agent/src/monitor.rs b/agent/src/monitor.rs index 263268f99cb..f2c033b64af 100644 --- a/agent/src/monitor.rs +++ b/agent/src/monitor.rs @@ -224,9 +224,9 @@ impl RefCountable for SysStatusBroker { CounterValue::Unsigned(self.config.load().max_memory as u64), )); metrics.push(( - "max_cpus", + "max_millicpus", CounterType::Gauged, - CounterValue::Unsigned(self.config.load().max_cpus as u64), + CounterValue::Unsigned(self.config.load().max_millicpus as u64), )); metrics.push(( "system_free_memory_limit", diff --git a/agent/src/trident.rs b/agent/src/trident.rs index 3640400c70e..639b0b91f92 100644 --- a/agent/src/trident.rs +++ b/agent/src/trident.rs @@ -90,7 +90,7 @@ use crate::{ command::get_hostname, environment::{ check, controller_ip_check, free_memory_check, free_space_checker, get_ctrl_ip_and_mac, - get_env, kernel_check, running_in_container, tap_interface_check, + get_env, kernel_check, running_in_container, running_in_k8s, tap_interface_check, trident_process_check, }, guard::Guard, @@ -428,7 +428,6 @@ impl Trident { "use K8S_NODE_IP_FOR_DEEPFLOW env ip as destination_ip({})", ctrl_ip ); - warn!("When running in a container, the cpu and memory limits notified by deepflow-server will be ignored, please make sure to use K8s or docker for resource limits."); } #[cfg(target_os = "linux")] @@ -498,7 +497,7 @@ impl Trident { if matches!( config_handler.static_config.agent_mode, RunningMode::Managed - ) && running_in_container() + ) && running_in_k8s() && config_handler .static_config .kubernetes_cluster_id diff --git a/agent/src/utils/cgroups/linux.rs b/agent/src/utils/cgroups/linux.rs index 9527e1366b7..2a18054716d 100644 --- a/agent/src/utils/cgroups/linux.rs +++ b/agent/src/utils/cgroups/linux.rs @@ -133,7 +133,7 @@ impl Cgroups { let environment_config = self.config.clone(); let running = self.running.clone(); - let mut last_cpu = 0; + let mut last_millicpus = 0; let mut last_memory = 0; let cgroup = self.cgroup.clone(); let thread = thread::Builder::new() @@ -141,10 +141,10 @@ impl Cgroups { .spawn(move || { loop { let environment = environment_config.load(); - let max_cpus = environment.max_cpus; + let max_millicpus = environment.max_millicpus; let max_memory = environment.max_memory; - if max_cpus != last_cpu || max_memory != last_memory { - if let Err(e) = Self::apply(cgroup.clone(), max_cpus, max_memory) { + if max_millicpus != last_millicpus || max_memory != last_memory { + if let Err(e) = Self::apply(cgroup.clone(), max_millicpus, max_memory) { warn!( "apply cgroups resource failed, {}, deepflow-agent restart...", e @@ -153,7 +153,7 @@ impl Cgroups { break; } } - last_cpu = max_cpus; + last_millicpus = max_millicpus; last_memory = max_memory; let (running, timer) = &*running; @@ -175,9 +175,9 @@ impl Cgroups { } /// 更改资源限制 - pub fn apply(cgroup: Cgroup, max_cpus: u32, max_memory: u64) -> Result<(), Error> { + pub fn apply(cgroup: Cgroup, max_millicpus: u32, max_memory: u64) -> Result<(), Error> { let mut resources = Resources::default(); - let cpu_quota = max_cpus * DEFAULT_CPU_CFS_PERIOD_US; + let cpu_quota = max_millicpus * 100; // The unit of cpu_quota is 100_000 us. Convert max_millicpus to the unit of cpu_quota let cpu_resources = CpuResources { quota: Some(cpu_quota as i64), period: Some(DEFAULT_CPU_CFS_PERIOD_US as u64), diff --git a/agent/src/utils/environment.rs b/agent/src/utils/environment.rs index 1af85bfbba2..bac370f613e 100644 --- a/agent/src/utils/environment.rs +++ b/agent/src/utils/environment.rs @@ -16,71 +16,54 @@ use std::{ env::{self, VarError}, - fs, io, + fs, iter::Iterator, net::{IpAddr, Ipv4Addr, Ipv6Addr}, - path::{Path, PathBuf}, + path::Path, thread, time::Duration, }; -#[cfg(target_os = "windows")] -use std::{ffi::OsString, os::windows::ffi::OsStringExt, ptr}; -#[cfg(any(target_os = "linux", target_os = "android"))] -use std::{io::Read, os::unix::fs::MetadataExt}; use bytesize::ByteSize; -#[cfg(any(target_os = "linux"))] -use k8s_openapi::api::apps::v1::DaemonSet; -#[cfg(any(target_os = "linux"))] -use kube::{api::Api, Client, Config}; -#[cfg(any(target_os = "linux", target_os = "android"))] -use log::info; use log::{error, warn}; -#[cfg(any(target_os = "linux", target_os = "android"))] -use nom::AsBytes; use sysinfo::{DiskExt, System, SystemExt}; -#[cfg(target_os = "windows")] -use winapi::{ - shared::minwindef::{DWORD, MAX_PATH}, - um::libloaderapi::GetModuleFileNameW, + +use crate::{ + common::PROCESS_NAME, + config::K8S_CA_CRT_PATH, + error::{Error, Result}, + exception::ExceptionHandler, + utils::process::get_process_num_by_name, }; -use crate::common::PROCESS_NAME; -use crate::config::K8S_CA_CRT_PATH; -use crate::error::{Error, Result}; -use crate::exception::ExceptionHandler; -use public::proto::{common::TridentType, trident::Exception}; +use public::{ + proto::{common::TridentType, trident::Exception}, + utils::net::{ + addr_list, get_mac_by_ip, get_route_src_ip_and_mac, is_global, link_by_name, link_list, + LinkFlags, MacAddr, + }, +}; -#[cfg(target_os = "windows")] -use super::process::get_memory_rss; -use super::process::get_process_num_by_name; #[cfg(any(target_os = "linux", target_os = "android"))] -use public::utils::net::get_link_enabled_features; -use public::utils::net::{ - addr_list, get_mac_by_ip, get_route_src_ip_and_mac, is_global, link_by_name, link_list, - LinkFlags, MacAddr, -}; +mod linux; +#[cfg(any(target_os = "linux", target_os = "android"))] +pub use linux::*; +#[cfg(target_os = "windows")] +mod windows; +#[cfg(target_os = "windows")] +pub use self::windows::*; pub type Checker = Box Result<()>>; +const IN_CONTAINER: &str = "IN_CONTAINER"; // K8S environment node ip environment variable const K8S_NODE_IP_FOR_DEEPFLOW: &str = "K8S_NODE_IP_FOR_DEEPFLOW"; const ENV_INTERFACE_NAME: &str = "CTRL_NETWORK_INTERFACE"; const K8S_POD_IP_FOR_DEEPFLOW: &str = "K8S_POD_IP_FOR_DEEPFLOW"; -const IN_CONTAINER: &str = "IN_CONTAINER"; -pub const K8S_MEM_LIMIT_FOR_DEEPFLOW: &str = "K8S_MEM_LIMIT_FOR_DEEPFLOW"; -pub const K8S_NODE_NAME_FOR_DEEPFLOW: &str = "K8S_NODE_NAME_FOR_DEEPFLOW"; +const K8S_NODE_NAME_FOR_DEEPFLOW: &str = "K8S_NODE_NAME_FOR_DEEPFLOW"; const ONLY_WATCH_K8S_RESOURCE: &str = "ONLY_WATCH_K8S_RESOURCE"; const K8S_NAMESPACE_FOR_DEEPFLOW: &str = "K8S_NAMESPACE_FOR_DEEPFLOW"; -const BYTES_PER_MEGABYTE: u64 = 1024 * 1024; -const MIN_MEMORY_LIMIT_MEGABYTE: u64 = 128; // uint: Megabyte -const MAX_MEMORY_LIMIT_MEGABYTE: u64 = 100000; // uint: Megabyte - -#[cfg(any(target_os = "linux", target_os = "android"))] -const CORE_FILE_CONFIG: &str = "/proc/sys/kernel/core_pattern"; -#[cfg(any(target_os = "linux", target_os = "android"))] -const CORE_FILE_LIMIT: usize = 3; const DNS_HOST_IPV4: IpAddr = IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)); const DNS_HOST_IPV6: IpAddr = IpAddr::V6(Ipv6Addr::new(0x240c, 0, 0, 0, 0, 0, 0, 0x6666)); @@ -99,91 +82,6 @@ pub fn check(f: Checker) { } } -pub fn kernel_check() { - if cfg!(target_os = "windows") { - return; - } - - #[cfg(any(target_os = "linux", target_os = "android"))] - { - use nix::sys::utsname::uname; - const RECOMMENDED_KERNEL_VERSION: &str = "4.19.17"; - // kernel_version 形如 5.4.0-13格式 - let sys_uname = uname(); - if sys_uname - .release() - .trim() - .split_once('-') // `-` 后面数字是修改版本号的次数,可以用 `-` 分隔 - .unwrap_or_default() - .0 - .ne(RECOMMENDED_KERNEL_VERSION) - { - warn!( - "kernel version is not recommended({})", - RECOMMENDED_KERNEL_VERSION - ); - } - } -} - -pub fn tap_interface_check(tap_interfaces: &[String]) { - if cfg!(target_os = "windows") { - return; - } - - if tap_interfaces.is_empty() { - return error!("static-config: tap-interfaces is none in analyzer-mode"); - } - - #[cfg(any(target_os = "linux", target_os = "android"))] - for name in tap_interfaces { - let features = match get_link_enabled_features(name) { - Ok(f) => f, - Err(e) => { - warn!("{}, please check rx-vlan-offload manually", e); - continue; - } - }; - if features.contains("rx-vlan-hw-parse") { - warn!( - "NIC {} feature rx-vlan-offload is on, turn off if packet has vlan", - name - ); - } - } -} - -#[cfg(any(target_os = "linux", target_os = "android"))] -pub fn free_memory_check(_required: u64, _exception_handler: &ExceptionHandler) -> Result<()> { - return Ok(()); // fixme: The way to obtain free memory is different in earlier versions of Linux, which requires adaptation -} - -#[cfg(target_os = "windows")] -pub fn free_memory_check(required: u64, exception_handler: &ExceptionHandler) -> Result<()> { - get_memory_rss() - .map_err(|e| Error::Environment(e.to_string())) - .and_then(|memory_usage| { - if required < memory_usage { - return Ok(()); - } - - let still_need = required - memory_usage; - let mut system = System::new(); - system.refresh_memory(); - - if still_need <= system.available_memory() { - exception_handler.clear(Exception::MemNotEnough); - Ok(()) - } else { - exception_handler.set(Exception::MemNotEnough); - Err(Error::Environment(format!( - "need {} more memory to run", - ByteSize::b(still_need).to_string_as(true) - ))) - } - }) -} - pub fn free_memory_checker(required: u64, exception_handler: ExceptionHandler) -> Checker { Box::new(move || free_memory_check(required, &exception_handler)) } @@ -261,141 +159,6 @@ pub fn controller_ip_check(ips: &[String]) { crate::utils::notify_exit(-1); } -#[cfg(any(target_os = "linux", target_os = "android"))] -pub fn core_file_check() { - let core_path = fs::read(CORE_FILE_CONFIG); - if core_path.is_err() { - warn!( - "Core file read {} error: {}", - CORE_FILE_CONFIG, - core_path.unwrap_err() - ); - return; - } - let core_path = String::from_utf8(core_path.unwrap()); - if core_path.is_err() { - warn!( - "Core file parse {} error: {}", - CORE_FILE_CONFIG, - core_path.unwrap_err() - ); - return; - } - // core_path example: - // 1. "|/usr/libexec/abrt-hook-ccpp %s %c %p %u %g %t e %P %I %h" - let core_path = core_path.unwrap(); - if core_path.as_bytes()[0] == '|' as u8 { - warn!("The core file is configured with pipeline operation, failed to check."); - return; - } - - // core_path example: - // 1. "/" - // 1. "/core" - // 1. "/core%/" - // 1. "/core/core-%t-%p-%h" - let parts = core_path.split("/").collect::>(); - let core_path = if parts.len() <= 1 { - "/".to_string() - } else { - if parts[parts.len() - 1].find("%").is_none() { - parts.join("/") - } else { - parts[..parts.len() - 1].join("/") - } - }; - - info!("Check core-files in dir: {}", core_path); - - let context = fs::read_dir(core_path.clone()); - if context.is_err() { - warn!( - "Core file read dir {} error: {}.", - core_path, - context.unwrap_err() - ); - return; - } - - let mut core_files = vec![]; - // Traverse the directory to get the core file in the directory - for entry in context.unwrap() { - if entry.is_err() || !entry.as_ref().unwrap().path().is_file() { - continue; - } - let entry = entry.as_ref().unwrap(); - let file = fs::File::open(entry.path()); - if file.is_err() { - continue; - } - let mut file = file.unwrap(); - let mut elf_data = [0u8; 128]; - let n = file.read(&mut elf_data); - if n.is_err() { - continue; - } - let elf_data = &mut elf_data[..n.unwrap()]; - - // Check whether the file is a core file - let elf_header = elf::file::FileHeader::parse(&mut elf_data.as_bytes()); - if elf_header.is_err() { - continue; - } - let elf_header = elf_header.unwrap(); - if elf_header.elftype.0 != elf::gabi::ET_CORE { - continue; - } - - // Check whether the core file is generated by PROCESS_NAME - let mut elf_data = [0u8; 80000]; - let n = file.read(&mut elf_data); - if n.is_err() { - continue; - } - let elf_data = &mut elf_data[..n.unwrap()]; - unsafe { - if String::from_utf8_unchecked(elf_data.to_vec()) - .find(PROCESS_NAME) - .is_none() - { - continue; - } - } - - let meta_data = entry.metadata(); - if meta_data.is_err() { - continue; - } - let meta_data = meta_data.unwrap(); - let item = { - let last_modify_time = meta_data.mtime(); - let path = entry.file_name(); - if path.to_str().is_none() { - continue; - } - ( - last_modify_time, - format!("{}/{}", core_path, path.to_str().unwrap().to_string()), - ) - }; - - info!("Core file: {} {}.", item.0, item.1); - core_files.push(item); - } - - if core_files.len() > CORE_FILE_LIMIT { - core_files.sort_by(|a, b| b.0.cmp(&a.0)); - core_files[CORE_FILE_LIMIT..].iter().for_each(|x| { - let result = fs::remove_file(&x.1); - if result.is_err() { - warn!("Remove core file({}) error: {}.", x.1, result.unwrap_err()) - } else { - info!("Remove core file: {} {}.", x.0, x.1) - } - }); - } -} - pub fn trident_process_check(process_threshold: u32) { let process_num = get_process_num_by_name(PROCESS_NAME); @@ -471,49 +234,12 @@ pub fn running_in_k8s() -> bool { fs::metadata(K8S_CA_CRT_PATH).is_ok() } -fn container_mem_limit() -> Option { - let limit_files = [ - "/sys/fs/cgroup/memory.max", // If the docker image uses cgroups v2 - "/sys/fs/cgroup/memory/memory.limit_in_bytes", // If the docker image uses cgroups v1 - ]; - - limit_files.iter().find_map(|limit_file| { - fs::read_to_string(limit_file) - .ok() - .and_then(|content| content.trim().parse().ok()) - }) -} - -fn k8s_mem_limit() -> Option { - // Environment variable "K8S_MEM_LIMIT_FOR_DEEPFLOW" is set from container fields - // https://kubernetes.io/docs/tasks/inject-data-application/environment-variable-expose-pod-information/#use-container-fields-as-values-for-environment-variables - env::var(K8S_MEM_LIMIT_FOR_DEEPFLOW).ok().and_then(|v| { - v.parse::().ok().and_then(|v| { - if v < MIN_MEMORY_LIMIT_MEGABYTE || v > MAX_MEMORY_LIMIT_MEGABYTE { - warn!("the K8S_MEM_LIMIT_FOR_DEEPFLOW: {} Mi is out of [{} Mi, {} Mi], use the limit value from server instead", v, MIN_MEMORY_LIMIT_MEGABYTE, MAX_MEMORY_LIMIT_MEGABYTE); - None - } else { - Some(v * BYTES_PER_MEGABYTE) - } - }) - }) -} - -pub fn get_container_mem_limit() -> Option { - if running_in_k8s() { - k8s_mem_limit() - } else { - container_mem_limit() - } -} - pub fn get_env() -> String { let items = vec![ K8S_NODE_IP_FOR_DEEPFLOW, ENV_INTERFACE_NAME, K8S_POD_IP_FOR_DEEPFLOW, IN_CONTAINER, - K8S_MEM_LIMIT_FOR_DEEPFLOW, ONLY_WATCH_K8S_RESOURCE, K8S_NAMESPACE_FOR_DEEPFLOW, ]; @@ -532,90 +258,6 @@ pub fn get_k8s_namespace() -> String { env::var(K8S_NAMESPACE_FOR_DEEPFLOW).unwrap_or("deepflow".to_owned()) } -#[cfg(any(target_os = "linux"))] -pub async fn get_current_k8s_image() -> Option { - if !running_in_k8s() { - return None; - } - let Ok(mut config) = Config::infer().await else { - warn!("failed to infer kubernetes config"); - return None; - }; - config.accept_invalid_certs = true; - - let Ok(client) = Client::try_from(config) else { - warn!("failed to create kubernetes client"); - return None; - }; - - let daemonsets: Api = Api::namespaced(client, &get_k8s_namespace()); - - let Ok(daemonset) = daemonsets.get(public::consts::DAEMONSET_NAME).await else { - warn!("failed to get daemonsets"); - return None; - }; - - // Referer: https://kubernetes.io/zh-cn/docs/reference/kubernetes-api/workload-resources/pod-v1/#Container - // The deepflow-agent DaemonSet.spec format is as follows: - // { - // "spec":{ - // "template":{ - // "spec":{ - // "containers":[{ - // "name":"deepflow-agent", - // "image":"deepflow-agent:latest", - // }] - // } - // } - // } - // } - if let Some(spec) = daemonset.spec { - if let Some(s) = spec.template.spec { - for container in s.containers { - return Some(container.image.unwrap_or_default()); - } - } - } - None -} - -#[cfg(any(target_os = "linux", target_os = "android"))] -pub fn get_executable_path() -> Result { - let possible_paths = vec![ - "/proc/self/exe".to_owned(), - "/proc/curproc/exe".to_owned(), - "/proc/curproc/file".to_owned(), - format!("/proc/{}/path/a.out", std::process::id()), - ]; - for path in possible_paths { - if let Ok(path) = fs::read_link(path) { - return Ok(path); - } - } - Err(io::Error::new( - io::ErrorKind::NotFound, - "executable path not found", - )) -} - -#[cfg(target_os = "windows")] -pub fn get_executable_path() -> Result { - let mut buf = Vec::with_capacity(MAX_PATH); - unsafe { - let ret = GetModuleFileNameW(ptr::null_mut(), buf.as_mut_ptr(), MAX_PATH as DWORD) as usize; - if ret > 0 && ret < MAX_PATH { - buf.set_len(ret); - let s = OsString::from_wide(&buf); - Ok(s.into()) - } else { - Err(io::Error::new( - io::ErrorKind::NotFound, - "executable path not found", - )) - } - } -} - pub fn get_mac_by_name(src_interface: String) -> u32 { if src_interface.is_empty() { return 0; diff --git a/agent/src/utils/environment/linux.rs b/agent/src/utils/environment/linux.rs new file mode 100644 index 00000000000..1f679272533 --- /dev/null +++ b/agent/src/utils/environment/linux.rs @@ -0,0 +1,400 @@ +/* + * Copyright (c) 2024 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::{ + fs, + io::{self, Read}, + iter::Iterator, + os::unix::fs::MetadataExt, + path::PathBuf, +}; + +use bollard::{container::UpdateContainerOptions, Docker}; +use k8s_openapi::{api::apps::v1::DaemonSet, apimachinery::pkg::api::resource::Quantity}; +use kube::{ + api::{Api, Patch, PatchParams}, + Client, Config, +}; +use log::{error, info, warn}; +use nom::AsBytes; + +use public::utils::net::get_link_enabled_features; + +use super::{get_k8s_namespace, running_in_container, running_in_k8s}; +use crate::{ + common::{CONTAINER_NAME, DAEMONSET_NAME, PROCESS_NAME}, + error::{Error, Result}, + exception::ExceptionHandler, +}; + +const CORE_FILE_CONFIG: &str = "/proc/sys/kernel/core_pattern"; +const CORE_FILE_LIMIT: usize = 3; + +pub fn free_memory_check(_required: u64, _exception_handler: &ExceptionHandler) -> Result<()> { + return Ok(()); // fixme: The way to obtain free memory is different in earlier versions of Linux, which requires adaptation +} + +pub fn kernel_check() { + use nix::sys::utsname::uname; + const RECOMMENDED_KERNEL_VERSION: &str = "4.19.17"; + // The `kernel_version` is in the format of 5.4.0-13 + let sys_uname = uname(); + if sys_uname + .release() + .trim() + .split_once('-') // The number after "-" represents the number of times the version has been modified, and it is separated by "-" + .unwrap_or_default() + .0 + .ne(RECOMMENDED_KERNEL_VERSION) + { + warn!( + "kernel version is not recommended({})", + RECOMMENDED_KERNEL_VERSION + ); + } +} + +pub fn tap_interface_check(tap_interfaces: &[String]) { + if tap_interfaces.is_empty() { + return error!("static-config: tap-interfaces is none in analyzer-mode"); + } + + for name in tap_interfaces { + let features = match get_link_enabled_features(name) { + Ok(f) => f, + Err(e) => { + warn!("{}, please check rx-vlan-offload manually", e); + continue; + } + }; + if features.contains("rx-vlan-hw-parse") { + warn!( + "NIC {} feature rx-vlan-offload is on, turn off if packet has vlan", + name + ); + } + } +} + +pub fn core_file_check() { + let core_path = fs::read(CORE_FILE_CONFIG); + if core_path.is_err() { + warn!( + "Core file read {} error: {}", + CORE_FILE_CONFIG, + core_path.unwrap_err() + ); + return; + } + let core_path = String::from_utf8(core_path.unwrap()); + if core_path.is_err() { + warn!( + "Core file parse {} error: {}", + CORE_FILE_CONFIG, + core_path.unwrap_err() + ); + return; + } + // core_path example: + // 1. "|/usr/libexec/abrt-hook-ccpp %s %c %p %u %g %t e %P %I %h" + let core_path = core_path.unwrap(); + if core_path.as_bytes()[0] == '|' as u8 { + warn!("The core file is configured with pipeline operation, failed to check."); + return; + } + + // core_path example: + // 1. "/" + // 1. "/core" + // 1. "/core%/" + // 1. "/core/core-%t-%p-%h" + let parts = core_path.split("/").collect::>(); + let core_path = if parts.len() <= 1 { + "/".to_string() + } else { + if parts[parts.len() - 1].find("%").is_none() { + parts.join("/") + } else { + parts[..parts.len() - 1].join("/") + } + }; + + info!("Check core-files in dir: {}", core_path); + + let context = fs::read_dir(core_path.clone()); + if context.is_err() { + warn!( + "Core file read dir {} error: {}.", + core_path, + context.unwrap_err() + ); + return; + } + + let mut core_files = vec![]; + // Traverse the directory to get the core file in the directory + for entry in context.unwrap() { + if entry.is_err() || !entry.as_ref().unwrap().path().is_file() { + continue; + } + let entry = entry.as_ref().unwrap(); + let file = fs::File::open(entry.path()); + if file.is_err() { + continue; + } + let mut file = file.unwrap(); + let mut elf_data = [0u8; 128]; + let n = file.read(&mut elf_data); + if n.is_err() { + continue; + } + let elf_data = &mut elf_data[..n.unwrap()]; + + // Check whether the file is a core file + let elf_header = elf::file::FileHeader::parse(&mut elf_data.as_bytes()); + if elf_header.is_err() { + continue; + } + let elf_header = elf_header.unwrap(); + if elf_header.elftype.0 != elf::gabi::ET_CORE { + continue; + } + + // Check whether the core file is generated by PROCESS_NAME + let mut elf_data = [0u8; 80000]; + let n = file.read(&mut elf_data); + if n.is_err() { + continue; + } + let elf_data = &mut elf_data[..n.unwrap()]; + unsafe { + if String::from_utf8_unchecked(elf_data.to_vec()) + .find(PROCESS_NAME) + .is_none() + { + continue; + } + } + + let meta_data = entry.metadata(); + if meta_data.is_err() { + continue; + } + let meta_data = meta_data.unwrap(); + let item = { + let last_modify_time = meta_data.mtime(); + let path = entry.file_name(); + if path.to_str().is_none() { + continue; + } + ( + last_modify_time, + format!("{}/{}", core_path, path.to_str().unwrap().to_string()), + ) + }; + + info!("Core file: {} {}.", item.0, item.1); + core_files.push(item); + } + + if core_files.len() > CORE_FILE_LIMIT { + core_files.sort_by(|a, b| b.0.cmp(&a.0)); + core_files[CORE_FILE_LIMIT..].iter().for_each(|x| { + let result = fs::remove_file(&x.1); + if result.is_err() { + warn!("Remove core file({}) error: {}.", x.1, result.unwrap_err()) + } else { + info!("Remove core file: {} {}.", x.0, x.1) + } + }); + } +} + +pub fn get_executable_path() -> Result { + let possible_paths = vec![ + "/proc/self/exe".to_owned(), + "/proc/curproc/exe".to_owned(), + "/proc/curproc/file".to_owned(), + format!("/proc/{}/path/a.out", std::process::id()), + ]; + for path in possible_paths { + if let Ok(path) = fs::read_link(path) { + return Ok(path); + } + } + Err(io::Error::new( + io::ErrorKind::NotFound, + "executable path not found", + )) +} + +pub async fn get_current_k8s_image() -> Option { + if !running_in_k8s() { + return None; + } + + let daemonsets = get_k8s_daemonsets().await.ok()?; + + let Ok(daemonset) = daemonsets.get(DAEMONSET_NAME).await else { + warn!("failed to get daemonsets"); + return None; + }; + + // Referer: https://kubernetes.io/zh-cn/docs/reference/kubernetes-api/workload-resources/pod-v1/#Container + // The deepflow-agent DaemonSet.spec format is as follows: + // { + // "spec":{ + // "template":{ + // "spec":{ + // "containers":[{ + // "name":"deepflow-agent", + // "image":"deepflow-agent:latest", + // }] + // } + // } + // } + // } + if let Some(spec) = daemonset.spec { + if let Some(s) = spec.template.spec { + for container in s.containers { + return Some(container.image.unwrap_or_default()); + } + } + } + None +} + +pub fn get_container_resource_limits() -> (u32, u64) { + if !running_in_container() { + return (0, 0); + } + let cpu_cgroups_files = [ + "/sys/fs/cgroup/cpu.max", // If the container image uses cgroups v2, the format of the file is: {cfs_quota_us} {cfs_period_us}, for example: 100000 100000 + "/sys/fs/cgroup/cpu/cpu.cfs_quota_us", // If the container image uses cgroups v1, the format of the file is: {cfs_quota_us}, for example: 100000 + ]; + let mem_cgroups_files = [ + "/sys/fs/cgroup/memory.max", // unit: bytes + "/sys/fs/cgroup/memory/memory.limit_in_bytes", + ]; + + let milli_cpu_limit = cpu_cgroups_files + .iter() + .find_map(|f| { + fs::read_to_string(f) + .ok()? + .split_whitespace() + .next() + .and_then(|value| value.parse::().map(|m| m / 100).ok()) // convert to milli-core + }) + .unwrap_or_default(); + let memory_limit = mem_cgroups_files + .iter() + .find_map(|f| { + fs::read_to_string(f) + .ok() + .and_then(|content| content.trim().parse::().ok()) + }) + .unwrap_or_default(); + + (milli_cpu_limit, memory_limit) +} + +pub async fn set_docker_resource_limits( + milli_cpu_limit: u32, // unit: milli-core + memory_limit: u64, +) -> Result<(), Error> { + let docker = Docker::connect_with_local_defaults() + .map_err(|e| Error::Environment(format!("connet docker failed: {:?}", e)))?; + let update_options = UpdateContainerOptions:: { + nano_cp_us: Some((milli_cpu_limit * 1_000_000) as i64), + memory: Some(memory_limit as i64), + memory_swap: Some((memory_limit * 2) as i64), + ..Default::default() + }; + docker + .update_container(CONTAINER_NAME, update_options) + .await + .map(|_| ()) + .map_err(|e| { + Error::Environment(format!( + "set milli_cpu_limit: {}, set memory_limit: {}bytes, update docker container failed: {:?}", + milli_cpu_limit as f64 / 1000.0, memory_limit, e + )) + }) +} + +pub async fn get_k8s_daemonsets() -> Result, Error> { + let mut config = Config::infer() + .await + .map_err(|e| Error::Environment(format!("get k8s config failed: {:?}", e)))?; + config.accept_invalid_certs = true; + let client = Client::try_from(config) + .map(|c| c) + .map_err(|e| Error::Environment(format!("get k8s client failed: {:?}", e)))?; + Ok(Api::namespaced(client, &get_k8s_namespace())) +} + +pub async fn set_k8s_resource_limits(milli_cpu_limit: u32, memory_limit: u64) -> Result<(), Error> { + let daemonsets = get_k8s_daemonsets().await?; + + let mut resource_limits = std::collections::BTreeMap::new(); + resource_limits.insert("cpu".to_string(), Quantity(format!("{}m", milli_cpu_limit))); + resource_limits.insert( + "memory".to_string(), + Quantity(format!("{}Mi", (memory_limit / (1024 * 1024)))), + ); + let patch = serde_json::json!({ + "apiVersion": "apps/v1", + "kind": "DaemonSet", + "spec": { + "template": { + "spec": { + "containers": [ + { + "name": CONTAINER_NAME, + "resources": { + "limits": resource_limits, + }, + } + ] + } + } + } + }); + let params = PatchParams::default(); + let patch = Patch::Strategic(&patch); + daemonsets + .patch(DAEMONSET_NAME, ¶ms, &patch) + .await + .map(|_| ()) + .map_err(|e| { + Error::Environment(format!( + "patch k8s daemonset {} failed: {:?}, patch value: {:?}", + DAEMONSET_NAME, e, patch + )) + }) +} + +pub async fn set_container_resource_limit( + milli_cpu_limit: u32, + memory_limit: u64, +) -> Result<(), Error> { + if running_in_k8s() { + set_k8s_resource_limits(milli_cpu_limit, memory_limit).await + } else { + set_docker_resource_limits(milli_cpu_limit, memory_limit).await + } +} diff --git a/agent/src/utils/environment/windows.rs b/agent/src/utils/environment/windows.rs new file mode 100644 index 00000000000..f4a95198188 --- /dev/null +++ b/agent/src/utils/environment/windows.rs @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2024 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::{ffi::OsString, io, os::windows::ffi::OsStringExt, path::PathBuf, ptr}; + +use bytesize::ByteSize; +use sysinfo::{System, SystemExt}; +use winapi::{ + shared::minwindef::{DWORD, MAX_PATH}, + um::libloaderapi::GetModuleFileNameW, +}; + +use crate::{ + error::{Error, Result}, + exception::ExceptionHandler, + utils::process::get_memory_rss, +}; +use public::proto::trident::Exception; + +pub fn free_memory_check(required: u64, exception_handler: &ExceptionHandler) -> Result<()> { + get_memory_rss() + .map_err(|e| Error::Environment(e.to_string())) + .and_then(|memory_usage| { + if required < memory_usage { + return Ok(()); + } + + let still_need = required - memory_usage; + let mut system = System::new(); + system.refresh_memory(); + + if still_need <= system.available_memory() { + exception_handler.clear(Exception::MemNotEnough); + Ok(()) + } else { + exception_handler.set(Exception::MemNotEnough); + Err(Error::Environment(format!( + "need {} more memory to run", + ByteSize::b(still_need).to_string_as(true) + ))) + } + }) +} + +pub fn kernel_check() {} + +pub fn tap_interface_check(_tap_interfaces: &[String]) {} + +pub fn get_executable_path() -> Result { + let mut buf = Vec::with_capacity(MAX_PATH); + unsafe { + let ret = GetModuleFileNameW(ptr::null_mut(), buf.as_mut_ptr(), MAX_PATH as DWORD) as usize; + if ret > 0 && ret < MAX_PATH { + buf.set_len(ret); + let s = OsString::from_wide(&buf); + Ok(s.into()) + } else { + Err(io::Error::new( + io::ErrorKind::NotFound, + "executable path not found", + )) + } + } +} diff --git a/agent/src/utils/guard.rs b/agent/src/utils/guard.rs index d5596df1905..725ab7062e4 100644 --- a/agent/src/utils/guard.rs +++ b/agent/src/utils/guard.rs @@ -264,7 +264,7 @@ impl Guard { return false; } }; - (cpu_limit * 100) as f32 > cpu_usage + (cpu_limit / 10) as f32 > cpu_usage // The cpu_usage is in percentage, and the unit of cpu_limit is milli-cores. Divide cpu_limit by 10 to align the units } pub fn start(&self) { @@ -300,7 +300,7 @@ impl Guard { loop { let config = config.load(); let tap_mode = config.tap_mode; - let cpu_limit = config.max_cpus; + let cpu_limit = config.max_millicpus; let mut system_guard = system.lock().unwrap(); if !system_guard.refresh_process_specifics(pid, ProcessRefreshKind::new().with_cpu()) { warn!("refresh process with cpu failed"); diff --git a/manifests/deepflow-agent-docker-compose/docker-compose.yaml b/manifests/deepflow-agent-docker-compose/docker-compose.yaml index 7e710931f54..95b09d8ae15 100644 --- a/manifests/deepflow-agent-docker-compose/docker-compose.yaml +++ b/manifests/deepflow-agent-docker-compose/docker-compose.yaml @@ -23,5 +23,6 @@ services: volumes: - /etc/deepflow-agent.yaml:/etc/deepflow-agent/deepflow-agent.yaml:ro - /sys/kernel/debug:/sys/kernel/debug:ro + - /var/run/docker.sock:/var/run/docker.sock:ro network_mode: "host" pid: "host" \ No newline at end of file