Skip to content

Commit

Permalink
Migrate to warp
Browse files Browse the repository at this point in the history
Signed-off-by: i1i1 <vanyarybin1@live.ru>
  • Loading branch information
i1i1 committed Aug 20, 2021
1 parent 7a0b936 commit ec8c30c
Show file tree
Hide file tree
Showing 22 changed files with 697 additions and 2,112 deletions.
502 changes: 264 additions & 238 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion Cargo.toml
Expand Up @@ -10,7 +10,6 @@ members = [
"iroha_dsl",
"iroha_error",
"iroha_error/iroha_error_macro",
"iroha_http_server",
"iroha_macro",
"iroha_macro/iroha_derive",
"iroha_network",
Expand Down
6 changes: 3 additions & 3 deletions docker-compose.yml
Expand Up @@ -21,7 +21,7 @@ services:
image: iroha:debug
environment:
TORII_P2P_URL: iroha2:1338
TORII_API_URL: iroha:8081
TORII_API_URL: iroha2:8081
IROHA_PUBLIC_KEY: "ed0120cc25624d62896d3a0bfd8940f928dc2abf27cc57cefeb442aa96d9081aae58a1"
IROHA_PRIVATE_KEY: '{"digest_function": "ed25519", "payload": "3bac34cda9e3763fa069c1198312d1ec73b53023b8180c822ac355435edc4a24cc25624d62896d3a0bfd8940f928dc2abf27cc57cefeb442aa96d9081aae58a1"}'
SUMERAGI_TRUSTED_PEERS: '[{"address":"iroha:1337", "public_key": "ed01207233bfc89dcbd68c19fde6ce6158225298ec1131b6a130d1aeb454c1ab5183c0"}, {"address":"iroha2:1338", "public_key": "ed0120cc25624d62896d3a0bfd8940f928dc2abf27cc57cefeb442aa96d9081aae58a1"}, {"address": "iroha3:1339", "public_key": "ed0120faca9e8aa83225cb4d16d67f27dd4f93fc30ffa11adc1f5c88fd5495ecc91020"}, {"address": "iroha4:1340", "public_key": "ed01208e351a70b6a603ed285d666b8d689b680865913ba03ce29fb7d13a166c4e7f1f"}]'
Expand All @@ -35,7 +35,7 @@ services:
image: iroha:debug
environment:
TORII_P2P_URL: iroha3:1339
TORII_API_URL: iroha:8082
TORII_API_URL: iroha3:8082
IROHA_PUBLIC_KEY: "ed0120faca9e8aa83225cb4d16d67f27dd4f93fc30ffa11adc1f5c88fd5495ecc91020"
IROHA_PRIVATE_KEY: '{"digest_function": "ed25519", "payload": "1261a436d36779223d7d6cf20e8b644510e488e6a50bafd77a7485264d27197dfaca9e8aa83225cb4d16d67f27dd4f93fc30ffa11adc1f5c88fd5495ecc91020"}'
SUMERAGI_TRUSTED_PEERS: '[{"address":"iroha:1337", "public_key": "ed01207233bfc89dcbd68c19fde6ce6158225298ec1131b6a130d1aeb454c1ab5183c0"}, {"address":"iroha2:1338", "public_key": "ed0120cc25624d62896d3a0bfd8940f928dc2abf27cc57cefeb442aa96d9081aae58a1"}, {"address": "iroha3:1339", "public_key": "ed0120faca9e8aa83225cb4d16d67f27dd4f93fc30ffa11adc1f5c88fd5495ecc91020"}, {"address": "iroha4:1340", "public_key": "ed01208e351a70b6a603ed285d666b8d689b680865913ba03ce29fb7d13a166c4e7f1f"}]'
Expand All @@ -49,7 +49,7 @@ services:
image: iroha:debug
environment:
TORII_P2P_URL: iroha4:1340
TORII_API_URL: iroha:8083
TORII_API_URL: iroha4:8083
IROHA_PUBLIC_KEY: "ed01208e351a70b6a603ed285d666b8d689b680865913ba03ce29fb7d13a166c4e7f1f"
IROHA_PRIVATE_KEY: '{"digest_function": "ed25519", "payload": "a70dab95c7482eb9f159111b65947e482108cfe67df877bd8d3b9441a781c7c98e351a70b6a603ed285d666b8d689b680865913ba03ce29fb7d13a166c4e7f1f"}'
SUMERAGI_TRUSTED_PEERS: '[{"address":"iroha:1337", "public_key": "ed01207233bfc89dcbd68c19fde6ce6158225298ec1131b6a130d1aeb454c1ab5183c0"}, {"address":"iroha2:1338", "public_key": "ed0120cc25624d62896d3a0bfd8940f928dc2abf27cc57cefeb442aa96d9081aae58a1"}, {"address": "iroha3:1339", "public_key": "ed0120faca9e8aa83225cb4d16d67f27dd4f93fc30ffa11adc1f5c88fd5495ecc91020"}, {"address": "iroha4:1340", "public_key": "ed01208e351a70b6a603ed285d666b8d689b680865913ba03ce29fb7d13a166c4e7f1f"}]'
Expand Down
21 changes: 11 additions & 10 deletions iroha/Cargo.toml
Expand Up @@ -26,33 +26,34 @@ is-it-maintained-open-issues = { repository = "https://github.com/hyperledger/ir
maintenance = { status = "actively-developed" }

[dependencies]
iroha_data_model = { path = "../iroha_data_model", features = ["http_error"] }
iroha_data_model = { path = "../iroha_data_model", features = ["warp"] }
iroha_derive = { version = "=0.1.0", path = "../iroha_macro/iroha_derive" }
iroha_macro = { version = "=0.1.0", path = "../iroha_macro" }
iroha_error = { version = "=0.1.0", path = "../iroha_error" }
iroha_network = { version = "=0.1.0", path = "../iroha_network" }
iroha_logger = { version = "=0.1.0", path = "../iroha_logger"}
iroha_crypto = { version = "=0.1.0", path = "../iroha_crypto" }
iroha_http_server = { version = "=0.1.0", path = "../iroha_http_server" }
iroha_version = { version = "=0.1.0", path = "../iroha_version", features = ["http_error"] }
iroha_version = { version = "=0.1.0", path = "../iroha_version", features = ["warp"] }
iroha_actor = { version = "=0.1.0", path = "../iroha_actor" }
iroha_config = { version = "=0.1.0", path = "../iroha_config" }
iroha_futures = { path = "../iroha_futures" }
iroha_p2p = { version = "=0.1.0", path = "../iroha_p2p" }
iroha_telemetry = { path = "../iroha_telemetry", optional = true }

async-trait = "0.1"
chrono = "0.4"
clap = "2.33.0"
dashmap = { version = "4.0" }
futures = { version = "0.3.4" }
parity-scale-codec = { version = "2", features = ["derive"] }
rand = "0.7.3"
ursa = "=0.3.6"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tracing = "0.1"
dashmap = { version = "4.0" }
tokio = { version = "1.6.0", features = ["sync", "time", "rt", "io-util", "rt-multi-thread", "macros", "fs"]}
tokio-stream = { version = "0.1.6", features = ["fs"]}
iroha_futures = { path = "../iroha_futures" }
iroha_telemetry = { path = "../iroha_telemetry", optional = true }
iroha_p2p = { version = "=0.1.0", path = "../iroha_p2p" }
async-trait = "0.1"
tracing = "0.1"
warp = "0.3"
ursa = "=0.3.6"

[dev-dependencies]
hex-literal = "0.2.1"
Expand Down
2 changes: 1 addition & 1 deletion iroha/src/block_sync.rs
Expand Up @@ -370,7 +370,7 @@ pub mod message {
let message: VersionedMessage = self.into();
match Network::send_request_to(
&peer.address,
Request::new(uri::BLOCK_SYNC_URI, message.encode_versioned()?),
Request::new(uri::BLOCK_SYNC, message.encode_versioned()?),
)
.await?
{
Expand Down
113 changes: 58 additions & 55 deletions iroha/src/event.rs
Expand Up @@ -7,17 +7,14 @@ use std::{convert::TryInto, fmt::Debug, time::Duration};
use futures::{SinkExt, StreamExt};
use iroha_data_model::events::{prelude::*, SubscriptionRequest};
use iroha_error::{error, Result, WrapErr};
use iroha_http_server::web_socket::{WebSocketMessage, WebSocketStream};
use iroha_version::prelude::*;
use tokio::{
sync::mpsc::{Receiver, Sender},
time,
};
use tokio::{sync::broadcast, time};
use warp::ws::{self, WebSocket};

/// Type of `Sender<Event>` which should be used for channels of `Event` messages.
pub type EventsSender = Sender<Event>;
pub type EventsSender = broadcast::Sender<Event>;
/// Type of `Receiver<Event>` which should be used for channels of `Event` messages.
pub type EventsReceiver = Receiver<Event>;
pub type EventsReceiver = broadcast::Receiver<Event>;

#[cfg(test)]
const TIMEOUT: Duration = Duration::from_millis(10_000);
Expand All @@ -28,7 +25,7 @@ const TIMEOUT: Duration = Duration::from_millis(1000);
/// Passes the events over the corresponding connection `stream` if they match the `filter`.
#[derive(Debug)]
pub struct Consumer {
stream: WebSocketStream,
stream: WebSocket,
filter: EventFilter,
}

Expand All @@ -38,64 +35,70 @@ impl Consumer {
/// # Errors
/// Can fail due to timeout or without message at websocket or during decoding request
#[iroha_futures::telemetry_future]
pub async fn new(mut stream: WebSocketStream) -> Result<Self> {
if let WebSocketMessage::Binary(message) = time::timeout(TIMEOUT, stream.next())
pub async fn new(mut stream: WebSocket) -> Result<Self> {
let message = time::timeout(TIMEOUT, stream.next())
.await
.wrap_err("Read message timeout")?
.ok_or_else(|| error!("Failed to read message: no message"))?
.wrap_err("Web Socket failure")?
{
let request: SubscriptionRequest =
VersionedEventSocketMessage::decode_versioned(&message)?
.into_inner_v1()
.try_into()?;
time::timeout(
TIMEOUT,
stream.send(WebSocketMessage::Binary(
VersionedEventSocketMessage::from(EventSocketMessage::SubscriptionAccepted)
.encode_versioned()?,
)),
)
.await
.wrap_err("Send message timeout")?
.wrap_err("Failed to send message")?;
let SubscriptionRequest(filter) = request;
Ok(Consumer { stream, filter })
} else {
Err(error!("Unexepcted message type"))
.wrap_err("Web Socket failure")?;

if !message.is_binary() {
return Err(error!("Unexpected message type"));
}
let SubscriptionRequest(filter): SubscriptionRequest =
VersionedEventSocketMessage::decode_versioned(message.as_bytes())?
.into_inner_v1()
.try_into()?;

time::timeout(
TIMEOUT,
stream.send(ws::Message::binary(
VersionedEventSocketMessage::from(EventSocketMessage::SubscriptionAccepted)
.encode_versioned()?,
)),
)
.await
.wrap_err("Send message timeout")?
.wrap_err("Failed to send message")?;

Ok(Consumer { stream, filter })
}

/// Forwards the `event` over the `stream` if it matches the `filter`.
///
/// # Errors
/// Can fail due to timeout or sending event. Also receiving might fail
#[iroha_futures::telemetry_future]
pub async fn consume(mut self, event: &Event) -> Result<Self> {
if self.filter.apply(event) {
let event = VersionedEventSocketMessage::from(EventSocketMessage::from(event.clone()))
.encode_versioned()
.wrap_err("Failed to serialize event")?;
time::timeout(TIMEOUT, self.stream.send(WebSocketMessage::Binary(event)))
.await
.wrap_err("Send message timeout")?
.wrap_err("Failed to send message")?;
if let WebSocketMessage::Binary(receipt) = time::timeout(TIMEOUT, self.stream.next())
.await
.wrap_err("Failed to read receipt")?
.ok_or_else(|| error!("Failed to read receipt: no receipt"))?
.wrap_err("Web Socket failure")?
{
if let EventSocketMessage::EventReceived =
VersionedEventSocketMessage::decode_versioned(&receipt)?.into_inner_v1()
{
} else {
return Err(error!("Expected `EventReceived`."));
}
} else {
return Err(error!("Unexpected message type"));
}
pub async fn consume(&mut self, event: &Event) -> Result<()> {
if !self.filter.apply(event) {
return Ok(());
}

let event = VersionedEventSocketMessage::from(EventSocketMessage::from(event.clone()))
.encode_versioned()
.wrap_err("Failed to serialize event")?;
time::timeout(TIMEOUT, self.stream.send(ws::Message::binary(event)))
.await
.wrap_err("Send message timeout")?
.wrap_err("Failed to send message")?;

let message = time::timeout(TIMEOUT, self.stream.next())
.await
.wrap_err("Failed to read receipt")?
.ok_or_else(|| error!("Failed to read receipt: no receipt"))?
.wrap_err("Web Socket failure")?;

if !message.is_binary() {
return Err(error!("Unexpected message type"));
}

if let EventSocketMessage::EventReceived =
VersionedEventSocketMessage::decode_versioned(message.as_bytes())?.into_inner_v1()
{
self.stream.flush().await?;
Ok(())
} else {
Err(error!("Expected `EventReceived`."))
}
Ok(self)
}
}
3 changes: 1 addition & 2 deletions iroha/src/genesis.rs
Expand Up @@ -136,8 +136,7 @@ async fn check_peers_status(
let reached = if &peer == this_peer_id {
true
} else {
match Network::send_request_to(&peer.address, Request::empty(uri::HEALTH_URI)).await
{
match Network::send_request_to(&peer.address, Request::empty(uri::HEALTH)).await {
Ok(Response::Ok(_)) => true,
Ok(Response::InternalError) => {
iroha_logger::info!(
Expand Down
11 changes: 4 additions & 7 deletions iroha/src/lib.rs
Expand Up @@ -7,7 +7,6 @@ pub mod event;
pub mod genesis;
mod init;
pub mod kura;
pub mod maintenance;
mod merkle;
pub mod modules;
pub mod queue;
Expand All @@ -26,7 +25,7 @@ use iroha_actor::{broker::*, prelude::*};
use iroha_data_model::prelude::*;
use iroha_error::{error, Result, WrapErr};
use smartcontracts::permissions::{IsInstructionAllowedBoxed, IsQueryAllowedBoxed};
use tokio::{sync::mpsc, task::JoinHandle};
use tokio::{sync::broadcast, task::JoinHandle};
use wsv::{World, WorldTrait};

use crate::{
Expand All @@ -35,7 +34,6 @@ use crate::{
config::Configuration,
genesis::GenesisNetwork,
kura::{Kura, KuraTrait},
maintenance::System,
prelude::*,
queue::{Queue, QueueTrait},
sumeragi::{Sumeragi, SumeragiTrait},
Expand Down Expand Up @@ -120,7 +118,7 @@ where

//iroha_logger::info!(?config, "Loaded configuration");

let (events_sender, events_receiver) = mpsc::channel(100);
let (events_sender, _) = broadcast::channel(100);
let wsv = Arc::new(WorldStateView::from_config(
config.wsv_configuration,
W::with(
Expand Down Expand Up @@ -152,7 +150,7 @@ where
let query_validator = Arc::new(query_validator);
let sumeragi: AlwaysAddr<_> = S::from_configuration(
&config.sumeragi_configuration,
events_sender,
events_sender.clone(),
Arc::clone(&wsv),
instruction_validator,
Arc::clone(&query_validator),
Expand Down Expand Up @@ -190,11 +188,10 @@ where
let torii = Torii::from_configuration(
config.torii_configuration.clone(),
Arc::clone(&wsv),
System::new(config),
queue.clone(),
sumeragi.clone(),
query_validator,
events_receiver,
events_sender,
broker.clone(),
);
let torii = Some(torii);
Expand Down

0 comments on commit ec8c30c

Please sign in to comment.