Skip to content

Commit

Permalink
Chore: [MR-554] Make members of StreamHeader private.
Browse files Browse the repository at this point in the history
  • Loading branch information
stiegerc committed Feb 21, 2024
1 parent b592af6 commit c3de2db
Show file tree
Hide file tree
Showing 21 changed files with 197 additions and 152 deletions.
18 changes: 9 additions & 9 deletions rs/canonical_state/src/encoding/old_types.rs
Expand Up @@ -301,21 +301,21 @@ pub struct StreamHeaderV6 {
impl From<(&StreamHeader, CertificationVersion)> for StreamHeaderV6 {
fn from((header, _certification_version): (&StreamHeader, CertificationVersion)) -> Self {
Self {
begin: header.begin.get(),
end: header.end.get(),
signals_end: header.signals_end.get(),
begin: header.begin().get(),
end: header.end().get(),
signals_end: header.signals_end().get(),
}
}
}

impl From<StreamHeaderV6> for StreamHeader {
fn from(header: StreamHeaderV6) -> Self {
Self {
begin: header.begin.into(),
end: header.end.into(),
signals_end: header.signals_end.into(),
reject_signals: Default::default(),
}
Self::new(
header.begin.into(),
header.end.into(),
header.signals_end.into(),
Default::default(),
)
}
}

Expand Down
27 changes: 13 additions & 14 deletions rs/canonical_state/src/encoding/tests/compatibility.rs
Expand Up @@ -21,18 +21,18 @@ use ic_replicated_state::{
use ic_test_utilities::types::{
ids::{canister_test_id, subnet_test_id},
messages::{RequestBuilder, ResponseBuilder},
xnet::StreamHeaderBuilder,
};
use ic_types::{
crypto::CryptoHash,
messages::{
CallbackId, Payload, RejectContext, Request, RequestMetadata, RequestOrResponse, Response,
},
nominal_cycles::NominalCycles,
xnet::StreamHeader,
CryptoHashOfPartialState, Cycles, Funds, NumBytes, Time,
};
use serde_cbor::value::Value;
use std::collections::{BTreeMap, VecDeque};
use std::collections::BTreeMap;

//
// Tests for exact binary encoding
Expand Down Expand Up @@ -64,12 +64,11 @@ use std::collections::{BTreeMap, VecDeque};
#[test]
fn canonical_encoding_stream_header() {
for certification_version in all_supported_versions() {
let header = StreamHeader {
begin: 23.into(),
end: 25.into(),
signals_end: 256.into(),
reject_signals: VecDeque::new(),
};
let header = StreamHeaderBuilder::new()
.begin(23.into())
.end(25.into())
.signals_end(256.into())
.build();

assert_eq!(
"A3 00 17 01 18 19 02 19 01 00",
Expand Down Expand Up @@ -110,12 +109,12 @@ fn canonical_encoding_stream_header() {
fn canonical_encoding_stream_header_v8_plus() {
for certification_version in all_supported_versions().filter(|v| v >= &CertificationVersion::V8)
{
let header = StreamHeader {
begin: 23.into(),
end: 25.into(),
signals_end: 256.into(),
reject_signals: vec![249.into(), 250.into(), 252.into()].into(),
};
let header = StreamHeaderBuilder::new()
.begin(23.into())
.end(25.into())
.signals_end(256.into())
.reject_signals(vec![249.into(), 250.into(), 252.into()].into())
.build();

assert_eq!(
"A4 00 17 01 18 19 02 19 01 00 03 83 01 02 04",
Expand Down
15 changes: 8 additions & 7 deletions rs/canonical_state/src/encoding/tests/test_fixtures.rs
Expand Up @@ -3,6 +3,7 @@ use ic_error_types::RejectCode;
use ic_test_utilities::types::{
ids::canister_test_id,
messages::{RequestBuilder, ResponseBuilder},
xnet::StreamHeaderBuilder,
};
use ic_types::{
messages::{CallbackId, Payload, RejectContext, RequestMetadata, RequestOrResponse},
Expand All @@ -12,16 +13,16 @@ use ic_types::{
use std::collections::VecDeque;

pub fn stream_header(certification_version: CertificationVersion) -> StreamHeader {
StreamHeader {
begin: 23.into(),
end: 25.into(),
signals_end: 256.into(),
reject_signals: if certification_version < CertificationVersion::V8 {
StreamHeaderBuilder::new()
.begin(23.into())
.end(25.into())
.signals_end(256.into())
.reject_signals(if certification_version < CertificationVersion::V8 {
VecDeque::new()
} else {
vec![10.into(), 200.into(), 250.into()].into()
},
}
})
.build()
}

pub fn request(certification_version: CertificationVersion) -> RequestOrResponse {
Expand Down
24 changes: 12 additions & 12 deletions rs/canonical_state/src/encoding/types.rs
Expand Up @@ -167,22 +167,22 @@ impl From<(&ic_types::xnet::StreamHeader, CertificationVersion)> for StreamHeade
// includes replicas with certification version 8, but they may "inherit" reject
// signals from a replica with certification version 9 after a downgrade.
assert!(
header.reject_signals.is_empty() || certification_version >= CertificationVersion::V8,
header.reject_signals().is_empty() || certification_version >= CertificationVersion::V8,
"Replicas with certification version < 9 should not be producing reject signals"
);

let mut next_index = header.signals_end;
let mut reject_signal_deltas = vec![0; header.reject_signals.len()];
for (i, stream_index) in header.reject_signals.iter().enumerate().rev() {
let mut next_index = header.signals_end();
let mut reject_signal_deltas = vec![0; header.reject_signals().len()];
for (i, stream_index) in header.reject_signals().iter().enumerate().rev() {
assert!(next_index > *stream_index);
reject_signal_deltas[i] = next_index.get() - stream_index.get();
next_index = *stream_index;
}

Self {
begin: header.begin.get(),
end: header.end.get(),
signals_end: header.signals_end.get(),
begin: header.begin().get(),
end: header.end().get(),
signals_end: header.signals_end().get(),
reject_signal_deltas,
}
}
Expand All @@ -206,12 +206,12 @@ impl TryFrom<StreamHeader> for ic_types::xnet::StreamHeader {
reject_signals.push_front(stream_index);
}

Ok(Self {
begin: header.begin.into(),
end: header.end.into(),
signals_end: header.signals_end.into(),
Ok(Self::new(
header.begin.into(),
header.end.into(),
header.signals_end.into(),
reject_signals,
})
))
}
}

Expand Down
16 changes: 8 additions & 8 deletions rs/canonical_state/src/traversal.rs
Expand Up @@ -66,11 +66,12 @@ mod tests {
use ic_test_utilities::{
state::new_canister_state,
types::ids::{canister_test_id, node_test_id, subnet_test_id, user_test_id},
types::xnet::StreamHeaderBuilder,
};
use ic_types::{xnet::StreamHeader, CanisterId, Cycles, ExecutionRound};
use ic_types::{CanisterId, Cycles, ExecutionRound};
use ic_wasm_types::CanisterModule;
use maplit::{btreemap, btreeset};
use std::collections::{BTreeSet, VecDeque};
use std::collections::BTreeSet;
use std::convert::TryFrom;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -345,12 +346,11 @@ mod tests {
use ic_replicated_state::metadata_state::Stream;
use ic_types::xnet::{StreamIndex, StreamIndexedQueue};

let header = StreamHeader {
begin: StreamIndex::from(4),
end: StreamIndex::from(4),
signals_end: StreamIndex::new(11),
reject_signals: VecDeque::new(),
};
let header = StreamHeaderBuilder::new()
.begin(4.into())
.end(4.into())
.signals_end(11.into())
.build();

let stream = Stream::new(
StreamIndexedQueue::with_begin(StreamIndex::from(4)),
Expand Down
2 changes: 1 addition & 1 deletion rs/messaging/src/message_routing.rs
Expand Up @@ -241,7 +241,7 @@ impl LatencyMetrics {
/// Records a `MessageTime` entry for messages to/from `subnet_id` before
/// `header.end` (if not already recorded).
pub(crate) fn record_header(&mut self, subnet_id: SubnetId, header: &StreamHeader) {
self.with_timeline(subnet_id, |t| t.add_entry(header.end));
self.with_timeline(subnet_id, |t| t.add_entry(header.end()));
}

/// Observes message durations for all messages to/from `subnet_id` with
Expand Down
18 changes: 9 additions & 9 deletions rs/messaging/src/routing/stream_handler.rs
Expand Up @@ -360,8 +360,8 @@ impl StreamHandlerImpl {
let rejected_messages = self.garbage_collect_messages(
&mut stream,
*remote_subnet,
stream_slice.header().signals_end,
&stream_slice.header().reject_signals,
stream_slice.header().signals_end(),
stream_slice.header().reject_signals(),
);
self.garbage_collect_signals(&mut stream, *remote_subnet, stream_slice);

Expand All @@ -376,22 +376,22 @@ impl StreamHandlerImpl {
None => {
// New stream.
assert_eq!(
stream_slice.header().signals_end,
stream_slice.header().signals_end(),
StreamIndex::from(0),
"Cannot garbage collect a stream for subnet {} that does not exist",
remote_subnet
);
assert_eq!(
stream_slice.header().begin, StreamIndex::from(0),
stream_slice.header().begin(), StreamIndex::from(0),
"Signals from subnet {} do not start from 0 in the first communication attempt",
remote_subnet
);
}
}
let backlog = if let Some(messages) = stream_slice.messages() {
stream_slice.header().end - messages.end()
stream_slice.header().end() - messages.end()
} else {
stream_slice.header().end - stream_slice.header().begin
stream_slice.header().end() - stream_slice.header().begin()
};
self.metrics
.xnet_message_backlog
Expand Down Expand Up @@ -462,11 +462,11 @@ impl StreamHandlerImpl {
);
assert_valid_signals_for_messages(
stream.signals_end(),
stream_slice.header().begin,
stream_slice.header().begin(),
stream_slice
.messages()
.map(|q| q.end())
.unwrap_or_else(|| stream_slice.header().end),
.unwrap_or_else(|| stream_slice.header().end()),
StreamComponent::MessagesFrom(remote_subnet),
);
assert_valid_slice_messages_for_stream(
Expand All @@ -475,7 +475,7 @@ impl StreamHandlerImpl {
remote_subnet,
);

self.discard_signals_before(stream, stream_slice.header().begin);
self.discard_signals_before(stream, stream_slice.header().begin());
}

/// Wrapper around `Stream::discard_signals_before()` plus telemetry.
Expand Down
24 changes: 14 additions & 10 deletions rs/messaging/src/routing/stream_handler/tests.rs
Expand Up @@ -31,7 +31,10 @@ use ic_test_utilities_metrics::{
use ic_types::{
messages::{CallbackId, Payload, Request, MAX_RESPONSE_COUNT_BYTES},
time::UNIX_EPOCH,
xnet::{testing::StreamSliceTesting, StreamIndex, StreamIndexedQueue},
xnet::{
testing::{StreamHeaderTesting, StreamSliceTesting},
StreamIndex, StreamIndexedQueue,
},
CanisterId, CountBytes, Cycles,
};
use lazy_static::lazy_static;
Expand Down Expand Up @@ -3088,15 +3091,16 @@ struct MessageConfig {
}

fn generate_stream(msg_config: MessageConfig, signal_config: SignalConfig) -> Stream {
let stream_header_builder = StreamHeaderBuilder::new()
.begin(StreamIndex::from(msg_config.begin))
.end(StreamIndex::from(msg_config.begin + msg_config.count))
.signals_end(StreamIndex::from(signal_config.end));
let stream_header = StreamHeaderBuilder::new()
.begin(msg_config.begin.into())
.end((msg_config.begin + msg_config.count).into())
.signals_end(signal_config.end.into())
.build();

let msg_begin = StreamIndex::from(msg_config.begin);

let slice = StreamSliceBuilder::new()
.header(stream_header_builder.build())
.header(stream_header)
.generate_messages(
msg_begin,
msg_config.count,
Expand All @@ -3115,9 +3119,9 @@ fn generate_stream(msg_config: MessageConfig, signal_config: SignalConfig) -> St
.iter()
.map(|x| StreamIndex::from(*x))
.collect();
Stream::with_signals(messages, slice.header().signals_end, reject_signals)
Stream::with_signals(messages, slice.header().signals_end(), reject_signals)
} else {
Stream::new(messages, slice.header().signals_end)
Stream::new(messages, slice.header().signals_end())
}
}

Expand Down Expand Up @@ -3183,9 +3187,9 @@ fn generate_stream_slice(config: StreamSliceConfig) -> StreamSlice {
},
);
let mut slice: StreamSlice = stream.into();
slice.header_mut().begin = StreamIndex::from(config.header_begin);
slice.header_mut().set_begin(config.header_begin.into());
if let Some(end) = config.header_end {
slice.header_mut().end = StreamIndex::from(end);
slice.header_mut().set_end(end.into());
}
slice
}
Expand Down
4 changes: 2 additions & 2 deletions rs/messaging/tests/queue_tests.rs
Expand Up @@ -751,9 +751,9 @@ fn induct_messages_and_track_callback_ids(
let (reverse_header, _) = stream_snapshot(into_subnet, from_subnet).unwrap();
for (stream_index, msg) in messages
.iter()
.take_while(|(stream_index, _)| *stream_index < reverse_header.signals_end)
.take_while(|(stream_index, _)| *stream_index < reverse_header.signals_end())
{
if !reverse_header.reject_signals.contains(&stream_index) {
if !reverse_header.reject_signals().contains(&stream_index) {
update_callback_id_trackers(
msg,
add_callback_id_tracker,
Expand Down
22 changes: 7 additions & 15 deletions rs/replicated_state/src/metadata_state.rs
Expand Up @@ -1296,12 +1296,12 @@ impl Stream {

/// Creates a header for this stream.
pub fn header(&self) -> StreamHeader {
StreamHeader {
begin: self.messages.begin(),
end: self.messages.end(),
signals_end: self.signals_end,
reject_signals: self.reject_signals.clone(),
}
StreamHeader::new(
self.messages.begin(),
self.messages.end(),
self.signals_end,
self.reject_signals.clone(),
)
}

/// Returns a reference to the message queue.
Expand Down Expand Up @@ -1430,15 +1430,7 @@ impl CountBytes for Stream {

impl From<Stream> for StreamSlice {
fn from(val: Stream) -> Self {
StreamSlice::new(
StreamHeader {
begin: val.messages.begin(),
end: val.messages.end(),
signals_end: val.signals_end,
reject_signals: val.reject_signals,
},
val.messages,
)
StreamSlice::new(val.header(), val.messages)
}
}

Expand Down

0 comments on commit c3de2db

Please sign in to comment.