From 0ba9af9e55b3fc520a685a43aaeb33fd1d78a16b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pavel=20=C5=A0imerda?= Date: Thu, 10 Feb 2022 02:13:22 +0100 Subject: [PATCH] feat(chainsync): create a clean chainsync client implementation --- Cargo.toml | 9 - examples/common.rs | 23 +- examples/pooltool.rs | 148 ---------- examples/sqlite.rs | 313 --------------------- examples/sync.rs | 35 ++- examples/tip.rs | 46 +-- src/lib.rs | 4 +- src/model.rs | 35 +++ src/mux.rs | 4 +- src/protocols.rs | 40 +++ src/protocols/chainsync.rs | 558 ++++++++++++++----------------------- 11 files changed, 358 insertions(+), 857 deletions(-) delete mode 100644 examples/pooltool.rs delete mode 100644 examples/sqlite.rs create mode 100644 src/model.rs diff --git a/Cargo.toml b/Cargo.toml index 4bbe4f5..4e7a324 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,7 +34,6 @@ tokio = { version = "1.15.0", features = ["full"]} [dev-dependencies] env_logger = "0.9.0" futures = "0.3.8" -rusqlite = { version = "0.26.0", features = ["bundled"] } oura = "1.1.0" pallas = "0.4.0" sled = "0.34.7" @@ -42,11 +41,3 @@ sled = "0.34.7" [[example]] name = "common" crate-type = ["staticlib"] - -[[example]] -name = "sqlite" -crate-type = ["staticlib"] - -[[example]] -name = "pooltool" -crate-type = ["staticlib"] diff --git a/examples/common.rs b/examples/common.rs index bf26a13..f892837 100644 --- a/examples/common.rs +++ b/examples/common.rs @@ -7,8 +7,6 @@ // SPDX-License-Identifier: MPL-2.0 // -use std::path::PathBuf; - use std::sync::Arc; use pallas::ledger::alonzo::{ @@ -30,13 +28,19 @@ use oura::{ }, }; +use cardano_ouroboros_network::{ + model::Point, +}; + #[derive(Clone)] pub struct Config { - pub db: PathBuf, pub sdb: sled::Db, pub host: String, pub magic: u32, pub writer: EventWriter, + pub byron_mainnet: Point, + pub byron_testnet: Point, + pub byron_guild: Point, } pub fn init() -> Config { @@ -47,11 +51,22 @@ impl Config { fn new() -> Config { env_logger::init(); Config { - db: PathBuf::from("sqlite.db"), sdb: sled::open(".db").unwrap(), host: "relays-new.cardano-mainnet.iohk.io:3001".to_string(), magic: 764824073, writer: oura_init().unwrap(), + byron_mainnet: ( + 4492799, + "f8084c61b6a238acec985b59310b6ecec49c0ab8352249afd7268da5cff2a457", + ).try_into().unwrap(), + byron_testnet: ( + 1598399, + "7e16781b40ebf8b6da18f7b5e8ade855d6738095ef2f1c58c77e88b6e45997a4", + ).try_into().unwrap(), + byron_guild: ( + 719, + "e5400faf19e712ebc5ff5b4b44cecb2b140d1cca25a011e36a91d89e97f53e2e", + ).try_into().unwrap(), } } diff --git a/examples/pooltool.rs b/examples/pooltool.rs deleted file mode 100644 index 775674d..0000000 --- a/examples/pooltool.rs +++ /dev/null @@ -1,148 +0,0 @@ -// -// © 2020 - 2022 PERLUR Group -// -// Re-licenses under MPLv2 -// © 2022 PERLUR Group -// -// SPDX-License-Identifier: MPL-2.0 -// - -use cardano_ouroboros_network::{ - protocols::chainsync::Listener, - BlockHeader, -}; -use chrono::{ - SecondsFormat, - Utc, -}; -use log::{ - error, - info, -}; -use regex::Regex; -use serde::Serialize; -use std::path::PathBuf; -use std::{ - ops::Sub, - process::{ - Command, - Stdio, - }, - time::{ - Duration, - Instant, - }, -}; - -struct PoolTool { - pub pooltool_api_key: String, - pub cardano_node_path: PathBuf, - pub node_version: String, - pub last_node_version_time: Instant, - pub pool_name: String, - pub pool_id: String, -} - -impl Default for PoolTool { - fn default() -> Self { - PoolTool { - pooltool_api_key: String::new(), - cardano_node_path: PathBuf::new(), - node_version: String::new(), - last_node_version_time: Instant::now().sub(Duration::from_secs(7200)), // 2 hours ago - pool_name: String::new(), - pool_id: String::new(), - } - } -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -struct PooltoolStats { - api_key: String, - pool_id: String, - data: PooltoolData, -} - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -struct PooltoolData { - node_id: String, - version: String, - at: String, - block_no: i64, - slot_no: i64, - block_hash: String, - parent_hash: String, - leader_vrf: String, - platform: String, -} - -impl Listener for PoolTool { - fn handle_tip(&mut self, header: &BlockHeader) { - if self.last_node_version_time.elapsed() > Duration::from_secs(3600) { - // Our node version is outdated. Make a call to update it. - let output = Command::new(&self.cardano_node_path) - .arg("--version") - .stdin(Stdio::null()) - .stdout(Stdio::piped()) - .output() - .expect(&*format!("Failed to execute {:?}", &self.cardano_node_path)); - let version_string = String::from_utf8_lossy(&output.stdout); - let cap = Regex::new("cardano-node (\\d+\\.\\d+\\.\\d+) .*\ngit rev ([a-f0-9]{5}).*") - .unwrap() - .captures(&*version_string) - .unwrap(); - self.node_version = format!( - "{}:{}", - cap.get(1).map_or("", |m| m.as_str()), - cap.get(2).map_or("", |m| m.as_str()) - ); - info!("Checking cardano-node version: {}", &self.node_version); - self.last_node_version_time = Instant::now(); - } - let client = reqwest::blocking::Client::new(); - let pooltool_result = client - .post("https://api.pooltool.io/v0/sendstats") - .body( - serde_json::ser::to_string(&PooltoolStats { - api_key: self.pooltool_api_key.clone(), - pool_id: self.pool_id.clone(), - data: PooltoolData { - node_id: "".to_string(), - version: self.node_version.clone(), - at: Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true), - block_no: header.block_number, - slot_no: header.slot_number, - block_hash: hex::encode(&header.hash), - parent_hash: hex::encode(&header.prev_hash), - leader_vrf: hex::encode(&header.leader_vrf_0), - platform: "cncli".to_string(), - }, - }) - .unwrap(), - ) - .send(); - - match pooltool_result { - Ok(response) => match response.text() { - Ok(text) => { - info!( - "Pooltool ({}, {}): ({}, {}), json: {}", - &self.pool_name, - &self.pool_id[..8], - &header.block_number, - hex::encode(&header.hash[..8]), - text - ); - } - Err(error) => { - error!("PoolTool error: {}", error); - } - }, - Err(error) => { - error!("PoolTool error: {}", error); - } - } - } -} diff --git a/examples/sqlite.rs b/examples/sqlite.rs deleted file mode 100644 index 295c998..0000000 --- a/examples/sqlite.rs +++ /dev/null @@ -1,313 +0,0 @@ -// -// Forked-off from https://github.com/AndrewWestberg/cncli/ on 2020-11-30 -// © 2020 Andrew Westberg licensed under Apache-2.0 -// -// Re-licensed under GPLv3 or LGPLv3 -// © 2020 - 2021 PERLUR Group -// -// Re-licenses under MPLv2 -// © 2022 PERLUR Group -// -// SPDX-License-Identifier: MPL-2.0 -// - -use blake2b_simd::Params; -use cardano_ouroboros_network::{ - BlockHeader, - BlockStore, -}; -use log::debug; -use rusqlite::{ - named_params, - Connection, - Error, -}; -use std::{ - io, - path::PathBuf, -}; - -pub struct SQLiteBlockStore { - pub db: Connection, -} - -impl SQLiteBlockStore { - const DB_VERSION: i64 = 2; - - pub fn new(db_path: &PathBuf) -> Result { - debug!("Opening database"); - let db = Connection::open(db_path)?; - { - debug!("Intialize database."); - db.execute_batch("PRAGMA journal_mode=WAL")?; - db.execute( - "CREATE TABLE IF NOT EXISTS db_version (version INTEGER PRIMARY KEY)", - [], - )?; - let mut stmt = db.prepare("SELECT version FROM db_version")?; - let mut rows = stmt.query([])?; - let version: i64 = match rows.next()? { - None => -1, - Some(row) => row.get(0)?, - }; - - // Upgrade their database to version 1 - if version < 1 { - debug!("Upgrade database to version 1..."); - db.execute( - "CREATE TABLE IF NOT EXISTS chain (\ - id INTEGER PRIMARY KEY AUTOINCREMENT, \ - block_number INTEGER NOT NULL, \ - slot_number INTEGER NOT NULL, \ - hash TEXT NOT NULL, \ - prev_hash TEXT NOT NULL, \ - eta_v TEXT NOT NULL, \ - node_vkey TEXT NOT NULL, \ - node_vrf_vkey TEXT NOT NULL, \ - eta_vrf_0 TEXT NOT NULL, \ - eta_vrf_1 TEXT NOT NULL, \ - leader_vrf_0 TEXT NOT NULL, \ - leader_vrf_1 TEXT NOT NULL, \ - block_size INTEGER NOT NULL, \ - block_body_hash TEXT NOT NULL, \ - pool_opcert TEXT NOT NULL, \ - unknown_0 INTEGER NOT NULL, \ - unknown_1 INTEGER NOT NULL, \ - unknown_2 TEXT NOT NULL, \ - protocol_major_version INTEGER NOT NULL, \ - protocol_minor_version INTEGER NOT NULL, \ - orphaned INTEGER NOT NULL DEFAULT 0 \ - )", - [], - )?; - db.execute( - "CREATE INDEX IF NOT EXISTS idx_chain_slot_number ON chain(slot_number)", - [], - )?; - db.execute( - "CREATE INDEX IF NOT EXISTS idx_chain_orphaned ON chain(orphaned)", - [], - )?; - db.execute( - "CREATE INDEX IF NOT EXISTS idx_chain_hash ON chain(hash)", - [], - )?; - db.execute( - "CREATE INDEX IF NOT EXISTS idx_chain_block_number ON chain(block_number)", - [], - )?; - } - - // Upgrade their database to version 2 - if version < 2 { - debug!("Upgrade database to version 2..."); - db.execute( - "CREATE TABLE IF NOT EXISTS slots (\ - id INTEGER PRIMARY KEY AUTOINCREMENT, \ - epoch INTEGER NOT NULL, \ - pool_id TEXT NOT NULL, \ - slot_qty INTEGER NOT NULL, \ - slots TEXT NOT NULL, \ - hash TEXT NOT NULL, - UNIQUE(epoch,pool_id) - )", - [], - )?; - } - - // Update the db version now that we've upgraded the user's database fully - if version < 0 { - db.execute( - "INSERT INTO db_version (version) VALUES (?1)", - &[&SQLiteBlockStore::DB_VERSION], - )?; - } else { - db.execute( - "UPDATE db_version SET version=?1", - &[&SQLiteBlockStore::DB_VERSION], - )?; - } - } - - Ok(SQLiteBlockStore { db: db }) - } - - fn sql_save_block( - &mut self, - pending_blocks: &mut Vec, - network_magic: u32, - ) -> Result<(), rusqlite::Error> { - let db = &mut self.db; - - // get the last block eta_v (nonce) in the db - let mut prev_eta_v = { - hex::decode( - match db.query_row( - "SELECT eta_v, max(slot_number) FROM chain WHERE orphaned = 0", - [], - |row| row.get(0), - ) { - Ok(eta_v) => eta_v, - Err(_) => { - if network_magic == 764824073 { - // mainnet genesis hash - String::from( - "1a3be38bcbb7911969283716ad7aa550250226b76a61fc51cc9a9a35d9276d81", - ) - } else { - // assume testnet genesis hash - String::from( - "849a1764f152e1b09c89c0dfdbcbdd38d711d1fec2db5dfa0f87cf2737a0eaf4", - ) - } - } - }, - ) - .unwrap() - }; - - let tx = db.transaction()?; - { - // scope for db transaction - let mut orphan_stmt = - tx.prepare("UPDATE chain SET orphaned = 1 WHERE block_number >= ?1")?; - let mut insert_stmt = tx.prepare( - "INSERT INTO chain (\ - block_number, \ - slot_number, \ - hash, \ - prev_hash, \ - eta_v, \ - node_vkey, \ - node_vrf_vkey, \ - eta_vrf_0, \ - eta_vrf_1, \ - leader_vrf_0, \ - leader_vrf_1, \ - block_size, \ - block_body_hash, \ - pool_opcert, \ - unknown_0, \ - unknown_1, \ - unknown_2, \ - protocol_major_version, \ - protocol_minor_version) \ - VALUES (\ - :block_number, \ - :slot_number, \ - :hash, \ - :prev_hash, \ - :eta_v, \ - :node_vkey, \ - :node_vrf_vkey, \ - :eta_vrf_0, \ - :eta_vrf_1, \ - :leader_vrf_0, \ - :leader_vrf_1, \ - :block_size, \ - :block_body_hash, \ - :pool_opcert, \ - :unknown_0, \ - :unknown_1, \ - :unknown_2, \ - :protocol_major_version, \ - :protocol_minor_version)", - )?; - - for block in pending_blocks.drain(..) { - // Set any necessary blocks as orphans - let orphan_num = orphan_stmt.execute(&[&block.block_number])?; - - if orphan_num > 0 { - // get the last block eta_v (nonce) in the db - prev_eta_v = { - hex::decode( - match tx.query_row("SELECT eta_v, max(slot_number) FROM chain WHERE orphaned = 0", [], |row| row.get(0)) { - Ok(eta_v) => { eta_v } - Err(_) => { - if network_magic == 764824073 { - // mainnet genesis hash - String::from("1a3be38bcbb7911969283716ad7aa550250226b76a61fc51cc9a9a35d9276d81") - } else { - // assume testnet genesis hash - String::from("849a1764f152e1b09c89c0dfdbcbdd38d711d1fec2db5dfa0f87cf2737a0eaf4") - } - } - } - ).unwrap() - }; - } - // blake2b hash of eta_vrf_0 - let mut block_eta_v = Params::new() - .hash_length(32) - .to_state() - .update(&*block.eta_vrf_0) - .finalize() - .as_bytes() - .to_vec(); - prev_eta_v.append(&mut block_eta_v); - // blake2b hash of prev_eta_v + block_eta_v - prev_eta_v = Params::new() - .hash_length(32) - .to_state() - .update(&*prev_eta_v) - .finalize() - .as_bytes() - .to_vec(); - - insert_stmt.execute(named_params! { - ":block_number" : block.block_number, - ":slot_number": block.slot_number, - ":hash" : hex::encode(block.hash), - ":prev_hash" : hex::encode(block.prev_hash), - ":eta_v" : hex::encode(&prev_eta_v), - ":node_vkey" : hex::encode(block.node_vkey), - ":node_vrf_vkey" : hex::encode(block.node_vrf_vkey), - ":eta_vrf_0" : hex::encode(block.eta_vrf_0), - ":eta_vrf_1" : hex::encode(block.eta_vrf_1), - ":leader_vrf_0" : hex::encode(block.leader_vrf_0), - ":leader_vrf_1" : hex::encode(block.leader_vrf_1), - ":block_size" : block.block_size, - ":block_body_hash" : hex::encode(block.block_body_hash), - ":pool_opcert" : hex::encode(block.pool_opcert), - ":unknown_0" : block.unknown_0, - ":unknown_1" : block.unknown_1, - ":unknown_2" : hex::encode(block.unknown_2), - ":protocol_major_version" : block.protocol_major_version, - ":protocol_minor_version" : block.protocol_minor_version, - })?; - } - } - - tx.commit()?; - Ok(()) - } -} - -impl BlockStore for SQLiteBlockStore { - fn save_block( - &mut self, - mut pending_blocks: &mut Vec, - network_magic: u32, - ) -> io::Result<()> { - match self.sql_save_block(&mut pending_blocks, network_magic) { - Ok(_) => Ok(()), - Err(_) => Err(io::Error::new(io::ErrorKind::Other, "Database error!")), - } - } - - fn load_blocks(&mut self) -> Option)>> { - let db = &self.db; - let mut stmt = db.prepare("SELECT slot_number, hash FROM chain where orphaned = 0 ORDER BY slot_number DESC LIMIT 33").unwrap(); - let blocks = stmt - .query_map([], |row| { - let slot_result: Result = row.get(0); - let hash_result: Result = row.get(1); - let slot = slot_result?; - let hash = hash_result?; - Ok((slot, hex::decode(hash).unwrap())) - }) - .ok()?; - Some(blocks.map(|item| item.unwrap()).collect()) - } -} diff --git a/examples/sync.rs b/examples/sync.rs index e256387..8b958e9 100644 --- a/examples/sync.rs +++ b/examples/sync.rs @@ -9,15 +9,13 @@ use cardano_ouroboros_network::{ mux::Connection, - protocols::chainsync::{ - ChainSync, - Mode, - }, protocols::handshake::Handshake, + protocols::chainsync::{ChainSync, Reply}, }; +use log::info; + mod common; -mod sqlite; async fn chainsync() -> Result<(), Box> { let cfg = common::init(); @@ -31,15 +29,24 @@ async fn chainsync() -> Result<(), Box> { .build()? .run(&mut connection) .await?; - let mut chainsync = ChainSync { - mode: Mode::Sync, - network_magic: cfg.magic, - store: Some(Box::new(sqlite::SQLiteBlockStore::new(&cfg.db)?)), - ..Default::default() - }; - - chainsync.run(&mut connection).await?; - Ok(()) + + let mut chainsync = ChainSync::builder() + .build(&mut connection); + chainsync.find_intersect(vec![ + cfg.byron_mainnet, + cfg.byron_testnet, + cfg.byron_guild, + ]).await?; + loop { + match chainsync.request_next().await? { + Reply::Forward(header, _tip) => { + info!("Block header: block={} slot={}", header.block_number, header.slot_number); + } + Reply::Backward(point, _tip) => { + info!("Roll backward: slot={}", point.slot); + } + } + } } #[tokio::main] diff --git a/examples/tip.rs b/examples/tip.rs index 6b4eedd..1ae0036 100644 --- a/examples/tip.rs +++ b/examples/tip.rs @@ -9,26 +9,13 @@ use cardano_ouroboros_network::{ mux::Connection, - protocols::chainsync::{ - ChainSync, - Listener, - Mode, - }, protocols::handshake::Handshake, - BlockHeader, + protocols::chainsync::{ChainSync, Intersect, Reply}, }; use log::info; mod common; -struct Handler {} - -impl Listener for Handler { - fn handle_tip(&mut self, msg_roll_forward: &BlockHeader) { - info!("Tip reached: {:?}!", msg_roll_forward); - } -} - async fn tip() -> Result<(), Box> { let cfg = common::init(); @@ -41,15 +28,30 @@ async fn tip() -> Result<(), Box> { .build()? .run(&mut connection) .await?; - let mut chainsync = ChainSync { - mode: Mode::SendTip, - network_magic: cfg.magic, - notify: Some(Box::new(Handler {})), - ..Default::default() - }; - chainsync.run(&mut connection).await?; - Ok(()) + let mut chainsync = ChainSync::builder() + .build(&mut connection); + let intersect = chainsync.find_intersect(vec![ + cfg.byron_mainnet, + cfg.byron_testnet, + cfg.byron_guild, + ]).await?; + match intersect { + Intersect::Found(point, tip) => info!("= {:?}, {:?}", point, tip), + _ => panic!(), + }; + loop { + match chainsync.request_next().await? { + Reply::Forward(header, tip) => { + info!("+ {:?}, {:?}", header, tip); + if header.hash == tip.hash { + info!("Reached tip!"); + } + chainsync.find_intersect(vec![tip.into()]).await?; + } + Reply::Backward(slot, tip) => info!("- {:?}, {:?}", slot, tip), + } + } } #[tokio::main] diff --git a/src/lib.rs b/src/lib.rs index 830b257..a34d0a2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,6 +12,7 @@ // pub mod mux; +pub mod model; pub mod protocols; use log::debug; @@ -35,7 +36,6 @@ pub trait Message: std::fmt::Debug + Sized { fn to_bytes(&self) -> Vec { let values = self.to_values(); - debug!("Tx: message {:?}", values); to_vec(&values).unwrap() } } @@ -70,6 +70,8 @@ pub trait Protocol { debug_assert_eq!(self.agency(), self.role()); // TODO: Protocol should really return an error. let message = self.send().unwrap(); + let info = message.info(); + debug!("Tx: message {}", info); let bytes = message.to_bytes(); debug!("State: {:?}", self.state()); Some(bytes) diff --git a/src/model.rs b/src/model.rs new file mode 100644 index 0000000..3325e49 --- /dev/null +++ b/src/model.rs @@ -0,0 +1,35 @@ +use crate::Error; + +#[derive(Debug, Clone)] +pub struct Point { + pub slot: i64, + pub hash: Vec, +} + +#[derive(Debug, Clone)] +pub struct Tip { + pub block_number: i64, + pub slot_number: i64, + pub hash: Vec, +} + +impl Into for Tip { + fn into(self) -> Point { + Point { + slot: self.slot_number, + hash: self.hash, + } + } +} + +impl TryFrom<(i64, &str)> for Point { + type Error = Error; + + fn try_from(pair: (i64, &str)) -> Result { + let (slot, hash) = pair; + Ok(Point { + slot, + hash: hex::decode(hash).map_err(|_| "Bad hash hex.".to_string())?, + }) + } +} diff --git a/src/mux.rs b/src/mux.rs index cbaab10..ac0f634 100644 --- a/src/mux.rs +++ b/src/mux.rs @@ -120,7 +120,7 @@ impl Connection { self.start_time.elapsed() } - pub(crate) fn channel<'a>(&'a mut self, idx: u16) -> Channel<'a> { + pub fn channel<'a>(&'a mut self, idx: u16) -> Channel<'a> { let receiver = self.register(idx); let demux = self.run_demux(); Channel { @@ -185,7 +185,7 @@ impl Connection { } } -pub(crate) struct Channel<'a> { +pub struct Channel<'a> { idx: u16, receiver: Receiver, connection: &'a mut Connection, diff --git a/src/protocols.rs b/src/protocols.rs index 6558372..66b2a7b 100644 --- a/src/protocols.rs +++ b/src/protocols.rs @@ -15,3 +15,43 @@ pub mod blockfetch; pub mod chainsync; pub mod handshake; pub mod txsubmission; + +use crate::Error; +use serde_cbor::Value; + +#[derive(Debug)] +pub(crate) struct Values<'a>(std::slice::Iter<'a, Value>); + +impl<'a> Values<'a> { + pub(crate) fn from_values(values: &'a Vec) -> Self { + Values(values.iter()) + } + + pub(crate) fn array(&mut self) -> Result { + match self.0.next() { + Some(Value::Array(values)) => Ok(Values::from_values(values)), + other => Err(format!("Integer required: {:?}", other)), + } + } + + pub(crate) fn integer(&mut self) -> Result { + match self.0.next() { + Some(&Value::Integer(value)) => Ok(value), + other => Err(format!("Integer required, found {:?}", other)), + } + } + + pub(crate) fn bytes(&mut self) -> Result<&Vec, Error> { + match self.0.next() { + Some(Value::Bytes(vec)) => Ok(vec), + other => Err(format!("Bytes required, found {:?}", other)), + } + } + + pub(crate) fn end(mut self) -> Result<(), Error> { + match self.0.next() { + None => Ok(()), + other => Err(format!("End of array required, found {:?}", other)), + } + } +} diff --git a/src/protocols/chainsync.rs b/src/protocols/chainsync.rs index ffee64c..dc537ab 100644 --- a/src/protocols/chainsync.rs +++ b/src/protocols/chainsync.rs @@ -11,34 +11,24 @@ // SPDX-License-Identifier: MPL-2.0 // -use std::{ - io, - ops::Sub, - time::{ - Duration, - Instant, - }, -}; - -use log::{ - info, - trace, -}; use serde_cbor::{ de, Value, }; -use crate::mux::Connection; use crate::Message as MessageOps; use crate::{ Agency, Error, Protocol, + mux::Connection, + mux::Channel, + model::Point, + model::Tip, + protocols::Values, }; use crate::{ BlockHeader, - BlockStore, }; use blake2b_simd::Params; @@ -52,55 +42,43 @@ pub enum State { Done, } -#[derive(Debug)] -pub struct Point { - slot: i64, - hash: Vec, -} - #[derive(Debug)] pub enum Message { RequestNext, AwaitReply, RollForward(BlockHeader, Tip), - RollBackward(i64, Tip), + RollBackward(Point, Tip), FindIntersect(Vec), - IntersectFound(i64, Tip), + IntersectFound(Point, Tip), IntersectNotFound(Tip), Done, } impl MessageOps for Message { - fn from_values(array: Vec) -> Result { - let mut values = array.iter(); - let message = match values - .next() - .ok_or("Unexpected end of message.".to_string())? - { - Value::Integer(0) => Message::RequestNext, - Value::Integer(1) => Message::AwaitReply, - Value::Integer(2) => Message::RollForward( - parse_wrapped_header(values.next().ok_or("Unexpected End of message.".to_string())?)?, - parse_tip(values.next().ok_or("Unexpected End of message.".to_string())?)?, + fn from_values(values: Vec) -> Result { + let mut array = Values::from_values(&values); + let message = match array.integer()? { + 0 => Message::RequestNext, + 1 => Message::AwaitReply, + 2 => Message::RollForward( + parse_wrapped_header(array.array()?)?, + parse_tip(array.array()?)?, ), - Value::Integer(3) => Message::RollBackward( - parse_point(values.next().ok_or("Unexpected End of message.".to_string())?)?, - parse_tip(values.next().ok_or("Unexpected End of message.".to_string())?)?, + 3 => Message::RollBackward( + parse_point(array.array()?)?, + parse_tip(array.array()?)?, ), - Value::Integer(5) => Message::IntersectFound( - parse_point(values.next().ok_or("Unexpected End of message.".to_string())?)?, - parse_tip(values.next().ok_or("Unexpected End of message.".to_string())?)?, + 5 => Message::IntersectFound( + parse_point(array.array()?)?, + parse_tip(array.array()?)?, ), - Value::Integer(6) => Message::IntersectNotFound( - parse_tip(values.next().ok_or("Unexpected End of message.".to_string())?)?, + 6 => Message::IntersectNotFound( + parse_tip(array.array()?)?, ), - Value::Integer(7) => Message::Done, - _ => return Err("Unexpected message ID.".to_string()), + 7 => Message::Done, + other => return Err(format!("Unexpected message: {}.", other)), }; - match values.next() { - Some(value) => return Err(format!("Unexpected data: {:?}", value)), - None => (), - } + array.end()?; Ok(message) } @@ -128,95 +106,85 @@ impl MessageOps for Message { } fn info(&self) -> String { - format!("{:?}", self) + match self { + Message::RollForward(header, _tip) => format!( + "block={} slot={}", + header.block_number, + header.slot_number, + ), + other => format!("{:?}", other), + } } } -#[derive(PartialEq)] -pub enum Mode { - Sync, - SendTip, +pub struct ChainSyncBuilder { } -#[derive(Debug)] -pub struct Tip { - pub block_number: i64, - pub slot_number: i64, - pub hash: Vec, +impl ChainSyncBuilder { + pub fn build<'a>(self, connection: &'a mut Connection) -> ChainSync<'a> { + ChainSync { + channel: Some(connection.channel(0x0002)), + intersect: None, + reply: None, + state: State::Idle, + query: None, + } + } } -pub trait Listener: Send { - fn handle_tip(&mut self, msg_roll_forward: &BlockHeader); +#[derive(Debug)] +pub enum Intersect { + Found(Point, Tip), + NotFound(Tip), } -pub struct ChainSync { - pub mode: Mode, - pub last_log_time: Instant, - pub last_insert_time: Instant, - pub store: Option>, - pub network_magic: u32, - pub pending_blocks: Vec, - pub state: State, - pub result: Option>, - pub is_intersect_found: bool, - pub tip_to_intersect: Option, - pub notify: Option>, +#[derive(Debug)] +pub enum Reply { + Forward(BlockHeader, Tip), + Backward(Point, Tip), } -impl Default for ChainSync { - fn default() -> Self { - ChainSync { - mode: Mode::Sync, - last_log_time: Instant::now().sub(Duration::from_secs(6)), - last_insert_time: Instant::now(), - store: None, - network_magic: 764824073, - pending_blocks: Vec::new(), - state: State::Idle, - result: None, - is_intersect_found: false, - tip_to_intersect: None, - notify: None, - } - } +enum Query { + Intersect(Vec), + Reply, } -impl ChainSync { - const FIVE_SECS: Duration = Duration::from_secs(5); +pub struct ChainSync<'a> { + channel: Option>, + query: Option, + intersect: Option, + reply: Option, + state: State, +} - pub async fn run(&mut self, connection: &mut Connection) -> Result<(), Error> { - connection.channel(self.protocol_id()).execute(self).await +impl<'a> ChainSync<'a> { + pub fn builder() -> ChainSyncBuilder { + ChainSyncBuilder { + } } - fn save_block(&mut self, msg_roll_forward: &BlockHeader, is_tip: bool) -> io::Result<()> { - match self.store.as_mut() { - Some(store) => { - self.pending_blocks.push((*msg_roll_forward).clone()); - - if is_tip || self.last_insert_time.elapsed() > ChainSync::FIVE_SECS { - store.save_block(&mut self.pending_blocks, self.network_magic)?; - self.last_insert_time = Instant::now(); - } - } - None => {} - } - Ok(()) + pub async fn find_intersect(&mut self, points: Vec) -> Result { + self.query = Some(Query::Intersect(points)); + self.execute().await?; + Ok(self.intersect.take().unwrap()) } - fn notify_tip(&mut self, msg_roll_forward: &BlockHeader) { - match &mut self.notify { - Some(listener) => listener.handle_tip(msg_roll_forward), - None => {} - } + pub async fn request_next(&mut self) -> Result { + self.query = Some(Query::Reply); + self.execute().await?; + Ok(self.reply.take().unwrap()) } - fn jump_to_tip(&mut self, tip: Tip) { - self.tip_to_intersect = Some(tip); - self.is_intersect_found = false; + async fn execute(&mut self) -> Result<(), Error> { + // TODO: Do something with the Option trick. + let mut channel = self.channel.take().ok_or("Channel not available.".to_string())?; + channel.execute(self).await?; + self.channel = Some(channel); + Ok(()) } } -impl Protocol for ChainSync { +impl Protocol for ChainSync<'_> { type State = State; type Message = Message; @@ -229,6 +197,9 @@ impl Protocol for ChainSync { } fn agency(&self) -> Agency { + if self.query.is_none() { + return Agency::None; + } return match self.state { State::Idle => Agency::Client, State::Intersect => Agency::Server, @@ -245,128 +216,47 @@ impl Protocol for ChainSync { fn send(&mut self) -> Result { match self.state { State::Idle => { - trace!("ChainSync::State::Idle"); - if !self.is_intersect_found { - let mut chain_blocks: Vec = Vec::new(); - - /* Classic sync: Use blocks from store if available. */ - match self.store.as_mut() { - Some(store) => { - let blocks = (*store).load_blocks().ok_or("Can't load blocks.")?; - for (i, block) in blocks.iter().enumerate() { - // all powers of 2 including 0th element 0, 2, 4, 8, 16, 32 - if (i == 0) || ((i > 1) && (i & (i - 1) == 0)) { - let (slot, hash) = block.clone(); - chain_blocks.push(Point {slot, hash}); - } - } - } - None => (), + match self.query.as_ref().unwrap() { + Query::Intersect(points) => { + self.state = State::Intersect; + Ok(Message::FindIntersect(points.clone())) } - - /* Tip discovery: Use discovered tip to retrieve header. */ - if self.tip_to_intersect.is_some() { - let tip = self.tip_to_intersect.as_ref().unwrap(); - chain_blocks.push(Point { - slot: tip.slot_number, - hash: tip.hash.clone(), - }); + Query::Reply => { + self.state = State::CanAwait; + Ok(Message::RequestNext) } - - //TODO: Make these protocol inputs instead of hardcoding here - // Last byron block of mainnet - chain_blocks.push(Point { - slot: 4492799, - hash: hex::decode( - "f8084c61b6a238acec985b59310b6ecec49c0ab8352249afd7268da5cff2a457", - ) - .unwrap(), - }); - // Last byron block of testnet - chain_blocks.push(Point { - slot: 1598399, - hash: hex::decode( - "7e16781b40ebf8b6da18f7b5e8ade855d6738095ef2f1c58c77e88b6e45997a4", - ) - .unwrap(), - }); - // Last byron block of guild - chain_blocks.push(Point { - slot: 719, - hash: hex::decode( - "e5400faf19e712ebc5ff5b4b44cecb2b140d1cca25a011e36a91d89e97f53e2e", - ) - .unwrap(), - }); - - self.state = State::Intersect; - Ok(Message::FindIntersect(chain_blocks)) - } else { - self.state = State::CanAwait; - Ok(Message::RequestNext) } } - state => Err(format!("Unsupported: {:?}", state)), + other => Err(format!("Unsupported: {:?}", other)), } } fn recv(&mut self, message: Message) -> Result<(), Error> { match message { Message::RequestNext => { - // Server wants us to wait a bit until it gets a new block + self.state = State::CanAwait; + } + Message::AwaitReply => { self.state = State::MustReply; } Message::RollForward(header, tip) => { - let is_tip = header.slot_number == tip.slot_number - && header.hash == tip.hash; - trace!( - "block {} of {}, {:.2}% synced", - header.block_number, - tip.block_number, - (header.block_number as f64 - / tip.block_number as f64) - * 100.0 - ); - if is_tip || self.last_log_time.elapsed() > ChainSync::FIVE_SECS { - if self.mode == Mode::Sync { - info!( - "block {} of {}, {:.2}% synced", - header.block_number, - tip.block_number, - (header.block_number as f64 - / tip.block_number as f64) - * 100.0 - ); - } - self.last_log_time = Instant::now() - } - - /* Classic sync: Store header data. */ - /* TODO: error handling */ - let _ = self.save_block(&header, is_tip); - - if is_tip { - /* Got complete tip header. */ - self.notify_tip(&header); - } else { - match self.mode { - /* Next time get tip header. */ - Mode::SendTip => self.jump_to_tip(tip), - _ => (), - } - } + self.reply = Some(Reply::Forward(header, tip)); + self.query = None; self.state = State::Idle; } - Message::RollBackward(_point, _tip) => { + Message::RollBackward(point, tip) => { + self.reply = Some(Reply::Backward(point, tip)); + self.query = None; self.state = State::Idle; } - Message::IntersectFound(_point, _tip) => { - self.is_intersect_found = true; + Message::IntersectFound(point, tip) => { + self.intersect = Some(Intersect::Found(point, tip)); + self.query = None; self.state = State::Idle; } - Message::IntersectNotFound(_tip) => { - self.is_intersect_found = true; // should start syncing at first byron block. We will just skip all byron - // blocks. + Message::IntersectNotFound(tip) => { + self.intersect = Some(Intersect::NotFound(tip)); + self.query = None; self.state = State::Idle; } Message::Done => { @@ -379,31 +269,34 @@ impl Protocol for ChainSync { } trait UnwrapValue { - fn integer(&self) -> i128; - fn bytes(&self) -> Vec; + fn array(&self) -> Result<&Vec, Error>; + fn integer(&self) -> Result; + fn bytes(&self) -> Result<&Vec, Error>; } impl UnwrapValue for Value { - fn integer(&self) -> i128 { + fn array(&self) -> Result<&Vec, Error> { match self { - Value::Integer(integer_value) => *integer_value, - _ => { - panic!("not an integer!") - } + Value::Array(array) => Ok(array), + _ => Err(format!("Integer required: {:?}", self)), } } - fn bytes(&self) -> Vec { + fn integer(&self) -> Result { match self { - Value::Bytes(bytes_vec) => bytes_vec.clone(), - _ => { - panic!("not a byte array!") - } + Value::Integer(value) => Ok(*value), + _ => Err(format!("Integer required: {:?}", self)), } } -} -fn parse_wrapped_header(value: &Value) -> Result { + fn bytes(&self) -> Result<&Vec, Error> { + match self { + Value::Bytes(vec) => Ok(vec), + _ => Err(format!("Bytes required: {:?}", self)), + } + } +} +fn parse_wrapped_header(mut array: Values) -> Result { let mut msg_roll_forward = BlockHeader { block_number: 0, slot_number: 0, @@ -424,126 +317,103 @@ fn parse_wrapped_header(value: &Value) -> Result { protocol_major_version: 0, protocol_minor_version: 0, }; - match value { - Value::Array(header_array) => { - match &header_array[1] { - Value::Bytes(wrapped_block_header_bytes) => { - // calculate the block hash - let hash = Params::new() - .hash_length(32) - .to_state() - .update(&*wrapped_block_header_bytes) - .finalize(); - msg_roll_forward.hash = hash.as_bytes().to_owned(); - - let block_header: Value = - de::from_slice(&wrapped_block_header_bytes[..]).unwrap(); - match block_header { - Value::Array(block_header_array) => match &block_header_array[0] { - Value::Array(block_header_array_inner) => { - msg_roll_forward.block_number = - block_header_array_inner[0].integer() as i64; - msg_roll_forward.slot_number = - block_header_array_inner[1].integer() as i64; - msg_roll_forward - .prev_hash - .append(&mut block_header_array_inner[2].bytes()); - msg_roll_forward - .node_vkey - .append(&mut block_header_array_inner[3].bytes()); - msg_roll_forward - .node_vrf_vkey - .append(&mut block_header_array_inner[4].bytes()); - match &block_header_array_inner[5] { - Value::Array(nonce_array) => { - msg_roll_forward - .eta_vrf_0 - .append(&mut nonce_array[0].bytes()); - msg_roll_forward - .eta_vrf_1 - .append(&mut nonce_array[1].bytes()); - } - _ => return Err("invalid cbor! code: 340".to_string()), - } - match &block_header_array_inner[6] { - Value::Array(leader_array) => { - msg_roll_forward - .leader_vrf_0 - .append(&mut leader_array[0].bytes()); - msg_roll_forward - .leader_vrf_1 - .append(&mut leader_array[1].bytes()); - } - _ => return Err("invalid cbor! code: 341".to_string()), - } - msg_roll_forward.block_size = - block_header_array_inner[7].integer() as i64; - msg_roll_forward - .block_body_hash - .append(&mut block_header_array_inner[8].bytes()); - msg_roll_forward - .pool_opcert - .append(&mut block_header_array_inner[9].bytes()); - msg_roll_forward.unknown_0 = - block_header_array_inner[10].integer() as i64; - msg_roll_forward.unknown_1 = - block_header_array_inner[11].integer() as i64; - msg_roll_forward - .unknown_2 - .append(&mut block_header_array_inner[12].bytes()); - msg_roll_forward.protocol_major_version = - block_header_array_inner[13].integer() as i64; - msg_roll_forward.protocol_minor_version = - block_header_array_inner[14].integer() as i64; - } - _ => return Err("invalid cbor! code: 342".to_string()), - }, - _ => return Err("invalid cbor! code: 343".to_string()), + + array.integer()?; + let wrapped_block_header_bytes = array.bytes()?.clone(); + array.end()?; + + // calculate the block hash + let hash = Params::new() + .hash_length(32) + .to_state() + .update(&*wrapped_block_header_bytes) + .finalize(); + msg_roll_forward.hash = hash.as_bytes().to_owned(); + + let block_header: Value = + de::from_slice(&wrapped_block_header_bytes[..]).unwrap(); + match block_header { + Value::Array(block_header_array) => match &block_header_array[0] { + Value::Array(block_header_array_inner) => { + msg_roll_forward.block_number = + block_header_array_inner[0].integer()? as i64; + msg_roll_forward.slot_number = + block_header_array_inner[1].integer()? as i64; + msg_roll_forward + .prev_hash + .append(&mut block_header_array_inner[2].bytes()?.clone()); + msg_roll_forward + .node_vkey + .append(&mut block_header_array_inner[3].bytes()?.clone()); + msg_roll_forward + .node_vrf_vkey + .append(&mut block_header_array_inner[4].bytes()?.clone()); + match &block_header_array_inner[5] { + Value::Array(nonce_array) => { + msg_roll_forward + .eta_vrf_0 + .append(&mut nonce_array[0].bytes()?.clone()); + msg_roll_forward + .eta_vrf_1 + .append(&mut nonce_array[1].bytes()?.clone()); } + _ => return Err("invalid cbor! code: 340".to_string()), } - _ => return Err("invalid cbor! code: 344".to_string()), + match &block_header_array_inner[6] { + Value::Array(leader_array) => { + msg_roll_forward + .leader_vrf_0 + .append(&mut leader_array[0].bytes()?.clone()); + msg_roll_forward + .leader_vrf_1 + .append(&mut leader_array[1].bytes()?.clone()); + } + _ => return Err("invalid cbor! code: 341".to_string()), + } + msg_roll_forward.block_size = + block_header_array_inner[7].integer()? as i64; + msg_roll_forward + .block_body_hash + .append(&mut block_header_array_inner[8].bytes()?.clone()); + msg_roll_forward + .pool_opcert + .append(&mut block_header_array_inner[9].bytes()?.clone()); + msg_roll_forward.unknown_0 = + block_header_array_inner[10].integer()? as i64; + msg_roll_forward.unknown_1 = + block_header_array_inner[11].integer()? as i64; + msg_roll_forward + .unknown_2 + .append(&mut block_header_array_inner[12].bytes()?.clone()); + msg_roll_forward.protocol_major_version = + block_header_array_inner[13].integer()? as i64; + msg_roll_forward.protocol_minor_version = + block_header_array_inner[14].integer()? as i64; } - } - _ => return Err("invalid cbor! code: 345".to_string()), + _ => return Err("invalid cbor! code: 342".to_string()), + }, + _ => return Err("invalid cbor! code: 343".to_string()), } Ok(msg_roll_forward) } -fn parse_tip(value: &Value) -> Result { - let mut tip = Tip { - block_number: 0, - slot_number: 0, - hash: vec![], - }; - match value { - Value::Array(tip_array) => { - match &tip_array[0] { - Value::Array(tip_info_array) => { - tip.slot_number = tip_info_array[0].integer() as i64; - tip.hash.append(&mut tip_info_array[1].bytes()); - } - _ => return Err("invalid cbor! code: 346".to_string()), - } - tip.block_number = tip_array[1].integer() as i64; - } - _ => return Err("invalid cbor! code: 347".to_string()), - } - Ok(tip) +fn parse_tip(mut array: Values) -> Result { + let mut tip_info_array = array.array()?; + let slot_number = tip_info_array.integer()? as i64; + let hash = tip_info_array.bytes()?.clone(); + tip_info_array.end()?; + let block_number = array.integer()? as i64; + array.end()?; + Ok(Tip { + block_number, + slot_number, + hash, + }) } -pub fn parse_point(value: &Value) -> Result { - let mut slot: i64 = 0; - match value { - Value::Array(block) => { - if block.len() > 0 { - match block[0] { - Value::Integer(parsed_slot) => slot = parsed_slot as i64, - _ => return Err("invalid cbor".to_string()), - } - } - } - _ => return Err("invalid cbor".to_string()), - } - Ok(slot) +fn parse_point(mut array: Values) -> Result { + let slot = array.integer()? as i64; + let hash = array.bytes()?.clone(); + array.end()?; + Ok(Point { slot, hash }) } -