Skip to content

Commit

Permalink
[AGENT] Improve l7 log merge
Browse files Browse the repository at this point in the history
- Avoid creating new `Box<MetaAppProto>` on merge failure
- Merge `on_request_log` and `on_response_log` logic
  • Loading branch information
rvql committed Nov 14, 2023
1 parent 04f3986 commit b615871
Show file tree
Hide file tree
Showing 17 changed files with 171 additions and 255 deletions.
2 changes: 1 addition & 1 deletion agent/src/common/l7_protocol_info.rs
Expand Up @@ -88,7 +88,7 @@ pub trait L7ProtocolInfoInterface: Into<L7ProtocolSendLog> {
// 返回的错误暂时无视
// =============================================================
// merge request and response. now return err will have no effect.
fn merge_log(&mut self, other: L7ProtocolInfo) -> Result<()>;
fn merge_log(&mut self, other: &mut L7ProtocolInfo) -> Result<()>;

fn app_proto_head(&self) -> Option<AppProtoHead>;

Expand Down
10 changes: 7 additions & 3 deletions agent/src/flow_generator/protocol_logs/dns.rs
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use serde::Serialize;

use super::pb_adapter::{ExtendedInfo, L7ProtocolSendLog, L7Request, L7Response};
Expand Down Expand Up @@ -66,7 +67,10 @@ impl L7ProtocolInfoInterface for DnsInfo {
Some(self.trans_id as u32)
}

fn merge_log(&mut self, other: crate::common::l7_protocol_info::L7ProtocolInfo) -> Result<()> {
fn merge_log(
&mut self,
other: &mut crate::common::l7_protocol_info::L7ProtocolInfo,
) -> Result<()> {
if let L7ProtocolInfo::DnsInfo(other) = other {
self.merge(other);
}
Expand All @@ -87,8 +91,8 @@ impl L7ProtocolInfoInterface for DnsInfo {
}

impl DnsInfo {
pub fn merge(&mut self, other: Self) {
self.answers = other.answers;
pub fn merge(&mut self, other: &mut Self) {
std::mem::swap(&mut self.answers, &mut other.answers);
if other.status != L7ResponseStatus::default() {
self.status = other.status;
}
Expand Down
14 changes: 5 additions & 9 deletions agent/src/flow_generator/protocol_logs/fastcgi.rs
Expand Up @@ -105,16 +105,12 @@ impl L7ProtocolInfoInterface for FastCGIInfo {
Some(self.request_id)
}

fn merge_log(&mut self, other: L7ProtocolInfo) -> Result<()> {
fn merge_log(&mut self, other: &mut L7ProtocolInfo) -> Result<()> {
if let L7ProtocolInfo::FastCGIInfo(info) = other {
self.status = info.status;
self.status_code = info.status_code;
if self.trace_id.is_empty() {
self.trace_id = info.trace_id;
}
if self.span_id.is_empty() {
self.span_id = info.span_id;
}
super::swap_if!(self, trace_id, is_empty, info);
super::swap_if!(self, span_id, is_empty, info);
}

Ok(())
Expand Down Expand Up @@ -653,12 +649,12 @@ mod test {
let resp_param = &ParseParam::new(&p[1], log_cache.clone(), true, true);
let resp_payload = p[1].get_l4_payload().unwrap();
assert_eq!((&mut parser).check_payload(resp_payload, resp_param), false);
let resp = (&mut parser)
let mut resp = (&mut parser)
.parse_payload(resp_payload, resp_param)
.unwrap()
.unwrap_single();

req.merge_log(resp).unwrap();
req.merge_log(&mut resp).unwrap();
if let L7ProtocolInfo::FastCGIInfo(info) = req {
return (info, parser.perf_stats.unwrap());
}
Expand Down
55 changes: 15 additions & 40 deletions agent/src/flow_generator/protocol_logs/http.rs
Expand Up @@ -166,7 +166,7 @@ impl L7ProtocolInfoInterface for HttpInfo {
self.stream_id
}

fn merge_log(&mut self, other: L7ProtocolInfo) -> Result<()> {
fn merge_log(&mut self, other: &mut L7ProtocolInfo) -> Result<()> {
if let L7ProtocolInfo::HttpInfo(other) = other {
return self.merge(other);
}
Expand Down Expand Up @@ -215,30 +215,18 @@ impl L7ProtocolInfoInterface for HttpInfo {
}

impl HttpInfo {
pub fn merge(&mut self, other: Self) -> Result<()> {
pub fn merge(&mut self, other: &mut Self) -> Result<()> {
let other_is_grpc = other.is_grpc();

match other.msg_type {
// merge with request
LogMessageType::Request => {
if self.path.is_empty() {
self.path = other.path;
}
if self.host.is_empty() {
self.host = other.host;
}
if self.method.is_empty() {
self.method = other.method;
}
if self.user_agent.is_none() {
self.user_agent = other.user_agent;
}
if self.referer.is_none() {
self.referer = other.referer;
}
if self.endpoint.is_none() {
self.endpoint = other.endpoint;
}
super::swap_if!(self, path, is_empty, other);
super::swap_if!(self, host, is_empty, other);
super::swap_if!(self, method, is_empty, other);
super::swap_if!(self, user_agent, is_none, other);
super::swap_if!(self, referer, is_none, other);
super::swap_if!(self, endpoint, is_none, other);
// 下面用于判断是否结束
// ================
// determine whether request is end
Expand All @@ -258,13 +246,8 @@ impl HttpInfo {
self.status_code = other.status_code;
}

if other.custom_exception.is_some() {
self.custom_exception = other.custom_exception;
}

if other.custom_result.is_some() {
self.custom_result = other.custom_result
}
super::swap_if!(self, custom_exception, is_none, other);
super::swap_if!(self, custom_result, is_none, other);

if self.resp_content_length.is_none() {
self.resp_content_length = other.resp_content_length;
Expand All @@ -280,19 +263,11 @@ impl HttpInfo {
if other_is_grpc {
self.proto = L7Protocol::Grpc;
}
if self.trace_id.is_empty() {
self.trace_id = other.trace_id;
}
if self.span_id.is_empty() {
self.span_id = other.span_id;
}
if self.x_request_id_0.is_empty() {
self.x_request_id_0 = other.x_request_id_0.clone();
}
if self.x_request_id_1.is_empty() {
self.x_request_id_1 = other.x_request_id_1.clone();
}
self.attributes.extend(other.attributes);
super::swap_if!(self, trace_id, is_empty, other);
super::swap_if!(self, span_id, is_empty, other);
super::swap_if!(self, x_request_id_0, is_empty, other);
super::swap_if!(self, x_request_id_1, is_empty, other);
self.attributes.append(&mut other.attributes);
Ok(())
}

Expand Down
21 changes: 18 additions & 3 deletions agent/src/flow_generator/protocol_logs/mod.rs
Expand Up @@ -322,18 +322,18 @@ impl From<AppProtoLogsBaseInfo> for flow_log::AppProtoLogsBaseInfo {

impl AppProtoLogsBaseInfo {
// 请求调用回应来合并
fn merge(&mut self, log: AppProtoLogsBaseInfo) {
fn merge(&mut self, log: &mut AppProtoLogsBaseInfo) {
// adjust protocol when change, now only use for http2 change to grpc.
if self.head.proto != log.head.proto {
self.head.proto = log.head.proto;
}
if log.process_id_0 > 0 {
self.process_id_0 = log.process_id_0;
self.process_kname_0 = log.process_kname_0;
std::mem::swap(&mut self.process_kname_0, &mut log.process_kname_0);
}
if log.process_id_1 > 0 {
self.process_id_1 = log.process_id_1;
self.process_kname_1 = log.process_kname_1;
std::mem::swap(&mut self.process_kname_1, &mut log.process_kname_1);
}
self.syscall_trace_id_thread_1 = log.syscall_trace_id_thread_1;
self.syscall_cap_seq_1 = log.syscall_cap_seq_1;
Expand Down Expand Up @@ -455,3 +455,18 @@ fn decode_base64_to_string(value: &str) -> String {
Err(_) => value.to_string(),
}
}

macro_rules! swap_if {
($this:expr, $field:ident, is_none, $other:expr) => {
if $this.$field.is_none() {
std::mem::swap(&mut $this.$field, &mut $other.$field);
}
};
($this:expr, $field:ident, is_empty, $other:expr) => {
if $this.$field.is_empty() {
std::mem::swap(&mut $this.$field, &mut $other.$field);
}
};
}

pub(crate) use swap_if;
11 changes: 6 additions & 5 deletions agent/src/flow_generator/protocol_logs/mq/kafka.rs
Expand Up @@ -79,7 +79,10 @@ impl L7ProtocolInfoInterface for KafkaInfo {
Some(self.correlation_id)
}

fn merge_log(&mut self, other: crate::common::l7_protocol_info::L7ProtocolInfo) -> Result<()> {
fn merge_log(
&mut self,
other: &mut crate::common::l7_protocol_info::L7ProtocolInfo,
) -> Result<()> {
if let L7ProtocolInfo::KafkaInfo(other) = other {
self.merge(other);
}
Expand All @@ -102,7 +105,7 @@ impl L7ProtocolInfoInterface for KafkaInfo {
impl KafkaInfo {
// https://kafka.apache.org/protocol.html
const API_KEY_MAX: u16 = 67;
pub fn merge(&mut self, other: Self) {
pub fn merge(&mut self, other: &mut Self) {
if self.resp_msg_size.is_none() {
self.resp_msg_size = other.resp_msg_size;
}
Expand All @@ -112,9 +115,7 @@ impl KafkaInfo {
if other.status_code.is_some() {
self.status_code = other.status_code;
}
if other.topic_name.len() > 0 {
self.topic_name = other.topic_name;
}
crate::flow_generator::protocol_logs::swap_if!(self, topic_name, is_empty, other);
}

pub fn check(&self) -> bool {
Expand Down
8 changes: 4 additions & 4 deletions agent/src/flow_generator/protocol_logs/mq/mqtt.rs
Expand Up @@ -80,7 +80,7 @@ impl L7ProtocolInfoInterface for MqttInfo {
None
}

fn merge_log(&mut self, other: L7ProtocolInfo) -> Result<()> {
fn merge_log(&mut self, other: &mut L7ProtocolInfo) -> Result<()> {
if let L7ProtocolInfo::MqttInfo(mqtt) = other {
self.merge(mqtt);
}
Expand Down Expand Up @@ -129,7 +129,7 @@ impl Default for MqttInfo {
}

impl MqttInfo {
pub fn merge(&mut self, other: Self) {
pub fn merge(&mut self, other: &mut Self) {
if self.res_msg_size.is_none() {
self.res_msg_size = other.res_msg_size;
}
Expand All @@ -141,10 +141,10 @@ impl MqttInfo {
}
match other.pkt_type {
PacketKind::Publish { .. } => {
self.publish_topic = other.publish_topic;
std::mem::swap(&mut self.publish_topic, &mut other.publish_topic);
}
PacketKind::Unsubscribe | PacketKind::Subscribe => {
self.subscribe_topics = other.subscribe_topics;
std::mem::swap(&mut self.subscribe_topics, &mut other.subscribe_topics);
}
_ => (),
}
Expand Down

0 comments on commit b615871

Please sign in to comment.