-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_tokio_stream.rs
173 lines (123 loc) · 5.77 KB
/
test_tokio_stream.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
use std::os::unix::{io::AsRawFd, net::UnixStream as OsUnixStream, prelude::FromRawFd};
use tempdir::TempDir;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{UnixListener, UnixStream},
};
use async_send_fd::{AsyncRecvFd, AsyncRecvTokioStream, AsyncSendFd, AsyncSendTokioStream};
const SOCKET_NAME: &str = "tokio_send_fd_test.sock";
#[tokio::test]
async fn send_raw_fd_test() {
let tmp_dir = TempDir::new("tokio-send-fd").unwrap();
let sock_path = tmp_dir.path().join(SOCKET_NAME);
let sock_path1 = sock_path.clone();
let sock_path2 = sock_path.clone();
println!("Start listening at: {:?}", sock_path1);
let listener = UnixListener::bind(sock_path1).unwrap();
let j1 = tokio::spawn(async move {
let (stream, _) = listener.accept().await.unwrap();
println!("Incoming peer connection");
let (left, right) = OsUnixStream::pair().unwrap();
println!("Sending peer fd");
stream.send_fd(left.as_raw_fd()).await.unwrap();
println!("Succesfullt sent peer fd");
right.set_nonblocking(true).unwrap();
let mut peer_stream = UnixStream::from_std(right).unwrap();
let mut buffer = [0u8; 4];
println!("Reading data from the peer");
assert!(peer_stream.read(&mut buffer).await.unwrap() == 4);
println!("Message sent through a socket: {:?}", buffer);
});
let j2 = tokio::spawn(async move {
println!("Connection to the sender");
let stream = UnixStream::connect(sock_path2).await.unwrap();
println!("Succesfully connected to the sender. Reading file descriptor");
let fd = stream.recv_fd().await.unwrap();
println!("Succesfully read file descriptor");
let os_stream = unsafe { OsUnixStream::from_raw_fd(fd) };
// XXX: Don't forget to make this non-blocking. This gonna save you several days of debugging
os_stream.set_nonblocking(true).unwrap();
let mut peer_stream = UnixStream::from_std(os_stream).unwrap();
println!("Sending data to the peer");
let buffer: [u8; 4] = [0, 0, 0, 42];
peer_stream.write(&buffer).await.unwrap();
println!("Succesfully sent data to the peer");
});
tokio::try_join!(j1, j2).unwrap();
let _ = std::fs::remove_dir(sock_path);
}
#[tokio::test]
async fn send_tokio_stream_test() {
let tmp_dir = TempDir::new("tokio-send-fd").unwrap();
let sock_path = tmp_dir.path().join(SOCKET_NAME);
let sock_path1 = sock_path.clone();
let sock_path2 = sock_path.clone();
println!("Start listening at: {:?}", sock_path1);
let listener = UnixListener::bind(sock_path1).unwrap();
let j1 = tokio::spawn(async move {
let (stream, _) = listener.accept().await.unwrap();
println!("Incoming peer connection");
let (left, mut right) = UnixStream::pair().unwrap();
println!("Sending peer fd");
stream.send_stream(left).await.unwrap();
println!("Succesfullt sent peer fd");
let mut buffer = [0u8; 4];
println!("Reading data from the peer");
assert!(right.read(&mut buffer).await.unwrap() == 4);
println!("Message sent through a socket: {:?}", buffer);
});
let j2 = tokio::spawn(async move {
println!("Connection to the sender");
let stream = UnixStream::connect(sock_path2).await.unwrap();
println!("Succesfully connected to the sender. Reading file descriptor");
let mut peer_stream = stream.recv_stream().await.unwrap();
println!("Sending data to the peer");
let buffer: [u8; 4] = [0, 0, 0, 42];
peer_stream.write(&buffer).await.unwrap();
println!("Succesfully sent data to the peer");
});
tokio::try_join!(j1, j2).unwrap();
let _ = std::fs::remove_dir(sock_path);
}
#[tokio::test]
async fn tokio_halves_test() {
let tmp_dir = TempDir::new("tokio-send-fd").unwrap();
let sock_path = tmp_dir.path().join(SOCKET_NAME);
let sock_path1 = sock_path.clone();
let sock_path2 = sock_path.clone();
println!("Start listening at: {:?}", sock_path1);
let listener = UnixListener::bind(sock_path1).unwrap();
let j1 = tokio::spawn(async move {
let (stream, _) = listener.accept().await.unwrap();
println!("Incoming peer connection");
let (left, right) = OsUnixStream::pair().unwrap();
let (_, sender) = stream.into_split();
println!("Sending peer fd");
sender.send_fd(left.as_raw_fd()).await.unwrap();
println!("Succesfullt sent peer fd");
right.set_nonblocking(true).unwrap();
let mut peer_stream = UnixStream::from_std(right).unwrap();
let mut buffer = [0u8; 4];
println!("Reading data from the peer");
assert!(peer_stream.read(&mut buffer).await.unwrap() == 4);
println!("Message sent through a socket: {:?}", buffer);
});
let j2 = tokio::spawn(async move {
println!("Connection to the sender");
let stream = UnixStream::connect(sock_path2).await.unwrap();
let (receiver, _) = stream.into_split();
println!("Succesfully connected to the sender. Reading file descriptor");
let fd = receiver.recv_fd().await.unwrap();
println!("Succesfully read file descriptor");
let os_stream = unsafe { OsUnixStream::from_raw_fd(fd) };
// XXX: Don't forget to make this non-blocking. This gonna save you several days of debugging
os_stream.set_nonblocking(true).unwrap();
let mut peer_stream = UnixStream::from_std(os_stream).unwrap();
println!("Sending data to the peer");
let buffer: [u8; 4] = [0, 0, 0, 42];
peer_stream.write(&buffer).await.unwrap();
println!("Succesfully sent data to the peer");
});
tokio::try_join!(j1, j2).unwrap();
let _ = std::fs::remove_dir(sock_path);
}