diff --git a/agent/Cargo.lock b/agent/Cargo.lock index d2e00ef15a1..ded8f8bd877 100644 --- a/agent/Cargo.lock +++ b/agent/Cargo.lock @@ -974,6 +974,7 @@ dependencies = [ "public", "rand", "regex", + "reorder", "reqwest", "ring", "roxmltree", @@ -3164,6 +3165,13 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78" +[[package]] +name = "reorder" +version = "0.1.0" +dependencies = [ + "public", +] + [[package]] name = "reqwest" version = "0.11.18" diff --git a/agent/Cargo.toml b/agent/Cargo.toml index dcfb7b921f9..f15a0355b89 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -90,6 +90,7 @@ cgroups-rs = "0.2.9" nix = "0.23" pcap = "0.9.1" procfs = { git = "https://github.com/deepflowio/procfs/" } +reorder = { path = "plugins/reorder" } [target.'cfg(target_os = "linux")'.dependencies] k8s-openapi = { version = "^0.15", features = ["v1_19", "schemars"] } diff --git a/agent/crates/public/src/l7_protocol.rs b/agent/crates/public/src/l7_protocol.rs index 37e1a8d55a1..9d675a828b9 100644 --- a/agent/crates/public/src/l7_protocol.rs +++ b/agent/crates/public/src/l7_protocol.rs @@ -150,3 +150,8 @@ impl L7ProtocolEnum { } } } + +pub trait L7ProtocolChecker { + fn is_disabled(&self, p: L7Protocol) -> bool; + fn is_enabled(&self, p: L7Protocol) -> bool; +} diff --git a/agent/plugins/reorder/Cargo.toml b/agent/plugins/reorder/Cargo.toml new file mode 100644 index 00000000000..ae79d8434a9 --- /dev/null +++ b/agent/plugins/reorder/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "reorder" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +public = { path = "../../crates/public"} diff --git a/agent/plugins/reorder/src/lib.rs b/agent/plugins/reorder/src/lib.rs new file mode 100644 index 00000000000..94189615f66 --- /dev/null +++ b/agent/plugins/reorder/src/lib.rs @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2024 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::any::Any; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering::Relaxed}; +use std::sync::Arc; + +use public::{ + counter, + l7_protocol::{L7Protocol, L7ProtocolChecker}, +}; + +pub trait Downcast { + fn as_any_mut(&mut self) -> &mut dyn Any; + fn into_any(self: Box) -> Box; +} + +pub trait CacheItem: Downcast { + // Distinguish different flows + fn get_id(&self) -> u64; + // Used for sorting + fn get_seq(&self) -> u64; + // Time in seconds + fn get_timestmap(&self) -> u64; + fn get_l7_protocol(&self) -> L7Protocol; +} + +#[derive(Default)] +pub struct ReorderCounter { + drop_before_window: AtomicU64, + drop_out_of_order: AtomicU64, + flow_counter: AtomicU64, + packet_couter: AtomicU64, + closed: AtomicBool, +} + +pub struct StatsReorderCounter(Arc); + +impl StatsReorderCounter { + pub fn new(count: Arc) -> Self { + Self(count) + } +} + +impl counter::OwnedCountable for StatsReorderCounter { + fn closed(&self) -> bool { + self.0.closed.load(Relaxed) + } + + fn get_counters(&self) -> Vec { + vec![ + ( + "drop-before-window", + counter::CounterType::Counted, + counter::CounterValue::Unsigned(self.0.drop_before_window.swap(0, Relaxed)), + ), + ( + "drop-out-of-order", + counter::CounterType::Counted, + counter::CounterValue::Unsigned(self.0.drop_out_of_order.swap(0, Relaxed)), + ), + ( + "flow-counter", + counter::CounterType::Counted, + counter::CounterValue::Unsigned(self.0.flow_counter.load(Relaxed)), + ), + ( + "packet-counter", + counter::CounterType::Counted, + counter::CounterValue::Unsigned(self.0.packet_couter.load(Relaxed)), + ), + ] + } +} + +pub struct Reorder { + counter: Arc, +} + +impl Reorder { + pub fn new(_: Box, counter: Arc, _: usize) -> Self { + Self { counter } + } + + pub fn flush(&mut self, _: u64) -> Vec> { + vec![] + } + + pub fn inject_item(&mut self, item: Box) -> Vec> { + vec![item] + } +} + +impl Drop for Reorder { + fn drop(&mut self) { + self.counter.closed.store(true, Relaxed); + } +} diff --git a/agent/src/common/l7_protocol_log.rs b/agent/src/common/l7_protocol_log.rs index 11e9c2cac99..d8e9359daf5 100644 --- a/agent/src/common/l7_protocol_log.rs +++ b/agent/src/common/l7_protocol_log.rs @@ -48,7 +48,7 @@ use crate::plugin::c_ffi::SoPluginFunc; use crate::plugin::wasm::WasmVm; use public::enums::IpProtocol; -use public::l7_protocol::{CustomProtocol, L7Protocol, L7ProtocolEnum}; +use public::l7_protocol::{CustomProtocol, L7Protocol, L7ProtocolChecker, L7ProtocolEnum}; /* 所有协议都需要实现L7ProtocolLogInterface这个接口. @@ -514,12 +514,14 @@ impl L7ProtocolBitmap { pub fn set_disabled(&mut self, p: L7Protocol) { self.0 &= !(1 << (p as u128)); } +} - pub fn is_disabled(&self, p: L7Protocol) -> bool { +impl L7ProtocolChecker for L7ProtocolBitmap { + fn is_disabled(&self, p: L7Protocol) -> bool { self.0 & (1 << (p as u128)) == 0 } - pub fn is_enabled(&self, p: L7Protocol) -> bool { + fn is_enabled(&self, p: L7Protocol) -> bool { !self.is_disabled(p) } } diff --git a/agent/src/common/meta_packet.rs b/agent/src/common/meta_packet.rs index 5a06826f755..9d8b70620f9 100644 --- a/agent/src/common/meta_packet.rs +++ b/agent/src/common/meta_packet.rs @@ -14,6 +14,7 @@ * limitations under the License. */ +use std::any::Any; use std::fmt; use std::net::{IpAddr, Ipv4Addr}; use std::ops::Deref; @@ -47,8 +48,9 @@ use crate::error; use crate::{ common::ebpf::{GO_HTTP2_UPROBE, GO_HTTP2_UPROBE_DATA}, ebpf::{ - MSG_REQUEST_END, MSG_RESPONSE_END, PACKET_KNAME_MAX_PADDING, SK_BPF_DATA, SOCK_DATA_HTTP2, - SOCK_DATA_TLS_HTTP2, SOCK_DIR_RCV, SOCK_DIR_SND, + MSG_REASM_SEG, MSG_REASM_START, MSG_REQUEST_END, MSG_RESPONSE_END, + PACKET_KNAME_MAX_PADDING, SK_BPF_DATA, SOCK_DATA_HTTP2, SOCK_DATA_TLS_HTTP2, SOCK_DIR_RCV, + SOCK_DIR_SND, }, }; use crate::{ @@ -61,6 +63,7 @@ use public::{ buffer::BatchedBuffer, utils::net::{is_unicast_link_local, MacAddr}, }; +use reorder::{CacheItem, Downcast}; #[derive(Clone, Debug)] pub enum RawPacket<'a> { @@ -108,6 +111,32 @@ bitflags! { } } +#[cfg(any(target_os = "linux", target_os = "android"))] +#[derive(PartialEq, Clone, Debug)] +pub enum SegmentFlags { + None, + Start, + Seg, +} + +#[cfg(any(target_os = "linux", target_os = "android"))] +impl Default for SegmentFlags { + fn default() -> Self { + SegmentFlags::None + } +} + +#[cfg(any(target_os = "linux", target_os = "android"))] +impl From for SegmentFlags { + fn from(value: u8) -> Self { + match value { + MSG_REASM_START => SegmentFlags::Start, + MSG_REASM_SEG => SegmentFlags::Seg, + _ => SegmentFlags::None, + } + } +} + #[derive(Clone, Debug, Default)] pub struct MetaPacket<'a> { // 主机序, 不因L2End1而颠倒, 端口会在查询策略时被修改 @@ -168,6 +197,8 @@ pub struct MetaPacket<'a> { pub is_request_end: bool, pub is_response_end: bool, pub ebpf_flags: EbpfFlags, + #[cfg(any(target_os = "linux", target_os = "android"))] + pub segment_flags: SegmentFlags, pub process_id: u32, pub pod_id: u32, @@ -901,6 +932,14 @@ impl<'a> MetaPacket<'a> { 0 } + #[cfg(any(target_os = "linux", target_os = "android"))] + pub fn merge(&mut self, packet: &mut MetaPacket) { + self.raw_from_ebpf.append(&mut packet.raw_from_ebpf); + self.packet_len += packet.packet_len - 54; + self.payload_len += packet.payload_len; + self.l4_payload_len += packet.l4_payload_len; + } + #[cfg(any(target_os = "linux", target_os = "android"))] pub unsafe fn from_ebpf(data: *mut SK_BPF_DATA) -> Result, Box> { let data = &mut data.read_unaligned(); @@ -991,6 +1030,7 @@ impl<'a> MetaPacket<'a> { } else { EbpfFlags::NONE }; + packet.segment_flags = SegmentFlags::from(data.msg_type); // 目前只有 go uprobe http2 的方向判断能确保准确 if data.source == GO_HTTP2_UPROBE || data.source == GO_HTTP2_UPROBE_DATA { @@ -1105,6 +1145,34 @@ impl<'a> fmt::Display for MetaPacket<'a> { } } +impl Downcast for MetaPacket<'static> { + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + + fn into_any(self: Box) -> Box { + self + } +} + +impl CacheItem for MetaPacket<'static> { + fn get_id(&self) -> u64 { + self.generate_ebpf_flow_id() + } + + fn get_seq(&self) -> u64 { + self.cap_seq + } + + fn get_timestmap(&self) -> u64 { + self.lookup_key.timestamp.as_secs() + } + + fn get_l7_protocol(&self) -> L7Protocol { + self.l7_protocol_from_ebpf + } +} + #[derive(Clone, Debug, Default)] pub struct MetaPacketTcpHeader { pub seq: u32, diff --git a/agent/src/config/config.rs b/agent/src/config/config.rs index 070eaf7e6eb..116bd1089f8 100644 --- a/agent/src/config/config.rs +++ b/agent/src/config/config.rs @@ -385,6 +385,9 @@ pub struct EbpfYamlConfig { pub java_symbol_file_refresh_defer_interval: Duration, pub on_cpu_profile: OnCpuProfile, pub off_cpu_profile: OffCpuProfile, + pub syscall_out_of_order_cache_size: usize, + pub syscall_out_of_order_reassembly: Vec, + pub syscall_segmentation_reassembly: Vec, } impl Default for EbpfYamlConfig { @@ -409,6 +412,9 @@ impl Default for EbpfYamlConfig { java_symbol_file_refresh_defer_interval: Duration::from_secs(600), on_cpu_profile: OnCpuProfile::default(), off_cpu_profile: OffCpuProfile::default(), + syscall_out_of_order_reassembly: vec![], + syscall_segmentation_reassembly: vec![], + syscall_out_of_order_cache_size: 16, } } } @@ -796,6 +802,9 @@ impl YamlConfig { .off_cpu_profile .min_block .clamp(Duration::from_micros(0), Duration::from_micros(3600000000)); + if !(8..=128).contains(&c.ebpf.syscall_out_of_order_cache_size) { + c.ebpf.syscall_out_of_order_cache_size = 16; + } if c.guard_interval < Duration::from_secs(1) || c.guard_interval > Duration::from_secs(3600) { diff --git a/agent/src/ebpf/mod.rs b/agent/src/ebpf/mod.rs index f233331ecaa..497802a1cf3 100644 --- a/agent/src/ebpf/mod.rs +++ b/agent/src/ebpf/mod.rs @@ -27,7 +27,7 @@ use std::fmt; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; // 最大长度 -pub const CAP_LEN_MAX: usize = 8192; +pub const CAP_LEN_MAX: usize = 16384; // process_kname is up to 16 bytes, if the length of process_kname exceeds 15, the ending char is '\0' pub const PACKET_KNAME_MAX_PADDING: usize = 15; diff --git a/agent/src/ebpf_dispatcher/ebpf_dispatcher.rs b/agent/src/ebpf_dispatcher/ebpf_dispatcher.rs index e1062957315..5ebf1949c97 100644 --- a/agent/src/ebpf_dispatcher/ebpf_dispatcher.rs +++ b/agent/src/ebpf_dispatcher/ebpf_dispatcher.rs @@ -28,11 +28,12 @@ use libc::{c_int, c_ulonglong}; use log::{debug, error, info, warn}; use super::{Error, Result}; +use crate::common::ebpf::EbpfType; use crate::common::flow::L7Stats; use crate::common::l7_protocol_log::{ get_all_protocol, L7ProtocolBitmap, L7ProtocolParserInterface, }; -use crate::common::meta_packet::MetaPacket; +use crate::common::meta_packet::{MetaPacket, SegmentFlags}; use crate::common::proc_event::{BoxedProcEvents, EventType, ProcEvent}; use crate::common::{FlowAclListener, FlowAclListenerId, TaggedFlow}; use crate::config::handler::{CollectorAccess, EbpfAccess, EbpfConfig, LogParserAccess}; @@ -42,18 +43,20 @@ use crate::exception::ExceptionHandler; use crate::flow_generator::{flow_map::Config, AppProto, FlowMap}; use crate::integration_collector::Profile; use crate::policy::PolicyGetter; +use crate::rpc::get_timestamp; use crate::utils::stats; use public::{ buffer::BatchedBox, counter::{Countable, Counter, CounterType, CounterValue, OwnedCountable}, debug::QueueDebugger, - l7_protocol::L7Protocol, + l7_protocol::{L7Protocol, L7ProtocolChecker}, leaky_bucket::LeakyBucket, proto::{common::TridentType, metric, trident::Exception}, queue::{bounded_with_debug, DebugSender, Receiver}, utils::bitmap::parse_u16_range_list_to_bitmap, }; +use reorder::{Reorder, ReorderCounter, StatsReorderCounter}; pub struct EbpfCounter { rx: AtomicU64, @@ -199,9 +202,97 @@ struct EbpfDispatcher { } impl EbpfDispatcher { - const FLOW_MAP_SIZE: usize = 1 << 14; + const MERGE_COUNT_MAX: usize = 2; + + fn segmentation_reassembly<'a>( + packets: &'a mut Vec>>, + ) -> Vec>> { + let mut merge_packets: Vec>> = vec![]; + let mut count = 0; + for mut p in packets.drain(..) { + if p.segment_flags != SegmentFlags::Seg { + count = 1; + merge_packets.push(p); + continue; + } + + let Some(last) = merge_packets.last_mut() else { + count = 1; + merge_packets.push(p); + continue; + }; + + if last.generate_ebpf_flow_id() == p.generate_ebpf_flow_id() + && last.segment_flags == SegmentFlags::Start + && last.cap_seq + 1 == p.cap_seq + && count < Self::MERGE_COUNT_MAX + { + last.merge(&mut p); + count += 1; + } else { + count = 1; + merge_packets.push(p); + } + } + + merge_packets + } + + fn inject_flush_ticker( + timestamp: Duration, + flow_map: &mut FlowMap, + config: &Config, + reorder: &mut Reorder, + ) { + flow_map.inject_flush_ticker(config, Duration::ZERO); + let mut packets = reorder.flush(timestamp.as_secs()); + let mut packets = packets + .drain(..) + .map(|x| x.into_any().downcast::().unwrap()) + .collect::>>(); + let packets = Self::segmentation_reassembly(&mut packets); + for mut packet in packets { + flow_map.inject_meta_packet(&config, &mut packet); + } + } + + fn inject_meta_packet( + mut packet: Box>, + flow_map: &mut FlowMap, + config: &Config, + reorder: &mut Reorder, + ) { + let mut packets = match packet.ebpf_type { + EbpfType::GoHttp2Uprobe | EbpfType::GoHttp2UprobeData => { + flow_map.inject_meta_packet(config, &mut packet); + reorder.flush(packet.lookup_key.timestamp.as_secs()) + } + _ => reorder.inject_item(packet), + }; + let mut packets = packets + .drain(..) + .map(|x| x.into_any().downcast::().unwrap()) + .collect::>>(); + let packets = Self::segmentation_reassembly(&mut packets); + for mut packet in packets { + flow_map.inject_meta_packet(config, &mut packet); + } + } fn run(&self, counter: Arc, exception_handler: ExceptionHandler) { + let ebpf_config = self.config.load(); + let out_of_order_reassembly_bitmap = + L7ProtocolBitmap::from(&ebpf_config.ebpf.syscall_out_of_order_reassembly); + let reorder_counter = Arc::new(ReorderCounter::default()); + self.stats_collector.register_countable( + &stats::NoTagModule("ebpf-collector-reorder"), + Countable::Owned(Box::new(StatsReorderCounter::new(reorder_counter.clone()))), + ); + let mut reorder = Reorder::new( + Box::new(out_of_order_reassembly_bitmap), + reorder_counter, + ebpf_config.ebpf.syscall_out_of_order_cache_size, + ); let mut flow_map = FlowMap::new( self.dispatcher_id as u32, self.flow_output.clone(), @@ -214,7 +305,6 @@ impl EbpfDispatcher { self.stats_collector.clone(), true, // from_ebpf ); - let ebpf_config = self.config.load(); let leaky_bucket = LeakyBucket::new(Some(ebpf_config.ebpf.global_ebpf_pps_threshold)); const QUEUE_BATCH_SIZE: usize = 1024; let mut batch = Vec::with_capacity(QUEUE_BATCH_SIZE); @@ -231,7 +321,12 @@ impl EbpfDispatcher { .recv_all(&mut batch, Some(Duration::from_secs(1))) .is_err() { - flow_map.inject_flush_ticker(&config, Duration::ZERO); + Self::inject_flush_ticker( + get_timestamp(self.time_diff.load(Ordering::Relaxed)), + &mut flow_map, + &config, + &mut reorder, + ); continue; } @@ -250,7 +345,7 @@ impl EbpfDispatcher { packet.timestamp_adjust(self.time_diff.load(Ordering::Relaxed)); packet.set_loopback_mac(ebpf_config.ctrl_mac); - flow_map.inject_meta_packet(&config, &mut packet); + Self::inject_meta_packet(packet, &mut flow_map, &config, &mut reorder); } } } @@ -475,6 +570,18 @@ impl EbpfCollector { } } + let segmentation_reassembly_bitmap = + L7ProtocolBitmap::from(&config.ebpf.syscall_segmentation_reassembly); + for i in get_all_protocol().into_iter() { + if segmentation_reassembly_bitmap.is_enabled(i.protocol()) { + info!( + "l7 protocol {:?} segmentation reassembly enabled", + i.protocol() + ); + ebpf::enable_ebpf_seg_reasm_protocol(i.protocol() as ebpf::c_int); + } + } + let white_list = &config.ebpf.kprobe_whitelist; if !white_list.port_list.is_empty() { if let Some(b) = parse_u16_range_list_to_bitmap(&white_list.port_list, false) { diff --git a/agent/src/flow_generator/perf/mod.rs b/agent/src/flow_generator/perf/mod.rs index 7da39130a72..958d4a6ea60 100644 --- a/agent/src/flow_generator/perf/mod.rs +++ b/agent/src/flow_generator/perf/mod.rs @@ -28,7 +28,7 @@ use std::sync::Arc; use enum_dispatch::enum_dispatch; use public::bitmap::Bitmap; -use public::l7_protocol::L7ProtocolEnum; +use public::l7_protocol::{L7ProtocolChecker as L7ProtocolCheckerBitmap, L7ProtocolEnum}; use super::protocol_logs::sql::ObfuscateCache; use super::{ diff --git a/agent/src/flow_generator/protocol_logs/sql/mysql.rs b/agent/src/flow_generator/protocol_logs/sql/mysql.rs index c6e568f336f..2b7cf7da704 100644 --- a/agent/src/flow_generator/protocol_logs/sql/mysql.rs +++ b/agent/src/flow_generator/protocol_logs/sql/mysql.rs @@ -45,6 +45,7 @@ use crate::{ utils::bytes, }; use public::bytes::read_u32_le; +use public::l7_protocol::L7ProtocolChecker; const SERVER_STATUS_CODE_MIN: u16 = 1000; const CLIENT_STATUS_CODE_MIN: u16 = 2000; diff --git a/server/agent_config/config.go b/server/agent_config/config.go index 0ac8ef74133..eac78f7ad9a 100644 --- a/server/agent_config/config.go +++ b/server/agent_config/config.go @@ -276,6 +276,9 @@ type EbpfConfig struct { JavaSymbolFileRefreshDeferInterval *string `yaml:"java-symbol-file-refresh-defer-interval,omitempty"` OnCpuProfile *OnCpuProfile `yaml:"on-cpu-profile,omitempty"` OffCpuProfile *OffCpuProfile `yaml:"off-cpu-profile,omitempty"` + SyscallOutOfOrderReassembly []string `yaml:"syscall-out-of-order-reassembly,omitempty"` + SyscallSegmentationReassembly []string `yaml:"syscall-segmentation-reassembly,omitempty"` + SyscallOutOfOrderCacheSize *int `yaml:"syscall-out-of-order-cache-size,omitempty"` } type OsProcRegex struct { diff --git a/server/agent_config/example.yaml b/server/agent_config/example.yaml index 6c2e5638346..a7ae4a678ba 100644 --- a/server/agent_config/example.yaml +++ b/server/agent_config/example.yaml @@ -1098,7 +1098,7 @@ vtap_group_id: g-xxxxxx # ZMTP: [] # DNS: [] # TLS: [] - + ## L7 Protocol Advanced Features #l7-protocol-advanced-features: @@ -1438,6 +1438,52 @@ vtap_group_id: g-xxxxxx ## time exceeding 1 hour. #minblock: 50us + ## Ebpf reorder cache size + ## Default: 16. Range: [8, 128] + #syscall-out-of-order-cache-size: 16 + + ## List of Application Protocols + ## Note: After enabling the l7 protocol, the agent will perform sorting. + #syscall-out-of-order-reassembly: + #- HTTP + #- HTTP2 ## for both HTTP2 and gRPC + #- SofaRPC + #- FastCGI + #- Dubbo + #- MySQL + #- PostgreSQL + #- Redis + #- MongoDB + #- Kafka + #- MQTT + #- AMQP + #- OpenWire + #- NATS + #- DNS + #- TLS + #- Custom ## custom protocol from plugin + + ## List of Application Protocols + ## Note: After enabling the protocol, the agent will perform segmented aggregation + #syscall-segmentation-reassembly: + #- HTTP + #- HTTP2 ## for both HTTP2 and gRPC + #- SofaRPC + #- FastCGI + #- Dubbo + #- MySQL + #- PostgreSQL + #- Redis + #- MongoDB + #- Kafka + #- MQTT + #- AMQP + #- OpenWire + #- NATS + #- DNS + #- TLS + #- Custom ## custom protocol from plugin + ###################################### ## Agent Running in Standalone Mode ## ######################################