Skip to content

Commit

Permalink
Merge pull request #248 from freedomlayer/real/feat/port-to-stable
Browse files Browse the repository at this point in the history
Port to stable rust
  • Loading branch information
realcr committed Nov 19, 2019
2 parents 389c9e6 + 8a46fe6 commit 68fec7c
Show file tree
Hide file tree
Showing 122 changed files with 1,224 additions and 1,613 deletions.
493 changes: 209 additions & 284 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion components/app/Cargo.toml
Expand Up @@ -21,5 +21,10 @@ version = { path = "../version", version="0.1.0", package = "offst-version" }

log = "0.4"
simple_logger = "1.0.1"
futures-preview = "0.3.0-alpha.16"

futures = "0.3.1"
derive_more = "0.14.0"

[dev-dependencies]

futures = {version = "0.3.1", features = ["thread-pool"]}
9 changes: 6 additions & 3 deletions components/app/src/app_conn/app_conn.rs
Expand Up @@ -6,7 +6,7 @@ use proto::app_server::messages::{AppPermissions, AppServerToApp, AppToAppServer

use crypto::rand::{CryptoRandom, OffstSystemRandom};

use common::conn::ConnPair;
use common::conn::{sink_to_sender, ConnPair};
use common::multi_consumer::{multi_consumer_service, MultiConsumerClient};
use common::mutable_state::BatchMutable;
use common::state_service::{state_service, StateClient};
Expand Down Expand Up @@ -44,11 +44,14 @@ impl<R> AppConn<R>
where
R: CryptoRandom + Clone,
{
pub fn new<S>(conn_tuple: AppConnTuple, rng: R, spawner: &mut S) -> Result<Self, AppConnError>
pub fn new<S>(conn_tuple: AppConnTuple, rng: R, spawner: &S) -> Result<Self, AppConnError>
where
S: Spawn,
{
let (app_permissions, node_report, (sender, mut receiver)) = conn_tuple;
let (app_permissions, node_report, conn_pair) = conn_tuple;
let (sender, mut receiver) = conn_pair.split();

let sender = sink_to_sender(sender, spawner);

let (mut incoming_mutations_sender, incoming_mutations) = mpsc::channel(0);
let (requests_sender, incoming_requests) = mpsc::channel(0);
Expand Down
34 changes: 17 additions & 17 deletions components/app/src/connect.rs
@@ -1,11 +1,10 @@
use std::time::Duration;

use futures::channel::mpsc;
use futures::executor::ThreadPool;
use futures::task::{Spawn, SpawnExt};
use futures::{SinkExt, StreamExt};

use common::conn::{ConnPairVec, FutTransform};
use common::conn::{ConnPair, ConnPairVec, FutTransform};
use common::int_convert::usize_to_u64;

use proto::app_server::messages::{AppPermissions, AppServerToApp, AppToAppServer};
Expand All @@ -24,7 +23,7 @@ use proto::crypto::PublicKey;
use crypto::rand::{system_random, CryptoRandom};

use identity::IdentityClient;
use net::NetConnector;
use net::TcpConnector;
use timer::create_timer;

use timer::TimerClient;
Expand Down Expand Up @@ -53,7 +52,7 @@ pub async fn setup_connection<R, S>(
rng: R,
node_public_key: PublicKey,
app_identity_client: IdentityClient,
mut spawner: S,
spawner: S,
) -> Result<AppConnTuple, SetupConnectionError>
where
R: Clone + CryptoRandom + 'static,
Expand Down Expand Up @@ -83,7 +82,7 @@ where
assert_eq!(public_key, node_public_key);

// Keepalive wrapper:
let (mut sender, mut receiver) = keepalive_transform.transform(enc_conn).await;
let (mut sender, mut receiver) = keepalive_transform.transform(enc_conn).await.split();

// Get AppPermissions:
let app_permissions_data = receiver
Expand Down Expand Up @@ -135,36 +134,40 @@ where
}
});

Ok((app_permissions, node_report, (user_sender, user_receiver)))
Ok((
app_permissions,
node_report,
ConnPair::from_raw(user_sender, user_receiver),
))
}

#[derive(Debug)]
pub enum NodeConnectError {
/// Could not open network connection
NetConnectorError,
TcpConnectorError,
SetupConnectionError(SetupConnectionError),
CreateNodeConnectionError,
}

/// Connect to an offst node
pub async fn node_connect<C, R, S>(
mut net_connector: C,
mut tcp_connector: C,
node_public_key: PublicKey,
node_net_address: NetAddress,
timer_client: TimerClient,
app_identity_client: IdentityClient,
rng: R,
mut spawner: S,
spawner: S,
) -> Result<AppConn<R>, NodeConnectError>
where
C: FutTransform<Input = NetAddress, Output = Option<ConnPairVec>>,
R: CryptoRandom + Clone + 'static,
S: Spawn + Send + Sync + Clone + 'static,
{
let conn_pair = net_connector
let conn_pair = tcp_connector
.transform(node_net_address)
.await
.ok_or(NodeConnectError::NetConnectorError)?;
.ok_or(NodeConnectError::TcpConnectorError)?;

let conn_tuple = setup_connection(
conn_pair,
Expand All @@ -177,8 +180,7 @@ where
.await
.map_err(NodeConnectError::SetupConnectionError)?;

AppConn::new(conn_tuple, rng, &mut spawner)
.map_err(|_| NodeConnectError::CreateNodeConnectionError)
AppConn::new(conn_tuple, rng, &spawner).map_err(|_| NodeConnectError::CreateNodeConnectionError)
}

#[derive(Debug)]
Expand All @@ -194,10 +196,8 @@ pub async fn connect<S>(
where
S: Spawn + Clone + Send + Sync + 'static,
{
let resolve_thread_pool = ThreadPool::new().map_err(|_| ConnectError)?;

// A tcp connector, Used to connect to remote servers:
let net_connector = NetConnector::new(MAX_FRAME_LENGTH, resolve_thread_pool, spawner.clone());
let tcp_connector = TcpConnector::new(MAX_FRAME_LENGTH, spawner.clone());

// Get a timer client:
let dur = Duration::from_millis(usize_to_u64(TICK_MS).unwrap());
Expand All @@ -207,7 +207,7 @@ where
let rng = system_random();

node_connect(
net_connector,
tcp_connector,
node_public_key,
node_net_address,
timer_client,
Expand Down
2 changes: 1 addition & 1 deletion components/app/src/identity.rs
Expand Up @@ -23,7 +23,7 @@ pub enum IdentityFromFileError {

pub fn identity_from_file<S>(
idfile_path: &Path,
mut spawner: S,
spawner: S,
) -> Result<IdentityClient, IdentityFromFileError>
where
S: Spawn,
Expand Down
4 changes: 0 additions & 4 deletions components/app/src/lib.rs
@@ -1,7 +1,3 @@
#![feature(arbitrary_self_types)]
#![feature(nll)]
#![feature(generators)]
#![feature(never_type)]
#![deny(trivial_numeric_casts, warnings)]
#![allow(intra_doc_link_resolution_failure)]
#![allow(
Expand Down
6 changes: 5 additions & 1 deletion components/app_server/Cargo.toml
Expand Up @@ -14,5 +14,9 @@ timer = { path = "../timer", version = "0.1.0" , package = "offst-timer" }
proto = { path = "../proto", version = "0.1.0" , package = "offst-proto" }

log = "0.4"
futures-preview = "0.3.0-alpha.16"
futures = "0.3.1"
im = "12.0.0"

[dev-dependencies]

futures = {version = "0.3.1", features = ["thread-pool"]}
4 changes: 0 additions & 4 deletions components/app_server/src/lib.rs
@@ -1,8 +1,4 @@
#![crate_type = "lib"]
#![feature(arbitrary_self_types)]
#![feature(nll)]
#![feature(generators)]
#![feature(never_type)]
#![deny(trivial_numeric_casts, warnings)]
#![allow(intra_doc_link_resolution_failure)]
#![allow(
Expand Down
12 changes: 7 additions & 5 deletions components/app_server/src/server.rs
Expand Up @@ -6,8 +6,8 @@ use futures::channel::mpsc;
use futures::task::{Spawn, SpawnExt};
use futures::{future, stream, Sink, SinkExt, Stream, StreamExt};

use common::conn::ConnPair;
use common::select_streams::{select_streams, BoxStream};
use common::conn::{sink_to_sender, BoxStream, ConnPair};
use common::select_streams::select_streams;
// use common::mutable_state::MutableState;
use proto::crypto::{PaymentId, Uid};

Expand Down Expand Up @@ -165,16 +165,17 @@ where
&mut self,
incoming_app_connection: IncomingAppConnection<B>,
) -> Result<(), AppServerError> {
let (permissions, (sender, receiver)) = incoming_app_connection;
let (permissions, conn_pair) = incoming_app_connection;
let (sender, receiver) = conn_pair.split();

let app_counter = self.app_counter;
let mut receiver =
let receiver =
receiver.map(move |app_to_app_server| (app_counter, Some(app_to_app_server)));

let mut from_app_sender = self.from_app_sender.clone();
let send_all_fut = async move {
// Forward all messages:
let _ = from_app_sender.send_all(&mut receiver).await;
let _ = from_app_sender.send_all(&mut receiver.map(Ok)).await;
// Notify that the connection to the app was closed:
let _ = from_app_sender.send((app_counter, None)).await;
};
Expand All @@ -183,6 +184,7 @@ where
.spawn(send_all_fut)
.map_err(|_| AppServerError::SpawnError)?;

let sender = sink_to_sender(sender, &self.spawner);
let mut app = App::new(permissions, sender);
// Send the initial node report:
app.send(AppServerToApp::Report(self.node_report.clone()))
Expand Down
10 changes: 6 additions & 4 deletions components/app_server/src/tests/all_apps_closed.rs
@@ -1,8 +1,10 @@
use futures::channel::mpsc;
use futures::executor::ThreadPool;
use futures::executor::{block_on, ThreadPool};
use futures::task::Spawn;
use futures::{SinkExt, StreamExt};

use common::conn::ConnPair;

use proto::crypto::PublicKey;

use proto::app_server::messages::{AppPermissions, AppServerToApp};
Expand All @@ -28,7 +30,7 @@ where

let (app_sender, app_server_receiver) = mpsc::channel(0);
let (app_server_sender, mut app_receiver) = mpsc::channel(0);
let app_server_conn_pair = (app_server_sender, app_server_receiver);
let app_server_conn_pair = ConnPair::from_raw(app_server_sender, app_server_receiver);

let app_permissions = AppPermissions {
routes: true,
Expand Down Expand Up @@ -84,6 +86,6 @@ where

#[test]
fn test_app_server_loop_index_all_apps_closed() {
let mut thread_pool = ThreadPool::new().unwrap();
thread_pool.run(task_app_server_loop_all_apps_closed(thread_pool.clone()));
let thread_pool = ThreadPool::new().unwrap();
block_on(task_app_server_loop_all_apps_closed(thread_pool.clone()));
}
10 changes: 6 additions & 4 deletions components/app_server/src/tests/funder_command.rs
@@ -1,8 +1,10 @@
use futures::channel::mpsc;
use futures::executor::ThreadPool;
use futures::executor::{block_on, ThreadPool};
use futures::task::Spawn;
use futures::{SinkExt, StreamExt};

use common::conn::ConnPair;

use proto::crypto::Uid;

use proto::app_server::messages::{
Expand All @@ -28,7 +30,7 @@ where

let (mut app_sender, app_server_receiver) = mpsc::channel(0);
let (app_server_sender, mut app_receiver) = mpsc::channel(0);
let app_server_conn_pair = (app_server_sender, app_server_receiver);
let app_server_conn_pair = ConnPair::from_raw(app_server_sender, app_server_receiver);

let app_permissions = AppPermissions {
routes: true,
Expand Down Expand Up @@ -102,6 +104,6 @@ where

#[test]
fn test_app_server_loop_funder_command() {
let mut thread_pool = ThreadPool::new().unwrap();
thread_pool.run(task_app_server_loop_funder_command(thread_pool.clone()));
let thread_pool = ThreadPool::new().unwrap();
block_on(task_app_server_loop_funder_command(thread_pool.clone()));
}
10 changes: 6 additions & 4 deletions components/app_server/src/tests/index_client_command.rs
@@ -1,8 +1,10 @@
use futures::channel::mpsc;
use futures::executor::ThreadPool;
use futures::executor::{block_on, ThreadPool};
use futures::task::Spawn;
use futures::{SinkExt, StreamExt};

use common::conn::ConnPair;

use proto::app_server::messages::{
AppPermissions, AppRequest, AppServerToApp, AppToAppServer, NodeReportMutation,
};
Expand Down Expand Up @@ -30,7 +32,7 @@ where

let (mut app_sender, app_server_receiver) = mpsc::channel(0);
let (app_server_sender, mut app_receiver) = mpsc::channel(0);
let app_server_conn_pair = (app_server_sender, app_server_receiver);
let app_server_conn_pair = ConnPair::from_raw(app_server_sender, app_server_receiver);

let app_permissions = AppPermissions {
routes: true,
Expand Down Expand Up @@ -113,8 +115,8 @@ where

#[test]
fn test_app_server_loop_index_client_command() {
let mut thread_pool = ThreadPool::new().unwrap();
thread_pool.run(task_app_server_loop_index_client_command(
let thread_pool = ThreadPool::new().unwrap();
block_on(task_app_server_loop_index_client_command(
thread_pool.clone(),
));
}
12 changes: 7 additions & 5 deletions components/app_server/src/tests/request_routes.rs
@@ -1,10 +1,12 @@
use std::convert::TryFrom;

use futures::channel::mpsc;
use futures::executor::ThreadPool;
use futures::executor::{block_on, ThreadPool};
use futures::task::Spawn;
use futures::{SinkExt, StreamExt};

use common::conn::ConnPair;

use proto::crypto::{PublicKey, Uid};

use proto::app_server::messages::{AppPermissions, AppRequest, AppServerToApp, AppToAppServer};
Expand Down Expand Up @@ -32,7 +34,7 @@ where
// Connect two apps:
let (mut app_sender0, app_server_receiver) = mpsc::channel(1);
let (app_server_sender, mut app_receiver0) = mpsc::channel(1);
let app_server_conn_pair = (app_server_sender, app_server_receiver);
let app_server_conn_pair = ConnPair::from_raw(app_server_sender, app_server_receiver);
let app_permissions = AppPermissions {
routes: true,
buyer: true,
Expand All @@ -46,7 +48,7 @@ where

let (_app_sender1, app_server_receiver) = mpsc::channel(1);
let (app_server_sender, mut app_receiver1) = mpsc::channel(1);
let app_server_conn_pair = (app_server_sender, app_server_receiver);
let app_server_conn_pair = ConnPair::from_raw(app_server_sender, app_server_receiver);
let app_permissions = AppPermissions {
routes: true,
buyer: true,
Expand Down Expand Up @@ -154,6 +156,6 @@ where

#[test]
fn test_app_server_loop_index_request_routes() {
let mut thread_pool = ThreadPool::new().unwrap();
thread_pool.run(task_app_server_loop_request_routes(thread_pool.clone()));
let thread_pool = ThreadPool::new().unwrap();
block_on(task_app_server_loop_request_routes(thread_pool.clone()));
}

0 comments on commit 68fec7c

Please sign in to comment.