From b45231b4fcad2339a778a01aaf36a3ba45b8ecd7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pavel=20=C5=A0imerda?= Date: Thu, 10 Feb 2022 15:04:48 +0100 Subject: [PATCH 1/5] refactor: reverse the protocol/channel relation --- examples/blockfetch.rs | 10 ++--- src/mux.rs | 77 ++++++++++++++++++------------------- src/protocols/blockfetch.rs | 56 +++++++++++++++------------ src/protocols/chainsync.rs | 2 +- src/protocols/handshake.rs | 2 +- 5 files changed, 74 insertions(+), 73 deletions(-) diff --git a/examples/blockfetch.rs b/examples/blockfetch.rs index ed78ff6..c106c5d 100644 --- a/examples/blockfetch.rs +++ b/examples/blockfetch.rs @@ -21,8 +21,6 @@ use pallas::ledger::alonzo::{ Fragment, }; -use blake2b_simd::Params; - use oura::{ mapper::ChainWellKnownInfo, mapper::Config, @@ -36,8 +34,6 @@ use oura::{ }, }; -use log::debug; - mod common; async fn blockfetch() -> Result<(), Box> { @@ -64,8 +60,8 @@ async fn blockfetch() -> Result<(), Box> { 26250057, hex::decode("5fec758c8aaff4a7683c27b075dc3984d8d982839cc56470a682d1411c9f8198")?, ) - .build()?; - let mut blocks = blockfetch.run(&mut connection).await?; + .build(&mut connection)?; + let mut blocks = blockfetch.run().await?; let (tx, rx) = new_inter_stage_channel(None); let config = Config { @@ -90,7 +86,7 @@ async fn blockfetch() -> Result<(), Box> { let block = BlockWrapper::decode_fragment(&block[..])?; let hash = hash_block_header(&block.1.header); //debug!("HASH: {}", hash); - block_db.insert(&hash, &*block_raw); + block_db.insert(&hash, &*block_raw)?; writer.crawl(&block.1).unwrap(); } diff --git a/src/mux.rs b/src/mux.rs index dbd93ef..cbaab10 100644 --- a/src/mux.rs +++ b/src/mux.rs @@ -7,10 +7,7 @@ // SPDX-License-Identifier: MPL-2.0 // -use crate::{ - Agency, - Protocol, -}; +use crate::{Protocol, Agency}; use byteorder::{ ByteOrder, NetworkEndian, @@ -123,21 +120,26 @@ impl Connection { self.start_time.elapsed() } - pub(crate) fn execute<'a, P>(&'a mut self, protocol: &'a mut P) -> Channel<'a, P> - where - P: Protocol, - { - Channel::new(protocol, self) + pub(crate) fn channel<'a>(&'a mut self, idx: u16) -> Channel<'a> { + let receiver = self.register(idx); + let demux = self.run_demux(); + Channel { + idx, + receiver, + connection: self, + _demux: demux, + bytes: Vec::new(), + } } fn register(&mut self, idx: u16) -> Receiver { - trace!("Registering protocol {}.", idx); + trace!("Registering channel {}.", idx); let (tx, rx) = mpsc::unbounded_channel(); self.channels.lock().unwrap().insert(idx, tx); rx } fn unregister(&mut self, idx: u16) { - trace!("Unregistering protocol {}.", idx); + trace!("Unregistering channel {}.", idx); self.channels.lock().unwrap().remove(&idx); } async fn send(&self, idx: u16, payload: &[u8]) { @@ -183,48 +185,33 @@ impl Connection { } } -pub(crate) struct Channel<'a, P: Protocol> { +pub(crate) struct Channel<'a> { idx: u16, receiver: Receiver, - pub(crate) protocol: &'a mut P, connection: &'a mut Connection, _demux: Arc, - bytes: Vec, + pub(crate) bytes: Vec, } -impl<'a, P: Protocol> Channel<'_, P> { - fn new(protocol: &'a mut P, connection: &'a mut Connection) -> Channel<'a, P> { - let idx = protocol.protocol_id(); - let receiver = connection.register(idx); - let demux = connection.run_demux(); - Channel { - idx, - receiver, - protocol, - connection, - _demux: demux, - bytes: Vec::new(), - } - } - - pub(crate) async fn execute(&mut self) -> Result<(), Error> { +impl<'a> Channel<'a> { + pub(crate) async fn execute

(&mut self, protocol: &mut P) -> Result<(), Error> + where + P: Protocol, + { trace!("Executing protocol {}.", self.idx); loop { - let agency = self.protocol.agency(); + let agency = protocol.agency(); if agency == Agency::None { break; } - let role = self.protocol.role(); + let role = protocol.role(); if agency == role { - self.connection - .send(self.idx, &self.protocol.send_bytes().unwrap()) - .await; + self.send(&protocol.send_bytes().unwrap()).await?; } else { let mut bytes = std::mem::replace(&mut self.bytes, Vec::new()); - let new_data = self.connection.recv(&mut self.receiver).await; + let new_data = self.recv().await?; bytes.extend(new_data); - self.bytes = self - .protocol + self.bytes = protocol .receive_bytes(bytes) .unwrap_or(Box::new([])) .into_vec(); @@ -235,9 +222,21 @@ impl<'a, P: Protocol> Channel<'_, P> { } Ok(()) } + + async fn send(&mut self, data: &[u8]) -> Result<(), Error> { + Ok(self.connection + .send(self.idx, &data) + .await) + } + + async fn recv(&mut self) -> Result, Error> { + Ok(self.connection + .recv(&mut self.receiver) + .await) + } } -impl<'a, P: Protocol> Drop for Channel<'_, P> { +impl Drop for Channel<'_> { fn drop(&mut self) { self.connection.unregister(self.idx); } diff --git a/src/protocols/blockfetch.rs b/src/protocols/blockfetch.rs index d955855..b00e348 100644 --- a/src/protocols/blockfetch.rs +++ b/src/protocols/blockfetch.rs @@ -7,10 +7,9 @@ // SPDX-License-Identifier: MPL-2.0 // -use crate::mux::{ - Channel, - Connection, -}; +use crate::mux::Channel; +#[cfg(not(test))] +use crate::mux::Connection; use crate::Message as MessageOps; use crate::{ Agency, @@ -105,8 +104,12 @@ impl Builder { self.last = Some((slot, hash)); self } - pub fn build(&mut self) -> Result { + pub fn build<'a>(&mut self, #[cfg(not(test))]connection: &'a mut Connection) -> Result, Error> { Ok(BlockFetch { + #[cfg(not(test))] + channel: Some(connection.channel(0x0003)), + #[cfg(test)] + channel: None, config: Config { first: self.first.as_ref().ok_or("First point required.")?.clone(), last: self.last.as_ref().ok_or("Last point required.")?.clone(), @@ -124,7 +127,8 @@ pub struct Config { last: Point, } -pub struct BlockFetch { +pub struct BlockFetch<'a> { + channel: Option>, config: Config, state: State, result: Vec>, @@ -132,48 +136,50 @@ pub struct BlockFetch { done: bool, } -impl BlockFetch { +impl<'a> BlockFetch<'a> { pub fn builder() -> Builder { Default::default() } - pub async fn run<'a>( - &'a mut self, - connection: &'a mut Connection, - ) -> Result, Error> { + pub async fn run<'b>(&'b mut self) -> Result, Error> + where + 'a: 'b, + { // Start the protocol and prefetch first block into `self.result`. - self.running = true; - let mut channel = connection.execute(self); - channel.execute().await?; - Ok(BlockStream { channel }) + //self.running = true; + // TODO: Do something with the Option trick. + let mut channel = self.channel.take().ok_or("Channel not available.".to_string())?; + channel.execute(self).await?; + self.channel = Some(channel); + Ok(BlockStream { blockfetch: self }) } } -pub struct BlockStream<'a> { - channel: Channel<'a, BlockFetch>, +pub struct BlockStream<'a, 'b> { + blockfetch: &'b mut BlockFetch<'a>, } -impl BlockStream<'_> { +impl BlockStream<'_, '_> { pub async fn next(&mut self) -> Result>, Error> { - if self.channel.protocol.result.is_empty() { - match self.channel.protocol.state() { + if self.blockfetch.result.is_empty() { + match self.blockfetch.state() { State::Streaming => { - self.channel.protocol.running = true; - self.channel.execute().await? + self.blockfetch.running = true; + //self.blockfetch.channel.execute(self.blockfetch).await? } State::Idle => return Ok(None), _ => panic!("Unexpected state."), } } - if self.channel.protocol.result.is_empty() { + if self.blockfetch.result.is_empty() { Ok(None) } else { - Ok(Some(self.channel.protocol.result.remove(0))) + Ok(Some(self.blockfetch.result.remove(0))) } } } -impl Protocol for BlockFetch { +impl Protocol for BlockFetch<'_> { type State = State; type Message = Message; diff --git a/src/protocols/chainsync.rs b/src/protocols/chainsync.rs index ace7ce2..ecd9fbe 100644 --- a/src/protocols/chainsync.rs +++ b/src/protocols/chainsync.rs @@ -129,7 +129,7 @@ impl ChainSync { const FIVE_SECS: Duration = Duration::from_secs(5); pub async fn run(&mut self, connection: &mut Connection) -> Result<(), Error> { - connection.execute(self).execute().await + connection.channel(self.protocol_id()).execute(self).await } fn save_block(&mut self, msg_roll_forward: &BlockHeader, is_tip: bool) -> io::Result<()> { diff --git a/src/protocols/handshake.rs b/src/protocols/handshake.rs index a9395ff..359aefb 100644 --- a/src/protocols/handshake.rs +++ b/src/protocols/handshake.rs @@ -294,7 +294,7 @@ impl Handshake { } pub async fn run(&mut self, connection: &mut Connection) -> Result<(Version, u32), Error> { - connection.execute(self).execute().await?; + connection.channel(self.protocol_id()).execute(self).await?; self.version .as_ref() .map(|v| (v.clone(), self.network_magic)) From 04a95f751f4927f054d75815db31bbbc708f6790 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pavel=20=C5=A0imerda?= Date: Wed, 9 Feb 2022 19:48:27 +0100 Subject: [PATCH 2/5] chore(examples): move oura/sled stuff to common --- examples/blockfetch.rs | 47 +-------------------------- examples/common.rs | 74 ++++++++++++++++++++++++++++++++++++++---- 2 files changed, 69 insertions(+), 52 deletions(-) diff --git a/examples/blockfetch.rs b/examples/blockfetch.rs index c106c5d..10cf448 100644 --- a/examples/blockfetch.rs +++ b/examples/blockfetch.rs @@ -13,27 +13,6 @@ use cardano_ouroboros_network::{ protocols::handshake::Handshake, }; -use std::sync::Arc; - -use pallas::ledger::alonzo::{ - crypto::hash_block_header, - BlockWrapper, - Fragment, -}; - -use oura::{ - mapper::ChainWellKnownInfo, - mapper::Config, - mapper::EventWriter, - pipelining::new_inter_stage_channel, - pipelining::SinkProvider, - sources::MagicArg, - utils::{ - Utils, - WithUtils, - }, -}; - mod common; async fn blockfetch() -> Result<(), Box> { @@ -63,34 +42,10 @@ async fn blockfetch() -> Result<(), Box> { .build(&mut connection)?; let mut blocks = blockfetch.run().await?; - let (tx, rx) = new_inter_stage_channel(None); - let config = Config { - include_block_end_events: true, - ..Default::default() - }; - - let well_known = ChainWellKnownInfo::try_from_magic(*MagicArg::default()).unwrap(); - let utils = Arc::new(Utils::new(well_known, None)); - let writer = EventWriter::standalone(tx, None, config); - let sink_handle = WithUtils::new( - oura::sinks::terminal::Config { - throttle_min_span_millis: Some(0), - }, - utils, - ) - .bootstrap(rx)?; - let block_db = cfg.sdb.open_tree("blocks").unwrap(); - while let Some(block) = blocks.next().await? { - let block_raw = block.clone(); - let block = BlockWrapper::decode_fragment(&block[..])?; - let hash = hash_block_header(&block.1.header); - //debug!("HASH: {}", hash); - block_db.insert(&hash, &*block_raw)?; - writer.crawl(&block.1).unwrap(); + cfg.handle_block(&block)?; } - sink_handle.join().unwrap(); Ok(()) } diff --git a/examples/common.rs b/examples/common.rs index ea76acf..bf26a13 100644 --- a/examples/common.rs +++ b/examples/common.rs @@ -9,20 +9,82 @@ use std::path::PathBuf; +use std::sync::Arc; + +use pallas::ledger::alonzo::{ + crypto::hash_block_header, + BlockWrapper, + Fragment, +}; + +use oura::{ + mapper::ChainWellKnownInfo, + mapper::Config as MapperConfig, + mapper::EventWriter, + pipelining::new_inter_stage_channel, + pipelining::SinkProvider, + sources::MagicArg, + utils::{ + Utils, + WithUtils, + }, +}; + #[derive(Clone)] pub struct Config { pub db: PathBuf, pub sdb: sled::Db, pub host: String, pub magic: u32, + pub writer: EventWriter, } pub fn init() -> Config { - env_logger::init(); - Config { - db: PathBuf::from("sqlite.db"), - sdb: sled::open(".db").unwrap(), - host: "relays-new.cardano-mainnet.iohk.io:3001".to_string(), - magic: 764824073, + Config::new() +} + +impl Config { + fn new() -> Config { + env_logger::init(); + Config { + db: PathBuf::from("sqlite.db"), + sdb: sled::open(".db").unwrap(), + host: "relays-new.cardano-mainnet.iohk.io:3001".to_string(), + magic: 764824073, + writer: oura_init().unwrap(), + } } + + #[allow(dead_code)] + pub fn handle_block(&self, data: &[u8]) -> Result<(), Box> { + let block_db = self.sdb.open_tree("blocks").unwrap(); + + let block = BlockWrapper::decode_fragment(&data[..])?; + let hash = hash_block_header(&block.1.header); + //debug!("HASH: {}", hash); + block_db.insert(&hash, &*data)?; + self.writer.crawl(&block.1).unwrap(); + Ok(()) + } +} + +fn oura_init() -> Result> { + let (tx, rx) = new_inter_stage_channel(None); + let config = MapperConfig { + include_block_end_events: true, + ..Default::default() + }; + + let well_known = ChainWellKnownInfo::try_from_magic(*MagicArg::default()).unwrap(); + let utils = Arc::new(Utils::new(well_known, None)); + let writer = EventWriter::standalone(tx, None, config); + let _sink_handle = WithUtils::new( + oura::sinks::terminal::Config { + throttle_min_span_millis: Some(0), + }, + utils, + ) + .bootstrap(rx)?; + Ok(writer) + // sink_handle.join().unwrap(); } From 40e5efdeea1d96ca3ab093961f9edc099a113b30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pavel=20=C5=A0imerda?= Date: Wed, 9 Feb 2022 22:10:53 +0100 Subject: [PATCH 3/5] refactor(chainsync): switch to Message::* --- src/protocols/chainsync.rs | 366 +++++++++++++++++-------------------- 1 file changed, 167 insertions(+), 199 deletions(-) diff --git a/src/protocols/chainsync.rs b/src/protocols/chainsync.rs index ecd9fbe..ffee64c 100644 --- a/src/protocols/chainsync.rs +++ b/src/protocols/chainsync.rs @@ -21,11 +21,8 @@ use std::{ }; use log::{ - debug, - error, info, trace, - warn, }; use serde_cbor::{ de, @@ -55,19 +52,78 @@ pub enum State { Done, } +#[derive(Debug)] +pub struct Point { + slot: i64, + hash: Vec, +} + #[derive(Debug)] pub enum Message { - Array(Vec), + RequestNext, + AwaitReply, + RollForward(BlockHeader, Tip), + RollBackward(i64, Tip), + FindIntersect(Vec), + IntersectFound(i64, Tip), + IntersectNotFound(Tip), + Done, } impl MessageOps for Message { - fn from_values(values: Vec) -> Result { - Ok(Message::Array(values)) + fn from_values(array: Vec) -> Result { + let mut values = array.iter(); + let message = match values + .next() + .ok_or("Unexpected end of message.".to_string())? + { + Value::Integer(0) => Message::RequestNext, + Value::Integer(1) => Message::AwaitReply, + Value::Integer(2) => Message::RollForward( + parse_wrapped_header(values.next().ok_or("Unexpected End of message.".to_string())?)?, + parse_tip(values.next().ok_or("Unexpected End of message.".to_string())?)?, + ), + Value::Integer(3) => Message::RollBackward( + parse_point(values.next().ok_or("Unexpected End of message.".to_string())?)?, + parse_tip(values.next().ok_or("Unexpected End of message.".to_string())?)?, + ), + Value::Integer(5) => Message::IntersectFound( + parse_point(values.next().ok_or("Unexpected End of message.".to_string())?)?, + parse_tip(values.next().ok_or("Unexpected End of message.".to_string())?)?, + ), + Value::Integer(6) => Message::IntersectNotFound( + parse_tip(values.next().ok_or("Unexpected End of message.".to_string())?)?, + ), + Value::Integer(7) => Message::Done, + _ => return Err("Unexpected message ID.".to_string()), + }; + match values.next() { + Some(value) => return Err(format!("Unexpected data: {:?}", value)), + None => (), + } + Ok(message) } fn to_values(&self) -> Vec { match self { - Message::Array(values) => values.clone(), + Message::RequestNext => vec![ + Value::Integer(0), + ], + Message::FindIntersect(points) => vec![ + Value::Integer(4), + Value::Array( + points + .iter() + .map(|Point {slot, hash}| { + Value::Array(vec![ + Value::Integer(*slot as i128), + Value::Bytes(hash.clone()), + ]) + }) + .collect(), + ), + ], + _ => panic!(), } } @@ -158,29 +214,6 @@ impl ChainSync { self.tip_to_intersect = Some(tip); self.is_intersect_found = false; } - - fn msg_find_intersect(&self, chain_blocks: Vec<(i64, Vec)>) -> Message { - Message::Array(vec![ - Value::Integer(4), // message_id - // Value::Array(points), - Value::Array( - chain_blocks - .iter() - .map(|(slot, hash)| { - Value::Array(vec![ - Value::Integer(*slot as i128), - Value::Bytes(hash.clone()), - ]) - }) - .collect(), - ), - ]) - } - - fn msg_request_next(&self) -> Message { - // we just send an array containing the message_id for this one. - Message::Array(vec![Value::Integer(0)]) - } } impl Protocol for ChainSync { @@ -214,7 +247,7 @@ impl Protocol for ChainSync { State::Idle => { trace!("ChainSync::State::Idle"); if !self.is_intersect_found { - let mut chain_blocks: Vec<(i64, Vec)> = vec![]; + let mut chain_blocks: Vec = Vec::new(); /* Classic sync: Use blocks from store if available. */ match self.store.as_mut() { @@ -223,161 +256,123 @@ impl Protocol for ChainSync { for (i, block) in blocks.iter().enumerate() { // all powers of 2 including 0th element 0, 2, 4, 8, 16, 32 if (i == 0) || ((i > 1) && (i & (i - 1) == 0)) { - chain_blocks.push(block.clone()); + let (slot, hash) = block.clone(); + chain_blocks.push(Point {slot, hash}); } } } - None => {} + None => (), } /* Tip discovery: Use discovered tip to retrieve header. */ if self.tip_to_intersect.is_some() { let tip = self.tip_to_intersect.as_ref().unwrap(); - chain_blocks.push((tip.slot_number, tip.hash.clone())); + chain_blocks.push(Point { + slot: tip.slot_number, + hash: tip.hash.clone(), + }); } //TODO: Make these protocol inputs instead of hardcoding here // Last byron block of mainnet - chain_blocks.push(( - 4492799, - hex::decode( + chain_blocks.push(Point { + slot: 4492799, + hash: hex::decode( "f8084c61b6a238acec985b59310b6ecec49c0ab8352249afd7268da5cff2a457", ) .unwrap(), - )); + }); // Last byron block of testnet - chain_blocks.push(( - 1598399, - hex::decode( + chain_blocks.push(Point { + slot: 1598399, + hash: hex::decode( "7e16781b40ebf8b6da18f7b5e8ade855d6738095ef2f1c58c77e88b6e45997a4", ) .unwrap(), - )); + }); // Last byron block of guild - chain_blocks.push(( - 719, - hex::decode( + chain_blocks.push(Point { + slot: 719, + hash: hex::decode( "e5400faf19e712ebc5ff5b4b44cecb2b140d1cca25a011e36a91d89e97f53e2e", ) .unwrap(), - )); + }); - trace!("intersect"); - let payload = self.msg_find_intersect(chain_blocks); self.state = State::Intersect; - Ok(payload) + Ok(Message::FindIntersect(chain_blocks)) } else { - // request the next block from the server. - trace!("msg_request_next"); - let payload = self.msg_request_next(); self.state = State::CanAwait; - Ok(payload) + Ok(Message::RequestNext) } } state => Err(format!("Unsupported: {:?}", state)), } } - fn recv(&mut self, message: Self::Message) -> Result<(), Error> { - //msgRequestNext = [0] - //msgAwaitReply = [1] - //msgRollForward = [2, wrappedHeader, tip] - //msgRollBackward = [3, point, tip] - //msgFindIntersect = [4, points] - //msgIntersectFound = [5, point, tip] - //msgIntersectNotFound = [6, tip] - //chainSyncMsgDone = [7] - - let Message::Array(cbor_array) = message; - match cbor_array[0] { - Value::Integer(message_id) => { - match message_id { - 1 => { - // Server wants us to wait a bit until it gets a new block - self.state = State::MustReply; + fn recv(&mut self, message: Message) -> Result<(), Error> { + match message { + Message::RequestNext => { + // Server wants us to wait a bit until it gets a new block + self.state = State::MustReply; + } + Message::RollForward(header, tip) => { + let is_tip = header.slot_number == tip.slot_number + && header.hash == tip.hash; + trace!( + "block {} of {}, {:.2}% synced", + header.block_number, + tip.block_number, + (header.block_number as f64 + / tip.block_number as f64) + * 100.0 + ); + if is_tip || self.last_log_time.elapsed() > ChainSync::FIVE_SECS { + if self.mode == Mode::Sync { + info!( + "block {} of {}, {:.2}% synced", + header.block_number, + tip.block_number, + (header.block_number as f64 + / tip.block_number as f64) + * 100.0 + ); } - 2 => { - // MsgRollForward - match parse_msg_roll_forward(cbor_array) { - None => { - warn!("Probably a byron block. skipping...") - } - Some((msg_roll_forward, tip)) => { - let is_tip = msg_roll_forward.slot_number == tip.slot_number - && msg_roll_forward.hash == tip.hash; - trace!( - "block {} of {}, {:.2}% synced", - msg_roll_forward.block_number, - tip.block_number, - (msg_roll_forward.block_number as f64 - / tip.block_number as f64) - * 100.0 - ); - if is_tip || self.last_log_time.elapsed() > ChainSync::FIVE_SECS { - if self.mode == Mode::Sync { - info!( - "block {} of {}, {:.2}% synced", - msg_roll_forward.block_number, - tip.block_number, - (msg_roll_forward.block_number as f64 - / tip.block_number as f64) - * 100.0 - ); - } - self.last_log_time = Instant::now() - } - - /* Classic sync: Store header data. */ - /* TODO: error handling */ - let _ = self.save_block(&msg_roll_forward, is_tip); - - if is_tip { - /* Got complete tip header. */ - self.notify_tip(&msg_roll_forward); - } else { - match self.mode { - /* Next time get tip header. */ - Mode::SendTip => self.jump_to_tip(tip), - _ => {} - } - } - } - } + self.last_log_time = Instant::now() + } - self.state = State::Idle; + /* Classic sync: Store header data. */ + /* TODO: error handling */ + let _ = self.save_block(&header, is_tip); - // testing only so we sync only a single block - // self.state = State::Done; - } - 3 => { - // MsgRollBackward - let slot = parse_msg_roll_backward(cbor_array); - warn!("rollback to slot: {}", slot); - self.state = State::Idle; - } - 5 => { - debug!("MsgIntersectFound: {:?}", cbor_array); - self.is_intersect_found = true; - self.state = State::Idle; - } - 6 => { - warn!("MsgIntersectNotFound: {:?}", cbor_array); - self.is_intersect_found = true; // should start syncing at first byron block. We will just skip all byron - // blocks. - self.state = State::Idle; - } - 7 => { - warn!("MsgDone: {:?}", cbor_array); - self.state = State::Done; - } - _ => { - error!("Got unexpected message_id: {}", message_id); + if is_tip { + /* Got complete tip header. */ + self.notify_tip(&header); + } else { + match self.mode { + /* Next time get tip header. */ + Mode::SendTip => self.jump_to_tip(tip), + _ => (), } } + self.state = State::Idle; } - _ => { - error!("Unexpected cbor!") + Message::RollBackward(_point, _tip) => { + self.state = State::Idle; + } + Message::IntersectFound(_point, _tip) => { + self.is_intersect_found = true; + self.state = State::Idle; + } + Message::IntersectNotFound(_tip) => { + self.is_intersect_found = true; // should start syncing at first byron block. We will just skip all byron + // blocks. + self.state = State::Idle; } + Message::Done => { + self.state = State::Done; + } + other => return Err(format!("Got unexpected message: {:?}", other)), } Ok(()) } @@ -408,7 +403,7 @@ impl UnwrapValue for Value { } } -pub fn parse_msg_roll_forward(cbor_array: Vec) -> Option<(BlockHeader, Tip)> { +fn parse_wrapped_header(value: &Value) -> Result { let mut msg_roll_forward = BlockHeader { block_number: 0, slot_number: 0, @@ -429,13 +424,7 @@ pub fn parse_msg_roll_forward(cbor_array: Vec) -> Option<(BlockHeader, Ti protocol_major_version: 0, protocol_minor_version: 0, }; - let mut tip = Tip { - block_number: 0, - slot_number: 0, - hash: vec![], - }; - - match &cbor_array[1] { + match value { Value::Array(header_array) => { match &header_array[1] { Value::Bytes(wrapped_block_header_bytes) => { @@ -474,10 +463,7 @@ pub fn parse_msg_roll_forward(cbor_array: Vec) -> Option<(BlockHeader, Ti .eta_vrf_1 .append(&mut nonce_array[1].bytes()); } - _ => { - warn!("invalid cbor! code: 340"); - return None; - } + _ => return Err("invalid cbor! code: 340".to_string()), } match &block_header_array_inner[6] { Value::Array(leader_array) => { @@ -488,10 +474,7 @@ pub fn parse_msg_roll_forward(cbor_array: Vec) -> Option<(BlockHeader, Ti .leader_vrf_1 .append(&mut leader_array[1].bytes()); } - _ => { - warn!("invalid cbor! code: 341"); - return None; - } + _ => return Err("invalid cbor! code: 341".to_string()), } msg_roll_forward.block_size = block_header_array_inner[7].integer() as i64; @@ -513,69 +496,54 @@ pub fn parse_msg_roll_forward(cbor_array: Vec) -> Option<(BlockHeader, Ti msg_roll_forward.protocol_minor_version = block_header_array_inner[14].integer() as i64; } - _ => { - warn!("invalid cbor! code: 342"); - return None; - } + _ => return Err("invalid cbor! code: 342".to_string()), }, - _ => { - warn!("invalid cbor! code: 343"); - return None; - } + _ => return Err("invalid cbor! code: 343".to_string()), } } - _ => { - warn!("invalid cbor! code: 344"); - return None; - } + _ => return Err("invalid cbor! code: 344".to_string()), } } - _ => { - warn!("invalid cbor! code: 345"); - return None; - } + _ => return Err("invalid cbor! code: 345".to_string()), } + Ok(msg_roll_forward) +} - match &cbor_array[2] { +fn parse_tip(value: &Value) -> Result { + let mut tip = Tip { + block_number: 0, + slot_number: 0, + hash: vec![], + }; + match value { Value::Array(tip_array) => { match &tip_array[0] { Value::Array(tip_info_array) => { tip.slot_number = tip_info_array[0].integer() as i64; tip.hash.append(&mut tip_info_array[1].bytes()); } - _ => { - warn!("invalid cbor! code: 346"); - return None; - } + _ => return Err("invalid cbor! code: 346".to_string()), } tip.block_number = tip_array[1].integer() as i64; } - _ => { - warn!("invalid cbor! code: 347"); - return None; - } + _ => return Err("invalid cbor! code: 347".to_string()), } - - Some((msg_roll_forward, tip)) + Ok(tip) } -pub fn parse_msg_roll_backward(cbor_array: Vec) -> i64 { +pub fn parse_point(value: &Value) -> Result { let mut slot: i64 = 0; - match &cbor_array[1] { + match value { Value::Array(block) => { if block.len() > 0 { match block[0] { Value::Integer(parsed_slot) => slot = parsed_slot as i64, - _ => { - error!("invalid cbor"); - } + _ => return Err("invalid cbor".to_string()), } } } - _ => { - error!("invalid cbor"); - } + _ => return Err("invalid cbor".to_string()), } - - slot + Ok(slot) } + From 0ba9af9e55b3fc520a685a43aaeb33fd1d78a16b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pavel=20=C5=A0imerda?= Date: Thu, 10 Feb 2022 02:13:22 +0100 Subject: [PATCH 4/5] feat(chainsync): create a clean chainsync client implementation --- Cargo.toml | 9 - examples/common.rs | 23 +- examples/pooltool.rs | 148 ---------- examples/sqlite.rs | 313 --------------------- examples/sync.rs | 35 ++- examples/tip.rs | 46 +-- src/lib.rs | 4 +- src/model.rs | 35 +++ src/mux.rs | 4 +- src/protocols.rs | 40 +++ src/protocols/chainsync.rs | 558 ++++++++++++++----------------------- 11 files changed, 358 insertions(+), 857 deletions(-) delete mode 100644 examples/pooltool.rs delete mode 100644 examples/sqlite.rs create mode 100644 src/model.rs diff --git a/Cargo.toml b/Cargo.toml index 4bbe4f5..4e7a324 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,7 +34,6 @@ tokio = { version = "1.15.0", features = ["full"]} [dev-dependencies] env_logger = "0.9.0" futures = "0.3.8" -rusqlite = { version = "0.26.0", features = ["bundled"] } oura = "1.1.0" pallas = "0.4.0" sled = "0.34.7" @@ -42,11 +41,3 @@ sled = "0.34.7" [[example]] name = "common" crate-type = ["staticlib"] - -[[example]] -name = "sqlite" -crate-type = ["staticlib"] - -[[example]] -name = "pooltool" -crate-type = ["staticlib"] diff --git a/examples/common.rs b/examples/common.rs index bf26a13..f892837 100644 --- a/examples/common.rs +++ b/examples/common.rs @@ -7,8 +7,6 @@ // SPDX-License-Identifier: MPL-2.0 // -use std::path::PathBuf; - use std::sync::Arc; use pallas::ledger::alonzo::{ @@ -30,13 +28,19 @@ use oura::{ }, }; +use cardano_ouroboros_network::{ + model::Point, +}; + #[derive(Clone)] pub struct Config { - pub db: PathBuf, pub sdb: sled::Db, pub host: String, pub magic: u32, pub writer: EventWriter, + pub byron_mainnet: Point, + pub byron_testnet: Point, + pub byron_guild: Point, } pub fn init() -> Config { @@ -47,11 +51,22 @@ impl Config { fn new() -> Config { env_logger::init(); Config { - db: PathBuf::from("sqlite.db"), sdb: sled::open(".db").unwrap(), host: "relays-new.cardano-mainnet.iohk.io:3001".to_string(), magic: 764824073, writer: oura_init().unwrap(), + byron_mainnet: ( + 4492799, + "f8084c61b6a238acec985b59310b6ecec49c0ab8352249afd7268da5cff2a457", + ).try_into().unwrap(), + byron_testnet: ( + 1598399, + "7e16781b40ebf8b6da18f7b5e8ade855d6738095ef2f1c58c77e88b6e45997a4", + ).try_into().unwrap(), + byron_guild: ( + 719, + "e5400faf19e712ebc5ff5b4b44cecb2b140d1cca25a011e36a91d89e97f53e2e", + ).try_into().unwrap(), } } diff --git a/examples/pooltool.rs b/examples/pooltool.rs deleted file mode 100644 index 775674d..0000000 --- a/examples/pooltool.rs +++ /dev/null @@ -1,148 +0,0 @@ -// -// © 2020 - 2022 PERLUR Group -// -// Re-licenses under MPLv2 -// © 2022 PERLUR Group -// -// SPDX-License-Identifier: MPL-2.0 -// - -use cardano_ouroboros_network::{ - protocols::chainsync::Listener, - BlockHeader, -}; -use chrono::{ - SecondsFormat, - Utc, -}; -use log::{ - error, - info, -}; -use regex::Regex; -use serde::Serialize; -use std::path::PathBuf; -use std::{ - ops::Sub, - process::{ - Command, - Stdio, - }, - time::{ - Duration, - Instant, - }, -}; - -struct PoolTool { - pub pooltool_api_key: String, - pub cardano_node_path: PathBuf, - pub node_version: String, - pub last_node_version_time: Instant, - pub pool_name: String, - pub pool_id: String, -} - -impl Default for PoolTool { - fn default() -> Self { - PoolTool { - pooltool_api_key: String::new(), - cardano_node_path: PathBuf::new(), - node_version: String::new(), - last_node_version_time: Instant::now().sub(Duration::from_secs(7200)), // 2 hours ago - pool_name: String::new(), - pool_id: String::new(), - } - } -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -struct PooltoolStats { - api_key: String, - pool_id: String, - data: PooltoolData, -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -struct PooltoolData { - node_id: String, - version: String, - at: String, - block_no: i64, - slot_no: i64, - block_hash: String, - parent_hash: String, - leader_vrf: String, - platform: String, -} - -impl Listener for PoolTool { - fn handle_tip(&mut self, header: &BlockHeader) { - if self.last_node_version_time.elapsed() > Duration::from_secs(3600) { - // Our node version is outdated. Make a call to update it. - let output = Command::new(&self.cardano_node_path) - .arg("--version") - .stdin(Stdio::null()) - .stdout(Stdio::piped()) - .output() - .expect(&*format!("Failed to execute {:?}", &self.cardano_node_path)); - let version_string = String::from_utf8_lossy(&output.stdout); - let cap = Regex::new("cardano-node (\\d+\\.\\d+\\.\\d+) .*\ngit rev ([a-f0-9]{5}).*") - .unwrap() - .captures(&*version_string) - .unwrap(); - self.node_version = format!( - "{}:{}", - cap.get(1).map_or("", |m| m.as_str()), - cap.get(2).map_or("", |m| m.as_str()) - ); - info!("Checking cardano-node version: {}", &self.node_version); - self.last_node_version_time = Instant::now(); - } - let client = reqwest::blocking::Client::new(); - let pooltool_result = client - .post("https://api.pooltool.io/v0/sendstats") - .body( - serde_json::ser::to_string(&PooltoolStats { - api_key: self.pooltool_api_key.clone(), - pool_id: self.pool_id.clone(), - data: PooltoolData { - node_id: "".to_string(), - version: self.node_version.clone(), - at: Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true), - block_no: header.block_number, - slot_no: header.slot_number, - block_hash: hex::encode(&header.hash), - parent_hash: hex::encode(&header.prev_hash), - leader_vrf: hex::encode(&header.leader_vrf_0), - platform: "cncli".to_string(), - }, - }) - .unwrap(), - ) - .send(); - - match pooltool_result { - Ok(response) => match response.text() { - Ok(text) => { - info!( - "Pooltool ({}, {}): ({}, {}), json: {}", - &self.pool_name, - &self.pool_id[..8], - &header.block_number, - hex::encode(&header.hash[..8]), - text - ); - } - Err(error) => { - error!("PoolTool error: {}", error); - } - }, - Err(error) => { - error!("PoolTool error: {}", error); - } - } - } -} diff --git a/examples/sqlite.rs b/examples/sqlite.rs deleted file mode 100644 index 295c998..0000000 --- a/examples/sqlite.rs +++ /dev/null @@ -1,313 +0,0 @@ -// -// Forked-off from https://github.com/AndrewWestberg/cncli/ on 2020-11-30 -// © 2020 Andrew Westberg licensed under Apache-2.0 -// -// Re-licensed under GPLv3 or LGPLv3 -// © 2020 - 2021 PERLUR Group -// -// Re-licenses under MPLv2 -// © 2022 PERLUR Group -// -// SPDX-License-Identifier: MPL-2.0 -// - -use blake2b_simd::Params; -use cardano_ouroboros_network::{ - BlockHeader, - BlockStore, -}; -use log::debug; -use rusqlite::{ - named_params, - Connection, - Error, -}; -use std::{ - io, - path::PathBuf, -}; - -pub struct SQLiteBlockStore { - pub db: Connection, -} - -impl SQLiteBlockStore { - const DB_VERSION: i64 = 2; - - pub fn new(db_path: &PathBuf) -> Result { - debug!("Opening database"); - let db = Connection::open(db_path)?; - { - debug!("Intialize database."); - db.execute_batch("PRAGMA journal_mode=WAL")?; - db.execute( - "CREATE TABLE IF NOT EXISTS db_version (version INTEGER PRIMARY KEY)", - [], - )?; - let mut stmt = db.prepare("SELECT version FROM db_version")?; - let mut rows = stmt.query([])?; - let version: i64 = match rows.next()? { - None => -1, - Some(row) => row.get(0)?, - }; - - // Upgrade their database to version 1 - if version < 1 { - debug!("Upgrade database to version 1..."); - db.execute( - "CREATE TABLE IF NOT EXISTS chain (\ - id INTEGER PRIMARY KEY AUTOINCREMENT, \ - block_number INTEGER NOT NULL, \ - slot_number INTEGER NOT NULL, \ - hash TEXT NOT NULL, \ - prev_hash TEXT NOT NULL, \ - eta_v TEXT NOT NULL, \ - node_vkey TEXT NOT NULL, \ - node_vrf_vkey TEXT NOT NULL, \ - eta_vrf_0 TEXT NOT NULL, \ - eta_vrf_1 TEXT NOT NULL, \ - leader_vrf_0 TEXT NOT NULL, \ - leader_vrf_1 TEXT NOT NULL, \ - block_size INTEGER NOT NULL, \ - block_body_hash TEXT NOT NULL, \ - pool_opcert TEXT NOT NULL, \ - unknown_0 INTEGER NOT NULL, \ - unknown_1 INTEGER NOT NULL, \ - unknown_2 TEXT NOT NULL, \ - protocol_major_version INTEGER NOT NULL, \ - protocol_minor_version INTEGER NOT NULL, \ - orphaned INTEGER NOT NULL DEFAULT 0 \ - )", - [], - )?; - db.execute( - "CREATE INDEX IF NOT EXISTS idx_chain_slot_number ON chain(slot_number)", - [], - )?; - db.execute( - "CREATE INDEX IF NOT EXISTS idx_chain_orphaned ON chain(orphaned)", - [], - )?; - db.execute( - "CREATE INDEX IF NOT EXISTS idx_chain_hash ON chain(hash)", - [], - )?; - db.execute( - "CREATE INDEX IF NOT EXISTS idx_chain_block_number ON chain(block_number)", - [], - )?; - } - - // Upgrade their database to version 2 - if version < 2 { - debug!("Upgrade database to version 2..."); - db.execute( - "CREATE TABLE IF NOT EXISTS slots (\ - id INTEGER PRIMARY KEY AUTOINCREMENT, \ - epoch INTEGER NOT NULL, \ - pool_id TEXT NOT NULL, \ - slot_qty INTEGER NOT NULL, \ - slots TEXT NOT NULL, \ - hash TEXT NOT NULL, - UNIQUE(epoch,pool_id) - )", - [], - )?; - } - - // Update the db version now that we've upgraded the user's database fully - if version < 0 { - db.execute( - "INSERT INTO db_version (version) VALUES (?1)", - &[&SQLiteBlockStore::DB_VERSION], - )?; - } else { - db.execute( - "UPDATE db_version SET version=?1", - &[&SQLiteBlockStore::DB_VERSION], - )?; - } - } - - Ok(SQLiteBlockStore { db: db }) - } - - fn sql_save_block( - &mut self, - pending_blocks: &mut Vec, - network_magic: u32, - ) -> Result<(), rusqlite::Error> { - let db = &mut self.db; - - // get the last block eta_v (nonce) in the db - let mut prev_eta_v = { - hex::decode( - match db.query_row( - "SELECT eta_v, max(slot_number) FROM chain WHERE orphaned = 0", - [], - |row| row.get(0), - ) { - Ok(eta_v) => eta_v, - Err(_) => { - if network_magic == 764824073 { - // mainnet genesis hash - String::from( - "1a3be38bcbb7911969283716ad7aa550250226b76a61fc51cc9a9a35d9276d81", - ) - } else { - // assume testnet genesis hash - String::from( - "849a1764f152e1b09c89c0dfdbcbdd38d711d1fec2db5dfa0f87cf2737a0eaf4", - ) - } - } - }, - ) - .unwrap() - }; - - let tx = db.transaction()?; - { - // scope for db transaction - let mut orphan_stmt = - tx.prepare("UPDATE chain SET orphaned = 1 WHERE block_number >= ?1")?; - let mut insert_stmt = tx.prepare( - "INSERT INTO chain (\ - block_number, \ - slot_number, \ - hash, \ - prev_hash, \ - eta_v, \ - node_vkey, \ - node_vrf_vkey, \ - eta_vrf_0, \ - eta_vrf_1, \ - leader_vrf_0, \ - leader_vrf_1, \ - block_size, \ - block_body_hash, \ - pool_opcert, \ - unknown_0, \ - unknown_1, \ - unknown_2, \ - protocol_major_version, \ - protocol_minor_version) \ - VALUES (\ - :block_number, \ - :slot_number, \ - :hash, \ - :prev_hash, \ - :eta_v, \ - :node_vkey, \ - :node_vrf_vkey, \ - :eta_vrf_0, \ - :eta_vrf_1, \ - :leader_vrf_0, \ - :leader_vrf_1, \ - :block_size, \ - :block_body_hash, \ - :pool_opcert, \ - :unknown_0, \ - :unknown_1, \ - :unknown_2, \ - :protocol_major_version, \ - :protocol_minor_version)", - )?; - - for block in pending_blocks.drain(..) { - // Set any necessary blocks as orphans - let orphan_num = orphan_stmt.execute(&[&block.block_number])?; - - if orphan_num > 0 { - // get the last block eta_v (nonce) in the db - prev_eta_v = { - hex::decode( - match tx.query_row("SELECT eta_v, max(slot_number) FROM chain WHERE orphaned = 0", [], |row| row.get(0)) { - Ok(eta_v) => { eta_v } - Err(_) => { - if network_magic == 764824073 { - // mainnet genesis hash - String::from("1a3be38bcbb7911969283716ad7aa550250226b76a61fc51cc9a9a35d9276d81") - } else { - // assume testnet genesis hash - String::from("849a1764f152e1b09c89c0dfdbcbdd38d711d1fec2db5dfa0f87cf2737a0eaf4") - } - } - } - ).unwrap() - }; - } - // blake2b hash of eta_vrf_0 - let mut block_eta_v = Params::new() - .hash_length(32) - .to_state() - .update(&*block.eta_vrf_0) - .finalize() - .as_bytes() - .to_vec(); - prev_eta_v.append(&mut block_eta_v); - // blake2b hash of prev_eta_v + block_eta_v - prev_eta_v = Params::new() - .hash_length(32) - .to_state() - .update(&*prev_eta_v) - .finalize() - .as_bytes() - .to_vec(); - - insert_stmt.execute(named_params! { - ":block_number" : block.block_number, - ":slot_number": block.slot_number, - ":hash" : hex::encode(block.hash), - ":prev_hash" : hex::encode(block.prev_hash), - ":eta_v" : hex::encode(&prev_eta_v), - ":node_vkey" : hex::encode(block.node_vkey), - ":node_vrf_vkey" : hex::encode(block.node_vrf_vkey), - ":eta_vrf_0" : hex::encode(block.eta_vrf_0), - ":eta_vrf_1" : hex::encode(block.eta_vrf_1), - ":leader_vrf_0" : hex::encode(block.leader_vrf_0), - ":leader_vrf_1" : hex::encode(block.leader_vrf_1), - ":block_size" : block.block_size, - ":block_body_hash" : hex::encode(block.block_body_hash), - ":pool_opcert" : hex::encode(block.pool_opcert), - ":unknown_0" : block.unknown_0, - ":unknown_1" : block.unknown_1, - ":unknown_2" : hex::encode(block.unknown_2), - ":protocol_major_version" : block.protocol_major_version, - ":protocol_minor_version" : block.protocol_minor_version, - })?; - } - } - - tx.commit()?; - Ok(()) - } -} - -impl BlockStore for SQLiteBlockStore { - fn save_block( - &mut self, - mut pending_blocks: &mut Vec, - network_magic: u32, - ) -> io::Result<()> { - match self.sql_save_block(&mut pending_blocks, network_magic) { - Ok(_) => Ok(()), - Err(_) => Err(io::Error::new(io::ErrorKind::Other, "Database error!")), - } - } - - fn load_blocks(&mut self) -> Option)>> { - let db = &self.db; - let mut stmt = db.prepare("SELECT slot_number, hash FROM chain where orphaned = 0 ORDER BY slot_number DESC LIMIT 33").unwrap(); - let blocks = stmt - .query_map([], |row| { - let slot_result: Result = row.get(0); - let hash_result: Result = row.get(1); - let slot = slot_result?; - let hash = hash_result?; - Ok((slot, hex::decode(hash).unwrap())) - }) - .ok()?; - Some(blocks.map(|item| item.unwrap()).collect()) - } -} diff --git a/examples/sync.rs b/examples/sync.rs index e256387..8b958e9 100644 --- a/examples/sync.rs +++ b/examples/sync.rs @@ -9,15 +9,13 @@ use cardano_ouroboros_network::{ mux::Connection, - protocols::chainsync::{ - ChainSync, - Mode, - }, protocols::handshake::Handshake, + protocols::chainsync::{ChainSync, Reply}, }; +use log::info; + mod common; -mod sqlite; async fn chainsync() -> Result<(), Box> { let cfg = common::init(); @@ -31,15 +29,24 @@ async fn chainsync() -> Result<(), Box> { .build()? .run(&mut connection) .await?; - let mut chainsync = ChainSync { - mode: Mode::Sync, - network_magic: cfg.magic, - store: Some(Box::new(sqlite::SQLiteBlockStore::new(&cfg.db)?)), - ..Default::default() - }; - - chainsync.run(&mut connection).await?; - Ok(()) + + let mut chainsync = ChainSync::builder() + .build(&mut connection); + chainsync.find_intersect(vec![ + cfg.byron_mainnet, + cfg.byron_testnet, + cfg.byron_guild, + ]).await?; + loop { + match chainsync.request_next().await? { + Reply::Forward(header, _tip) => { + info!("Block header: block={} slot={}", header.block_number, header.slot_number); + } + Reply::Backward(point, _tip) => { + info!("Roll backward: slot={}", point.slot); + } + } + } } #[tokio::main] diff --git a/examples/tip.rs b/examples/tip.rs index 6b4eedd..1ae0036 100644 --- a/examples/tip.rs +++ b/examples/tip.rs @@ -9,26 +9,13 @@ use cardano_ouroboros_network::{ mux::Connection, - protocols::chainsync::{ - ChainSync, - Listener, - Mode, - }, protocols::handshake::Handshake, - BlockHeader, + protocols::chainsync::{ChainSync, Intersect, Reply}, }; use log::info; mod common; -struct Handler {} - -impl Listener for Handler { - fn handle_tip(&mut self, msg_roll_forward: &BlockHeader) { - info!("Tip reached: {:?}!", msg_roll_forward); - } -} - async fn tip() -> Result<(), Box> { let cfg = common::init(); @@ -41,15 +28,30 @@ async fn tip() -> Result<(), Box> { .build()? .run(&mut connection) .await?; - let mut chainsync = ChainSync { - mode: Mode::SendTip, - network_magic: cfg.magic, - notify: Some(Box::new(Handler {})), - ..Default::default() - }; - chainsync.run(&mut connection).await?; - Ok(()) + let mut chainsync = ChainSync::builder() + .build(&mut connection); + let intersect = chainsync.find_intersect(vec![ + cfg.byron_mainnet, + cfg.byron_testnet, + cfg.byron_guild, + ]).await?; + match intersect { + Intersect::Found(point, tip) => info!("= {:?}, {:?}", point, tip), + _ => panic!(), + }; + loop { + match chainsync.request_next().await? { + Reply::Forward(header, tip) => { + info!("+ {:?}, {:?}", header, tip); + if header.hash == tip.hash { + info!("Reached tip!"); + } + chainsync.find_intersect(vec![tip.into()]).await?; + } + Reply::Backward(slot, tip) => info!("- {:?}, {:?}", slot, tip), + } + } } #[tokio::main] diff --git a/src/lib.rs b/src/lib.rs index 830b257..a34d0a2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,6 +12,7 @@ // pub mod mux; +pub mod model; pub mod protocols; use log::debug; @@ -35,7 +36,6 @@ pub trait Message: std::fmt::Debug + Sized { fn to_bytes(&self) -> Vec { let values = self.to_values(); - debug!("Tx: message {:?}", values); to_vec(&values).unwrap() } } @@ -70,6 +70,8 @@ pub trait Protocol { debug_assert_eq!(self.agency(), self.role()); // TODO: Protocol should really return an error. let message = self.send().unwrap(); + let info = message.info(); + debug!("Tx: message {}", info); let bytes = message.to_bytes(); debug!("State: {:?}", self.state()); Some(bytes) diff --git a/src/model.rs b/src/model.rs new file mode 100644 index 0000000..3325e49 --- /dev/null +++ b/src/model.rs @@ -0,0 +1,35 @@ +use crate::Error; + +#[derive(Debug, Clone)] +pub struct Point { + pub slot: i64, + pub hash: Vec, +} + +#[derive(Debug, Clone)] +pub struct Tip { + pub block_number: i64, + pub slot_number: i64, + pub hash: Vec, +} + +impl Into for Tip { + fn into(self) -> Point { + Point { + slot: self.slot_number, + hash: self.hash, + } + } +} + +impl TryFrom<(i64, &str)> for Point { + type Error = Error; + + fn try_from(pair: (i64, &str)) -> Result { + let (slot, hash) = pair; + Ok(Point { + slot, + hash: hex::decode(hash).map_err(|_| "Bad hash hex.".to_string())?, + }) + } +} diff --git a/src/mux.rs b/src/mux.rs index cbaab10..ac0f634 100644 --- a/src/mux.rs +++ b/src/mux.rs @@ -120,7 +120,7 @@ impl Connection { self.start_time.elapsed() } - pub(crate) fn channel<'a>(&'a mut self, idx: u16) -> Channel<'a> { + pub fn channel<'a>(&'a mut self, idx: u16) -> Channel<'a> { let receiver = self.register(idx); let demux = self.run_demux(); Channel { @@ -185,7 +185,7 @@ impl Connection { } } -pub(crate) struct Channel<'a> { +pub struct Channel<'a> { idx: u16, receiver: Receiver, connection: &'a mut Connection, diff --git a/src/protocols.rs b/src/protocols.rs index 6558372..66b2a7b 100644 --- a/src/protocols.rs +++ b/src/protocols.rs @@ -15,3 +15,43 @@ pub mod blockfetch; pub mod chainsync; pub mod handshake; pub mod txsubmission; + +use crate::Error; +use serde_cbor::Value; + +#[derive(Debug)] +pub(crate) struct Values<'a>(std::slice::Iter<'a, Value>); + +impl<'a> Values<'a> { + pub(crate) fn from_values(values: &'a Vec) -> Self { + Values(values.iter()) + } + + pub(crate) fn array(&mut self) -> Result { + match self.0.next() { + Some(Value::Array(values)) => Ok(Values::from_values(values)), + other => Err(format!("Integer required: {:?}", other)), + } + } + + pub(crate) fn integer(&mut self) -> Result { + match self.0.next() { + Some(&Value::Integer(value)) => Ok(value), + other => Err(format!("Integer required, found {:?}", other)), + } + } + + pub(crate) fn bytes(&mut self) -> Result<&Vec, Error> { + match self.0.next() { + Some(Value::Bytes(vec)) => Ok(vec), + other => Err(format!("Bytes required, found {:?}", other)), + } + } + + pub(crate) fn end(mut self) -> Result<(), Error> { + match self.0.next() { + None => Ok(()), + other => Err(format!("End of array required, found {:?}", other)), + } + } +} diff --git a/src/protocols/chainsync.rs b/src/protocols/chainsync.rs index ffee64c..dc537ab 100644 --- a/src/protocols/chainsync.rs +++ b/src/protocols/chainsync.rs @@ -11,34 +11,24 @@ // SPDX-License-Identifier: MPL-2.0 // -use std::{ - io, - ops::Sub, - time::{ - Duration, - Instant, - }, -}; - -use log::{ - info, - trace, -}; use serde_cbor::{ de, Value, }; -use crate::mux::Connection; use crate::Message as MessageOps; use crate::{ Agency, Error, Protocol, + mux::Connection, + mux::Channel, + model::Point, + model::Tip, + protocols::Values, }; use crate::{ BlockHeader, - BlockStore, }; use blake2b_simd::Params; @@ -52,55 +42,43 @@ pub enum State { Done, } -#[derive(Debug)] -pub struct Point { - slot: i64, - hash: Vec, -} - #[derive(Debug)] pub enum Message { RequestNext, AwaitReply, RollForward(BlockHeader, Tip), - RollBackward(i64, Tip), + RollBackward(Point, Tip), FindIntersect(Vec), - IntersectFound(i64, Tip), + IntersectFound(Point, Tip), IntersectNotFound(Tip), Done, } impl MessageOps for Message { - fn from_values(array: Vec) -> Result { - let mut values = array.iter(); - let message = match values - .next() - .ok_or("Unexpected end of message.".to_string())? - { - Value::Integer(0) => Message::RequestNext, - Value::Integer(1) => Message::AwaitReply, - Value::Integer(2) => Message::RollForward( - parse_wrapped_header(values.next().ok_or("Unexpected End of message.".to_string())?)?, - parse_tip(values.next().ok_or("Unexpected End of message.".to_string())?)?, + fn from_values(values: Vec) -> Result { + let mut array = Values::from_values(&values); + let message = match array.integer()? { + 0 => Message::RequestNext, + 1 => Message::AwaitReply, + 2 => Message::RollForward( + parse_wrapped_header(array.array()?)?, + parse_tip(array.array()?)?, ), - Value::Integer(3) => Message::RollBackward( - parse_point(values.next().ok_or("Unexpected End of message.".to_string())?)?, - parse_tip(values.next().ok_or("Unexpected End of message.".to_string())?)?, + 3 => Message::RollBackward( + parse_point(array.array()?)?, + parse_tip(array.array()?)?, ), - Value::Integer(5) => Message::IntersectFound( - parse_point(values.next().ok_or("Unexpected End of message.".to_string())?)?, - parse_tip(values.next().ok_or("Unexpected End of message.".to_string())?)?, + 5 => Message::IntersectFound( + parse_point(array.array()?)?, + parse_tip(array.array()?)?, ), - Value::Integer(6) => Message::IntersectNotFound( - parse_tip(values.next().ok_or("Unexpected End of message.".to_string())?)?, + 6 => Message::IntersectNotFound( + parse_tip(array.array()?)?, ), - Value::Integer(7) => Message::Done, - _ => return Err("Unexpected message ID.".to_string()), + 7 => Message::Done, + other => return Err(format!("Unexpected message: {}.", other)), }; - match values.next() { - Some(value) => return Err(format!("Unexpected data: {:?}", value)), - None => (), - } + array.end()?; Ok(message) } @@ -128,95 +106,85 @@ impl MessageOps for Message { } fn info(&self) -> String { - format!("{:?}", self) + match self { + Message::RollForward(header, _tip) => format!( + "block={} slot={}", + header.block_number, + header.slot_number, + ), + other => format!("{:?}", other), + } } } -#[derive(PartialEq)] -pub enum Mode { - Sync, - SendTip, +pub struct ChainSyncBuilder { } -#[derive(Debug)] -pub struct Tip { - pub block_number: i64, - pub slot_number: i64, - pub hash: Vec, +impl ChainSyncBuilder { + pub fn build<'a>(self, connection: &'a mut Connection) -> ChainSync<'a> { + ChainSync { + channel: Some(connection.channel(0x0002)), + intersect: None, + reply: None, + state: State::Idle, + query: None, + } + } } -pub trait Listener: Send { - fn handle_tip(&mut self, msg_roll_forward: &BlockHeader); +#[derive(Debug)] +pub enum Intersect { + Found(Point, Tip), + NotFound(Tip), } -pub struct ChainSync { - pub mode: Mode, - pub last_log_time: Instant, - pub last_insert_time: Instant, - pub store: Option>, - pub network_magic: u32, - pub pending_blocks: Vec, - pub state: State, - pub result: Option>, - pub is_intersect_found: bool, - pub tip_to_intersect: Option, - pub notify: Option>, +#[derive(Debug)] +pub enum Reply { + Forward(BlockHeader, Tip), + Backward(Point, Tip), } -impl Default for ChainSync { - fn default() -> Self { - ChainSync { - mode: Mode::Sync, - last_log_time: Instant::now().sub(Duration::from_secs(6)), - last_insert_time: Instant::now(), - store: None, - network_magic: 764824073, - pending_blocks: Vec::new(), - state: State::Idle, - result: None, - is_intersect_found: false, - tip_to_intersect: None, - notify: None, - } - } +enum Query { + Intersect(Vec), + Reply, } -impl ChainSync { - const FIVE_SECS: Duration = Duration::from_secs(5); +pub struct ChainSync<'a> { + channel: Option>, + query: Option, + intersect: Option, + reply: Option, + state: State, +} - pub async fn run(&mut self, connection: &mut Connection) -> Result<(), Error> { - connection.channel(self.protocol_id()).execute(self).await +impl<'a> ChainSync<'a> { + pub fn builder() -> ChainSyncBuilder { + ChainSyncBuilder { + } } - fn save_block(&mut self, msg_roll_forward: &BlockHeader, is_tip: bool) -> io::Result<()> { - match self.store.as_mut() { - Some(store) => { - self.pending_blocks.push((*msg_roll_forward).clone()); - - if is_tip || self.last_insert_time.elapsed() > ChainSync::FIVE_SECS { - store.save_block(&mut self.pending_blocks, self.network_magic)?; - self.last_insert_time = Instant::now(); - } - } - None => {} - } - Ok(()) + pub async fn find_intersect(&mut self, points: Vec) -> Result { + self.query = Some(Query::Intersect(points)); + self.execute().await?; + Ok(self.intersect.take().unwrap()) } - fn notify_tip(&mut self, msg_roll_forward: &BlockHeader) { - match &mut self.notify { - Some(listener) => listener.handle_tip(msg_roll_forward), - None => {} - } + pub async fn request_next(&mut self) -> Result { + self.query = Some(Query::Reply); + self.execute().await?; + Ok(self.reply.take().unwrap()) } - fn jump_to_tip(&mut self, tip: Tip) { - self.tip_to_intersect = Some(tip); - self.is_intersect_found = false; + async fn execute(&mut self) -> Result<(), Error> { + // TODO: Do something with the Option trick. + let mut channel = self.channel.take().ok_or("Channel not available.".to_string())?; + channel.execute(self).await?; + self.channel = Some(channel); + Ok(()) } } -impl Protocol for ChainSync { +impl Protocol for ChainSync<'_> { type State = State; type Message = Message; @@ -229,6 +197,9 @@ impl Protocol for ChainSync { } fn agency(&self) -> Agency { + if self.query.is_none() { + return Agency::None; + } return match self.state { State::Idle => Agency::Client, State::Intersect => Agency::Server, @@ -245,128 +216,47 @@ impl Protocol for ChainSync { fn send(&mut self) -> Result { match self.state { State::Idle => { - trace!("ChainSync::State::Idle"); - if !self.is_intersect_found { - let mut chain_blocks: Vec = Vec::new(); - - /* Classic sync: Use blocks from store if available. */ - match self.store.as_mut() { - Some(store) => { - let blocks = (*store).load_blocks().ok_or("Can't load blocks.")?; - for (i, block) in blocks.iter().enumerate() { - // all powers of 2 including 0th element 0, 2, 4, 8, 16, 32 - if (i == 0) || ((i > 1) && (i & (i - 1) == 0)) { - let (slot, hash) = block.clone(); - chain_blocks.push(Point {slot, hash}); - } - } - } - None => (), + match self.query.as_ref().unwrap() { + Query::Intersect(points) => { + self.state = State::Intersect; + Ok(Message::FindIntersect(points.clone())) } - - /* Tip discovery: Use discovered tip to retrieve header. */ - if self.tip_to_intersect.is_some() { - let tip = self.tip_to_intersect.as_ref().unwrap(); - chain_blocks.push(Point { - slot: tip.slot_number, - hash: tip.hash.clone(), - }); + Query::Reply => { + self.state = State::CanAwait; + Ok(Message::RequestNext) } - - //TODO: Make these protocol inputs instead of hardcoding here - // Last byron block of mainnet - chain_blocks.push(Point { - slot: 4492799, - hash: hex::decode( - "f8084c61b6a238acec985b59310b6ecec49c0ab8352249afd7268da5cff2a457", - ) - .unwrap(), - }); - // Last byron block of testnet - chain_blocks.push(Point { - slot: 1598399, - hash: hex::decode( - "7e16781b40ebf8b6da18f7b5e8ade855d6738095ef2f1c58c77e88b6e45997a4", - ) - .unwrap(), - }); - // Last byron block of guild - chain_blocks.push(Point { - slot: 719, - hash: hex::decode( - "e5400faf19e712ebc5ff5b4b44cecb2b140d1cca25a011e36a91d89e97f53e2e", - ) - .unwrap(), - }); - - self.state = State::Intersect; - Ok(Message::FindIntersect(chain_blocks)) - } else { - self.state = State::CanAwait; - Ok(Message::RequestNext) } } - state => Err(format!("Unsupported: {:?}", state)), + other => Err(format!("Unsupported: {:?}", other)), } } fn recv(&mut self, message: Message) -> Result<(), Error> { match message { Message::RequestNext => { - // Server wants us to wait a bit until it gets a new block + self.state = State::CanAwait; + } + Message::AwaitReply => { self.state = State::MustReply; } Message::RollForward(header, tip) => { - let is_tip = header.slot_number == tip.slot_number - && header.hash == tip.hash; - trace!( - "block {} of {}, {:.2}% synced", - header.block_number, - tip.block_number, - (header.block_number as f64 - / tip.block_number as f64) - * 100.0 - ); - if is_tip || self.last_log_time.elapsed() > ChainSync::FIVE_SECS { - if self.mode == Mode::Sync { - info!( - "block {} of {}, {:.2}% synced", - header.block_number, - tip.block_number, - (header.block_number as f64 - / tip.block_number as f64) - * 100.0 - ); - } - self.last_log_time = Instant::now() - } - - /* Classic sync: Store header data. */ - /* TODO: error handling */ - let _ = self.save_block(&header, is_tip); - - if is_tip { - /* Got complete tip header. */ - self.notify_tip(&header); - } else { - match self.mode { - /* Next time get tip header. */ - Mode::SendTip => self.jump_to_tip(tip), - _ => (), - } - } + self.reply = Some(Reply::Forward(header, tip)); + self.query = None; self.state = State::Idle; } - Message::RollBackward(_point, _tip) => { + Message::RollBackward(point, tip) => { + self.reply = Some(Reply::Backward(point, tip)); + self.query = None; self.state = State::Idle; } - Message::IntersectFound(_point, _tip) => { - self.is_intersect_found = true; + Message::IntersectFound(point, tip) => { + self.intersect = Some(Intersect::Found(point, tip)); + self.query = None; self.state = State::Idle; } - Message::IntersectNotFound(_tip) => { - self.is_intersect_found = true; // should start syncing at first byron block. We will just skip all byron - // blocks. + Message::IntersectNotFound(tip) => { + self.intersect = Some(Intersect::NotFound(tip)); + self.query = None; self.state = State::Idle; } Message::Done => { @@ -379,31 +269,34 @@ impl Protocol for ChainSync { } trait UnwrapValue { - fn integer(&self) -> i128; - fn bytes(&self) -> Vec; + fn array(&self) -> Result<&Vec, Error>; + fn integer(&self) -> Result; + fn bytes(&self) -> Result<&Vec, Error>; } impl UnwrapValue for Value { - fn integer(&self) -> i128 { + fn array(&self) -> Result<&Vec, Error> { match self { - Value::Integer(integer_value) => *integer_value, - _ => { - panic!("not an integer!") - } + Value::Array(array) => Ok(array), + _ => Err(format!("Integer required: {:?}", self)), } } - fn bytes(&self) -> Vec { + fn integer(&self) -> Result { match self { - Value::Bytes(bytes_vec) => bytes_vec.clone(), - _ => { - panic!("not a byte array!") - } + Value::Integer(value) => Ok(*value), + _ => Err(format!("Integer required: {:?}", self)), } } -} -fn parse_wrapped_header(value: &Value) -> Result { + fn bytes(&self) -> Result<&Vec, Error> { + match self { + Value::Bytes(vec) => Ok(vec), + _ => Err(format!("Bytes required: {:?}", self)), + } + } +} +fn parse_wrapped_header(mut array: Values) -> Result { let mut msg_roll_forward = BlockHeader { block_number: 0, slot_number: 0, @@ -424,126 +317,103 @@ fn parse_wrapped_header(value: &Value) -> Result { protocol_major_version: 0, protocol_minor_version: 0, }; - match value { - Value::Array(header_array) => { - match &header_array[1] { - Value::Bytes(wrapped_block_header_bytes) => { - // calculate the block hash - let hash = Params::new() - .hash_length(32) - .to_state() - .update(&*wrapped_block_header_bytes) - .finalize(); - msg_roll_forward.hash = hash.as_bytes().to_owned(); - - let block_header: Value = - de::from_slice(&wrapped_block_header_bytes[..]).unwrap(); - match block_header { - Value::Array(block_header_array) => match &block_header_array[0] { - Value::Array(block_header_array_inner) => { - msg_roll_forward.block_number = - block_header_array_inner[0].integer() as i64; - msg_roll_forward.slot_number = - block_header_array_inner[1].integer() as i64; - msg_roll_forward - .prev_hash - .append(&mut block_header_array_inner[2].bytes()); - msg_roll_forward - .node_vkey - .append(&mut block_header_array_inner[3].bytes()); - msg_roll_forward - .node_vrf_vkey - .append(&mut block_header_array_inner[4].bytes()); - match &block_header_array_inner[5] { - Value::Array(nonce_array) => { - msg_roll_forward - .eta_vrf_0 - .append(&mut nonce_array[0].bytes()); - msg_roll_forward - .eta_vrf_1 - .append(&mut nonce_array[1].bytes()); - } - _ => return Err("invalid cbor! code: 340".to_string()), - } - match &block_header_array_inner[6] { - Value::Array(leader_array) => { - msg_roll_forward - .leader_vrf_0 - .append(&mut leader_array[0].bytes()); - msg_roll_forward - .leader_vrf_1 - .append(&mut leader_array[1].bytes()); - } - _ => return Err("invalid cbor! code: 341".to_string()), - } - msg_roll_forward.block_size = - block_header_array_inner[7].integer() as i64; - msg_roll_forward - .block_body_hash - .append(&mut block_header_array_inner[8].bytes()); - msg_roll_forward - .pool_opcert - .append(&mut block_header_array_inner[9].bytes()); - msg_roll_forward.unknown_0 = - block_header_array_inner[10].integer() as i64; - msg_roll_forward.unknown_1 = - block_header_array_inner[11].integer() as i64; - msg_roll_forward - .unknown_2 - .append(&mut block_header_array_inner[12].bytes()); - msg_roll_forward.protocol_major_version = - block_header_array_inner[13].integer() as i64; - msg_roll_forward.protocol_minor_version = - block_header_array_inner[14].integer() as i64; - } - _ => return Err("invalid cbor! code: 342".to_string()), - }, - _ => return Err("invalid cbor! code: 343".to_string()), + + array.integer()?; + let wrapped_block_header_bytes = array.bytes()?.clone(); + array.end()?; + + // calculate the block hash + let hash = Params::new() + .hash_length(32) + .to_state() + .update(&*wrapped_block_header_bytes) + .finalize(); + msg_roll_forward.hash = hash.as_bytes().to_owned(); + + let block_header: Value = + de::from_slice(&wrapped_block_header_bytes[..]).unwrap(); + match block_header { + Value::Array(block_header_array) => match &block_header_array[0] { + Value::Array(block_header_array_inner) => { + msg_roll_forward.block_number = + block_header_array_inner[0].integer()? as i64; + msg_roll_forward.slot_number = + block_header_array_inner[1].integer()? as i64; + msg_roll_forward + .prev_hash + .append(&mut block_header_array_inner[2].bytes()?.clone()); + msg_roll_forward + .node_vkey + .append(&mut block_header_array_inner[3].bytes()?.clone()); + msg_roll_forward + .node_vrf_vkey + .append(&mut block_header_array_inner[4].bytes()?.clone()); + match &block_header_array_inner[5] { + Value::Array(nonce_array) => { + msg_roll_forward + .eta_vrf_0 + .append(&mut nonce_array[0].bytes()?.clone()); + msg_roll_forward + .eta_vrf_1 + .append(&mut nonce_array[1].bytes()?.clone()); } + _ => return Err("invalid cbor! code: 340".to_string()), } - _ => return Err("invalid cbor! code: 344".to_string()), + match &block_header_array_inner[6] { + Value::Array(leader_array) => { + msg_roll_forward + .leader_vrf_0 + .append(&mut leader_array[0].bytes()?.clone()); + msg_roll_forward + .leader_vrf_1 + .append(&mut leader_array[1].bytes()?.clone()); + } + _ => return Err("invalid cbor! code: 341".to_string()), + } + msg_roll_forward.block_size = + block_header_array_inner[7].integer()? as i64; + msg_roll_forward + .block_body_hash + .append(&mut block_header_array_inner[8].bytes()?.clone()); + msg_roll_forward + .pool_opcert + .append(&mut block_header_array_inner[9].bytes()?.clone()); + msg_roll_forward.unknown_0 = + block_header_array_inner[10].integer()? as i64; + msg_roll_forward.unknown_1 = + block_header_array_inner[11].integer()? as i64; + msg_roll_forward + .unknown_2 + .append(&mut block_header_array_inner[12].bytes()?.clone()); + msg_roll_forward.protocol_major_version = + block_header_array_inner[13].integer()? as i64; + msg_roll_forward.protocol_minor_version = + block_header_array_inner[14].integer()? as i64; } - } - _ => return Err("invalid cbor! code: 345".to_string()), + _ => return Err("invalid cbor! code: 342".to_string()), + }, + _ => return Err("invalid cbor! code: 343".to_string()), } Ok(msg_roll_forward) } -fn parse_tip(value: &Value) -> Result { - let mut tip = Tip { - block_number: 0, - slot_number: 0, - hash: vec![], - }; - match value { - Value::Array(tip_array) => { - match &tip_array[0] { - Value::Array(tip_info_array) => { - tip.slot_number = tip_info_array[0].integer() as i64; - tip.hash.append(&mut tip_info_array[1].bytes()); - } - _ => return Err("invalid cbor! code: 346".to_string()), - } - tip.block_number = tip_array[1].integer() as i64; - } - _ => return Err("invalid cbor! code: 347".to_string()), - } - Ok(tip) +fn parse_tip(mut array: Values) -> Result { + let mut tip_info_array = array.array()?; + let slot_number = tip_info_array.integer()? as i64; + let hash = tip_info_array.bytes()?.clone(); + tip_info_array.end()?; + let block_number = array.integer()? as i64; + array.end()?; + Ok(Tip { + block_number, + slot_number, + hash, + }) } -pub fn parse_point(value: &Value) -> Result { - let mut slot: i64 = 0; - match value { - Value::Array(block) => { - if block.len() > 0 { - match block[0] { - Value::Integer(parsed_slot) => slot = parsed_slot as i64, - _ => return Err("invalid cbor".to_string()), - } - } - } - _ => return Err("invalid cbor".to_string()), - } - Ok(slot) +fn parse_point(mut array: Values) -> Result { + let slot = array.integer()? as i64; + let hash = array.bytes()?.clone(); + array.end()?; + Ok(Point { slot, hash }) } - From b128ebdc67ff8f3b8be887b52f4d5afee2b72ddb Mon Sep 17 00:00:00 2001 From: Mark Stopka Date: Sat, 12 Feb 2022 21:51:32 +0100 Subject: [PATCH 5/5] style(fmt): Apply rustfmt changes --- examples/common.rs | 16 ++++--- examples/sync.rs | 21 +++++---- examples/tip.rs | 17 +++---- src/lib.rs | 2 +- src/mux.rs | 13 +++--- src/protocols/blockfetch.rs | 10 +++- src/protocols/chainsync.rs | 91 ++++++++++++++----------------------- 7 files changed, 81 insertions(+), 89 deletions(-) diff --git a/examples/common.rs b/examples/common.rs index f892837..404d8d9 100644 --- a/examples/common.rs +++ b/examples/common.rs @@ -28,9 +28,7 @@ use oura::{ }, }; -use cardano_ouroboros_network::{ - model::Point, -}; +use cardano_ouroboros_network::model::Point; #[derive(Clone)] pub struct Config { @@ -58,15 +56,21 @@ impl Config { byron_mainnet: ( 4492799, "f8084c61b6a238acec985b59310b6ecec49c0ab8352249afd7268da5cff2a457", - ).try_into().unwrap(), + ) + .try_into() + .unwrap(), byron_testnet: ( 1598399, "7e16781b40ebf8b6da18f7b5e8ade855d6738095ef2f1c58c77e88b6e45997a4", - ).try_into().unwrap(), + ) + .try_into() + .unwrap(), byron_guild: ( 719, "e5400faf19e712ebc5ff5b4b44cecb2b140d1cca25a011e36a91d89e97f53e2e", - ).try_into().unwrap(), + ) + .try_into() + .unwrap(), } } diff --git a/examples/sync.rs b/examples/sync.rs index 8b958e9..78bee16 100644 --- a/examples/sync.rs +++ b/examples/sync.rs @@ -9,8 +9,11 @@ use cardano_ouroboros_network::{ mux::Connection, + protocols::chainsync::{ + ChainSync, + Reply, + }, protocols::handshake::Handshake, - protocols::chainsync::{ChainSync, Reply}, }; use log::info; @@ -30,17 +33,17 @@ async fn chainsync() -> Result<(), Box> { .run(&mut connection) .await?; - let mut chainsync = ChainSync::builder() - .build(&mut connection); - chainsync.find_intersect(vec![ - cfg.byron_mainnet, - cfg.byron_testnet, - cfg.byron_guild, - ]).await?; + let mut chainsync = ChainSync::builder().build(&mut connection); + chainsync + .find_intersect(vec![cfg.byron_mainnet, cfg.byron_testnet, cfg.byron_guild]) + .await?; loop { match chainsync.request_next().await? { Reply::Forward(header, _tip) => { - info!("Block header: block={} slot={}", header.block_number, header.slot_number); + info!( + "Block header: block={} slot={}", + header.block_number, header.slot_number + ); } Reply::Backward(point, _tip) => { info!("Roll backward: slot={}", point.slot); diff --git a/examples/tip.rs b/examples/tip.rs index 1ae0036..5d2a3b0 100644 --- a/examples/tip.rs +++ b/examples/tip.rs @@ -9,8 +9,12 @@ use cardano_ouroboros_network::{ mux::Connection, + protocols::chainsync::{ + ChainSync, + Intersect, + Reply, + }, protocols::handshake::Handshake, - protocols::chainsync::{ChainSync, Intersect, Reply}, }; use log::info; @@ -29,13 +33,10 @@ async fn tip() -> Result<(), Box> { .run(&mut connection) .await?; - let mut chainsync = ChainSync::builder() - .build(&mut connection); - let intersect = chainsync.find_intersect(vec![ - cfg.byron_mainnet, - cfg.byron_testnet, - cfg.byron_guild, - ]).await?; + let mut chainsync = ChainSync::builder().build(&mut connection); + let intersect = chainsync + .find_intersect(vec![cfg.byron_mainnet, cfg.byron_testnet, cfg.byron_guild]) + .await?; match intersect { Intersect::Found(point, tip) => info!("= {:?}, {:?}", point, tip), _ => panic!(), diff --git a/src/lib.rs b/src/lib.rs index a34d0a2..089cc18 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,8 +11,8 @@ // SPDX-License-Identifier: MPL-2.0 // -pub mod mux; pub mod model; +pub mod mux; pub mod protocols; use log::debug; diff --git a/src/mux.rs b/src/mux.rs index ac0f634..a2dea79 100644 --- a/src/mux.rs +++ b/src/mux.rs @@ -7,7 +7,10 @@ // SPDX-License-Identifier: MPL-2.0 // -use crate::{Protocol, Agency}; +use crate::{ + Agency, + Protocol, +}; use byteorder::{ ByteOrder, NetworkEndian, @@ -224,15 +227,11 @@ impl<'a> Channel<'a> { } async fn send(&mut self, data: &[u8]) -> Result<(), Error> { - Ok(self.connection - .send(self.idx, &data) - .await) + Ok(self.connection.send(self.idx, &data).await) } async fn recv(&mut self) -> Result, Error> { - Ok(self.connection - .recv(&mut self.receiver) - .await) + Ok(self.connection.recv(&mut self.receiver).await) } } diff --git a/src/protocols/blockfetch.rs b/src/protocols/blockfetch.rs index b00e348..9edbe13 100644 --- a/src/protocols/blockfetch.rs +++ b/src/protocols/blockfetch.rs @@ -104,7 +104,10 @@ impl Builder { self.last = Some((slot, hash)); self } - pub fn build<'a>(&mut self, #[cfg(not(test))]connection: &'a mut Connection) -> Result, Error> { + pub fn build<'a>( + &mut self, + #[cfg(not(test))] connection: &'a mut Connection, + ) -> Result, Error> { Ok(BlockFetch { #[cfg(not(test))] channel: Some(connection.channel(0x0003)), @@ -148,7 +151,10 @@ impl<'a> BlockFetch<'a> { // Start the protocol and prefetch first block into `self.result`. //self.running = true; // TODO: Do something with the Option trick. - let mut channel = self.channel.take().ok_or("Channel not available.".to_string())?; + let mut channel = self + .channel + .take() + .ok_or("Channel not available.".to_string())?; channel.execute(self).await?; self.channel = Some(channel); Ok(BlockStream { blockfetch: self }) diff --git a/src/protocols/chainsync.rs b/src/protocols/chainsync.rs index dc537ab..5433bdf 100644 --- a/src/protocols/chainsync.rs +++ b/src/protocols/chainsync.rs @@ -16,19 +16,17 @@ use serde_cbor::{ Value, }; +use crate::BlockHeader; use crate::Message as MessageOps; use crate::{ - Agency, - Error, - Protocol, - mux::Connection, - mux::Channel, model::Point, model::Tip, + mux::Channel, + mux::Connection, protocols::Values, -}; -use crate::{ - BlockHeader, + Agency, + Error, + Protocol, }; use blake2b_simd::Params; @@ -64,17 +62,9 @@ impl MessageOps for Message { parse_wrapped_header(array.array()?)?, parse_tip(array.array()?)?, ), - 3 => Message::RollBackward( - parse_point(array.array()?)?, - parse_tip(array.array()?)?, - ), - 5 => Message::IntersectFound( - parse_point(array.array()?)?, - parse_tip(array.array()?)?, - ), - 6 => Message::IntersectNotFound( - parse_tip(array.array()?)?, - ), + 3 => Message::RollBackward(parse_point(array.array()?)?, parse_tip(array.array()?)?), + 5 => Message::IntersectFound(parse_point(array.array()?)?, parse_tip(array.array()?)?), + 6 => Message::IntersectNotFound(parse_tip(array.array()?)?), 7 => Message::Done, other => return Err(format!("Unexpected message: {}.", other)), }; @@ -84,15 +74,13 @@ impl MessageOps for Message { fn to_values(&self) -> Vec { match self { - Message::RequestNext => vec![ - Value::Integer(0), - ], + Message::RequestNext => vec![Value::Integer(0)], Message::FindIntersect(points) => vec![ Value::Integer(4), Value::Array( points .iter() - .map(|Point {slot, hash}| { + .map(|Point { slot, hash }| { Value::Array(vec![ Value::Integer(*slot as i128), Value::Bytes(hash.clone()), @@ -107,18 +95,15 @@ impl MessageOps for Message { fn info(&self) -> String { match self { - Message::RollForward(header, _tip) => format!( - "block={} slot={}", - header.block_number, - header.slot_number, - ), + Message::RollForward(header, _tip) => { + format!("block={} slot={}", header.block_number, header.slot_number,) + } other => format!("{:?}", other), } } } -pub struct ChainSyncBuilder { -} +pub struct ChainSyncBuilder {} impl ChainSyncBuilder { pub fn build<'a>(self, connection: &'a mut Connection) -> ChainSync<'a> { @@ -159,8 +144,7 @@ pub struct ChainSync<'a> { impl<'a> ChainSync<'a> { pub fn builder() -> ChainSyncBuilder { - ChainSyncBuilder { - } + ChainSyncBuilder {} } pub async fn find_intersect(&mut self, points: Vec) -> Result { @@ -177,7 +161,10 @@ impl<'a> ChainSync<'a> { async fn execute(&mut self) -> Result<(), Error> { // TODO: Do something with the Option trick. - let mut channel = self.channel.take().ok_or("Channel not available.".to_string())?; + let mut channel = self + .channel + .take() + .ok_or("Channel not available.".to_string())?; channel.execute(self).await?; self.channel = Some(channel); Ok(()) @@ -215,18 +202,16 @@ impl Protocol for ChainSync<'_> { fn send(&mut self) -> Result { match self.state { - State::Idle => { - match self.query.as_ref().unwrap() { - Query::Intersect(points) => { - self.state = State::Intersect; - Ok(Message::FindIntersect(points.clone())) - } - Query::Reply => { - self.state = State::CanAwait; - Ok(Message::RequestNext) - } + State::Idle => match self.query.as_ref().unwrap() { + Query::Intersect(points) => { + self.state = State::Intersect; + Ok(Message::FindIntersect(points.clone())) } - } + Query::Reply => { + self.state = State::CanAwait; + Ok(Message::RequestNext) + } + }, other => Err(format!("Unsupported: {:?}", other)), } } @@ -330,15 +315,12 @@ fn parse_wrapped_header(mut array: Values) -> Result { .finalize(); msg_roll_forward.hash = hash.as_bytes().to_owned(); - let block_header: Value = - de::from_slice(&wrapped_block_header_bytes[..]).unwrap(); + let block_header: Value = de::from_slice(&wrapped_block_header_bytes[..]).unwrap(); match block_header { Value::Array(block_header_array) => match &block_header_array[0] { Value::Array(block_header_array_inner) => { - msg_roll_forward.block_number = - block_header_array_inner[0].integer()? as i64; - msg_roll_forward.slot_number = - block_header_array_inner[1].integer()? as i64; + msg_roll_forward.block_number = block_header_array_inner[0].integer()? as i64; + msg_roll_forward.slot_number = block_header_array_inner[1].integer()? as i64; msg_roll_forward .prev_hash .append(&mut block_header_array_inner[2].bytes()?.clone()); @@ -370,18 +352,15 @@ fn parse_wrapped_header(mut array: Values) -> Result { } _ => return Err("invalid cbor! code: 341".to_string()), } - msg_roll_forward.block_size = - block_header_array_inner[7].integer()? as i64; + msg_roll_forward.block_size = block_header_array_inner[7].integer()? as i64; msg_roll_forward .block_body_hash .append(&mut block_header_array_inner[8].bytes()?.clone()); msg_roll_forward .pool_opcert .append(&mut block_header_array_inner[9].bytes()?.clone()); - msg_roll_forward.unknown_0 = - block_header_array_inner[10].integer()? as i64; - msg_roll_forward.unknown_1 = - block_header_array_inner[11].integer()? as i64; + msg_roll_forward.unknown_0 = block_header_array_inner[10].integer()? as i64; + msg_roll_forward.unknown_1 = block_header_array_inner[11].integer()? as i64; msg_roll_forward .unknown_2 .append(&mut block_header_array_inner[12].bytes()?.clone());