Skip to content

Commit

Permalink
feat: Move from async-tungstenite to tokio-tungstenite
Browse files Browse the repository at this point in the history
Moves us to rustls, removes openssl dependency.
  • Loading branch information
qdot committed Nov 21, 2023
1 parent 84f0fd0 commit 03bff1f
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 76 deletions.
8 changes: 4 additions & 4 deletions buttplug/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ client=[]
server=[]
serialize-json=[]
# Connectors
websockets=["serialize-json", "async-tungstenite", "tokio-native-tls"]
websockets=["serialize-json", "tokio-tungstenite", "rustls"]
# Device Communication Managers
xinput-manager=["server"]
btleplug-manager=["server", "btleplug"]
Expand All @@ -42,7 +42,7 @@ lovense-dongle-manager=["server", "serialport", "hidapi"]
lovense-connect-service-manager=["server","reqwest"]
websocket-server-manager=["server", "websockets"]
# Runtime managers
tokio-runtime=["async-tungstenite/tokio-runtime", "async-tungstenite/tokio-native-tls"]
tokio-runtime=[]
wasm-bindgen-runtime=["wasm-bindgen", "wasm-bindgen-futures"]
wasm = ["server", "wasm-bindgen-runtime", "serialize-json", "wasm-bindgen", "uuid/wasm-bindgen", "wasmtimer"]
dummy-runtime=[]
Expand Down Expand Up @@ -70,8 +70,6 @@ paste = "1.0.14"
lazy_static = "1.4.0"
byteorder = "1.5.0"
thiserror = "1.0.50"
async-tungstenite = { version = "0.23.0", optional = true }
tokio-native-tls = { version = "0.3.1", optional = true }
wasm-bindgen-futures = { version = "0.4.38", optional = true }
cfg-if = "1.0.0"
tracing = "0.1.40"
Expand All @@ -94,6 +92,8 @@ tokio-stream = "0.1.14"
wasmtimer = { version = "0.2.0", optional = true }
instant = "0.1.12"
regex = "1.10.2"
tokio-tungstenite = { version = "0.20.1", features = ["rustls-tls-webpki-roots"], optional = true }
rustls = { version = "0.21.9", optional = true }

[dev-dependencies]
serde_yaml = "0.9.27"
Expand Down
2 changes: 1 addition & 1 deletion buttplug/src/core/connector/transport/websocket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
pub mod websocket_client;
pub mod websocket_server;

pub use async_tungstenite::tungstenite::Error as TungsteniteError;
pub use tokio_tungstenite::tungstenite::Error as TungsteniteError;
pub use websocket_client::ButtplugWebsocketClientTransport;

pub use websocket_server::{
Expand Down
82 changes: 52 additions & 30 deletions buttplug/src/core/connector/transport/websocket/websocket_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,48 @@ use crate::{
},
util::async_manager,
};
use async_tungstenite::{tokio::connect_async_with_tls_connector, tungstenite::protocol::Message};
use tokio_tungstenite::{Connector, connect_async_tls_with_config, tungstenite::protocol::Message, connect_async};
use rustls::ClientConfig;
use futures::{future::BoxFuture, FutureExt, SinkExt, StreamExt};
use url::Url;
use std::sync::Arc;
use tokio::sync::{
mpsc::{Receiver, Sender},
Notify,
};
use tokio_native_tls::native_tls::TlsConnector as NativeTlsConnector;
use tokio_native_tls::TlsConnector;
use tracing::Instrument;

/// Websocket connector for ButtplugClients, using [async_tungstenite]
// Taken from https://stackoverflow.com/questions/72846337/does-hyper-client-not-accept-self-signed-certificates
pub fn get_rustls_config_dangerous() -> ClientConfig {
let store = rustls::RootCertStore::empty();

let mut config = ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(store)
.with_no_client_auth();

// if you want to completely disable cert-verification, use this
let mut dangerous_config = ClientConfig::dangerous(&mut config);
dangerous_config.set_certificate_verifier(Arc::new(NoCertificateVerification {}));

config
}
pub struct NoCertificateVerification {}
impl rustls::client::ServerCertVerifier for NoCertificateVerification {
fn verify_server_cert(
&self,
_end_entity: &rustls::Certificate,
_intermediates: &[rustls::Certificate],
_server_name: &rustls::ServerName,
_scts: &mut dyn Iterator<Item = &[u8]>,
_ocsp: &[u8],
_now: std::time::SystemTime,
) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
Ok(rustls::client::ServerCertVerified::assertion())
}
}

/// Websocket connector for ButtplugClients, using [tokio_tungstenite]
pub struct ButtplugWebsocketClientTransport {
/// Address of the server we'll connect to.
address: String,
Expand Down Expand Up @@ -85,34 +115,26 @@ impl ButtplugConnectorTransport for ButtplugWebsocketClientTransport {
) -> BoxFuture<'static, Result<(), ButtplugConnectorError>> {
let disconnect_notifier = self.disconnect_notifier.clone();

// If we're supposed to be a secure connection, generate a TLS connector
// based on our certificate verfication needs. Otherwise, just pass None in
// which case we won't wrap.
let tls_connector: Option<TlsConnector> = if self.should_use_tls {
if self.bypass_cert_verify {
Some(
NativeTlsConnector::builder()
.danger_accept_invalid_certs(true)
.build()
.expect("Should always succeed, we're not setting any fallible options.")
.into(),
)
} else {
Some(
NativeTlsConnector::new()
.expect("Should always succeed, not setting options.")
.into(),
)
}
} else {
// If we're not using a secure connection, just return None, at which
// point async_tungstenite won't use a wrapper.
None
};
let address = self.address.clone();

let should_use_tls = self.should_use_tls;
let bypass_cert_verify = self.bypass_cert_verify;
async move {
match connect_async_with_tls_connector(&address, tls_connector).await {
let url = Url::parse(&address).expect("Should be checked before here");
let stream_result = if should_use_tls {
// If we're supposed to be a secure connection, generate a TLS connector
// based on our certificate verfication needs. Otherwise, just pass None in
// which case we won't wrap.
let connector = if bypass_cert_verify {
Some(Connector::Rustls(Arc::new(get_rustls_config_dangerous())))
} else {
None
};
connect_async_tls_with_config(&url, None, false, connector).await
} else {
connect_async(&url).await
};

match stream_result {
Ok((stream, _)) => {
let (mut writer, mut reader) = stream.split();

Expand Down
36 changes: 17 additions & 19 deletions buttplug/src/core/connector/transport/websocket/websocket_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ use crate::{
},
util::async_manager,
};
use futures::{future::BoxFuture, AsyncRead, AsyncWrite, FutureExt, SinkExt, StreamExt};
use futures::{future::BoxFuture, FutureExt, StreamExt, SinkExt};
use std::{sync::Arc, time::Duration};
use tokio::{
net::TcpListener,
net::{TcpListener, TcpStream},
sync::{
mpsc::{Receiver, Sender},
Notify,
Expand Down Expand Up @@ -68,13 +68,12 @@ impl ButtplugWebsocketServerTransportBuilder {
}
}

async fn run_connection_loop<S>(
ws_stream: async_tungstenite::WebSocketStream<S>,
async fn run_connection_loop(
ws_stream: tokio_tungstenite::WebSocketStream<TcpStream>,
mut request_receiver: Receiver<ButtplugSerializedMessage>,
response_sender: Sender<ButtplugTransportIncomingMessage>,
disconnect_notifier: Arc<Notify>,
) where
S: AsyncRead + AsyncWrite + Unpin,
)
{
info!("Starting websocket server connection event loop.");

Expand All @@ -98,7 +97,7 @@ async fn run_connection_loop<S>(
}
pong_count = 0;
if websocket_server_sender
.send(async_tungstenite::tungstenite::Message::Ping(vec!(0)))
.send(tokio_tungstenite::tungstenite::Message::Ping(vec!(0)))
.await
.is_err() {
warn!("Cannot send ping to client, considering connection closed.");
Expand All @@ -111,7 +110,7 @@ async fn run_connection_loop<S>(
ButtplugSerializedMessage::Text(text_msg) => {
trace!("Sending text message: {}", text_msg);
if websocket_server_sender
.send(async_tungstenite::tungstenite::Message::Text(text_msg))
.send(tokio_tungstenite::tungstenite::Message::Text(text_msg))
.await
.is_err() {
warn!("Cannot send text value to server, considering connection closed.");
Expand All @@ -120,7 +119,7 @@ async fn run_connection_loop<S>(
}
ButtplugSerializedMessage::Binary(binary_msg) => {
if websocket_server_sender
.send(async_tungstenite::tungstenite::Message::Binary(binary_msg))
.send(tokio_tungstenite::tungstenite::Message::Binary(binary_msg))
.await
.is_err() {
warn!("Cannot send binary value to server, considering connection closed.");
Expand All @@ -141,40 +140,40 @@ async fn run_connection_loop<S>(
match ws_data {
Ok(msg) => {
match msg {
async_tungstenite::tungstenite::Message::Text(text_msg) => {
tokio_tungstenite::tungstenite::Message::Text(text_msg) => {
trace!("Got text: {}", text_msg);
if response_sender.send(ButtplugTransportIncomingMessage::Message(ButtplugSerializedMessage::Text(text_msg))).await.is_err() {
warn!("Connector that owns transport no longer available, exiting.");
break;
}
}
async_tungstenite::tungstenite::Message::Close(_) => {
tokio_tungstenite::tungstenite::Message::Close(_) => {
let _ = response_sender.send(ButtplugTransportIncomingMessage::Close("Websocket server closed".to_owned())).await;
// If closing errors out, log it but there's not a lot we can do.
if let Err(e) = websocket_server_sender.close().await {
error!("{:?}", e);
}
break;
}
async_tungstenite::tungstenite::Message::Ping(val) => {
tokio_tungstenite::tungstenite::Message::Ping(val) => {
if websocket_server_sender
.send(async_tungstenite::tungstenite::Message::Pong(val))
.send(tokio_tungstenite::tungstenite::Message::Pong(val))
.await
.is_err() {
warn!("Cannot send pong to client, considering connection closed.");
return;
}
continue;
}
async_tungstenite::tungstenite::Message::Frame(_) => {
tokio_tungstenite::tungstenite::Message::Frame(_) => {
// noop
continue;
}
async_tungstenite::tungstenite::Message::Pong(_) => {
tokio_tungstenite::tungstenite::Message::Pong(_) => {
pong_count += 1;
continue;
}
async_tungstenite::tungstenite::Message::Binary(_) => {
tokio_tungstenite::tungstenite::Message::Binary(_) => {
error!("Don't know how to handle binary message types!");
}
}
Expand All @@ -195,7 +194,7 @@ async fn run_connection_loop<S>(
}
}

/// Websocket connector for ButtplugClients, using [async_tungstenite]
/// Websocket connector for ButtplugClients, using [tokio_tungstenite]
pub struct ButtplugWebsocketServerTransport {
port: u16,
listen_on_all_interfaces: bool,
Expand Down Expand Up @@ -232,8 +231,7 @@ impl ButtplugConnectorTransport for ButtplugWebsocketServerTransport {
debug!("Websocket: Listening on: {}", addr);
if let Ok((stream, _)) = listener.accept().await {
info!("Websocket: Got connection");
let ws_fut = async_tungstenite::tokio::accept_async(stream);
let ws_stream = ws_fut.await.map_err(|err| {
let ws_stream = tokio_tungstenite::accept_async(stream).await.map_err(|err| {
error!("Websocket server accept error: {:?}", err);
ButtplugConnectorError::TransportSpecificError(
ButtplugConnectorTransportSpecificError::TungsteniteError(err),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl WebsocketServerDeviceCommunicationManager {
return;
};
info!("Got connection");
let ws_fut = async_tungstenite::tokio::accept_async(stream);
let ws_fut = tokio_tungstenite::accept_async(stream);
let mut ws_stream = match ws_fut.await {
Ok(ws_stream) => ws_stream,
Err(err) => {
Expand All @@ -130,7 +130,7 @@ impl WebsocketServerDeviceCommunicationManager {
let sender_clone = sender.clone();
tokio::spawn(async move {
// TODO Implement a receive timeout here so we don't wait forever
if let Some(Ok(async_tungstenite::tungstenite::Message::Text(info_message))) =
if let Some(Ok(tokio_tungstenite::tungstenite::Message::Text(info_message))) =
ws_stream.next().await
{
let info_packet: WebsocketServerDeviceCommManagerInitInfo =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ use crate::{
use async_trait::async_trait;
use futures::{
future::{self, BoxFuture},
AsyncRead,
AsyncWrite,
FutureExt,
SinkExt,
StreamExt,
Expand All @@ -49,19 +47,17 @@ use tokio::{
mpsc::{channel, Receiver, Sender},
Mutex,
},
time::sleep,
time::sleep, net::TcpStream,
};
use tokio_util::sync::CancellationToken;

async fn run_connection_loop<S>(
async fn run_connection_loop(
address: &str,
event_sender: broadcast::Sender<HardwareEvent>,
ws_stream: async_tungstenite::WebSocketStream<S>,
ws_stream: tokio_tungstenite::WebSocketStream<TcpStream>,
mut request_receiver: Receiver<Vec<u8>>,
response_sender: broadcast::Sender<Vec<u8>>,
) where
S: AsyncRead + AsyncWrite + Unpin,
{
) {
info!("Starting websocket server connection event loop.");

let (mut websocket_server_sender, mut websocket_server_receiver) = ws_stream.split();
Expand All @@ -78,7 +74,7 @@ async fn run_connection_loop<S>(
}
pong_count = 0;
if websocket_server_sender
.send(async_tungstenite::tungstenite::Message::Ping(vec!(0)))
.send(tokio_tungstenite::tungstenite::Message::Ping(vec!(0)))
.await
.is_err() {
error!("Cannot send ping to client, considering connection closed.");
Expand All @@ -88,7 +84,7 @@ async fn run_connection_loop<S>(
ws_msg = request_receiver.recv().fuse() => {
if let Some(binary_msg) = ws_msg {
if websocket_server_sender
.send(async_tungstenite::tungstenite::Message::Binary(binary_msg))
.send(tokio_tungstenite::tungstenite::Message::Binary(binary_msg))
.await
.is_err() {
error!("Cannot send binary value to client, considering connection closed.");
Expand All @@ -104,31 +100,31 @@ async fn run_connection_loop<S>(
match ws_data {
Ok(msg) => {
match msg {
async_tungstenite::tungstenite::Message::Text(text_msg) => {
tokio_tungstenite::tungstenite::Message::Text(text_msg) => {
// If someone accidentally packs text, politely turn it into binary for them.
let _ = response_sender.send(text_msg.as_bytes().to_vec());
}
async_tungstenite::tungstenite::Message::Binary(binary_msg) => {
tokio_tungstenite::tungstenite::Message::Binary(binary_msg) => {
// If no one is listening, ignore output.
let _ = response_sender.send(binary_msg);
}
async_tungstenite::tungstenite::Message::Close(_) => {
tokio_tungstenite::tungstenite::Message::Close(_) => {
// Drop the error if no one receives the message, we're breaking anyways.
let _ = event_sender
.send(HardwareEvent::Disconnected(
address.to_owned()
));
break;
}
async_tungstenite::tungstenite::Message::Ping(_) => {
tokio_tungstenite::tungstenite::Message::Ping(_) => {
// noop
continue;
}
async_tungstenite::tungstenite::Message::Frame(_) => {
tokio_tungstenite::tungstenite::Message::Frame(_) => {
// noop
continue;
}
async_tungstenite::tungstenite::Message::Pong(_) => {
tokio_tungstenite::tungstenite::Message::Pong(_) => {
pong_count += 1;
continue;
}
Expand Down Expand Up @@ -170,12 +166,10 @@ pub struct WebsocketServerHardwareConnector {
}

impl WebsocketServerHardwareConnector {
pub fn new<S>(
pub fn new(
info: WebsocketServerDeviceCommManagerInitInfo,
ws_stream: async_tungstenite::WebSocketStream<S>,
ws_stream: tokio_tungstenite::WebSocketStream<TcpStream>,
) -> Self
where
S: 'static + AsyncRead + AsyncWrite + Unpin + Send,
{
let (outgoing_sender, outgoing_receiver) = channel(256);
let (incoming_broadcaster, _) = broadcast::channel(256);
Expand Down

0 comments on commit 03bff1f

Please sign in to comment.