Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AGENT] Reduce size of AppProtoLogsBaseInfo #4735

Merged
merged 3 commits into from Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 11 additions & 13 deletions agent/src/flow_generator/protocol_logs/mod.rs
Expand Up @@ -50,7 +50,6 @@ use std::{
fmt,
net::{IpAddr, Ipv4Addr, Ipv6Addr},
str,
time::Duration,
};

use prost::Message;
Expand All @@ -62,6 +61,7 @@ use crate::{
enums::{IpProtocol, TapType},
flow::{L7Protocol, PacketDirection, SignalSource},
tap_port::TapPort,
Timestamp,
},
metric::document::TapSide,
};
Expand Down Expand Up @@ -151,18 +151,16 @@ impl From<AppProtoHead> for flow_log::AppProtoHead {

#[derive(Serialize, Debug, Clone)]
pub struct AppProtoLogsBaseInfo {
#[serde(serialize_with = "duration_to_micros")]
pub start_time: Duration,
#[serde(serialize_with = "duration_to_micros")]
pub end_time: Duration,
#[serde(serialize_with = "timestamp_to_micros")]
pub start_time: Timestamp,
#[serde(serialize_with = "timestamp_to_micros")]
pub end_time: Timestamp,
pub flow_id: u64,
#[serde(serialize_with = "to_string_format")]
pub tap_port: TapPort,
pub signal_source: SignalSource,
pub vtap_id: u16,
pub tap_type: TapType,
#[serde(skip)]
pub is_ipv6: bool,
pub tap_side: TapSide,
#[serde(flatten)]
pub head: AppProtoHead,
Expand Down Expand Up @@ -218,9 +216,9 @@ pub struct AppProtoLogsBaseInfo {
#[serde(skip_serializing_if = "value_is_default")]
pub syscall_coroutine_1: u64,
#[serde(skip_serializing_if = "value_is_default")]
pub syscall_cap_seq_0: u64,
pub syscall_cap_seq_0: u32,
#[serde(skip_serializing_if = "value_is_default")]
pub syscall_cap_seq_1: u64,
pub syscall_cap_seq_1: u32,

pub protocol: IpProtocol,
#[serde(skip)]
Expand All @@ -233,7 +231,7 @@ pub struct AppProtoLogsBaseInfo {
pub pod_id_1: u32,
}

pub fn duration_to_micros<S>(d: &Duration, serializer: S) -> Result<S::Ok, S::Error>
pub fn timestamp_to_micros<S>(d: &Timestamp, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
Expand Down Expand Up @@ -280,7 +278,7 @@ impl From<AppProtoLogsBaseInfo> for flow_log::AppProtoLogsBaseInfo {
tap_port: f.tap_port.0,
vtap_id: f.vtap_id as u32,
tap_type: u16::from(f.tap_type) as u32,
is_ipv6: f.is_ipv6 as u32,
is_ipv6: f.ip_src.is_ipv6() as u32,
tap_side: f.tap_side as u32,
head: Some(f.head.into()),
mac_src: f.mac_src.into(),
Expand All @@ -306,8 +304,8 @@ impl From<AppProtoLogsBaseInfo> for flow_log::AppProtoLogsBaseInfo {
syscall_trace_id_response: f.syscall_trace_id_response,
syscall_trace_id_thread_0: f.syscall_trace_id_thread_0,
syscall_trace_id_thread_1: f.syscall_trace_id_thread_1,
syscall_cap_seq_0: f.syscall_cap_seq_0 as u32,
syscall_cap_seq_1: f.syscall_cap_seq_1 as u32,
syscall_cap_seq_0: f.syscall_cap_seq_0,
syscall_cap_seq_1: f.syscall_cap_seq_1,
syscall_coroutine_0: f.syscall_coroutine_0,
syscall_coroutine_1: f.syscall_coroutine_1,
gpid_0: f.gpid_0,
Expand Down
52 changes: 21 additions & 31 deletions agent/src/flow_generator/protocol_logs/parser.rs
Expand Up @@ -14,8 +14,6 @@
* limitations under the License.
*/

#[cfg(any(target_os = "linux", target_os = "android"))]
use std::mem::swap;
use std::{
cmp::min,
collections::{hash_map::Entry, HashMap},
Expand All @@ -38,7 +36,6 @@ use super::{AppProtoHead, AppProtoLogsBaseInfo, BoxAppProtoLogsData, LogMessageT

use crate::{
common::{
enums::EthernetType,
flow::{get_uniq_flow_id_in_one_minute, L7Protocol, PacketDirection, SignalSource},
l7_protocol_info::{L7ProtocolInfo, L7ProtocolInfoInterface},
meta_packet::ProtocolData,
Expand Down Expand Up @@ -96,8 +93,8 @@ impl MetaAppProto {
head: AppProtoHead,
) -> Option<Self> {
let mut base_info = AppProtoLogsBaseInfo {
start_time: meta_packet.lookup_key.timestamp.into(),
end_time: meta_packet.lookup_key.timestamp.into(),
start_time: meta_packet.lookup_key.timestamp,
end_time: meta_packet.lookup_key.timestamp,
flow_id: flow.flow.flow_id,
vtap_id: flow.flow.flow_key.vtap_id,
tap_type: flow.flow.flow_key.tap_type,
Expand All @@ -116,7 +113,6 @@ impl MetaAppProto {
mac_dst: MacAddr::ZERO,
ip_src: flow.flow.flow_metrics_peers[FLOW_METRICS_PEER_SRC].nat_real_ip,
ip_dst: flow.flow.flow_metrics_peers[FLOW_METRICS_PEER_DST].nat_real_ip,
is_ipv6: meta_packet.lookup_key.eth_type == EthernetType::IPV6,
port_src: flow.flow.flow_metrics_peers[FLOW_METRICS_PEER_SRC].nat_real_port,
port_dst: flow.flow.flow_metrics_peers[FLOW_METRICS_PEER_DST].nat_real_port,
l3_epc_id_src: flow.flow.flow_metrics_peers[FLOW_METRICS_PEER_SRC].l3_epc_id,
Expand Down Expand Up @@ -145,28 +141,31 @@ impl MetaAppProto {
let is_src = meta_packet.lookup_key.l2_end_0;
let process_name = get_string_from_chars(&meta_packet.process_kname);
if is_src {
base_info.process_id_0 = meta_packet.process_id;
base_info.process_kname_0 = process_name;
base_info.syscall_coroutine_0 = meta_packet.coroutine_id;
base_info.pod_id_0 = meta_packet.pod_id;
} else {
base_info.process_id_1 = meta_packet.process_id;
base_info.process_kname_1 = process_name;
base_info.syscall_coroutine_1 = meta_packet.coroutine_id;
base_info.pod_id_1 = meta_packet.pod_id;
}
match (is_src, meta_packet.lookup_key.direction) {
(true, PacketDirection::ClientToServer)
| (false, PacketDirection::ServerToClient) => {
base_info.process_id_0 = meta_packet.process_id;
base_info.process_kname_0 = process_name;
}
(false, PacketDirection::ClientToServer)
| (true, PacketDirection::ServerToClient) => {
base_info.process_id_1 = meta_packet.process_id;
base_info.process_kname_1 = process_name;
}
}
}

if flow.flow.tap_side == TapSide::Local {
if flow.flow.tap_side == TapSide::Local || base_info.is_vip_interface_src {
base_info.mac_src = flow.flow.flow_key.mac_src;
}
if flow.flow.tap_side == TapSide::Local || base_info.is_vip_interface_dst {
base_info.mac_dst = flow.flow.flow_key.mac_dst;
} else {
if base_info.is_vip_interface_src {
base_info.mac_src = flow.flow.flow_key.mac_src;
}
if base_info.is_vip_interface_dst {
base_info.mac_dst = flow.flow.flow_key.mac_dst;
}
}

let seq = if let ProtocolData::TcpHeader(tcp_data) = &meta_packet.protocol_data {
Expand All @@ -180,23 +179,14 @@ impl MetaAppProto {
// ebpf info
base_info.syscall_trace_id_request = meta_packet.syscall_trace_id;
base_info.syscall_trace_id_thread_0 = meta_packet.thread_id;
base_info.syscall_cap_seq_0 = meta_packet.cap_seq;
base_info.syscall_cap_seq_0 = meta_packet.cap_seq as u32;
} else {
#[cfg(any(target_os = "linux", target_os = "android"))]
if meta_packet.signal_source == SignalSource::EBPF {
swap(&mut base_info.process_id_0, &mut base_info.process_id_1);
swap(
&mut base_info.process_kname_0,
&mut base_info.process_kname_1,
);
}

base_info.resp_tcp_seq = seq + l7_info.tcp_seq_offset();

// ebpf info
base_info.syscall_trace_id_response = meta_packet.syscall_trace_id;
base_info.syscall_trace_id_thread_1 = meta_packet.thread_id;
base_info.syscall_cap_seq_1 = meta_packet.cap_seq;
base_info.syscall_cap_seq_1 = meta_packet.cap_seq as u32;
}

Some(Self {
Expand Down Expand Up @@ -242,7 +232,7 @@ impl MetaAppProto {
if self.base_info.head.msg_type == LogMessageType::Request {
cap_seq += 1;
};
flow_id_part | ((proto as u64) << 24) | (cap_seq & 0xffffff)
flow_id_part | ((proto as u64) << 24) | (cap_seq as u64 & 0xffffff)
}
}

Expand Down Expand Up @@ -632,7 +622,7 @@ impl SessionQueue {
return;
}

if !self.throttle.acquire(item.base_info.start_time) {
if !self.throttle.acquire(item.base_info.start_time.into()) {
self.counter.throttle_drop.fetch_add(1, Ordering::Relaxed);
return;
}
Expand Down