Skip to content

Commit

Permalink
Use separate futures modules
Browse files Browse the repository at this point in the history
  • Loading branch information
afdw committed Aug 31, 2020
1 parent 30b0c51 commit 54445c9
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 15 deletions.
8 changes: 6 additions & 2 deletions Cargo.toml
Expand Up @@ -18,13 +18,17 @@ coveralls = { repository = "ossystems/compress-tools-rs" }
[dependencies]
derive_more = { version = "0.99", default-features = false, features = ["display", "from", "error"] }
async-trait = { version = "0.1.39", optional = true }
futures = { version = "0.3.5", optional = true }
futures-channel = { version = "0.3.5", features = ["sink"], optional = true }
futures-core = { version = "0.3.5", optional = true }
futures-io = { version = "0.3.5", optional = true }
futures-util = { version = "0.3.5", features = ["sink", "io"], optional = true }
futures-executor = { version = "0.3.5", optional = true }
blocking = { version = "0.6.1", optional = true }
tokio = { version = "0.2.22", features = ["blocking", "macros"], optional = true }
tokio-util = { version = "0.3.1", features = ["compat"], optional = true }

[features]
async_support = ["async-trait", "futures"]
async_support = ["async-trait", "futures-channel", "futures-core", "futures-io", "futures-util", "futures-executor"]
futures_support = ["async_support", "blocking"]
tokio_support = ["async_support", "tokio", "tokio-util"]

Expand Down
26 changes: 14 additions & 12 deletions src/async_support.rs
Expand Up @@ -7,16 +7,19 @@

use crate::{Ownership, Result, READER_BUFFER_SIZE};
use async_trait::async_trait;
use futures::{
channel::mpsc::{Receiver, Sender},
io::ErrorKind,
use futures_channel::mpsc::{channel, Receiver, Sender};
use futures_core::FusedStream;
use futures_executor::block_on;
use futures_io::{AsyncRead, AsyncWrite};
use futures_util::{
io::{AsyncReadExt, AsyncWriteExt},
join,
stream::FusedStream,
AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, SinkExt, StreamExt,
sink::SinkExt,
stream::StreamExt,
};
use std::{
future::Future,
io::{Read, Write},
io::{ErrorKind, Read, Write},
path::Path,
};

Expand All @@ -40,7 +43,7 @@ impl Read for AsyncReadWrapper {
return Ok(0);
}
assert_eq!(buf.len(), READER_BUFFER_SIZE);
Ok(match futures::executor::block_on(self.rx.next()) {
Ok(match block_on(self.rx.next()) {
Some(data) => {
buf.write_all(&data)?;
data.len()
Expand All @@ -56,7 +59,7 @@ fn make_async_read_wrapper_and_worker<R>(
where
R: AsyncRead + Unpin,
{
let (mut tx, rx) = futures::channel::mpsc::channel(0);
let (mut tx, rx) = channel(0);
(AsyncReadWrapper { rx }, async move {
loop {
let mut data = vec![0; READER_BUFFER_SIZE];
Expand All @@ -76,15 +79,14 @@ struct AsyncWriteWrapper {

impl Write for AsyncWriteWrapper {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
match futures::executor::block_on(self.tx.send(buf.to_owned())) {
match block_on(self.tx.send(buf.to_owned())) {
Ok(()) => Ok(buf.len()),
Err(err) => Err(std::io::Error::new(ErrorKind::Other, err)),
}
}

fn flush(&mut self) -> std::io::Result<()> {
futures::executor::block_on(self.tx.send(vec![]))
.map_err(|err| std::io::Error::new(ErrorKind::Other, err))
block_on(self.tx.send(vec![])).map_err(|err| std::io::Error::new(ErrorKind::Other, err))
}
}

Expand All @@ -94,7 +96,7 @@ fn make_async_write_wrapper_and_worker<W>(
where
W: AsyncWrite + Unpin,
{
let (tx, mut rx) = futures::channel::mpsc::channel(0);
let (tx, mut rx) = channel(0);
(AsyncWriteWrapper { tx }, async move {
while let Some(v) = rx.next().await {
if v.is_empty() {
Expand Down
2 changes: 1 addition & 1 deletion src/futures_support.rs
Expand Up @@ -2,7 +2,7 @@

use crate::{async_support, async_support::BlockingExecutor, Ownership, Result};
use async_trait::async_trait;
use futures::{AsyncRead, AsyncWrite};
use futures_io::{AsyncRead, AsyncWrite};
use std::path::Path;

struct FuturesBlockingExecutor {}
Expand Down

0 comments on commit 54445c9

Please sign in to comment.