Skip to content

Commit

Permalink
Dummy parsing of almost all messages with basic tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Klapeyron committed Dec 5, 2020
1 parent bfc822e commit a168bd8
Showing 1 changed file with 260 additions and 56 deletions.
316 changes: 260 additions & 56 deletions src/messages/receiver.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use crate::common::validity_trait::Validity;
use crate::messages::ack_nack::AckNack;
use crate::messages::gap::Gap;
use crate::messages::header::Header;
use crate::messages::info_source::InfoSource;
use crate::messages::heartbeat::Heartbeat;
use crate::messages::heartbeat_frag::HeartbeatFrag;
use crate::messages::info_destination::InfoDestination;
use crate::messages::nack_frag::NackFrag;
use crate::messages::protocol_version::ProtocolVersion_t;
use crate::messages::submessage::EntitySubmessage;
use crate::messages::submessage_header::SubmessageHeader;
use crate::messages::submessage_kind::SubmessageKind;
use crate::messages::vendor_id::VendorId_t;
use crate::messages::{ack_nack::AckNack, gap::Gap, header::Header, info_source::InfoSource};
use crate::structure::guid_prefix::GuidPrefix_t;
use crate::structure::locator::{LocatorKind_t, LocatorList_t, Locator_t};

use crate::structure::time::Time_t;

use log::info;
use speedy::{Endianness, Readable};
use std::io::{Error, ErrorKind};

Expand Down Expand Up @@ -110,63 +112,146 @@ impl Decoder for MessageReceiver {
)
.and_then(|submessage_header| {
match submessage_header.submessage_id {
SubmessageKind::ACKNACK => {
let ack_nack = AckNack::read_from_buffer_owned_with_ctx(
submessage_header.flags.endianness_flag(),
&bytes.split_to(submessage_header.submessage_length.into()),
)?;
Ok(Some(EntitySubmessage::AckNack(
ack_nack,
submessage_header.flags,
)))
SubmessageKind::INFO_TS | SubmessageKind::PAD
if submessage_header.submessage_length == 0 =>
{
// It is the last submessage
self.state = DeserializationState::ReadingHeader;
}
SubmessageKind::GAP => {
let gap = Gap::read_from_buffer_owned_with_ctx(
submessage_header.flags.endianness_flag(),
&bytes.split_to(submessage_header.submessage_length.into()),
)?;
Ok(Some(EntitySubmessage::Gap(gap)))
_ => {}
}
Ok(submessage_header)
})
.and_then(|submessage_header| match submessage_header.submessage_id {
SubmessageKind::ACKNACK => {
let ack_nack = AckNack::read_from_buffer_owned_with_ctx(
submessage_header.flags.endianness_flag(),
&bytes.split_to(submessage_header.submessage_length.into()),
)?;
Ok(Some(EntitySubmessage::AckNack(
ack_nack,
submessage_header.flags,
)))
}
SubmessageKind::DATA => {
unimplemented!();
}
SubmessageKind::DATA_FRAG => {
unimplemented!();
}
SubmessageKind::GAP => {
let gap = Gap::read_from_buffer_owned_with_ctx(
submessage_header.flags.endianness_flag(),
&bytes.split_to(submessage_header.submessage_length.into()),
)?;
Ok(Some(EntitySubmessage::Gap(gap)))
}
SubmessageKind::NACK_FRAG => {
let nack_frag = NackFrag::read_from_buffer_owned_with_ctx(
submessage_header.flags.endianness_flag(),
&bytes.split_to(submessage_header.submessage_length.into()),
)?;

Ok(Some(EntitySubmessage::NackFrag(nack_frag)))
}
SubmessageKind::HEARTBEAT => {
let heartbeat = Heartbeat::read_from_buffer_owned_with_ctx(
submessage_header.flags.endianness_flag(),
&bytes.split_to(submessage_header.submessage_length.into()),
)?;

Ok(Some(EntitySubmessage::Heartbeat(
heartbeat,
submessage_header.flags,
)))
}
SubmessageKind::HEARTBEAT_FRAG => {
let heartbeat_frag = HeartbeatFrag::read_from_buffer_owned_with_ctx(
submessage_header.flags.endianness_flag(),
&bytes.split_to(submessage_header.submessage_length.into()),
)?;

Ok(Some(EntitySubmessage::HeartbeatFrag(heartbeat_frag)))
}
SubmessageKind::INFO_SRC => {
let info_src = InfoSource::read_from_buffer_owned_with_ctx(
submessage_header.flags.endianness_flag(),
&bytes.split_to(submessage_header.submessage_length.into()),
)?;
self.receiver.source_guid_prefix = info_src.guid_prefix;
self.receiver.source_version = info_src.protocol_version;
self.receiver.source_vendor_id = info_src.vendor_id;
self.receiver.unicast_reply_locator_list = vec![Locator_t::LOCATOR_INVALID];
self.receiver.multicast_reply_locator_list =
vec![Locator_t::LOCATOR_INVALID];
self.receiver.have_timestamp = false;

Ok(None)
}
SubmessageKind::INFO_DST => {
let info_dst = InfoDestination::read_from_buffer_owned_with_ctx(
submessage_header.flags.endianness_flag(),
&bytes.split_to(submessage_header.submessage_length.into()),
)?;

if info_dst.guid_prefix != GuidPrefix_t::GUIDPREFIX_UNKNOWN {
self.receiver.dest_guid_prefix = info_dst.guid_prefix;
}
SubmessageKind::INFO_SRC => {
let info_src = InfoSource::read_from_buffer_owned_with_ctx(

Ok(None)
}
SubmessageKind::INFO_REPLAY => {
let mut bytes = bytes.split_to(submessage_header.submessage_length.into());
let (unicast_locator_list, read_bytes) =
LocatorList_t::read_with_length_from_buffer_with_ctx(
submessage_header.flags.endianness_flag(),
&bytes,
);
self.receiver.unicast_reply_locator_list = unicast_locator_list?;

use crate::bytes::Buf;
let mut bytes = bytes.split_to(read_bytes);

self.receiver.multicast_reply_locator_list =
if submessage_header.flags.is_flag_set(0x02) {
let (multicast_locator_list, read_bytes) =
LocatorList_t::read_with_length_from_buffer_with_ctx(
submessage_header.flags.endianness_flag(),
&bytes,
);
bytes.advance(read_bytes);
multicast_locator_list?
} else {
vec![]
};

Ok(None)
}
SubmessageKind::INFO_TS => {
if !submessage_header.flags.is_flag_set(0x02) {
let timestamp = Time_t::read_from_buffer_owned_with_ctx(
submessage_header.flags.endianness_flag(),
&bytes.split_to(submessage_header.submessage_length.into()),
)?;
self.receiver.source_guid_prefix = info_src.guid_prefix;
self.receiver.source_version = info_src.protocol_version;
self.receiver.source_vendor_id = info_src.vendor_id;
self.receiver.unicast_reply_locator_list =
vec![Locator_t::LOCATOR_INVALID];
self.receiver.multicast_reply_locator_list =
vec![Locator_t::LOCATOR_INVALID];
self.receiver.have_timestamp = true;
self.receiver.timestamp = timestamp;
} else {
self.receiver.have_timestamp = false;

Ok(None)
}
SubmessageKind::INFO_TS => {
if !submessage_header.flags.is_flag_set(0x02) {
let timestamp = Time_t::read_from_buffer_owned_with_ctx(
submessage_header.flags.endianness_flag(),
&bytes.split_to(submessage_header.submessage_length.into()),
)?;
self.receiver.have_timestamp = true;
self.receiver.timestamp = timestamp;
} else {
self.receiver.have_timestamp = false;
}

Ok(None)
}
/*
SubmessageKind::DATA => Ok(None),
SubmessageKind::DATA_FRAG => Ok(None),
SubmessageKind::GAP => Ok(None),
SubmessageKind::HEARTBEAT => Ok(None),
SubmessageKind::HEARTBEAT_FRAG => Ok(None),
SubmessageKind::NACK_FRAG => Ok(None),
*/
// TODO: skip this submessage and go to another one
_ => Err(speedy::Error::custom("Invalid submessage id".to_owned())),
Ok(None)
}
SubmessageKind::PAD => {
use crate::bytes::Buf;
bytes.advance(submessage_header.submessage_length.into());
Ok(None)
}
_ => {
info!(
"Received unknown submessage with id {:?}, skipping",
submessage_header.submessage_id
);
Ok(None)
}
})
.or_else(|err| Err(err.into()))
Expand All @@ -178,6 +263,7 @@ impl Decoder for MessageReceiver {
#[cfg(test)]
mod tests {
use super::*;
use crate::messages::fragment_number::FragmentNumber_t;
use crate::messages::header::Header;
use crate::messages::submessage_flag::SubmessageFlag;
use crate::structure::count::Count_t;
Expand Down Expand Up @@ -216,7 +302,14 @@ mod tests {
$entity.write_to_vec_with_ctx(submessage_header.flags.endianness_flag()).unwrap();
submessage_content.extend(serialized_submessage.into_iter());
)*
submessage_header.submessage_length = submessage_content.len() as u16;
let provided_submessage_length = submessage_header.submessage_length;
let calculated_submessage_length = submessage_content.len() as u16;
assert_eq!(
provided_submessage_length, calculated_submessage_length,
"Try to replace provided submessage_length {} with {}.",
provided_submessage_length, calculated_submessage_length
);
submessage_header.submessage_length = calculated_submessage_length;
let submessage_header = submessage_header.write_to_vec_with_ctx(submessage_header.flags.endianness_flag()).unwrap();
serialized_input.extend(submessage_header.into_iter());
serialized_input.extend(submessage_content.into_iter());
Expand Down Expand Up @@ -271,7 +364,7 @@ mod tests {
submessage_header = SubmessageHeader {
submessage_id: SubmessageKind::INFO_TS,
flags: SubmessageFlag { flags: 0b0000_0000 },
submessage_length: 24,
submessage_length: 8,
},
submessage_entities = [Time_t::TIME_INFINITE],
submessage_header = SubmessageHeader {
Expand All @@ -296,4 +389,115 @@ mod tests {
SubmessageFlag { flags: 0b0000_0000 }
))]
);

message_decoding_test!(
test_name = single_gap_with_info_src,
header = Header::new(GuidPrefix_t::GUIDPREFIX_UNKNOWN),
[
submessage_header = SubmessageHeader {
submessage_id: SubmessageKind::INFO_SRC,
flags: SubmessageFlag { flags: 0b0000_0001 },
submessage_length: 16,
},
submessage_entities = [
ProtocolVersion_t::PROTOCOLVERSION,
VendorId_t::VENDOR_UNKNOWN,
GuidPrefix_t::GUIDPREFIX_UNKNOWN
],
submessage_header = SubmessageHeader {
submessage_id: SubmessageKind::GAP,
flags: SubmessageFlag { flags: 0b0000_0000 },
submessage_length: 28,
},
submessage_entities = [
EntityId_t::ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER,
EntityId_t::ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER,
SequenceNumber_t::from(42),
SequenceNumberSet_t::new(SequenceNumber_t::from(0b10110100))
],
],
expected_notifications = [Ok(EntitySubmessage::Gap(Gap {
reader_id: EntityId_t::ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER,
writer_id: EntityId_t::ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER,
gap_start: SequenceNumber_t::from(42),
gap_list: SequenceNumberSet_t::new(SequenceNumber_t::from(0b10110100))
}))]
);

message_decoding_test!(
test_name = single_heartbeat_with_info_dst,
header = Header::new(GuidPrefix_t::GUIDPREFIX_UNKNOWN),
[
submessage_header = SubmessageHeader {
submessage_id: SubmessageKind::INFO_DST,
flags: SubmessageFlag { flags: 0b0000_0001 },
submessage_length: 12,
},
submessage_entities = [GuidPrefix_t::GUIDPREFIX_UNKNOWN],
submessage_header = SubmessageHeader {
submessage_id: SubmessageKind::HEARTBEAT,
flags: SubmessageFlag { flags: 0b0000_0001 },
submessage_length: 28,
},
submessage_entities = [
EntityId_t::ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER,
EntityId_t::ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER,
SequenceNumber_t::from(7),
SequenceNumber_t::from(11),
Count_t::from(99)
],
],
expected_notifications = [Ok(EntitySubmessage::Heartbeat(
Heartbeat {
reader_id: EntityId_t::ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER,
writer_id: EntityId_t::ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER,
first_sn: SequenceNumber_t::from(7),
last_sn: SequenceNumber_t::from(11),
count: Count_t::from(99)
},
SubmessageFlag { flags: 0b0000_0001 }
))]
);

message_decoding_test!(
test_name = single_heartbeat_frag_with_info_reply_and_multicast_locator_list,
header = Header::new(GuidPrefix_t::GUIDPREFIX_UNKNOWN),
[
submessage_header = SubmessageHeader {
submessage_id: SubmessageKind::INFO_REPLAY,
flags: SubmessageFlag { flags: 0b0000_0000 },
submessage_length: 80,
},
submessage_entities = [
vec![Locator_t::LOCATOR_INVALID],
vec![
Locator_t::from("127.0.0.1:8080".parse::<std::net::SocketAddr>().unwrap()),
Locator_t::from(
"[2001:db8::1]:8080"
.parse::<std::net::SocketAddr>()
.unwrap()
)
]
],
submessage_header = SubmessageHeader {
submessage_id: SubmessageKind::HEARTBEAT_FRAG,
flags: SubmessageFlag { flags: 0b0000_0000 },
submessage_length: 24,
},
submessage_entities = [
EntityId_t::ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER,
EntityId_t::ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER,
SequenceNumber_t::from(36),
FragmentNumber_t::from(33),
Count_t::from(12345)
],
],
expected_notifications = [Ok(EntitySubmessage::HeartbeatFrag(HeartbeatFrag {
reader_id: EntityId_t::ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER,
writer_id: EntityId_t::ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER,
writer_sn: SequenceNumber_t::from(36),
last_fragment_num: FragmentNumber_t::from(33),
count: Count_t::from(12345)
}))]
);
}

0 comments on commit a168bd8

Please sign in to comment.