Skip to content

Commit

Permalink
[Agent] Optimizes HTTP2 parsing headers
Browse files Browse the repository at this point in the history
  • Loading branch information
TomatoMr authored and rvql committed Dec 28, 2023
1 parent db7aaa2 commit 6a3c0a7
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 23 deletions.
8 changes: 6 additions & 2 deletions agent/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion agent/Cargo.toml
Expand Up @@ -34,8 +34,9 @@ futures = "~0.3"
grpc = { path = "plugins/grpc" }
hex = "0.4.3"
hostname = "0.3.1"
hpack = "0.3.0"
hpack = { git = "https://github.com/deepflowio/hpack-rs/" }
http = "0.2.5"
http2 = { path = "plugins/http2" }
humantime-serde = "1.0"
hyper = { version = "0.14", features = ["full"] }
ipnet = "2.4.0"
Expand Down
8 changes: 8 additions & 0 deletions agent/plugins/http2/Cargo.toml
@@ -0,0 +1,8 @@
[package]
name = "http2"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
15 changes: 15 additions & 0 deletions agent/plugins/http2/src/lib.rs
@@ -0,0 +1,15 @@
use std::collections::HashSet;

pub fn get_expected_headers() -> HashSet<Vec<u8>> {
let mut hash_set = HashSet::new();
hash_set.insert(b":method".to_vec());
hash_set.insert(b":status".to_vec());
hash_set.insert(b"host".to_vec());
hash_set.insert(b":authority".to_vec());
hash_set.insert(b":path".to_vec());
hash_set.insert(b"content-type".to_vec());
hash_set.insert(b"content-length".to_vec());
hash_set.insert(b"user-agent".to_vec());
hash_set.insert(b"referer".to_vec());
hash_set
}
17 changes: 14 additions & 3 deletions agent/src/config/handler.rs
Expand Up @@ -29,6 +29,7 @@ use flexi_logger::{
writers::FileLogWriter, Age, Cleanup, Criterion, FileSpec, FlexiLoggerError, LoggerHandle,
Naming,
};
use http2::get_expected_headers;
use log::{info, warn, Level};
#[cfg(any(target_os = "linux", target_os = "android"))]
use nix::{
Expand Down Expand Up @@ -969,6 +970,7 @@ pub struct L7LogDynamicConfig {

trace_set: HashSet<String>,
span_set: HashSet<String>,
pub expected_headers_set: Arc<HashSet<Vec<u8>>>,
}

impl PartialEq for L7LogDynamicConfig {
Expand All @@ -991,19 +993,27 @@ impl L7LogDynamicConfig {
) -> Self {
proxy_client.make_ascii_lowercase();

let mut expected_headers_set = get_expected_headers();
expected_headers_set.insert(proxy_client.as_bytes().to_vec());
let mut x_request_id_set = HashSet::new();
for t in x_request_id.iter() {
x_request_id_set.insert(t.trim().to_string());
let t = t.trim();
expected_headers_set.insert(t.as_bytes().to_vec());
x_request_id_set.insert(t.to_string());
}

let mut trace_set = HashSet::new();
for t in trace_types.iter() {
trace_set.insert(t.to_checker_string());
let t = t.to_checker_string();
expected_headers_set.insert(t.as_bytes().to_vec());
trace_set.insert(t);
}

let mut span_set = HashSet::new();
for t in span_types.iter() {
span_set.insert(t.to_checker_string());
let t = t.to_checker_string();
expected_headers_set.insert(t.as_bytes().to_vec());
span_set.insert(t);
}

Self {
Expand All @@ -1013,6 +1023,7 @@ impl L7LogDynamicConfig {
span_types,
trace_set,
span_set,
expected_headers_set: Arc::new(expected_headers_set),
}
}

Expand Down
58 changes: 41 additions & 17 deletions agent/src/flow_generator/protocol_logs/http.rs
Expand Up @@ -14,7 +14,9 @@
* limitations under the License.
*/

use std::collections::HashSet;
use std::str;
use std::sync::Arc;

use hpack::Decoder;
use nom::AsBytes;
Expand Down Expand Up @@ -524,6 +526,9 @@ impl L7ProtocolParserInterface for HttpLog {
let Some(config) = param.parse_config else {
return false;
};
if self.http2_req_decoder.is_none() {
self.set_header_decoder(config.l7_log_dynamic.expected_headers_set.clone());
}
match param.ebpf_type {
EbpfType::GoHttp2Uprobe | EbpfType::GoHttp2UprobeData => {
if param.direction == PacketDirection::ServerToClient {
Expand Down Expand Up @@ -565,23 +570,33 @@ impl L7ProtocolParserInterface for HttpLog {
self.wasm_hook(param, payload, &mut info);
}
}
L7Protocol::Http2 | L7Protocol::Grpc => match param.ebpf_type {
EbpfType::GoHttp2Uprobe => {
self.parse_http2_go_uprobe(&config.l7_log_dynamic, payload, param, &mut info)?;
if param.parse_log {
if self.proto == L7Protocol::Http2
&& !config.http_endpoint_disabled
&& info.path.len() > 0
{
info.endpoint = Some(handle_endpoint(config, &info.path));
L7Protocol::Http2 | L7Protocol::Grpc => {
if self.http2_req_decoder.is_none() {
self.set_header_decoder(config.l7_log_dynamic.expected_headers_set.clone());
}
match param.ebpf_type {
EbpfType::GoHttp2Uprobe => {
self.parse_http2_go_uprobe(
&config.l7_log_dynamic,
payload,
param,
&mut info,
)?;
if param.parse_log {
if self.proto == L7Protocol::Http2
&& !config.http_endpoint_disabled
&& info.path.len() > 0
{
info.endpoint = Some(handle_endpoint(config, &info.path));
}
return Ok(L7ParseResult::Single(L7ProtocolInfo::HttpInfo(info)));
} else {
return Ok(L7ParseResult::None);
}
return Ok(L7ParseResult::Single(L7ProtocolInfo::HttpInfo(info)));
} else {
return Ok(L7ParseResult::None);
}
_ => self.parse_http_v2(payload, param, &mut info)?,
}
_ => self.parse_http_v2(payload, param, &mut info)?,
},
}
_ => unreachable!(),
}
match self.proto {
Expand Down Expand Up @@ -650,12 +665,17 @@ impl HttpLog {
};
Self {
proto: l7_protcol,
http2_req_decoder: Some(Decoder::new()),
http2_resp_decoder: Some(Decoder::new()),
..Default::default()
}
}

fn set_header_decoder(&mut self, expected_headers_set: Arc<HashSet<Vec<u8>>>) {
self.http2_req_decoder = Some(Decoder::new_with_expected_headers(
expected_headers_set.clone(),
));
self.http2_resp_decoder = Some(Decoder::new_with_expected_headers(expected_headers_set));
}

fn http1_check_protocol(&mut self, payload: &[u8]) -> bool {
let mut headers = parse_v1_headers(payload);
let Some(first_line) = headers.next() else {
Expand Down Expand Up @@ -1503,7 +1523,7 @@ mod tests {
let parse_config = &LogParserConfig {
l7_log_collect_nps_threshold: 10,
l7_log_session_aggr_timeout: Duration::from_secs(10),
l7_log_dynamic: config,
l7_log_dynamic: config.clone(),
..Default::default()
};
for packet in packets.iter_mut() {
Expand All @@ -1523,6 +1543,7 @@ mod tests {
span_set.insert(TraceType::Sw8.to_checker_string());
let mut http1 = HttpLog::new_v1();
let mut http2 = HttpLog::new_v2(false);
http2.set_header_decoder(config.expected_headers_set.clone());
let param = &mut ParseParam::new(packet as &MetaPacket, log_cache.clone(), true, true);
param.set_log_parse_config(parse_config);

Expand Down Expand Up @@ -1802,6 +1823,9 @@ mod tests {
let first_dst_port = packets[0].lookup_key.dst_port;

let config = LogParserConfig::default();
if http.protocol() == L7Protocol::Http2 || http.protocol() == L7Protocol::Grpc {
http.set_header_decoder(config.l7_log_dynamic.expected_headers_set.clone());
}

for packet in packets.iter_mut() {
if packet.lookup_key.dst_port == first_dst_port {
Expand Down

0 comments on commit 6a3c0a7

Please sign in to comment.