Skip to content

Commit

Permalink
tokio: switch the whole library to async tokio for now
Browse files Browse the repository at this point in the history
We need to dedicate the capacity to individual protocols. We still want
to support non-tokio usage in the future.
  • Loading branch information
pavlix committed Jan 26, 2022
1 parent 5077db2 commit aa80407
Show file tree
Hide file tree
Showing 12 changed files with 271 additions and 347 deletions.
8 changes: 1 addition & 7 deletions Cargo.toml
Expand Up @@ -16,7 +16,6 @@ categories = ["network-programming"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
experimental = ["tokio", "futures"]

[dependencies]
blake2b_simd = "1.0.0"
Expand All @@ -30,8 +29,7 @@ serde = { version = "1.0.117", features = ["derive"] }
serde_cbor = "0.11.1"
serde_json = "1.0.59"
log = "0.4.11"
tokio = { version = "1.15.0", features = ["full"], optional = true }
futures = { version = "0.3.19", optional = true }
tokio = { version = "1.15.0", features = ["full"]}

[dev-dependencies]
simple_logger = { version = "2.1.0", default-features = false, features = ["colors"] }
Expand All @@ -49,7 +47,3 @@ crate-type = ["staticlib"]
[[example]]
name = "pooltool"
crate-type = ["staticlib"]

[[example]]
name = "async"
required-features = ["experimental"]
56 changes: 0 additions & 56 deletions examples/async.rs

This file was deleted.

5 changes: 2 additions & 3 deletions examples/common.rs
Expand Up @@ -8,19 +8,18 @@ SPDX-License-Identifier: GPL-3.0-only OR LGPL-3.0-only
use simple_logger::SimpleLogger;
use std::path::PathBuf;

#[derive(Clone)]
pub struct Config {
pub db: PathBuf,
pub host: String,
pub port: u16,
pub magic: u32,
}

pub fn init() -> Config {
SimpleLogger::new().init().unwrap();
Config {
db: PathBuf::from("sqlite.db"),
host: "relays-new.cardano-mainnet.iohk.io".to_string(),
port: 3001,
host: "relays-new.cardano-mainnet.iohk.io:3001".to_string(),
magic: 764824073,
}
}
27 changes: 13 additions & 14 deletions examples/local.rs
Expand Up @@ -5,9 +5,8 @@ SPDX-License-Identifier: GPL-3.0-only OR LGPL-3.0-only
*/

use cardano_ouroboros_network::mux;
use futures::executor::block_on;
use log::{error, info};
use cardano_ouroboros_network::mux::Connection;
use log::info;
use std::env;

mod common;
Expand All @@ -16,21 +15,21 @@ mod common;
* Test a handshake with the local node's unix socket
*/
#[cfg(target_family = "unix")]
fn main() {
#[tokio::main]
async fn main() {
let cfg = common::init();
let args: Vec<String> = env::args().collect();
let magic = cfg.magic;

let socket_path = &args[1];
info!("UNIX socket path set to {} ", socket_path);
let mut args = env::args();
args.next();
let socket_path = &args.next().unwrap_or("test.sock".to_string());
info!("UNIX socket path set to {:?} ", socket_path);

block_on(async {
let channel = mux::connection::connect_unix(socket_path)
.await
.unwrap();
channel.handshake(magic).await.unwrap();
info!("Ping UNIX socket success");
});
let mut connection = Connection::unix_connect(socket_path)
.await
.unwrap();
connection.handshake(magic).await.unwrap();
info!("Ping UNIX socket success");
}

#[cfg(target_family = "windows")]
Expand Down
53 changes: 25 additions & 28 deletions examples/ping.rs
Expand Up @@ -5,55 +5,52 @@ SPDX-License-Identifier: GPL-3.0-only OR LGPL-3.0-only
*/

use cardano_ouroboros_network::mux;
use cardano_ouroboros_network::mux::Connection;
use std::{
env,
time::Duration,
};
use log::{info, error};
use futures::{
executor::block_on,
future::join_all,
};
use futures::future::join_all;

mod common;

async fn ping(host: &String, port: u16, magic: u32) -> Result<(Duration, Duration), String> {
info!("Pinging host {} port {} magic {}.", host, port, magic);
let channel = match mux::connection::connect(&host, port).await {
Ok(channel) => channel,
async fn ping(host: &String, magic: u32) -> Result<(Duration, Duration), String> {
info!("Pinging host {} magic {}.", host, magic);
let mut connection = match Connection::tcp_connect(&host).await {
Ok(connection) => connection,
Err(_) => { return Err("Could not connect.".to_string()) }
};
let connect_duration = channel.duration();
channel.handshake(magic).await?;
let total_duration = channel.duration();
let connect_duration = connection.duration();
connection.handshake(magic).await?;
let total_duration = connection.duration();
Ok((connect_duration, total_duration))
}

fn main() {
#[tokio::main]
async fn main() {
let cfg = common::init();
let port = cfg.port;
let magic = cfg.magic;

block_on(async {
let mut args: Vec<String> = env::args().collect();
let mut args: Vec<String> = env::args().collect();

args.remove(0);
args.remove(0);

/* Use configured host by default. */
if args.len() == 0 {
args = vec![cfg.host.clone()];
}
/* Use configured host by default. */
if args.len() == 0 {
args = vec![cfg.host.clone()];
}

join_all(args.iter().map(|host| async move {
match ping(&host.clone(), port, magic).await {
join_all(args.iter().map(|host| {
let cfg = cfg.clone();
async move {
match ping(&host.clone(), cfg.magic).await {
Ok((connect_duration, total_duration)) => {
info!("Ping {}:{} success! : connect_duration: {}, total_duration: {}", &host, port, connect_duration.as_millis(), total_duration.as_millis());
info!("Ping {} success! : connect_duration: {}, total_duration: {}", &host, connect_duration.as_millis(), total_duration.as_millis());
}
Err(error) => {
error!("Ping {}:{} failed! : {:?}", &host, port, error);
error!("Ping {} failed! : {:?}", &host, error);
}
}
})).await;
});
}
})).await;
}
8 changes: 4 additions & 4 deletions examples/pingpong.rs
Expand Up @@ -6,7 +6,7 @@ SPDX-License-Identifier: GPL-3.0-only OR LGPL-3.0-only
*/

use cardano_ouroboros_network::{
mux,
mux::Connection,
protocols::pingpong,
};
use futures::executor::block_on;
Expand All @@ -17,8 +17,8 @@ fn main() {
let cfg = common::init();

block_on(async {
let channel = mux::connection::connect("127.0.0.1", cfg.port).await.unwrap();
channel.handshake(cfg.magic).await.unwrap();
channel.execute(pingpong::PingPongProtocol::new(0x0100)).await.unwrap();
let mut connection = Connection::tcp_connect("127.0.0.1:3001").await.unwrap();
connection.handshake(cfg.magic).await.unwrap();
connection.execute(&mut pingpong::PingPongProtocol::new(0x0100)).await.unwrap();
});
}
48 changes: 23 additions & 25 deletions examples/server.rs
Expand Up @@ -6,44 +6,42 @@ SPDX-License-Identifier: GPL-3.0-only OR LGPL-3.0-only
*/

use cardano_ouroboros_network::{
mux::connection::Channel,
mux::Connection,
protocols::{
handshake,
pingpong,
},
};
use std::net::{TcpListener, TcpStream};
use log::{info, error};
use futures::executor::block_on;
use cardano_ouroboros_network::mux::connection::Stream::Tcp;
use tokio::net::{TcpListener, TcpStream};
use log::info;

mod common;

fn main() {
async fn serve() {
let cfg = common::init();
let listener = TcpListener::bind(format!("127.0.0.1:{}", cfg.port)).unwrap();

for stream in listener.incoming() {
match handle(stream.unwrap(), &cfg) {
Ok(_) => info!("connection closed"),
Err(e) => error!("connection failed: {}", e),
}
let listener = TcpListener::bind("127.0.0.1:3001").await.unwrap();
loop {
let (socket, _addr) = listener.accept().await.unwrap();
handle(socket, &cfg).await.unwrap();
}
}

type Error = Box<dyn std::error::Error>;

fn handle(stream: TcpStream, cfg: &common::Config) -> Result<(), Error> {
let channel = Channel::new(Tcp(stream));

async fn handle(stream: TcpStream, cfg: &common::Config) -> Result<(), Error> {
info!("new client!");
block_on(async {
channel.execute(handshake::HandshakeProtocol::builder()
.server()
.node_to_node()
.network_magic(cfg.magic)
.build()?).await?;
channel.execute(pingpong::PingPongProtocol::expect(0x0100)).await?;
Ok(())
})

let mut connection = Connection::from_tcp_stream(stream);
connection.execute(&mut handshake::HandshakeProtocol::builder()
.server()
.node_to_node()
.network_magic(cfg.magic)
.build()?).await?;
connection.execute(&mut pingpong::PingPongProtocol::expect(0x0100)).await?;
Ok(())
}

#[tokio::main]
async fn main() {
serve().await;
}
24 changes: 11 additions & 13 deletions examples/sync.rs
Expand Up @@ -6,25 +6,23 @@ SPDX-License-Identifier: GPL-3.0-only OR LGPL-3.0-only
*/

use cardano_ouroboros_network::{
mux,
mux::Connection,
protocols::chainsync::{ChainSyncProtocol, Mode},
};
use futures::executor::block_on;

mod common;
mod sqlite;

fn main() {
#[tokio::main]
async fn main() {
let cfg = common::init();

block_on(async {
let channel = mux::connection::connect(&cfg.host, cfg.port).await.unwrap();
channel.handshake(cfg.magic).await.unwrap();
channel.execute({ChainSyncProtocol {
mode: Mode::Sync,
network_magic: cfg.magic,
store: Some(Box::new(sqlite::SQLiteBlockStore::new(&cfg.db).unwrap())),
..Default::default()
}}).await.unwrap();
});
let mut connection = Connection::tcp_connect(&cfg.host).await.unwrap();
connection.handshake(cfg.magic).await.unwrap();
connection.execute(&mut ChainSyncProtocol {
mode: Mode::Sync,
network_magic: cfg.magic,
store: Some(Box::new(sqlite::SQLiteBlockStore::new(&cfg.db).unwrap())),
..Default::default()
}).await.unwrap();
}
26 changes: 11 additions & 15 deletions examples/tip.rs
Expand Up @@ -6,13 +6,10 @@ SPDX-License-Identifier: GPL-3.0-only OR LGPL-3.0-only
*/

use cardano_ouroboros_network::{
mux::Connection,
BlockHeader,
mux,
protocols::chainsync::{ChainSyncProtocol, Mode, Listener},
};
use futures::{
executor::block_on,
};
use log::info;

mod common;
Expand All @@ -25,17 +22,16 @@ impl Listener for Handler {
}
}

fn main() {
#[tokio::main]
async fn main() {
let cfg = common::init();

block_on(async {
let channel = mux::connection::connect(&cfg.host, cfg.port).await.unwrap();
channel.handshake(cfg.magic).await.unwrap();
channel.execute(ChainSyncProtocol {
mode: Mode::SendTip,
network_magic: cfg.magic,
notify: Some(Box::new(Handler {})),
..Default::default()
}).await.unwrap();
});
let mut connection = Connection::tcp_connect(&cfg.host).await.unwrap();
connection.handshake(cfg.magic).await.unwrap();
connection.execute(&mut ChainSyncProtocol {
mode: Mode::SendTip,
network_magic: cfg.magic,
notify: Some(Box::new(Handler {})),
..Default::default()
}).await.unwrap();
}

0 comments on commit aa80407

Please sign in to comment.