Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
/target
/Cargo.lock
/.vscode
/.cargo
.idea

/.direnv
.envrc
/.envrc

# Editor directories
/.vscode
/.zed
/.idea
8 changes: 4 additions & 4 deletions compio-io/src/compat/sync_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,20 @@ pub struct SyncStream<S> {
}

impl<S> SyncStream<S> {
// 64MB max
// 64MiB max
const DEFAULT_MAX_BUFFER: usize = 64 * 1024 * 1024;

/// Creates a new `SyncStream` with default buffer sizes.
///
/// - Base capacity: 8KB
/// - Max buffer size: 64MB
/// - Base capacity: 8KiB
/// - Max buffer size: 64MiB
pub fn new(stream: S) -> Self {
Self::with_capacity(DEFAULT_BUF_SIZE, stream)
}

/// Creates a new `SyncStream` with a custom base capacity.
///
/// The maximum buffer size defaults to 64MB.
/// The maximum buffer size defaults to 64MiB.
pub fn with_capacity(base_capacity: usize, stream: S) -> Self {
Self::with_limits(base_capacity, Self::DEFAULT_MAX_BUFFER, stream)
}
Expand Down
2 changes: 1 addition & 1 deletion compio-io/src/read/buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub struct BufReader<R> {

impl<R> BufReader<R> {
/// Creates a new `BufReader` with a default buffer capacity. The default is
/// currently 8 KB, but may change in the future.
/// currently 8 KiB, but may change in the future.
pub fn new(reader: R) -> Self {
Self::with_capacity(DEFAULT_BUF_SIZE, reader)
}
Expand Down
2 changes: 1 addition & 1 deletion compio-io/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub use split::Splittable;
///
/// This is an asynchronous version of [`std::io::copy`][std].
///
/// A heap-allocated copy buffer with 8 KB is created to take data from the
/// A heap-allocated copy buffer with 8 KiB is created to take data from the
/// reader to the writer.
pub async fn copy<R: AsyncRead, W: AsyncWrite>(reader: &mut R, writer: &mut W) -> IoResult<u64> {
let mut buf = Vec::with_capacity(DEFAULT_BUF_SIZE);
Expand Down
2 changes: 1 addition & 1 deletion compio-io/src/write/buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct BufWriter<W> {

impl<W> BufWriter<W> {
/// Creates a new `BufWriter` with a default buffer capacity. The default is
/// currently 8 KB, but may change in the future.
/// currently 8 KiB, but may change in the future.
pub fn new(writer: W) -> Self {
Self::with_capacity(DEFAULT_BUF_SIZE, writer)
}
Expand Down
12 changes: 3 additions & 9 deletions compio-tls/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl<S: AsyncRead + AsyncWrite + 'static> AsyncRead for TlsStream<S> {
slice.fill(MaybeUninit::new(0));
// SAFETY: The memory has been initialized
let slice =
unsafe { std::slice::from_raw_parts_mut(slice.as_mut_ptr().cast(), slice.len()) };
unsafe { std::slice::from_raw_parts_mut::<u8>(slice.as_mut_ptr().cast(), slice.len()) };
match &mut self.0 {
#[cfg(feature = "native-tls")]
TlsStreamInner::NativeTls(s) => loop {
Expand Down Expand Up @@ -115,10 +115,7 @@ impl<S: AsyncRead + AsyncWrite + 'static> AsyncRead for TlsStream<S> {
BufResult(res, buf)
}
#[cfg(not(any(feature = "native-tls", feature = "rustls")))]
TlsStreamInner::None(f, ..) => {
let _slice: &mut [u8] = slice;
match *f {}
}
TlsStreamInner::None(f, ..) => match *f {},
}
}
}
Expand Down Expand Up @@ -159,10 +156,7 @@ impl<S: AsyncRead + AsyncWrite + 'static> AsyncWrite for TlsStream<S> {
BufResult(res, buf)
}
#[cfg(not(any(feature = "native-tls", feature = "rustls")))]
TlsStreamInner::None(f, ..) => {
let _slice: &[u8] = slice;
match *f {}
}
TlsStreamInner::None(f, ..) => match *f {},
}
}

Expand Down
2 changes: 1 addition & 1 deletion compio-ws/examples/client_tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let connector = None;

let (mut websocket, _response) =
connect_async_tls_with_config("wss://127.0.0.1:9002", None, false, connector).await?;
connect_async_tls_with_config("wss://127.0.0.1:9002", None, connector).await?;

println!("Successfully connected to WebSocket TLS server!");
println!();
Expand Down
156 changes: 136 additions & 20 deletions compio-ws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,121 @@ mod tls;
pub use tls::*;
pub use tungstenite;

/// Configuration for compio-ws.
///
/// ## API Interface
///
/// `_with_config` functions in this crate accept `impl Into<Config>`, so
/// following are all valid:
/// - [`Config`]
/// - [`WebSocketConfig`] (use custom WebSocket config with default remaining
/// settings)
/// - [`None`] (use default value)
pub struct Config {
/// WebSocket configuration from tungstenite.
websocket: Option<WebSocketConfig>,

/// Base buffer size
buffer_size_base: usize,

/// Maximum buffer size
buffer_size_limit: usize,

/// Disable Nagle's algorithm. This only affects
/// [`connect_async_with_config()`] and [`connect_async_tls_with_config()`].
disable_nagle: bool,
}

impl Config {
// 128 KiB, see <https://github.com/compio-rs/compio/pull/532>.
const DEFAULT_BUF_SIZE: usize = 128 * 1024;
// 64 MiB, the same as [`SyncStream`].
const DEFAULT_MAX_BUFFER: usize = 64 * 1024 * 1024;

/// Creates a new `Config` with default settings.
pub fn new() -> Self {
Self {
websocket: None,
buffer_size_base: Self::DEFAULT_BUF_SIZE,
buffer_size_limit: Self::DEFAULT_MAX_BUFFER,
disable_nagle: false,
}
}

/// Get the WebSocket configuration.
pub fn websocket_config(&self) -> Option<&WebSocketConfig> {
self.websocket.as_ref()
}

/// Get the base buffer size.
pub fn buffer_size_base(&self) -> usize {
self.buffer_size_base
}

/// Get the maximum buffer size.
pub fn buffer_size_limit(&self) -> usize {
self.buffer_size_limit
}

/// Set custom base buffer size.
///
/// Default to 128 KiB.
pub fn with_buffer_size_base(mut self, size: usize) -> Self {
self.buffer_size_base = size;
self
}

/// Set custom maximum buffer size.
///
/// Default to 64 MiB.
pub fn with_buffer_size_limit(mut self, size: usize) -> Self {
self.buffer_size_limit = size;
self
}

/// Set custom buffer sizes.
///
/// Default to 128 KiB for base and 64 MiB for limit.
pub fn with_buffer_sizes(mut self, base: usize, limit: usize) -> Self {
self.buffer_size_base = base;
self.buffer_size_limit = limit;
self
}

/// Disable Nagle's algorithm, i.e. `set_nodelay(true)`.
///
/// Default to `false`. If you don't know what the Nagle's algorithm is,
/// better leave it to `false`.
pub fn disable_nagle(mut self, disable: bool) -> Self {
self.disable_nagle = disable;
self
}
}

impl Default for Config {
fn default() -> Self {
Self::new()
}
}

impl From<WebSocketConfig> for Config {
fn from(config: WebSocketConfig) -> Self {
Self {
websocket: Some(config),
..Default::default()
}
}
}

impl From<Option<WebSocketConfig>> for Config {
fn from(config: Option<WebSocketConfig>) -> Self {
Self {
websocket: config,
..Default::default()
}
}
}

/// A WebSocket stream that works with compio.
#[derive(Debug)]
pub struct WebSocketStream<S> {
Expand Down Expand Up @@ -135,11 +250,10 @@ impl<S> IntoInner for WebSocketStream<S> {

/// Accepts a new WebSocket connection with the provided stream.
///
/// This function will internally call `server::accept` to create a
/// handshake representation and returns a future representing the
/// resolution of the WebSocket handshake. The returned future will resolve
/// to either `WebSocketStream<S>` or `Error` depending if it's successful
/// or not.
/// This function will internally create a handshake representation and returns
/// a future representing the resolution of the WebSocket handshake. The
/// returned future will resolve to either [`WebSocketStream<S>`] or [`WsError`]
/// depending on if it's successful or not.
///
/// This is typically used after a socket has been accepted from a
/// `TcpListener`. That socket is then passed to this function to perform
Expand All @@ -151,11 +265,10 @@ where
accept_hdr_async(stream, NoCallback).await
}

/// The same as `accept_async()` but the one can specify a websocket
/// configuration. Please refer to `accept_async()` for more details.
/// Similar to [`accept_async()`] but user can specify a [`Config`].
pub async fn accept_async_with_config<S>(
stream: S,
config: Option<WebSocketConfig>,
config: impl Into<Config>,
) -> Result<WebSocketStream<S>, WsError>
where
S: AsyncRead + AsyncWrite,
Expand All @@ -164,7 +277,7 @@ where
}
/// Accepts a new WebSocket connection with the provided stream.
///
/// This function does the same as `accept_async()` but accepts an extra
/// This function does the same as [`accept_async()`] but accepts an extra
/// callback for header processing. The callback receives headers of the
/// incoming requests and is able to add extra headers to the reply.
pub async fn accept_hdr_async<S, C>(stream: S, callback: C) -> Result<WebSocketStream<S>, WsError>
Expand All @@ -175,19 +288,21 @@ where
accept_hdr_with_config_async(stream, callback, None).await
}

/// The same as `accept_hdr_async()` but the one can specify a websocket
/// configuration. Please refer to `accept_hdr_async()` for more details.
/// Similar to [`accept_hdr_async()`] but user can specify a [`Config`].
pub async fn accept_hdr_with_config_async<S, C>(
stream: S,
callback: C,
config: Option<WebSocketConfig>,
config: impl Into<Config>,
) -> Result<WebSocketStream<S>, WsError>
where
S: AsyncRead + AsyncWrite,
C: Callback,
{
let sync_stream = SyncStream::with_capacity(128 * 1024, stream);
let mut handshake_result = tungstenite::accept_hdr_with_config(sync_stream, callback, config);
let config = config.into();
let sync_stream =
SyncStream::with_limits(config.buffer_size_base, config.buffer_size_limit, stream);
let mut handshake_result =
tungstenite::accept_hdr_with_config(sync_stream, callback, config.websocket);

loop {
match handshake_result {
Expand Down Expand Up @@ -223,7 +338,7 @@ where
///
/// Internally, this creates a handshake representation and returns
/// a future representing the resolution of the WebSocket handshake. The
/// returned future will resolve to either `WebSocketStream<S>` or `Error`
/// returned future will resolve to either [`WebSocketStream<S>`] or [`WsError`]
/// depending on whether the handshake is successful.
///
/// This is typically used for clients who have already established, for
Expand All @@ -239,20 +354,21 @@ where
client_async_with_config(request, stream, None).await
}

/// The same as `client_async()` but the one can specify a websocket
/// configuration. Please refer to `client_async()` for more details.
/// Similar to [`client_async()`] but user can specify a [`Config`].
pub async fn client_async_with_config<R, S>(
request: R,
stream: S,
config: Option<WebSocketConfig>,
config: impl Into<Config>,
) -> Result<(WebSocketStream<S>, tungstenite::handshake::client::Response), WsError>
where
R: IntoClientRequest,
S: AsyncRead + AsyncWrite,
{
let sync_stream = SyncStream::with_capacity(128 * 1024, stream);
let config = config.into();
let sync_stream =
SyncStream::with_limits(config.buffer_size_base, config.buffer_size_limit, stream);
let mut handshake_result =
tungstenite::client::client_with_config(request, sync_stream, config);
tungstenite::client::client_with_config(request, sync_stream, config.websocket);

loop {
match handshake_result {
Expand Down
Loading