Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Agent] Supoort reorder ebpf #5733

Merged
merged 4 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions agent/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ cgroups-rs = "0.2.9"
nix = "0.23"
pcap = "0.9.1"
procfs = { git = "https://github.com/deepflowio/procfs/" }
reorder = { path = "plugins/reorder" }

[target.'cfg(target_os = "linux")'.dependencies]
k8s-openapi = { version = "^0.15", features = ["v1_19", "schemars"] }
Expand Down
5 changes: 5 additions & 0 deletions agent/crates/public/src/l7_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,8 @@ impl L7ProtocolEnum {
}
}
}

pub trait L7ProtocolChecker {
fn is_disabled(&self, p: L7Protocol) -> bool;
fn is_enabled(&self, p: L7Protocol) -> bool;
}
9 changes: 9 additions & 0 deletions agent/plugins/reorder/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[package]
name = "reorder"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
public = { path = "../../crates/public"}
111 changes: 111 additions & 0 deletions agent/plugins/reorder/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright (c) 2024 Yunshan Networks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use std::any::Any;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering::Relaxed};
use std::sync::Arc;

use public::{
counter,
l7_protocol::{L7Protocol, L7ProtocolChecker},
};

pub trait Downcast {
fn as_any_mut(&mut self) -> &mut dyn Any;
fn into_any(self: Box<Self>) -> Box<dyn Any>;
}

pub trait CacheItem: Downcast {
// Distinguish different flows
fn get_id(&self) -> u64;
// Used for sorting
fn get_seq(&self) -> u64;
// Time in seconds
fn get_timestmap(&self) -> u64;
fn get_l7_protocol(&self) -> L7Protocol;
}

#[derive(Default)]
pub struct ReorderCounter {
drop_before_window: AtomicU64,
drop_out_of_order: AtomicU64,
flow_counter: AtomicU64,
packet_couter: AtomicU64,
closed: AtomicBool,
}

pub struct StatsReorderCounter(Arc<ReorderCounter>);

impl StatsReorderCounter {
pub fn new(count: Arc<ReorderCounter>) -> Self {
Self(count)
}
}

impl counter::OwnedCountable for StatsReorderCounter {
fn closed(&self) -> bool {
self.0.closed.load(Relaxed)
}

fn get_counters(&self) -> Vec<counter::Counter> {
vec![
(
"drop-before-window",
counter::CounterType::Counted,
counter::CounterValue::Unsigned(self.0.drop_before_window.swap(0, Relaxed)),
),
(
"drop-out-of-order",
counter::CounterType::Counted,
counter::CounterValue::Unsigned(self.0.drop_out_of_order.swap(0, Relaxed)),
),
(
"flow-counter",
counter::CounterType::Counted,
counter::CounterValue::Unsigned(self.0.flow_counter.load(Relaxed)),
),
(
"packet-counter",
counter::CounterType::Counted,
counter::CounterValue::Unsigned(self.0.packet_couter.load(Relaxed)),
),
]
}
}

pub struct Reorder {
counter: Arc<ReorderCounter>,
}

impl Reorder {
pub fn new(_: Box<dyn L7ProtocolChecker>, counter: Arc<ReorderCounter>, _: usize) -> Self {
Self { counter }
}

pub fn flush(&mut self, _: u64) -> Vec<Box<dyn CacheItem>> {
vec![]
}

pub fn inject_item(&mut self, item: Box<dyn CacheItem>) -> Vec<Box<dyn CacheItem>> {
vec![item]
}
}

impl Drop for Reorder {
fn drop(&mut self) {
self.counter.closed.store(true, Relaxed);
}
}
8 changes: 5 additions & 3 deletions agent/src/common/l7_protocol_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::plugin::c_ffi::SoPluginFunc;
use crate::plugin::wasm::WasmVm;

use public::enums::IpProtocol;
use public::l7_protocol::{CustomProtocol, L7Protocol, L7ProtocolEnum};
use public::l7_protocol::{CustomProtocol, L7Protocol, L7ProtocolChecker, L7ProtocolEnum};

/*
所有协议都需要实现L7ProtocolLogInterface这个接口.
Expand Down Expand Up @@ -514,12 +514,14 @@ impl L7ProtocolBitmap {
pub fn set_disabled(&mut self, p: L7Protocol) {
self.0 &= !(1 << (p as u128));
}
}

pub fn is_disabled(&self, p: L7Protocol) -> bool {
impl L7ProtocolChecker for L7ProtocolBitmap {
fn is_disabled(&self, p: L7Protocol) -> bool {
self.0 & (1 << (p as u128)) == 0
}

pub fn is_enabled(&self, p: L7Protocol) -> bool {
fn is_enabled(&self, p: L7Protocol) -> bool {
!self.is_disabled(p)
}
}
Expand Down
72 changes: 70 additions & 2 deletions agent/src/common/meta_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

use std::any::Any;
use std::fmt;
use std::net::{IpAddr, Ipv4Addr};
use std::ops::Deref;
Expand Down Expand Up @@ -47,8 +48,9 @@ use crate::error;
use crate::{
common::ebpf::{GO_HTTP2_UPROBE, GO_HTTP2_UPROBE_DATA},
ebpf::{
MSG_REQUEST_END, MSG_RESPONSE_END, PACKET_KNAME_MAX_PADDING, SK_BPF_DATA, SOCK_DATA_HTTP2,
SOCK_DATA_TLS_HTTP2, SOCK_DIR_RCV, SOCK_DIR_SND,
MSG_REASM_SEG, MSG_REASM_START, MSG_REQUEST_END, MSG_RESPONSE_END,
PACKET_KNAME_MAX_PADDING, SK_BPF_DATA, SOCK_DATA_HTTP2, SOCK_DATA_TLS_HTTP2, SOCK_DIR_RCV,
SOCK_DIR_SND,
},
};
use crate::{
Expand All @@ -61,6 +63,7 @@ use public::{
buffer::BatchedBuffer,
utils::net::{is_unicast_link_local, MacAddr},
};
use reorder::{CacheItem, Downcast};

#[derive(Clone, Debug)]
pub enum RawPacket<'a> {
Expand Down Expand Up @@ -108,6 +111,32 @@ bitflags! {
}
}

#[cfg(any(target_os = "linux", target_os = "android"))]
#[derive(PartialEq, Clone, Debug)]
pub enum SegmentFlags {
None,
Start,
Seg,
}

#[cfg(any(target_os = "linux", target_os = "android"))]
impl Default for SegmentFlags {
fn default() -> Self {
SegmentFlags::None
}
}

#[cfg(any(target_os = "linux", target_os = "android"))]
impl From<u8> for SegmentFlags {
fn from(value: u8) -> Self {
match value {
MSG_REASM_START => SegmentFlags::Start,
MSG_REASM_SEG => SegmentFlags::Seg,
_ => SegmentFlags::None,
}
}
}

#[derive(Clone, Debug, Default)]
pub struct MetaPacket<'a> {
// 主机序, 不因L2End1而颠倒, 端口会在查询策略时被修改
Expand Down Expand Up @@ -168,6 +197,8 @@ pub struct MetaPacket<'a> {
pub is_request_end: bool,
pub is_response_end: bool,
pub ebpf_flags: EbpfFlags,
#[cfg(any(target_os = "linux", target_os = "android"))]
pub segment_flags: SegmentFlags,

pub process_id: u32,
pub pod_id: u32,
Expand Down Expand Up @@ -901,6 +932,14 @@ impl<'a> MetaPacket<'a> {
0
}

#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn merge(&mut self, packet: &mut MetaPacket) {
self.raw_from_ebpf.append(&mut packet.raw_from_ebpf);
self.packet_len += packet.packet_len - 54;
self.payload_len += packet.payload_len;
self.l4_payload_len += packet.l4_payload_len;
}

#[cfg(any(target_os = "linux", target_os = "android"))]
pub unsafe fn from_ebpf(data: *mut SK_BPF_DATA) -> Result<MetaPacket<'a>, Box<dyn Error>> {
let data = &mut data.read_unaligned();
Expand Down Expand Up @@ -991,6 +1030,7 @@ impl<'a> MetaPacket<'a> {
} else {
EbpfFlags::NONE
};
packet.segment_flags = SegmentFlags::from(data.msg_type);

// 目前只有 go uprobe http2 的方向判断能确保准确
if data.source == GO_HTTP2_UPROBE || data.source == GO_HTTP2_UPROBE_DATA {
Expand Down Expand Up @@ -1105,6 +1145,34 @@ impl<'a> fmt::Display for MetaPacket<'a> {
}
}

impl Downcast for MetaPacket<'static> {
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}

fn into_any(self: Box<Self>) -> Box<dyn Any> {
self
}
}

impl CacheItem for MetaPacket<'static> {
fn get_id(&self) -> u64 {
self.generate_ebpf_flow_id()
}

fn get_seq(&self) -> u64 {
self.cap_seq
}

fn get_timestmap(&self) -> u64 {
self.lookup_key.timestamp.as_secs()
}

fn get_l7_protocol(&self) -> L7Protocol {
self.l7_protocol_from_ebpf
}
}

#[derive(Clone, Debug, Default)]
pub struct MetaPacketTcpHeader {
pub seq: u32,
Expand Down
9 changes: 9 additions & 0 deletions agent/src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,9 @@ pub struct EbpfYamlConfig {
pub java_symbol_file_refresh_defer_interval: Duration,
pub on_cpu_profile: OnCpuProfile,
pub off_cpu_profile: OffCpuProfile,
pub syscall_out_of_order_cache_size: usize,
pub syscall_out_of_order_reassembly: Vec<String>,
pub syscall_segmentation_reassembly: Vec<String>,
}

impl Default for EbpfYamlConfig {
Expand All @@ -409,6 +412,9 @@ impl Default for EbpfYamlConfig {
java_symbol_file_refresh_defer_interval: Duration::from_secs(600),
on_cpu_profile: OnCpuProfile::default(),
off_cpu_profile: OffCpuProfile::default(),
syscall_out_of_order_reassembly: vec![],
syscall_segmentation_reassembly: vec![],
syscall_out_of_order_cache_size: 16,
}
}
}
Expand Down Expand Up @@ -796,6 +802,9 @@ impl YamlConfig {
.off_cpu_profile
.min_block
.clamp(Duration::from_micros(0), Duration::from_micros(3600000000));
if !(8..=128).contains(&c.ebpf.syscall_out_of_order_cache_size) {
c.ebpf.syscall_out_of_order_cache_size = 16;
}

if c.guard_interval < Duration::from_secs(1) || c.guard_interval > Duration::from_secs(3600)
{
Expand Down
2 changes: 1 addition & 1 deletion agent/src/ebpf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::fmt;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};

// 最大长度
pub const CAP_LEN_MAX: usize = 8192;
pub const CAP_LEN_MAX: usize = 16384;

// process_kname is up to 16 bytes, if the length of process_kname exceeds 15, the ending char is '\0'
pub const PACKET_KNAME_MAX_PADDING: usize = 15;
Expand Down