Skip to content

Commit

Permalink
[Agent] Support reorder ebpf packet
Browse files Browse the repository at this point in the history
  • Loading branch information
yuanchaoa committed May 29, 2024
1 parent 6eb9a4c commit 71de62a
Show file tree
Hide file tree
Showing 14 changed files with 328 additions and 125 deletions.
8 changes: 8 additions & 0 deletions agent/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ cgroups-rs = "0.2.9"
nix = "0.23"
pcap = "0.9.1"
procfs = { git = "https://github.com/deepflowio/procfs/" }
reorder = { path = "plugins/reorder" }

[target.'cfg(target_os = "linux")'.dependencies]
k8s-openapi = { version = "^0.15", features = ["v1_19", "schemars"] }
Expand Down
5 changes: 5 additions & 0 deletions agent/crates/public/src/l7_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,8 @@ impl L7ProtocolEnum {
}
}
}

pub trait L7ProtocolChecker {
fn is_disabled(&self, p: L7Protocol) -> bool;
fn is_enabled(&self, p: L7Protocol) -> bool;
}
2 changes: 1 addition & 1 deletion agent/plugins/reorder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
public = { path = "../../crates/public"}
public = { path = "../../crates/public"}
173 changes: 63 additions & 110 deletions agent/plugins/reorder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,145 +14,98 @@
* limitations under the License.
*/

use std::collections::{HashMap, VecDeque, HashSet};
use std::time::Duration;
use std::any::Any;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering::Relaxed};
use std::sync::Arc;

const CACHE_SIZE: usize = 16;
use public::{
counter,
l7_protocol::{L7Protocol, L7ProtocolChecker},
};

pub trait Downcast {
fn as_any_mut(&mut self) -> &mut dny Any;
fn as_any_mut(&mut self) -> &mut dyn Any;
fn into_any(self: Box<Self>) -> Box<dyn Any>;
}

pub trait CacheItem: Downcast {
// Distinguish different flows
fn get_id(&self) -> u64;
// Used for sorting
fn get_seq(&self) -> u64;
fn get_timestmap(&self) -> Duration;
// Time in seconds
fn get_timestmap(&self) -> u64;
fn get_l7_protocol(&self) -> L7Protocol;
}

struct CacheNode {
cache: [Option<Box<dyn CacheItem>>; CACHE_SIZE],
start: u64,
last_timestamp: u64,
#[derive(Default)]
pub struct ReorderCounter {
drop_before_window: AtomicU64,
drop_out_of_order: AtomicU64,
flow_counter: AtomicU64,
packet_couter: AtomicU64,
closed: AtomicBool,
}

impl CacheNode {
fn new(item: Box<dyn CacheItem>) -> Self {
let mut node = Self {
cache: [None; CACHE_SIZE],
start: item.get_seq(),
last_timestamp: item.get_timestmap(),
};
node.cache[0] = Some(item);
node
}

fn flush(&mut self, count: usize) -> Vec<Option<Box<dyn CacheItem>>> {
self.start += count;
pub struct StatsReorderCounter(Arc<ReorderCounter>);

let count = count.min(CACHE_SIZE);
let out = self.cache[0..count].to_vec();
self.cache[0..count].iter_mut().for_each(|x| *x = None);
if count < CACHE_SIZE {
let remain = CACHE_SIZE - count;
self.cache[0..reamin].copy_from_slice(&self.cache[count..]);
self.cache[count..].iter_mut().for_each(|x| *x = None);
}
impl StatsReorderCounter {
pub fn new(count: Arc<ReorderCounter>) -> Self {
Self(count)
}
}

out
impl counter::OwnedCountable for StatsReorderCounter {
fn closed(&self) -> bool {
self.0.closed.load(Relaxed)
}

fn add(&mut self, item: Box<dyn CacheItem>) -> (u64, Vec<Option<Box<dyn CacheItem>>>) {
let seq = item.get_seq();
if seq < self.start {
// TODO
return (self.last_timestamp, vec![]);
}
let mut offset = seq - self.start;
let mut out = vec![];
if offset >= CACHE_SIZE {
out = self.flush(offset as usize - CACHE_SIZE + 1);
offset = item.get_seq() - self.start;
}
let last_timestmap = self.last_timestamp;
self.cache[offset] = item;
self.last_timestamp = item.get_timestmap().as_secs();
(last_timestmap, out)
fn get_counters(&self) -> Vec<counter::Counter> {
vec![
(
"drop-before-window",
counter::CounterType::Counted,
counter::CounterValue::Unsigned(self.0.drop_before_window.swap(0, Relaxed)),
),
(
"drop-out-of-order",
counter::CounterType::Counted,
counter::CounterValue::Unsigned(self.0.drop_out_of_order.swap(0, Relaxed)),
),
(
"flow-counter",
counter::CounterType::Counted,
counter::CounterValue::Unsigned(self.0.flow_counter.load(Relaxed)),
),
(
"packet-counter",
counter::CounterType::Counted,
counter::CounterValue::Unsigned(self.0.packet_couter.load(Relaxed)),
),
]
}
}

struct Reorder {
cache: HashMap<u64, CacheNode>,
windows: VecDeque<HashSet<u64>>, // time in seconds
window_start: u64,
pub struct Reorder {
counter: Arc<ReorderCounter>,
}

impl Reorder {
const TIMEOUT: u64 = 5;
const WINDOW_SIZE: u64 = Self::TIMEOUT;

fn new() ->Self {
let mut windows = VecDeque::with_capacity(Self::WINDOW_SIZE as usize);
for _ in 0..Self::TIMEOUT as usize {
windows.push_back(HashSet::new());
}
Self {
cache: HashMap::new(),
windows,
window_start: 0,
}
}

fn window_change(&mut self, last: u64, now: u64, id: u64) {
self.window_delete(last, id);
self.window_delete(now, id);
}

fn window_delete(&mut self, timestamp: u64, id: u64) {
let offset = timestamp - self.window_start;
self.windows[offset].remove(id);
pub fn new(_: Box<dyn L7ProtocolChecker>, counter: Arc<ReorderCounter>, _: usize) -> Self {
Self { counter }
}

fn window_add(&mut self, timestamp: u64, id: u64) {
let offset = timestamp - self.window_start;
self.windows[offset].insert(id);
}

fn flush(&mut self, timestamp: Duration) -> Vec<Option<Box<dyn CacheItem>>> {


pub fn flush(&mut self, _: u64) -> Vec<Box<dyn CacheItem>> {
vec![]
}

fn flush(&mut self, timestamp: Duration) -> Vec<Option<Box<dyn CacheItem>>> {
let now = timestamp.as_secs();
if now < self.window_start {
// TODO
return vec![];
}
let offset = now - self.window_start;
let mut items = vec![];
if offset >= Self::WINDOW_SIZE {
let count = offset - Self::WINDOW_SIZE + 1;
for i in 0..count.min(Self::WINDOW_SIZE) {
let ids = self.windows[i]
}
}
pub fn inject_item(&mut self, item: Box<dyn CacheItem>) -> Vec<Box<dyn CacheItem>> {
vec![item]
}
}

fn inject_item(&mut self, item: Box<dyn CacheItem>) -> Vec<Option<Box<dyn CacheItem>>> {
let id = item.get_id();
if let Some(node) = self.cache.get_mut(&id) {
let (last_timestamp, out) = node.add(item);
self.window_change(last_timestamp, item.get_timestmap().as_secs(), id);
return out;
} else {
let node = CacheNode::new(item);
let mut timestmap = item.get_timestmap();
self.cache.insert(id, node);
self.window_add(item.get_timestmap().as_secs(), id);
}
vec![]
impl Drop for Reorder {
fn drop(&mut self) {
self.counter.closed.store(true, Relaxed);
}
}

8 changes: 5 additions & 3 deletions agent/src/common/l7_protocol_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::plugin::c_ffi::SoPluginFunc;
use crate::plugin::wasm::WasmVm;

use public::enums::IpProtocol;
use public::l7_protocol::{CustomProtocol, L7Protocol, L7ProtocolEnum};
use public::l7_protocol::{CustomProtocol, L7Protocol, L7ProtocolChecker, L7ProtocolEnum};

/*
所有协议都需要实现L7ProtocolLogInterface这个接口.
Expand Down Expand Up @@ -514,12 +514,14 @@ impl L7ProtocolBitmap {
pub fn set_disabled(&mut self, p: L7Protocol) {
self.0 &= !(1 << (p as u128));
}
}

pub fn is_disabled(&self, p: L7Protocol) -> bool {
impl L7ProtocolChecker for L7ProtocolBitmap {
fn is_disabled(&self, p: L7Protocol) -> bool {
self.0 & (1 << (p as u128)) == 0
}

pub fn is_enabled(&self, p: L7Protocol) -> bool {
fn is_enabled(&self, p: L7Protocol) -> bool {
!self.is_disabled(p)
}
}
Expand Down
Loading

0 comments on commit 71de62a

Please sign in to comment.