Skip to content

Commit 35f7b35

Browse files
committed
perf(phantun) spawn multiple threads for UDP send/receive
1 parent dff0c4c commit 35f7b35

File tree

3 files changed

+167
-69
lines changed

3 files changed

+167
-69
lines changed

phantun/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ clap = { version = "3.0", features = ["cargo"] }
1515
socket2 = { version = "0.4", features = ["all"] }
1616
fake-tcp = { path = "../fake-tcp", version = "0.2" }
1717
tokio = { version = "1.14", features = ["full"] }
18+
tokio-util = "0.7"
1819
log = "0.4"
1920
pretty_env_logger = "0.4"
2021
tokio-tun = "0.5"

phantun/src/bin/client.rs

+72-36
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@ use std::net::{Ipv4Addr, SocketAddr};
88
use std::sync::Arc;
99
use std::time::Duration;
1010
use tokio::net::UdpSocket;
11-
use tokio::sync::RwLock;
11+
use tokio::sync::{Notify, RwLock};
1212
use tokio::time;
1313
use tokio_tun::TunBuilder;
14+
use tokio_util::sync::CancellationToken;
1415

1516
const UDP_TTL: Duration = Duration::from_secs(180);
1617

@@ -119,14 +120,16 @@ async fn main() {
119120
.parse()
120121
.expect("bad peer address for Tun interface");
121122

123+
let num_cpus = num_cpus::get();
124+
122125
let tun = TunBuilder::new()
123126
.name(matches.value_of("tun").unwrap()) // if name is empty, then it is set by kernel.
124127
.tap(false) // false (default): TUN, true: TAP.
125128
.packet_info(false) // false: IFF_NO_PI, default is true.
126129
.up() // or set it up manually using `sudo ip link set <tun-name> up`.
127130
.address(tun_local)
128131
.destination(tun_peer)
129-
.try_build_mq(num_cpus::get())
132+
.try_build_mq(num_cpus)
130133
.unwrap();
131134

132135
info!("Created TUN device {}", tun[0].name());
@@ -168,52 +171,85 @@ async fn main() {
168171
assert!(connections.write().await.insert(addr, sock.clone()).is_none());
169172
debug!("inserted fake TCP socket into connection table");
170173

171-
let connections = connections.clone();
172-
173174
// spawn "fastpath" UDP socket and task, this will offload main task
174175
// from forwarding UDP packets
175-
tokio::spawn(async move {
176-
let mut buf_udp = [0u8; MAX_PACKET_LEN];
177-
let mut buf_tcp = [0u8; MAX_PACKET_LEN];
178-
let udp_sock = new_udp_reuseport(local_addr);
179-
udp_sock.connect(addr).await.unwrap();
180176

177+
let packet_received = Arc::new(Notify::new());
178+
let quit = CancellationToken::new();
179+
180+
for i in 0..num_cpus {
181+
let sock = sock.clone();
182+
let quit = quit.child_token();
183+
let packet_received = packet_received.clone();
184+
185+
tokio::spawn(async move {
186+
let mut buf_udp = [0u8; MAX_PACKET_LEN];
187+
let mut buf_tcp = [0u8; MAX_PACKET_LEN];
188+
let udp_sock = new_udp_reuseport(local_addr);
189+
udp_sock.connect(addr).await.unwrap();
190+
191+
loop {
192+
tokio::select! {
193+
Ok(size) = udp_sock.recv(&mut buf_udp) => {
194+
if sock.send(&buf_udp[..size]).await.is_none() {
195+
debug!("removed fake TCP socket from connections table");
196+
quit.cancel();
197+
return;
198+
}
199+
200+
packet_received.notify_one();
201+
},
202+
res = sock.recv(&mut buf_tcp) => {
203+
match res {
204+
Some(size) => {
205+
if size > 0 {
206+
if let Err(e) = udp_sock.send(&buf_tcp[..size]).await {
207+
error!("Unable to send UDP packet to {}: {}, closing connection", e, addr);
208+
quit.cancel();
209+
return;
210+
}
211+
}
212+
},
213+
None => {
214+
debug!("removed fake TCP socket from connections table");
215+
quit.cancel();
216+
return;
217+
},
218+
}
219+
220+
packet_received.notify_one();
221+
},
222+
_ = quit.cancelled() => {
223+
debug!("worker {} terminated", i);
224+
return;
225+
},
226+
};
227+
}
228+
});
229+
}
230+
231+
let connections = connections.clone();
232+
tokio::spawn(async move {
181233
loop {
182234
let read_timeout = time::sleep(UDP_TTL);
235+
let packet_received_fut = packet_received.notified();
183236

184237
tokio::select! {
185-
Ok(size) = udp_sock.recv(&mut buf_udp) => {
186-
if sock.send(&buf_udp[..size]).await.is_none() {
187-
connections.write().await.remove(&addr);
188-
debug!("removed fake TCP socket from connections table");
189-
return;
190-
}
191-
},
192-
res = sock.recv(&mut buf_tcp) => {
193-
match res {
194-
Some(size) => {
195-
if size > 0 {
196-
if let Err(e) = udp_sock.send(&buf_tcp[..size]).await {
197-
connections.write().await.remove(&addr);
198-
error!("Unable to send UDP packet to {}: {}, closing connection", e, addr);
199-
return;
200-
}
201-
}
202-
},
203-
None => {
204-
connections.write().await.remove(&addr);
205-
debug!("removed fake TCP socket from connections table");
206-
return;
207-
},
208-
}
209-
},
210238
_ = read_timeout => {
211239
info!("No traffic seen in the last {:?}, closing connection", UDP_TTL);
212240
connections.write().await.remove(&addr);
213241
debug!("removed fake TCP socket from connections table");
242+
243+
quit.cancel();
214244
return;
215-
}
216-
};
245+
},
246+
_ = quit.cancelled() => {
247+
connections.write().await.remove(&addr);
248+
debug!("removed fake TCP socket from connections table");
249+
return;
250+
},
251+
_ = packet_received_fut => {},
252+
}
217253
}
218254
});
219255
},

phantun/src/bin/server.rs

+94-33
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,36 @@
11
use clap::{crate_version, Arg, Command};
22
use fake_tcp::packet::MAX_PACKET_LEN;
33
use fake_tcp::Stack;
4-
use log::{error, info};
5-
use std::net::Ipv4Addr;
4+
use log::{debug, error, info};
5+
use std::net::{Ipv4Addr, SocketAddr};
6+
use std::sync::Arc;
67
use tokio::net::UdpSocket;
8+
use tokio::sync::Notify;
79
use tokio::time::{self, Duration};
810
use tokio_tun::TunBuilder;
11+
use tokio_util::sync::CancellationToken;
912
const UDP_TTL: Duration = Duration::from_secs(180);
1013

14+
fn new_udp_reuseport(addr: SocketAddr) -> UdpSocket {
15+
let udp_sock = socket2::Socket::new(
16+
if addr.is_ipv4() {
17+
socket2::Domain::IPV4
18+
} else {
19+
socket2::Domain::IPV6
20+
},
21+
socket2::Type::DGRAM,
22+
None,
23+
)
24+
.unwrap();
25+
udp_sock.set_reuse_port(true).unwrap();
26+
// from tokio-rs/mio/blob/master/src/sys/unix/net.rs
27+
udp_sock.set_cloexec(true).unwrap();
28+
udp_sock.set_nonblocking(true).unwrap();
29+
udp_sock.bind(&socket2::SockAddr::from(addr)).unwrap();
30+
let udp_sock: std::net::UdpSocket = udp_sock.into();
31+
udp_sock.try_into().unwrap()
32+
}
33+
1134
#[tokio::main]
1235
async fn main() {
1336
pretty_env_logger::init();
@@ -88,14 +111,16 @@ async fn main() {
88111
.parse()
89112
.expect("bad peer address for Tun interface");
90113

114+
let num_cpus = num_cpus::get();
115+
91116
let tun = TunBuilder::new()
92117
.name(matches.value_of("tun").unwrap()) // if name is empty, then it is set by kernel.
93118
.tap(false) // false (default): TUN, true: TAP.
94119
.packet_info(false) // false: IFF_NO_PI, default is true.
95120
.up() // or set it up manually using `sudo ip link set <tun-name> up`.
96121
.address(tun_local)
97122
.destination(tun_peer)
98-
.try_build_mq(num_cpus::get())
123+
.try_build_mq(num_cpus)
99124
.unwrap();
100125

101126
info!("Created TUN device {}", tun[0].name());
@@ -110,46 +135,82 @@ async fn main() {
110135
let mut buf_tcp = [0u8; MAX_PACKET_LEN];
111136

112137
loop {
113-
let sock = stack.accept().await;
138+
let sock = Arc::new(stack.accept().await);
114139
info!("New connection: {}", sock);
115140

116-
tokio::spawn(async move {
117-
let udp_sock = UdpSocket::bind(if remote_addr.is_ipv4() {
118-
"0.0.0.0:0"
119-
} else {
120-
"[::]:0"
121-
})
122-
.await
123-
.unwrap();
124-
udp_sock.connect(remote_addr).await.unwrap();
141+
let packet_received = Arc::new(Notify::new());
142+
let quit = CancellationToken::new();
143+
let udp_sock = UdpSocket::bind(if remote_addr.is_ipv4() {
144+
"0.0.0.0:0"
145+
} else {
146+
"[::]:0"
147+
})
148+
.await
149+
.unwrap();
150+
let local_addr = udp_sock.local_addr().unwrap();
151+
drop(udp_sock);
152+
153+
for i in 0..num_cpus {
154+
let sock = sock.clone();
155+
let quit = quit.child_token();
156+
let packet_received = packet_received.clone();
157+
let udp_sock = new_udp_reuseport(local_addr);
158+
159+
tokio::spawn(async move {
160+
udp_sock.connect(remote_addr).await.unwrap();
161+
162+
loop {
163+
tokio::select! {
164+
Ok(size) = udp_sock.recv(&mut buf_udp) => {
165+
if sock.send(&buf_udp[..size]).await.is_none() {
166+
quit.cancel();
167+
return;
168+
}
169+
170+
packet_received.notify_one();
171+
},
172+
res = sock.recv(&mut buf_tcp) => {
173+
match res {
174+
Some(size) => {
175+
if size > 0 {
176+
if let Err(e) = udp_sock.send(&buf_tcp[..size]).await {
177+
error!("Unable to send UDP packet to {}: {}, closing connection", e, remote_addr);
178+
quit.cancel();
179+
return;
180+
}
181+
}
182+
},
183+
None => {
184+
quit.cancel();
185+
return;
186+
},
187+
}
125188

189+
packet_received.notify_one();
190+
},
191+
_ = quit.cancelled() => {
192+
debug!("worker {} terminated", i);
193+
return;
194+
},
195+
};
196+
}
197+
});
198+
}
199+
200+
tokio::spawn(async move {
126201
loop {
127202
let read_timeout = time::sleep(UDP_TTL);
203+
let packet_received_fut = packet_received.notified();
128204

129205
tokio::select! {
130-
Ok(size) = udp_sock.recv(&mut buf_udp) => {
131-
if sock.send(&buf_udp[..size]).await.is_none() {
132-
return;
133-
}
134-
},
135-
res = sock.recv(&mut buf_tcp) => {
136-
match res {
137-
Some(size) => {
138-
if size > 0 {
139-
if let Err(e) = udp_sock.send(&buf_tcp[..size]).await {
140-
error!("Unable to send UDP packet to {}: {}, closing connection", e, remote_addr);
141-
return;
142-
}
143-
}
144-
},
145-
None => { return; },
146-
}
147-
},
148206
_ = read_timeout => {
149207
info!("No traffic seen in the last {:?}, closing connection", UDP_TTL);
208+
209+
quit.cancel();
150210
return;
151-
}
152-
};
211+
},
212+
_ = packet_received_fut => {},
213+
}
153214
}
154215
});
155216
}

0 commit comments

Comments
 (0)