Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Agent] Optimizes HTTP2 parsing headers #5035

Merged
merged 2 commits into from Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 6 additions & 2 deletions agent/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion agent/Cargo.toml
Expand Up @@ -34,8 +34,9 @@ futures = "~0.3"
grpc = { path = "plugins/grpc" }
hex = "0.4.3"
hostname = "0.3.1"
hpack = "0.3.0"
hpack = { git = "https://github.com/deepflowio/hpack-rs/" }
http = "0.2.5"
http2 = { path = "plugins/http2" }
humantime-serde = "1.0"
hyper = { version = "0.14", features = ["full"] }
ipnet = "2.4.0"
Expand Down
8 changes: 8 additions & 0 deletions agent/plugins/http2/Cargo.toml
@@ -0,0 +1,8 @@
[package]
name = "http2"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
15 changes: 15 additions & 0 deletions agent/plugins/http2/src/lib.rs
@@ -0,0 +1,15 @@
use std::collections::HashSet;

pub fn get_expected_headers() -> HashSet<Vec<u8>> {
let mut hash_set = HashSet::new();
hash_set.insert(b":method".to_vec());
hash_set.insert(b":status".to_vec());
hash_set.insert(b"host".to_vec());
hash_set.insert(b":authority".to_vec());
hash_set.insert(b":path".to_vec());
hash_set.insert(b"content-type".to_vec());
hash_set.insert(b"content-length".to_vec());
hash_set.insert(b"user-agent".to_vec());
hash_set.insert(b"referer".to_vec());
hash_set
}
17 changes: 14 additions & 3 deletions agent/src/config/handler.rs
Expand Up @@ -29,6 +29,7 @@ use flexi_logger::{
writers::FileLogWriter, Age, Cleanup, Criterion, FileSpec, FlexiLoggerError, LoggerHandle,
Naming,
};
use http2::get_expected_headers;
use log::{info, warn, Level};
#[cfg(any(target_os = "linux", target_os = "android"))]
use nix::{
Expand Down Expand Up @@ -969,6 +970,7 @@ pub struct L7LogDynamicConfig {

trace_set: HashSet<String>,
span_set: HashSet<String>,
pub expected_headers_set: Arc<HashSet<Vec<u8>>>,
}

impl PartialEq for L7LogDynamicConfig {
Expand All @@ -991,19 +993,27 @@ impl L7LogDynamicConfig {
) -> Self {
proxy_client.make_ascii_lowercase();

let mut expected_headers_set = get_expected_headers();
expected_headers_set.insert(proxy_client.as_bytes().to_vec());
let mut x_request_id_set = HashSet::new();
for t in x_request_id.iter() {
x_request_id_set.insert(t.trim().to_string());
let t = t.trim();
expected_headers_set.insert(t.as_bytes().to_vec());
x_request_id_set.insert(t.to_string());
}

let mut trace_set = HashSet::new();
for t in trace_types.iter() {
trace_set.insert(t.to_checker_string());
let t = t.to_checker_string();
expected_headers_set.insert(t.as_bytes().to_vec());
trace_set.insert(t);
}

let mut span_set = HashSet::new();
for t in span_types.iter() {
span_set.insert(t.to_checker_string());
let t = t.to_checker_string();
expected_headers_set.insert(t.as_bytes().to_vec());
span_set.insert(t);
}

Self {
Expand All @@ -1013,6 +1023,7 @@ impl L7LogDynamicConfig {
span_types,
trace_set,
span_set,
expected_headers_set: Arc::new(expected_headers_set),
}
}

Expand Down
58 changes: 41 additions & 17 deletions agent/src/flow_generator/protocol_logs/http.rs
Expand Up @@ -14,7 +14,9 @@
* limitations under the License.
*/

use std::collections::HashSet;
use std::str;
use std::sync::Arc;

use hpack::Decoder;
use nom::AsBytes;
Expand Down Expand Up @@ -524,6 +526,9 @@ impl L7ProtocolParserInterface for HttpLog {
let Some(config) = param.parse_config else {
return false;
};
if self.http2_req_decoder.is_none() {
self.set_header_decoder(config.l7_log_dynamic.expected_headers_set.clone());
}
match param.ebpf_type {
EbpfType::GoHttp2Uprobe | EbpfType::GoHttp2UprobeData => {
if param.direction == PacketDirection::ServerToClient {
Expand Down Expand Up @@ -565,23 +570,33 @@ impl L7ProtocolParserInterface for HttpLog {
self.wasm_hook(param, payload, &mut info);
}
}
L7Protocol::Http2 | L7Protocol::Grpc => match param.ebpf_type {
EbpfType::GoHttp2Uprobe => {
self.parse_http2_go_uprobe(&config.l7_log_dynamic, payload, param, &mut info)?;
if param.parse_log {
if self.proto == L7Protocol::Http2
&& !config.http_endpoint_disabled
&& info.path.len() > 0
{
info.endpoint = Some(handle_endpoint(config, &info.path));
L7Protocol::Http2 | L7Protocol::Grpc => {
if self.http2_req_decoder.is_none() {
self.set_header_decoder(config.l7_log_dynamic.expected_headers_set.clone());
}
match param.ebpf_type {
EbpfType::GoHttp2Uprobe => {
self.parse_http2_go_uprobe(
&config.l7_log_dynamic,
payload,
param,
&mut info,
)?;
if param.parse_log {
if self.proto == L7Protocol::Http2
&& !config.http_endpoint_disabled
&& info.path.len() > 0
{
info.endpoint = Some(handle_endpoint(config, &info.path));
}
return Ok(L7ParseResult::Single(L7ProtocolInfo::HttpInfo(info)));
} else {
return Ok(L7ParseResult::None);
}
return Ok(L7ParseResult::Single(L7ProtocolInfo::HttpInfo(info)));
} else {
return Ok(L7ParseResult::None);
}
_ => self.parse_http_v2(payload, param, &mut info)?,
}
_ => self.parse_http_v2(payload, param, &mut info)?,
},
}
_ => unreachable!(),
}
match self.proto {
Expand Down Expand Up @@ -650,12 +665,17 @@ impl HttpLog {
};
Self {
proto: l7_protcol,
http2_req_decoder: Some(Decoder::new()),
http2_resp_decoder: Some(Decoder::new()),
..Default::default()
}
}

fn set_header_decoder(&mut self, expected_headers_set: Arc<HashSet<Vec<u8>>>) {
self.http2_req_decoder = Some(Decoder::new_with_expected_headers(
expected_headers_set.clone(),
));
self.http2_resp_decoder = Some(Decoder::new_with_expected_headers(expected_headers_set));
}

fn http1_check_protocol(&mut self, payload: &[u8]) -> bool {
let mut headers = parse_v1_headers(payload);
let Some(first_line) = headers.next() else {
Expand Down Expand Up @@ -1503,7 +1523,7 @@ mod tests {
let parse_config = &LogParserConfig {
l7_log_collect_nps_threshold: 10,
l7_log_session_aggr_timeout: Duration::from_secs(10),
l7_log_dynamic: config,
l7_log_dynamic: config.clone(),
..Default::default()
};
for packet in packets.iter_mut() {
Expand All @@ -1523,6 +1543,7 @@ mod tests {
span_set.insert(TraceType::Sw8.to_checker_string());
let mut http1 = HttpLog::new_v1();
let mut http2 = HttpLog::new_v2(false);
http2.set_header_decoder(config.expected_headers_set.clone());
let param = &mut ParseParam::new(packet as &MetaPacket, log_cache.clone(), true, true);
param.set_log_parse_config(parse_config);

Expand Down Expand Up @@ -1802,6 +1823,9 @@ mod tests {
let first_dst_port = packets[0].lookup_key.dst_port;

let config = LogParserConfig::default();
if http.protocol() == L7Protocol::Http2 || http.protocol() == L7Protocol::Grpc {
http.set_header_decoder(config.l7_log_dynamic.expected_headers_set.clone());
}

for packet in packets.iter_mut() {
if packet.lookup_key.dst_port == first_dst_port {
Expand Down