Skip to content

Commit

Permalink
Merge pull request #146 from bgpkit/feature/rib-v2-encoder
Browse files Browse the repository at this point in the history
TableDumpV2 encoder
  • Loading branch information
digizeph committed Dec 21, 2023
2 parents c7a18ab + 2a3ae32 commit 80d1fc7
Show file tree
Hide file tree
Showing 35 changed files with 1,064 additions and 336 deletions.
31 changes: 31 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
name: build

on:
push:
branches: [ main ]
pull_request:
branches: [ main ]

env:
CARGO_TERM_COLOR: always

jobs:
build:
runs-on: self-hosted
steps:
- uses: actions/checkout@v2

- name: Build
run: cargo build

- name: Build cli
run: cargo build --features cli

- name: Build no-default-features
run: cargo build --no-default-features

- name: Run tests
run: cargo test --all-features

- name: Run clippy
run: cargo clippy --all-features -- -D warnings
13 changes: 2 additions & 11 deletions .github/workflows/rust.yml → .github/workflows/format.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Rust
name: formatting

on:
push:
Expand All @@ -15,17 +15,8 @@ jobs:
steps:
- uses: actions/checkout@v2

- name: Build
run: cargo build

- name: Run tests
run: cargo test --all-features

- name: Run format check
run: cargo fmt --check

- name: Run cargo-readme check
run: cargo readme > TMP_README.md && diff -b TMP_README.md README.md

- name: Run clippy
run: cargo clippy --all-features -- -D warnings
run: cargo readme > TMP_README.md && diff -b TMP_README.md README.md
67 changes: 19 additions & 48 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,11 @@ for item in broker.into_iter().take(2) {

### Filtering BGP Messages

BGPKIT Parser also has built-in [Filter][filter] mechanism. When creating a new [`BgpkitParser`] instance,
BGPKIT Parser also has built-in [Filter] mechanism. When creating a new [`BgpkitParser`] instance,
once can also call `add_filter` function to customize the parser to only show matching messages
when iterating through [BgpElem]s.

For all types of filters, check out the [Filter][filter] enum documentation.

[filter]: crate::parser::filter::Filter
For all types of filters, check out the [Filter] enum documentation.

```rust
use bgpkit_parser::BgpkitParser;
Expand Down Expand Up @@ -159,10 +157,10 @@ fn main() {

// subscribe to messages from one collector
let msg = json!({"type": "ris_subscribe", "data": {"host": "rrc21"}}).to_string();
socket.write_message(Message::Text(msg)).unwrap();
socket.send(Message::Text(msg)).unwrap();

loop {
let msg = socket.read_message().expect("Error reading message").to_string();
let msg = socket.read().expect("Error reading message").to_string();
if let Ok(elems) = parse_ris_live_message(msg.as_str()) {
for elem in elems {
println!("{}", elem);
Expand All @@ -175,7 +173,7 @@ fn main() {
#### Parsing OpenBMP Messages From RouteViews Kafka Stream

[RouteViews](http://www.routeviews.org/routeviews/) provides a real-time Kafka stream of the OpenBMP
data received from their collectors. Below is an partial example of how we handle the raw bytes
data received from their collectors. Below is a partial example of how we handle the raw bytes
received from the Kafka stream. For full examples, check out the [examples folder on GitHub](https://github.com/bgpkit/bgpkit-parser/tree/main/examples).

```rust
Expand Down Expand Up @@ -225,45 +223,20 @@ use bgpkit_parser::Elementor;
use itertools::Itertools;
use std::io::Write;

fn main() {
const OUTPUT_FILE: &str = "as3356_mrt.gz";

println!("Start downloading and filtering BGP messages from AS3356");
let mut mrt_writer = oneio::get_writer(OUTPUT_FILE).unwrap();

let mut records_count = 0;
let mut elems_count = 0;
bgpkit_parser::BgpkitParser::new(
"http://archive.routeviews.org/bgpdata/2023.10/UPDATES/updates.20231029.2015.bz2",
)
.unwrap()
.add_filter("origin_asn", "3356")
.unwrap()
.into_record_iter()
.for_each(|record| {
let bytes = record.encode();
mrt_writer.write_all(&bytes).unwrap();
records_count += 1;
let mut elementor = Elementor::new();
elems_count += elementor.record_to_elems(record).len();
});
// make sure to properly flush bytes from writer
drop(mrt_writer);
let mut updates_encoder = bgpkit_parser::encoder::MrtUpdatesEncoder::new();

println!(
"Found and archived {} MRT records, {} BGP messages",
records_count, elems_count
);
bgpkit_parser::BgpkitParser::new(
"http://archive.routeviews.org/bgpdata/2023.10/UPDATES/updates.20231029.2015.bz2",
).unwrap()
.add_filter("origin_asn", "3356").unwrap()
.into_iter()
.for_each(|elem| {
updates_encoder.process_elem(&elem);
});

let elems = bgpkit_parser::BgpkitParser::new(OUTPUT_FILE)
.unwrap()
.into_elem_iter()
.collect_vec();
println!(
"Read {} BGP messages from the newly archived MRT file.",
elems.len()
);
}
let mut mrt_writer = oneio::get_writer("as3356_mrt.gz").unwrap();
mrt_writer.write_all(updates_encoder.export_bytes().as_ref()).unwrap();
drop(mrt_writer);
```

## Command Line Tool
Expand Down Expand Up @@ -310,13 +283,11 @@ OPTIONS:

## Data Representation

There are two key data structure to understand for the parsing results: [MrtRecord][mrtrecord] and [BgpElem].

[mrtrecord]: crate::models::MrtRecord
There are two key data structure to understand for the parsing results: [MrtRecord] and [BgpElem].

### `MrtRecord`: unmodified MRT information representation

The [`MrtRecord`][mrtrecord] is the data structure that holds the unmodified, complete information parsed from the MRT data file.
The [MrtRecord] is the data structure that holds the unmodified, complete information parsed from the MRT data file.

```rust
pub struct MrtRecord {
Expand Down
4 changes: 3 additions & 1 deletion examples/extended_communities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ fn main() {
for c in cs {
match c {
MetaCommunity::Plain(_) => {}
MetaCommunity::Extended(_) | MetaCommunity::Large(_) => {
MetaCommunity::Extended(_)
| MetaCommunity::Large(_)
| MetaCommunity::Ipv6Extended(_) => {
log::info!("{}", &elem);
}
}
Expand Down
30 changes: 30 additions & 0 deletions examples/filter_export_rib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//! This example shows how to filter the content of a RIB file and re-encode into a new RIB file
//! with the filtered content.

use std::io::Write;
use tracing::info;

fn main() {
tracing_subscriber::fmt::init();

// const RIB_URL: &str = "https://data.ris.ripe.net/rrc26/2023.12/bview.20231221.1600.gz";
const RIB_URL: &str = "unfiltered.rib.gz";
let mut encoder = bgpkit_parser::encoder::MrtRibEncoder::new();
let parser = bgpkit_parser::BgpkitParser::new(RIB_URL)
.unwrap()
.add_filter("origin_asn", "13335")
.unwrap()
.disable_warnings();

info!("processing rib {}", RIB_URL);
for elem in parser {
encoder.process_elem(&elem);
}

info!("exporting filtered RIB...");
let mut writer = oneio::get_writer("filtered-13335.rib.gz").unwrap();
writer.write_all(encoder.export_bytes().as_ref()).unwrap();
drop(writer);

info!("exporting filtered RIB...done");
}
2 changes: 1 addition & 1 deletion examples/real-time-routeviews-kafka-to-mrt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ fn consume_and_archive(
let bmp_msg = parse_bmp_msg(&mut bytes);
match bmp_msg {
Ok(msg) => {
let mrt_record = match bgpkit_parser::models::MrtRecord::try_from(msg) {
let mrt_record = match bgpkit_parser::models::MrtRecord::try_from(&msg) {
Ok(r) => r,
Err(msg) => {
dbg!(msg);
Expand Down
5 changes: 5 additions & 0 deletions src/encoder/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod rib_encoder;
mod updates_encoder;

pub use rib_encoder::MrtRibEncoder;
pub use updates_encoder::MrtUpdatesEncoder;
170 changes: 170 additions & 0 deletions src/encoder/rib_encoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
//! MRT Encoder module
//!
//! `mrt_encoder` module handles serializing BGP/MRT messages back to MRT binary files. The main
//! difficulty part of this process is the handling of TableDumpV2 RIB dumps, which requires
//! reconstructing the peer index table before encoding all other contents.

use crate::models::{
Attributes, BgpElem, CommonHeader, EntryType, MrtMessage, NetworkPrefix, Peer, PeerIndexTable,
RibAfiEntries, RibEntry, TableDumpV2Message, TableDumpV2Type,
};
use bytes::{Bytes, BytesMut};
use ipnet::IpNet;
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr};

#[derive(Default)]
pub struct MrtRibEncoder {
index_table: PeerIndexTable,

per_prefix_entries_map: HashMap<IpNet, HashMap<u16, RibEntry>>,

timestamp: f64,
}

fn convert_timestamp(timestamp: f64) -> (u32, u32) {
let seconds = timestamp as u32;
let microseconds = ((timestamp - seconds as f64) * 1_000_000.0) as u32;
(seconds, microseconds)
}

impl MrtRibEncoder {
pub fn new() -> Self {
Self::default()
}

pub fn reset(&mut self) {
self.index_table = PeerIndexTable::default();
self.per_prefix_entries_map = HashMap::default();
self.timestamp = 0.0;
}

/// Processes a BgpElem and updates the internal data structures.
///
/// # Arguments
///
/// * `elem` - A reference to a BgpElem that contains the information to be processed.
pub fn process_elem(&mut self, elem: &BgpElem) {
if self.timestamp == 0.0 {
self.timestamp = elem.timestamp;
}
let bgp_identifier = match elem.peer_ip {
IpAddr::V4(ip) => ip,
IpAddr::V6(_ip) => Ipv4Addr::from(0),
};
let peer = Peer::new(bgp_identifier, elem.peer_ip, elem.peer_asn);
let peer_id = self.index_table.add_peer(peer);
let prefix = elem.prefix.prefix;

let entries_map = self.per_prefix_entries_map.entry(prefix).or_default();
let entry = RibEntry {
peer_index: peer_id,
originated_time: elem.timestamp as u32,
attributes: Attributes::from(elem),
};
entries_map.insert(peer_id, entry);
}

/// Export the data stored in the struct to a byte array.
///
/// The function first encodes the peer-index-table data into a `MrtMessage` and appends it to the `BytesMut` object.
/// Then, for each prefix in the `per_prefix_entries_map`, it creates a `RibAfiEntries` object and encodes it as a `MrtMessage`.
/// The resulting `BytesMut` object is then converted to an immutable `Bytes` object using `freeze()` and returned.
///
/// # Return
/// Returns a `Bytes` object containing the exported data as a byte array.
pub fn export_bytes(&mut self) -> Bytes {
let mut bytes = BytesMut::new();

// encode peer-index-table
let mrt_message = MrtMessage::TableDumpV2Message(TableDumpV2Message::PeerIndexTable(
self.index_table.clone(),
));
let (seconds, _microseconds) = convert_timestamp(self.timestamp);
let subtype = TableDumpV2Type::PeerIndexTable as u16;
let data_bytes = mrt_message.encode(subtype);
let header = CommonHeader {
timestamp: seconds,
microsecond_timestamp: None,
entry_type: EntryType::TABLE_DUMP_V2,
entry_subtype: subtype,
length: data_bytes.len() as u32,
};
let header_bytes = header.encode();
bytes.extend(header_bytes);
bytes.extend(data_bytes);

// encode each RibAfiEntries
for (entry_count, (prefix, entries_map)) in self.per_prefix_entries_map.iter().enumerate() {
let rib_type = match prefix.addr().is_ipv6() {
true => TableDumpV2Type::RibIpv6Unicast,
false => TableDumpV2Type::RibIpv4Unicast,
};

let mut prefix_rib_entry = RibAfiEntries {
rib_type,
sequence_number: entry_count as u32,
prefix: NetworkPrefix::new(*prefix, 0),
rib_entries: vec![],
};
for entry in entries_map.values() {
prefix_rib_entry.rib_entries.push(entry.clone());
}

let mrt_message =
MrtMessage::TableDumpV2Message(TableDumpV2Message::RibAfi(prefix_rib_entry));

let (seconds, _microseconds) = convert_timestamp(self.timestamp);
let subtype = rib_type as u16;
let data_bytes = mrt_message.encode(subtype);
let header_bytes = CommonHeader {
timestamp: seconds,
microsecond_timestamp: None,
entry_type: EntryType::TABLE_DUMP_V2,
entry_subtype: subtype,
length: data_bytes.len() as u32,
}
.encode();
bytes.extend(header_bytes);
bytes.extend(data_bytes);
}

self.reset();

bytes.freeze()
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::models::Asn;
use crate::parse_mrt_record;
use bytes::Buf;
use std::io::Cursor;

#[test]
fn test_encoding_rib() {
let mut encoder = MrtRibEncoder::new();
let mut elem = BgpElem::default();
elem.peer_ip = IpAddr::V4("10.0.0.1".parse().unwrap());
elem.peer_asn = Asn::from(65000);
elem.prefix.prefix = "10.250.0.0/24".parse().unwrap();
encoder.process_elem(&elem);
elem.prefix.prefix = "10.251.0.0/24".parse().unwrap();
encoder.process_elem(&elem);
let bytes = encoder.export_bytes();

let mut cursor = Cursor::new(bytes.clone());
while cursor.has_remaining() {
let parsed = parse_mrt_record(&mut cursor).unwrap();
dbg!(&parsed);
}

let mut cursor = Cursor::new(bytes);
let parser = crate::BgpkitParser::from_reader(&mut cursor);
for elem in parser {
println!("{}", elem);
}
}
}

0 comments on commit 80d1fc7

Please sign in to comment.