Skip to content

Commit

Permalink
upgrade tokio libraries
Browse files Browse the repository at this point in the history
  • Loading branch information
bluejekyll committed Mar 26, 2017
1 parent eecc034 commit b17e49c
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 68 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Expand Up @@ -3,8 +3,11 @@ All notable changes to this project will be documented in this file.
This project adheres to [Semantic Versioning](http://semver.org/).

## 0.10.1 (unreleased)
### Added
- Added `From<IpAddr>` for Name (reverse DNS) #105
### Changed
- Fixed TLS documentation, and add more elsewhere; fixes #102
- Upgraded tokio-core and moved to tokio-io

## 0.10.0
### Changed
Expand Down
150 changes: 95 additions & 55 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions client/Cargo.toml
Expand Up @@ -66,6 +66,7 @@ ring = { version = "^0.6", optional = true }
rustc-serialize = "^0.3.18"
time = "^0.1"
tokio-core = "^0.1"
tokio-io = "^0.1"
tokio-tls = { version = "^0.1", optional = true }
untrusted = "^0.3"

Expand Down
16 changes: 8 additions & 8 deletions client/src/client/client_future.rs
Expand Up @@ -133,8 +133,8 @@ impl<S: Stream<Item = Vec<u8>, Error = io::Error> + 'static> ClientFuture<S> {
let mut canceled = HashSet::new();
for (&id, &mut (ref mut req, ref mut timeout)) in self.active_requests.iter_mut() {
if let Ok(Async::Ready(())) = req.poll_cancel() {
canceled.insert(id);
}
canceled.insert(id);
}

// check for timeouts...
match timeout.poll() {
Expand All @@ -158,7 +158,7 @@ impl<S: Stream<Item = Vec<u8>, Error = io::Error> + 'static> ClientFuture<S> {
// then the otherside isn't really paying attention anyway)

// complete the request, it's failed...
req.complete(Err(ClientErrorKind::Timeout.into()));
req.send(Err(ClientErrorKind::Timeout.into())).expect("error notifying wait, possible future leak");
}
}
}
Expand Down Expand Up @@ -224,7 +224,7 @@ impl<S: Stream<Item = Vec<u8>, Error = io::Error> + 'static> Future for ClientFu
// TODO: it's too bad this happens here...
if let Err(e) = message.sign(signer, UTC::now().timestamp() as u32) {
warn!("could not sign message: {}", e);
complete.complete(Err(e.into()));
complete.send(Err(e.into())).expect("error notifying wait, possible future leak");
continue; // to the next message...
}
}
Expand All @@ -235,7 +235,7 @@ impl<S: Stream<Item = Vec<u8>, Error = io::Error> + 'static> Future for ClientFu
Ok(timeout) => timeout,
Err(e) => {
warn!("could not create timer: {}", e);
complete.complete(Err(e.into()));
complete.send(Err(e.into())).expect("error notifying wait, possible future leak");
continue; // to the next message...
}
};
Expand All @@ -252,7 +252,7 @@ impl<S: Stream<Item = Vec<u8>, Error = io::Error> + 'static> Future for ClientFu
Err(e) => {
debug!("error message id: {} error: {}", query_id, e);
// complete with the error, don't add to the map of active requests
complete.complete(Err(e.into()));
complete.send(Err(e.into())).expect("error notifying wait, possible future leak");
}
}
}
Expand All @@ -277,7 +277,7 @@ impl<S: Stream<Item = Vec<u8>, Error = io::Error> + 'static> Future for ClientFu
match Message::from_vec(&buffer) {
Ok(message) => {
match self.active_requests.remove(&message.id()) {
Some((complete, _)) => complete.complete(Ok(message)),
Some((complete, _)) => complete.send(Ok(message)).expect("error notifying wait, possible future leak"),
None => debug!("unexpected request_id: {}", message.id()),
}
}
Expand Down Expand Up @@ -334,7 +334,7 @@ impl ClientHandle for BasicClientHandle {
Ok(()) => receiver,
Err(e) => {
let (complete, receiver) = oneshot::channel();
complete.complete(Err(e.into()));
complete.send(Err(e.into())).expect("error notifying wait, possible future leak");
receiver
}
};
Expand Down
1 change: 1 addition & 0 deletions client/src/lib.rs
Expand Up @@ -48,6 +48,7 @@ extern crate rustc_serialize;
#[cfg(all(target_os = "macos", feature = "security-framework"))]
extern crate security_framework;
extern crate time;
extern crate tokio_io;
#[macro_use]
extern crate tokio_core;
#[cfg(feature = "tokio-tls")]
Expand Down
4 changes: 2 additions & 2 deletions client/src/tcp/tcp_client_stream.rs
Expand Up @@ -9,7 +9,7 @@ use std::net::SocketAddr;
use std::io;

use futures::{Async, Future, Poll, Stream};
use tokio_core::io::Io;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_core::net::TcpStream as TokioTcpStream;
use tokio_core::reactor::Handle;

Expand Down Expand Up @@ -55,7 +55,7 @@ impl<S> TcpClientStream<S> {
}
}

impl<S: Io> Stream for TcpClientStream<S> {
impl<S: AsyncRead + AsyncWrite> Stream for TcpClientStream<S> {
type Item = Vec<u8>;
type Error = io::Error;

Expand Down
6 changes: 3 additions & 3 deletions client/src/tcp/tcp_stream.rs
Expand Up @@ -12,7 +12,7 @@ use std::io;
use futures::{Async, Future, Poll};
use futures::stream::{Fuse, Peekable, Stream};
use futures::sync::mpsc::{unbounded, UnboundedReceiver};
use tokio_core::io::Io;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_core::net::TcpStream as TokioTcpStream;
use tokio_core::reactor::Handle;

Expand Down Expand Up @@ -82,7 +82,7 @@ impl TcpStream<TokioTcpStream> {
}
}

impl<S: Io> TcpStream<S> {
impl<S: AsyncRead + AsyncWrite> TcpStream<S> {
/// Initializes a TcpStream with an existing tokio_core::net::TcpStream.
///
/// This is intended for use with a TcpListener and Incoming.
Expand Down Expand Up @@ -117,7 +117,7 @@ impl<S: Io> TcpStream<S> {
}
}

impl<S: Io> Stream for TcpStream<S> {
impl<S: AsyncRead + AsyncWrite> Stream for TcpStream<S> {
type Item = (Vec<u8>, SocketAddr);
type Error = io::Error;

Expand Down

0 comments on commit b17e49c

Please sign in to comment.