/
request_routes.rs
161 lines (141 loc) · 5.39 KB
/
request_routes.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
use std::convert::TryFrom;
use futures::channel::mpsc;
use futures::executor::{block_on, ThreadPool};
use futures::task::Spawn;
use futures::{SinkExt, StreamExt};
use common::conn::ConnPair;
use proto::crypto::{PublicKey, Uid};
use proto::app_server::messages::{AppPermissions, AppRequest, AppServerToApp, AppToAppServer};
use proto::funder::messages::Currency;
use proto::index_client::messages::{
AppServerToIndexClient, ClientResponseRoutes, IndexClientRequest, IndexClientToAppServer,
RequestRoutes, ResponseRoutesResult,
};
use super::utils::spawn_dummy_app_server;
async fn task_app_server_loop_request_routes<S>(spawner: S)
where
S: Spawn + Clone + Send + 'static,
{
let (
_funder_sender,
_funder_receiver,
mut index_client_sender,
mut index_client_receiver,
mut connections_sender,
_initial_node_report,
) = spawn_dummy_app_server(spawner.clone());
// Connect two apps:
let (mut app_sender0, app_server_receiver) = mpsc::channel(1);
let (app_server_sender, mut app_receiver0) = mpsc::channel(1);
let app_server_conn_pair = ConnPair::from_raw(app_server_sender, app_server_receiver);
let app_permissions = AppPermissions {
routes: true,
buyer: true,
seller: true,
config: true,
};
connections_sender
.send((app_permissions, app_server_conn_pair))
.await
.unwrap();
let (_app_sender1, app_server_receiver) = mpsc::channel(1);
let (app_server_sender, mut app_receiver1) = mpsc::channel(1);
let app_server_conn_pair = ConnPair::from_raw(app_server_sender, app_server_receiver);
let app_permissions = AppPermissions {
routes: true,
buyer: true,
seller: true,
config: true,
};
connections_sender
.send((app_permissions, app_server_conn_pair))
.await
.unwrap();
// The apps should receive the current node report as the first message:
let _to_app_message = app_receiver0.next().await.unwrap();
let _to_app_message = app_receiver1.next().await.unwrap();
let currency1 = Currency::try_from("FST1".to_owned()).unwrap();
// Send a request routes message through app0:
let request_routes = RequestRoutes {
request_id: Uid::from(&[3; Uid::len()]),
currency: currency1.clone(),
capacity: 250,
source: PublicKey::from(&[0xee; PublicKey::len()]),
destination: PublicKey::from(&[0xff; PublicKey::len()]),
opt_exclude: None,
};
let to_app_server = AppToAppServer::new(
Uid::from(&[22; Uid::len()]),
AppRequest::RequestRoutes(request_routes.clone()),
);
app_sender0.send(to_app_server).await.unwrap();
// RequestRoutes command should be forwarded to IndexClient:
let to_index_client_message = index_client_receiver.next().await.unwrap();
match to_index_client_message {
AppServerToIndexClient::AppRequest((
app_request_id,
IndexClientRequest::RequestRoutes(received_request_routes),
)) => {
assert_eq!(app_request_id, Uid::from(&[22; Uid::len()]));
assert_eq!(received_request_routes, request_routes);
}
_ => unreachable!(),
};
// IndexClient returns a response that is not related to any open request.
// This response will be discarded.
let client_response_routes = ClientResponseRoutes {
request_id: Uid::from(&[2; Uid::len()]),
result: ResponseRoutesResult::Failure,
};
index_client_sender
.send(IndexClientToAppServer::ResponseRoutes(
client_response_routes,
))
.await
.unwrap();
// We shouldn't get an message at any of the apps:
assert!(app_receiver0.try_next().is_err());
assert!(app_receiver1.try_next().is_err());
// IndexClient returns a response corresponding to an open request:
let client_response_routes = ClientResponseRoutes {
request_id: Uid::from(&[3; Uid::len()]),
result: ResponseRoutesResult::Failure,
};
index_client_sender
.send(IndexClientToAppServer::ResponseRoutes(
client_response_routes,
))
.await
.unwrap();
let to_app_message = app_receiver0.next().await.unwrap();
match to_app_message {
AppServerToApp::ResponseRoutes(response_routes) => {
assert_eq!(response_routes.request_id, Uid::from(&[3; Uid::len()]));
assert_eq!(response_routes.result, ResponseRoutesResult::Failure);
}
_ => unreachable!(),
}
// We shouldn't get an incoming message at app1:
assert!(app_receiver1.try_next().is_err());
// IndexClient again returns the same response.
// This time the response should be discarded,
// because it does not correspond to any open request.
let client_response_routes = ClientResponseRoutes {
request_id: Uid::from(&[3; Uid::len()]),
result: ResponseRoutesResult::Failure,
};
index_client_sender
.send(IndexClientToAppServer::ResponseRoutes(
client_response_routes,
))
.await
.unwrap();
// We shouldn't get an message at any of the apps:
assert!(app_receiver0.try_next().is_err());
assert!(app_receiver1.try_next().is_err());
}
#[test]
fn test_app_server_loop_index_request_routes() {
let thread_pool = ThreadPool::new().unwrap();
block_on(task_app_server_loop_request_routes(thread_pool.clone()));
}