Skip to content

Commit

Permalink
Merge branch 'main' into feature-test-export-zero
Browse files Browse the repository at this point in the history
  • Loading branch information
sharang authored May 11, 2024
2 parents b0fd051 + 7824a56 commit e7e150a
Show file tree
Hide file tree
Showing 62 changed files with 1,969 additions and 831 deletions.
365 changes: 299 additions & 66 deletions agent/Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ arc-swap = "1.5.0"
base64 = "0.21"
bincode = "2.0.0-rc.1"
bitflags = "1.3.2"
bollard = "0.16.1"
bson = "2.7.0"
bytesize = "1.1.0"
cadence = "0.27.0"
Expand Down
3 changes: 0 additions & 3 deletions agent/benches/lru.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,6 @@ fn rrt_lru(c: &mut Criterion) {
LogCache {
msg_type: LogMessageType::Request,
time: item.duration.as_micros() as u64,
kafka_info: None,
multi_merge_info: None,
},
);
Expand All @@ -461,7 +460,6 @@ fn rrt_lru(c: &mut Criterion) {
LogCache {
msg_type: LogMessageType::Request,
time: item.duration.as_micros() as u64,
kafka_info: None,
multi_merge_info: None,
},
);
Expand Down Expand Up @@ -494,7 +492,6 @@ fn rrt_lru(c: &mut Criterion) {
LogCache {
msg_type: LogMessageType::Request,
time: item.duration.as_micros() as u64,
kafka_info: None,
multi_merge_info: None,
},
);
Expand Down
10 changes: 10 additions & 0 deletions agent/crates/public/src/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ pub fn read_u32_le(bs: &[u8]) -> u32 {
u32::from_le_bytes(bs[..4].try_into().unwrap())
}

pub fn read_i32_be(bs: &[u8]) -> i32 {
assert!(bs.len() >= 4);
i32::from_be_bytes(bs[..4].try_into().unwrap())
}

pub fn read_i32_le(bs: &[u8]) -> i32 {
assert!(bs.len() >= 4);
i32::from_le_bytes(bs[..4].try_into().unwrap())
Expand All @@ -63,6 +68,11 @@ pub fn read_u64_le(bs: &[u8]) -> u64 {
u64::from_le_bytes(bs[..8].try_into().unwrap())
}

pub fn read_i64_be(bs: &[u8]) -> i64 {
assert!(bs.len() >= 8);
i64::from_be_bytes(bs[..8].try_into().unwrap())
}

pub fn read_i64_le(bs: &[u8]) -> i64 {
assert!(bs.len() >= 8);
i64::from_le_bytes(bs[..8].try_into().unwrap())
Expand Down
8 changes: 4 additions & 4 deletions agent/resources/test/flow_generator/kafka/kafka-sw8.result
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
KafkaInfo { msg_type: Request, is_tls: false, correlation_id: 1, trace_id: "bb79fa34-d03f-4bc2-8fa9-979af7877426", span_id: "0d8ce61a-ef2a-4e23-b8f4-2c59e3f44d48-0", req_msg_size: Some(308), api_version: 7, api_key: 0, client_id: "kafka-python-producer-1", topic_name: "quickstart-events", resp_msg_size: None, status: Ok, status_code: None, captured_request_byte: 312, captured_response_byte: 0, rrt: 0 } is_kafka: true
KafkaInfo { msg_type: Other, is_tls: false, correlation_id: 0, trace_id: "", span_id: "", req_msg_size: None, api_version: 0, api_key: 0, client_id: "", topic_name: "", resp_msg_size: None, status: Ok, status_code: None, captured_request_byte: 0, captured_response_byte: 0, rrt: 0 } is_kafka: false
KafkaInfo { msg_type: Response, is_tls: false, correlation_id: 1, trace_id: "", span_id: "", req_msg_size: None, api_version: 0, api_key: 0, client_id: "", topic_name: "", resp_msg_size: Some(65), status: Ok, status_code: None, captured_request_byte: 0, captured_response_byte: 69, rrt: 28415 } is_kafka: true
KafkaInfo { msg_type: Other, is_tls: false, correlation_id: 0, trace_id: "", span_id: "", req_msg_size: None, api_version: 0, api_key: 0, client_id: "", topic_name: "", resp_msg_size: None, status: Ok, status_code: None, captured_request_byte: 0, captured_response_byte: 0, rrt: 0 } is_kafka: false
KafkaInfo { msg_type: Request, is_tls: false, correlation_id: 1, trace_id: "bb79fa34-d03f-4bc2-8fa9-979af7877426", span_id: "0d8ce61a-ef2a-4e23-b8f4-2c59e3f44d48-0", req_msg_size: Some(308), api_version: 7, api_key: 0, client_id: "kafka-python-producer-1", topic_name: "quickstart-events", partition: 0, offset: 0, group_id: "", resp_msg_size: None, status: Ok, status_code: None, captured_request_byte: 312, captured_response_byte: 0, rrt: 0 } is_kafka: true
KafkaInfo { msg_type: Other, is_tls: false, correlation_id: 0, trace_id: "", span_id: "", req_msg_size: None, api_version: 0, api_key: 0, client_id: "", topic_name: "", partition: 0, offset: 0, group_id: "", resp_msg_size: None, status: Ok, status_code: None, captured_request_byte: 0, captured_response_byte: 0, rrt: 0 } is_kafka: false
KafkaInfo { msg_type: Response, is_tls: false, correlation_id: 1, trace_id: "", span_id: "", req_msg_size: None, api_version: 1, api_key: 0, client_id: "", topic_name: "quickstart-events", partition: 0, offset: 7, group_id: "", resp_msg_size: Some(65), status: Ok, status_code: Some(0), captured_request_byte: 0, captured_response_byte: 69, rrt: 28415 } is_kafka: true
KafkaInfo { msg_type: Other, is_tls: false, correlation_id: 0, trace_id: "", span_id: "", req_msg_size: None, api_version: 0, api_key: 0, client_id: "", topic_name: "", partition: 0, offset: 0, group_id: "", resp_msg_size: None, status: Ok, status_code: None, captured_request_byte: 0, captured_response_byte: 0, rrt: 0 } is_kafka: false
4 changes: 2 additions & 2 deletions agent/resources/test/flow_generator/kafka/kafka.result
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
KafkaInfo { msg_type: Request, is_tls: false, correlation_id: 2, trace_id: "", span_id: "", req_msg_size: Some(49), api_version: 3, api_key: 18, client_id: "adminclient-1", topic_name: "", resp_msg_size: None, status: Ok, status_code: None, captured_request_byte: 53, captured_response_byte: 0, rrt: 0 } is_kafka: true
KafkaInfo { msg_type: Response, is_tls: false, correlation_id: 2, trace_id: "", span_id: "", req_msg_size: None, api_version: 0, api_key: 0, client_id: "", topic_name: "", resp_msg_size: Some(435), status: Ok, status_code: None, captured_request_byte: 0, captured_response_byte: 439, rrt: 4941 } is_kafka: false
KafkaInfo { msg_type: Request, is_tls: false, correlation_id: 2, trace_id: "", span_id: "", req_msg_size: Some(49), api_version: 3, api_key: 18, client_id: "adminclient-1", topic_name: "", partition: 0, offset: 0, group_id: "", resp_msg_size: None, status: Ok, status_code: None, captured_request_byte: 53, captured_response_byte: 0, rrt: 0 } is_kafka: true
KafkaInfo { msg_type: Response, is_tls: false, correlation_id: 2, trace_id: "", span_id: "", req_msg_size: None, api_version: 3, api_key: 18, client_id: "", topic_name: "", partition: 0, offset: 0, group_id: "", resp_msg_size: Some(435), status: Ok, status_code: None, captured_request_byte: 0, captured_response_byte: 439, rrt: 4941 } is_kafka: false
Original file line number Diff line number Diff line change
@@ -1 +1 @@
KafkaInfo { msg_type: Request, is_tls: false, correlation_id: 5, trace_id: "", span_id: "", req_msg_size: Some(164), api_version: 9, api_key: 0, client_id: "console-producer", topic_name: "quickstart-events", resp_msg_size: None, status: Ok, status_code: None, captured_request_byte: 168, captured_response_byte: 0, rrt: 0 } is_kafka: true
KafkaInfo { msg_type: Request, is_tls: false, correlation_id: 5, trace_id: "", span_id: "", req_msg_size: Some(164), api_version: 9, api_key: 0, client_id: "console-producer", topic_name: "quickstart-events", partition: 0, offset: 0, group_id: "", resp_msg_size: None, status: Ok, status_code: None, captured_request_byte: 168, captured_response_byte: 0, rrt: 0 } is_kafka: true
4 changes: 2 additions & 2 deletions agent/resources/test/flow_generator/kafka/produce.result
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
KafkaInfo { msg_type: Request, is_tls: false, correlation_id: 1, trace_id: "", span_id: "", req_msg_size: Some(117), api_version: 2, api_key: 0, client_id: "console-producer", topic_name: "topic2", resp_msg_size: None, status: Ok, status_code: None, captured_request_byte: 121, captured_response_byte: 0, rrt: 0 } is_kafka: true
KafkaInfo { msg_type: Response, is_tls: false, correlation_id: 1, trace_id: "", span_id: "", req_msg_size: None, api_version: 0, api_key: 0, client_id: "", topic_name: "", resp_msg_size: Some(46), status: Ok, status_code: None, captured_request_byte: 0, captured_response_byte: 50, rrt: 16107 } is_kafka: true
KafkaInfo { msg_type: Request, is_tls: false, correlation_id: 1, trace_id: "", span_id: "", req_msg_size: Some(117), api_version: 2, api_key: 0, client_id: "console-producer", topic_name: "topic2", partition: 0, offset: 0, group_id: "", resp_msg_size: None, status: Ok, status_code: None, captured_request_byte: 121, captured_response_byte: 0, rrt: 0 } is_kafka: true
KafkaInfo { msg_type: Response, is_tls: false, correlation_id: 1, trace_id: "", span_id: "", req_msg_size: None, api_version: 1, api_key: 0, client_id: "", topic_name: "topic2", partition: 0, offset: 0, group_id: "", resp_msg_size: Some(46), status: Ok, status_code: Some(0), captured_request_byte: 0, captured_response_byte: 50, rrt: 16107 } is_kafka: true
8 changes: 2 additions & 6 deletions agent/src/common/l7_protocol_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use std::sync::atomic::Ordering;

use super::{flow::PacketDirection, l7_protocol_log::KafkaInfoCache};
use super::flow::PacketDirection;
use enum_dispatch::enum_dispatch;
use log::{debug, error, warn};
use serde::Serialize;
Expand Down Expand Up @@ -163,7 +163,7 @@ pub trait L7ProtocolInfoInterface: Into<L7ProtocolSendLog> {
if have no previous log cache, cache the current log rrt
*/
fn cal_rrt(&self, param: &ParseParam, kafka_info: Option<KafkaInfoCache>) -> Option<u64> {
fn cal_rrt(&self, param: &ParseParam) -> Option<u64> {
let mut perf_cache = param.l7_perf_cache.borrow_mut();
let cache_key = self.cal_cache_key(param);
let previous_log_info = perf_cache.rrt_cache.pop(&cache_key);
Expand All @@ -186,7 +186,6 @@ pub trait L7ProtocolInfoInterface: Into<L7ProtocolSendLog> {
LogCache {
msg_type: param.direction.into(),
time: param.time,
kafka_info,
multi_merge_info: None,
},
);
Expand Down Expand Up @@ -220,7 +219,6 @@ pub trait L7ProtocolInfoInterface: Into<L7ProtocolSendLog> {
LogCache {
msg_type: param.direction.into(),
time: param.time,
kafka_info,
multi_merge_info: None,
},
);
Expand Down Expand Up @@ -273,7 +271,6 @@ pub trait L7ProtocolInfoInterface: Into<L7ProtocolSendLog> {
LogCache {
msg_type: param.direction.into(),
time: param.time,
kafka_info,
multi_merge_info: None,
},
);
Expand Down Expand Up @@ -332,7 +329,6 @@ pub trait L7ProtocolInfoInterface: Into<L7ProtocolSendLog> {
LogCache {
msg_type: param.direction.into(),
time: param.time,
kafka_info: None,
multi_merge_info: Some((req_end, resp_end, false)),
},
);
Expand Down
10 changes: 1 addition & 9 deletions agent/src/common/l7_protocol_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::flow_generator::protocol_logs::{
AmqpLog, BrpcLog, DnsLog, DubboLog, HttpLog, KafkaLog, MongoDBLog, MqttLog, MysqlLog, NatsLog,
OpenWireLog, OracleLog, PostgresqlLog, PulsarLog, RedisLog, SofaRpcLog, TlsLog, ZmtpLog,
};

use crate::flow_generator::{LogMessageType, Result};
#[cfg(any(target_os = "linux", target_os = "android"))]
use crate::plugin::c_ffi::SoPluginFunc;
Expand Down Expand Up @@ -277,18 +278,9 @@ pub struct EbpfParam<'a> {
pub process_kname: &'a str,
}

pub struct KafkaInfoCache {
// kafka req
pub api_key: u16,
pub api_version: u16,

// kafka resp code
pub code: i16,
}
pub struct LogCache {
pub msg_type: LogMessageType,
pub time: u64,
pub kafka_info: Option<KafkaInfoCache>,
// req_end, resp_end, merged
// set merged to true when req and resp merge once
pub multi_merge_info: Option<(bool, bool, bool)>,
Expand Down
15 changes: 12 additions & 3 deletions agent/src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1159,7 +1159,7 @@ pub struct RuntimeConfig {
pub vtap_group_id: String,
#[serde(skip)]
pub enabled: bool,
pub max_cpus: u32,
pub max_millicpus: u32,
pub max_memory: u64,
pub sync_interval: u64, // unit(second)
pub platform_sync_interval: u64, // unit(second)
Expand Down Expand Up @@ -1307,7 +1307,7 @@ impl RuntimeConfig {
Self {
vtap_group_id: Default::default(),
enabled: true,
max_cpus: 1,
max_millicpus: 1000,
max_memory: 768,
sync_interval: 60,
platform_sync_interval: 10,
Expand Down Expand Up @@ -1487,7 +1487,16 @@ impl TryFrom<trident::Config> for RuntimeConfig {
let rc = Self {
vtap_group_id: Default::default(),
enabled: conf.enabled(),
max_cpus: conf.max_cpus(),
max_millicpus: {
// Compatible with max_cpus and max_millicpus, take the smaller value in milli-cores.
let max_cpus = conf.max_cpus() * 1000;
let max_millicpus = conf.max_millicpus.unwrap_or(max_cpus); // conf.max_millicpus may be None, handle the case where conf.max_millicpus is None
if max_cpus != 0 && max_millicpus != 0 {
max_cpus.min(max_millicpus)
} else {
max_cpus | max_millicpus
}
},
max_memory: (conf.max_memory() as u64) << 20,
sync_interval: conf.sync_interval() as u64,
platform_sync_interval: conf.platform_sync_interval() as u64,
Expand Down
53 changes: 40 additions & 13 deletions agent/src/config/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,17 @@ use crate::{
handler::PacketHandlerBuilder,
metric::document::TapSide,
trident::{AgentComponents, RunningMode},
utils::environment::{free_memory_check, get_container_mem_limit, running_in_container},
utils::environment::{free_memory_check, running_in_container},
};
#[cfg(any(target_os = "linux", target_os = "android"))]
use crate::{
dispatcher::recv_engine::af_packet::OptTpacketVersion,
ebpf::CAP_LEN_MAX,
platform::ProcRegRewrite,
utils::environment::{get_ctrl_ip_and_mac, is_tt_workload},
utils::environment::{
get_container_resource_limits, get_ctrl_ip_and_mac, is_tt_workload,
set_container_resource_limit,
},
};
#[cfg(target_os = "linux")]
use crate::{
Expand Down Expand Up @@ -194,7 +197,7 @@ impl fmt::Debug for CollectorConfig {
#[derive(Clone, Debug, PartialEq)]
pub struct EnvironmentConfig {
pub max_memory: u64,
pub max_cpus: u32,
pub max_millicpus: u32,
pub process_threshold: u32,
pub thread_threshold: u32,
pub sys_free_memory_limit: u32,
Expand Down Expand Up @@ -1202,10 +1205,7 @@ impl TryFrom<(Config, RuntimeConfig)> for ModuleConfig {
type Error = ConfigError;

fn try_from(conf: (Config, RuntimeConfig)) -> Result<Self, Self::Error> {
let (static_config, mut conf) = conf;
if running_in_container() {
conf.max_memory = get_container_mem_limit().unwrap_or(conf.max_memory);
}
let (static_config, conf) = conf;
let controller_ip = static_config.controller_ips[0].parse::<IpAddr>().unwrap();
let dest_ip = if conf.analyzer_ip.len() > 0 {
conf.analyzer_ip.clone()
Expand All @@ -1231,7 +1231,7 @@ impl TryFrom<(Config, RuntimeConfig)> for ModuleConfig {
},
environment: EnvironmentConfig {
max_memory: conf.max_memory,
max_cpus: conf.max_cpus,
max_millicpus: conf.max_millicpus,
process_threshold: conf.process_threshold,
thread_threshold: conf.thread_threshold,
sys_free_memory_limit: conf.sys_free_memory_limit,
Expand Down Expand Up @@ -1574,6 +1574,8 @@ impl TryFrom<(Config, RuntimeConfig)> for ModuleConfig {
pub struct ConfigHandler {
pub ctrl_ip: IpAddr,
pub ctrl_mac: MacAddr,
pub container_cpu_limit: u32, // unit: milli-core
pub container_mem_limit: u64, // unit: bytes
pub logger_handle: Option<LoggerHandle>,
// need update
pub static_config: Config,
Expand All @@ -1587,10 +1589,17 @@ impl ConfigHandler {
ModuleConfig::try_from((config.clone(), RuntimeConfig::default())).unwrap();
let current_config = Arc::new(ArcSwap::from_pointee(candidate_config.clone()));

#[cfg(any(target_os = "linux", target_os = "android"))]
let (container_cpu_limit, container_mem_limit) = get_container_resource_limits();
#[cfg(target_os = "windows")]
let (container_cpu_limit, container_mem_limit) = (0, 0);

Self {
static_config: config,
ctrl_ip,
ctrl_mac,
container_cpu_limit,
container_mem_limit,
candidate_config,
current_config,
logger_handle: None,
Expand Down Expand Up @@ -2194,25 +2203,43 @@ impl ConfigHandler {
candidate_config.environment.max_memory = new_config.environment.max_memory;
}

if candidate_config.environment.max_cpus != new_config.environment.max_cpus {
info!("cpu limit set to {}", new_config.environment.max_cpus);
candidate_config.environment.max_cpus = new_config.environment.max_cpus;
if candidate_config.environment.max_millicpus != new_config.environment.max_millicpus {
info!(
"cpu limit set to {}",
new_config.environment.max_millicpus as f64 / 1000.0
);
candidate_config.environment.max_millicpus = new_config.environment.max_millicpus;
}
#[cfg(target_os = "linux")]
if running_in_container() {
if self.container_cpu_limit != candidate_config.environment.max_millicpus
|| self.container_mem_limit != candidate_config.environment.max_memory
{
info!("current container cpu limit: {}, memory limit: {}bytes, set cpu limit {} and memory limit {}bytes", self.container_cpu_limit as f64 / 1000.0, self.container_mem_limit, candidate_config.environment.max_millicpus as f64 / 1000.0, candidate_config.environment.max_memory);
if let Err(e) = runtime.block_on(set_container_resource_limit(
candidate_config.environment.max_millicpus,
candidate_config.environment.max_memory,
)) {
warn!("set container resources limit failed: {:?}", e);
};
}
}
} else {
let mut system = sysinfo::System::new();
system.refresh_memory();
let max_memory = system.total_memory();
system.refresh_cpu();
let max_cpus = 1.max(system.cpus().len()) as u32;
let max_millicpus = max_cpus * 1000;

if candidate_config.environment.max_memory != max_memory {
info!("memory set ulimit when tap_mode=analyzer");
candidate_config.environment.max_memory = max_memory;
}

if candidate_config.environment.max_cpus != max_cpus {
if candidate_config.environment.max_millicpus != max_millicpus {
info!("cpu set ulimit when tap_mode=analyzer");
candidate_config.environment.max_cpus = max_cpus;
candidate_config.environment.max_millicpus = max_millicpus;
}
}

Expand Down
2 changes: 1 addition & 1 deletion agent/src/ebpf/kernel/include/socket_trace_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#ifndef DF_BPF_SOCKET_TRACE_COMMON_H
#define DF_BPF_SOCKET_TRACE_COMMON_H
#define CAP_DATA_SIZE 1024 // For no-brust send buffer
#define BURST_DATA_BUF_SIZE 8192 // For brust send buffer
#define BURST_DATA_BUF_SIZE 16384 // For brust send buffer

enum endpoint_role {
ROLE_UNKNOWN,
Expand Down
3 changes: 0 additions & 3 deletions agent/src/flow_generator/protocol_logs/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ pub const SPAN_ID_TYPE: usize = 1;
// Kafka constants
pub const KAFKA_REQ_HEADER_LEN: usize = 14;
pub const KAFKA_RESP_HEADER_LEN: usize = 8;
pub const KAFKA_STATUS_CODE_OFFSET: usize = 12;
pub const KAFKA_STATUS_CODE_LEN: usize = 2;
pub const KAFKA_STATUS_CODE_CHECKER: usize = KAFKA_STATUS_CODE_OFFSET + KAFKA_STATUS_CODE_LEN;

// dubbo constants
pub const DUBBO_MAGIC_HIGH: u8 = 0xda;
Expand Down
2 changes: 1 addition & 1 deletion agent/src/flow_generator/protocol_logs/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl L7ProtocolParserInterface for DnsLog {
fn parse_payload(&mut self, payload: &[u8], param: &ParseParam) -> Result<L7ParseResult> {
let mut info = DnsInfo::default();
self.parse(payload, &mut info, param)?;
info.cal_rrt(param, None).map(|rrt| {
info.cal_rrt(param).map(|rrt| {
info.rrt = rrt;
self.perf_stats.as_mut().map(|p| p.update_rrt(rrt));
});
Expand Down
2 changes: 1 addition & 1 deletion agent/src/flow_generator/protocol_logs/fastcgi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ impl L7ProtocolParserInterface for FastCGILog {
self.perf_stats.as_mut().map(|p| p.inc_resp());
}
}
info.cal_rrt(param, None).map(|rrt| {
info.cal_rrt(param).map(|rrt| {
info.rrt = rrt;
self.perf_stats.as_mut().map(|p| p.update_rrt(rrt));
});
Expand Down
Loading

0 comments on commit e7e150a

Please sign in to comment.