Skip to content

Commit

Permalink
feat: Refactor dnstap to use 'OwnedValuePath's (vectordotdev#18212)
Browse files Browse the repository at this point in the history
* feat: Refactor dnstap to use 'OwnedValuePath's

* clippy fixes

* strengthen tests
  • Loading branch information
pront committed Aug 11, 2023
1 parent eaed0a8 commit ca7fa05
Show file tree
Hide file tree
Showing 4 changed files with 868 additions and 1,195 deletions.
14 changes: 4 additions & 10 deletions benches/dnstap/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
use bytes::Bytes;
use criterion::{criterion_group, criterion_main, BatchSize, Criterion, Throughput};
use vector::{
event::LogEvent,
sources::dnstap::{schema::DnstapEventSchema, DnstapParser},
};
use vector::event::LogEvent;
use vector::sources::dnstap::parser::DnstapParser;

fn benchmark_query_parsing(c: &mut Criterion) {
let mut event = LogEvent::default();
let schema = DnstapEventSchema::new();
let mut parser = DnstapParser::new(&schema, &mut event);
let raw_dnstap_data = "ChVqYW1lcy1WaXJ0dWFsLU1hY2hpbmUSC0JJTkQgOS4xNi4zcnoIAxACGAEiEAAAAAAAAA\
AAAAAAAAAAAAAqECABBQJwlAAAAAAAAAAAADAw8+0CODVA7+zq9wVNMU3WNlI2kwIAAAABAAAAAAABCWZhY2Vib29rMQNjb\
20AAAEAAQAAKQIAAACAAAAMAAoACOxjCAG9zVgzWgUDY29tAHgB";
Expand All @@ -19,7 +15,7 @@ fn benchmark_query_parsing(c: &mut Criterion) {
group.bench_function("dns_query_parsing", |b| {
b.iter_batched(
|| dnstap_data.clone(),
|dnstap_data| parser.parse_dnstap_data(Bytes::from(dnstap_data)).unwrap(),
|dnstap_data| DnstapParser::parse(&mut event, Bytes::from(dnstap_data)).unwrap(),
BatchSize::SmallInput,
)
});
Expand All @@ -29,8 +25,6 @@ fn benchmark_query_parsing(c: &mut Criterion) {

fn benchmark_update_parsing(c: &mut Criterion) {
let mut event = LogEvent::default();
let schema = DnstapEventSchema::new();
let mut parser = DnstapParser::new(&schema, &mut event);
let raw_dnstap_data = "ChVqYW1lcy1WaXJ0dWFsLU1hY2hpbmUSC0JJTkQgOS4xNi4zcmsIDhABGAEiBH8AAA\
EqBH8AAAEwrG44AEC+iu73BU14gfofUh1wi6gAAAEAAAAAAAAHZXhhbXBsZQNjb20AAAYAAWC+iu73BW0agDwvch1wi6gAA\
AEAAAAAAAAHZXhhbXBsZQNjb20AAAYAAXgB";
Expand All @@ -41,7 +35,7 @@ fn benchmark_update_parsing(c: &mut Criterion) {
group.bench_function("dns_update_parsing", |b| {
b.iter_batched(
|| dnstap_data.clone(),
|dnstap_data| parser.parse_dnstap_data(Bytes::from(dnstap_data)).unwrap(),
|dnstap_data| DnstapParser::parse(&mut event, Bytes::from(dnstap_data)).unwrap(),
BatchSize::SmallInput,
)
});
Expand Down
26 changes: 7 additions & 19 deletions src/sources/dnstap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use vector_common::internal_event::{
ByteSize, BytesReceived, InternalEventHandle as _, Protocol, Registered,
};
use vector_config::configurable_component;
use vrl::event_path;
use vrl::path::PathPrefix;
use vrl::value::{kind::Collection, Kind};

use super::util::framestream::{build_framestream_unix_source, FrameHandler};
Expand All @@ -19,9 +19,9 @@ use crate::{
};

pub mod parser;
pub use parser::{parse_dnstap_data, DnstapParser};

pub mod schema;
use crate::sources::dnstap::parser::DnstapParser;
use crate::sources::dnstap::schema::DNSTAP_VALUE_PATHS;
use dnsmsg_parser::{dns_message, dns_message_parser};
use lookup::lookup_v2::OptionalValuePath;
pub use schema::DnstapEventSchema;
Expand Down Expand Up @@ -109,19 +109,11 @@ impl DnstapConfig {
"protobuf:dnstap.Dnstap".to_string() //content-type for framestream
}

fn event_schema(timestamp_key: Option<&OwnedValuePath>) -> DnstapEventSchema {
let mut schema = DnstapEventSchema::new();
schema
.dnstap_root_data_schema_mut()
.set_timestamp(timestamp_key.cloned());
schema
}

pub fn schema_definition(
&self,
log_namespace: LogNamespace,
) -> vector_core::schema::Definition {
let event_schema = Self::event_schema(log_schema().timestamp_key());
let event_schema = DnstapEventSchema;

match log_namespace {
LogNamespace::Legacy => {
Expand Down Expand Up @@ -205,7 +197,6 @@ pub struct DnstapFrameHandler {
max_frame_length: usize,
socket_path: PathBuf,
content_type: String,
schema: DnstapEventSchema,
raw_data_only: bool,
multithreaded: bool,
max_frame_handling_tasks: u32,
Expand All @@ -224,8 +215,6 @@ impl DnstapFrameHandler {
let source_type_key = log_schema().source_type_key();
let timestamp_key = log_schema().timestamp_key();

let schema = DnstapConfig::event_schema(timestamp_key);

let host_key = config
.host_key
.clone()
Expand All @@ -235,7 +224,6 @@ impl DnstapFrameHandler {
max_frame_length: config.max_frame_length,
socket_path: config.socket_path.clone(),
content_type: config.content_type(),
schema,
raw_data_only: config.raw_data_only.unwrap_or(false),
multithreaded: config.multithreaded.unwrap_or(false),
max_frame_handling_tasks: config.max_frame_handling_tasks.unwrap_or(1000),
Expand Down Expand Up @@ -281,10 +269,10 @@ impl FrameHandler for DnstapFrameHandler {

if self.raw_data_only {
log_event.insert(
event_path!(self.schema.dnstap_root_data_schema().raw_data()),
(PathPrefix::Event, &DNSTAP_VALUE_PATHS.raw_data),
BASE64_STANDARD.encode(&frame),
);
} else if let Err(err) = parse_dnstap_data(&self.schema, &mut log_event, frame) {
} else if let Err(err) = DnstapParser::parse(&mut log_event, frame) {
emit!(DnstapParseError {
error: format!("Dnstap protobuf decode error {:?}.", err)
});
Expand Down Expand Up @@ -408,7 +396,7 @@ mod tests {
let mut event = Event::from(LogEvent::from(vrl::value::Value::from(json)));
event.as_mut_log().insert("timestamp", chrono::Utc::now());

let definition = DnstapConfig::event_schema(Some(&owned_value_path!("timestamp")));
let definition = DnstapEventSchema;
let schema = vector_core::schema::Definition::empty_legacy_namespace()
.with_standard_vector_source_metadata();

Expand Down
Loading

0 comments on commit ca7fa05

Please sign in to comment.