-
Notifications
You must be signed in to change notification settings - Fork 73
/
connect_async.rs
118 lines (111 loc) · 4.16 KB
/
connect_async.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use fast_socks5::client::{Config, Socks5Stream};
use futures_util::{SinkExt, StreamExt};
use governor::{Quota, RateLimiter};
use log::*;
use nonzero_ext::*;
use reqwest::Url;
use std::{env, num::NonZeroU32};
use tokio::{
io::{AsyncRead, AsyncWrite},
sync::mpsc::{Receiver, Sender},
};
use tokio_tungstenite::{
tungstenite::{Error, Message},
MaybeTlsStream, WebSocketStream,
};
/// Wraps a websocket client inside an event loop, returns a message_rx to
/// receive messages and a command_tx to send commands to the websocket server.
///
/// To close the websocket connection, send a `Message::Close` message to the
/// command_tx.
///
/// `limit`, max number of uplink messsages, for example, 100 per 10 seconds
pub async fn connect_async(
url: &str,
uplink_limit: Option<(NonZeroU32, std::time::Duration)>,
) -> Result<(Receiver<Message>, Sender<Message>), Error> {
if let Ok(proxy_env) = env::var("https_proxy").or_else(|_| env::var("http_proxy")) {
let proxy_url = Url::parse(&proxy_env).unwrap();
let proxy_scheme = proxy_url.scheme().to_lowercase();
if proxy_scheme.as_str() != "socks5" {
panic!("Unsupported proxy scheme {proxy_scheme}");
}
let proxy_addr = format!(
"{}:{}",
proxy_url.host_str().unwrap(),
proxy_url.port_or_known_default().unwrap()
);
let connect_url = Url::parse(url).unwrap();
let proxy_stream = Socks5Stream::connect(
proxy_addr.to_string(),
connect_url.host_str().unwrap().to_string(),
connect_url.port_or_known_default().unwrap(),
Config::default(),
)
.await
.unwrap();
let (ws_stream, _) = tokio_tungstenite::client_async_tls(connect_url, proxy_stream).await?;
// replaced
// let ret = tokio_tungstenite::connect_async(url).await;
connect_async_internal(ws_stream, uplink_limit).await
} else {
let (ws_stream, _) = tokio_tungstenite::connect_async(url).await?;
connect_async_internal(ws_stream, uplink_limit).await
}
}
async fn connect_async_internal<S: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
ws_stream: WebSocketStream<MaybeTlsStream<S>>,
uplink_limit: Option<(NonZeroU32, std::time::Duration)>,
) -> Result<(Receiver<Message>, Sender<Message>), Error> {
let (command_tx, mut command_rx) = tokio::sync::mpsc::channel::<Message>(1);
let (message_tx, message_rx) = tokio::sync::mpsc::channel::<Message>(32);
let (mut write, mut read) = ws_stream.split();
let limiter = if let Some((max_burst, duration)) = uplink_limit {
let quota = Quota::with_period(duration).unwrap().allow_burst(max_burst);
RateLimiter::direct(quota)
} else {
RateLimiter::direct(Quota::per_second(nonzero!(u32::max_value())))
};
tokio::task::spawn(async move {
loop {
tokio::select! {
command = command_rx.recv() => {
match command {
Some(command) => {
match command {
Message::Close(_) => {
break; // close the connection and break the loop
}
_ => {
limiter.until_ready().await;
if let Err(err) =write.send(command).await {
error!("Failed to send, error: {}", err);
}
}
}
}
None => {
debug!("command_rx closed");
break;
}
}
}
msg = read.next() => match msg {
Some(Ok(msg)) => {
let _= message_tx.send(msg).await;
}
Some(Err(err)) => {
error!("Failed to read, error: {}", err);
break;
}
None => {
debug!("message_tx closed");
break;
}
}
};
}
_ = write.send(Message::Close(None)).await;
});
Ok((message_rx, command_tx))
}