Skip to content

Commit

Permalink
Update for latest timer version (#307)
Browse files Browse the repository at this point in the history
* update for latest timer version
* exclude benches from ci
  • Loading branch information
KodrAus committed Apr 29, 2018
1 parent 845302d commit 3a261cc
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 15 deletions.
2 changes: 0 additions & 2 deletions ci/nightly.sh
Expand Up @@ -26,8 +26,6 @@ if [ "$KIND" == "build" ]; then
git push -q upstream HEAD:refs/heads/gh-pages --force
fi
elif [ "$KIND" == "bench" ]; then
cargo bench --verbose --all

cd benches
cargo build --all
elif [ "$KIND" == "integration" ]; then
Expand Down
4 changes: 2 additions & 2 deletions tests/run/Cargo.toml
Expand Up @@ -13,6 +13,6 @@ serde_derive = "~1"
serde_json = "~1"
futures = "~0.1.16"
tokio-core = "~0.1.9"
tokio-timer = "*"
tokio-timer = "~0.2.1"
futures-cpupool = "~0.1.6"
term-painter = "*"
term-painter = "~0.2.4"
41 changes: 30 additions & 11 deletions tests/run/src/wait_until_ready.rs
@@ -1,6 +1,7 @@
use std::error::Error as StdError;
use std::time::Duration;
use tokio_timer::Timer;
use std::time::{Duration, Instant};
use std::fmt;
use tokio_timer::{Deadline, Interval};
use futures::{stream, Future, Stream};
use elastic::prelude::*;
use elastic::Error;
Expand All @@ -11,12 +12,12 @@ struct Ping {
}

impl Ping {
fn is_not_ready(&self) -> Box<Future<Item = bool, Error = Box<StdError>>> {
fn is_ready(&self) -> Box<Future<Item = bool, Error = Box<StdError>>> {
let request = self.client.ping().send().map_err(|e| e.into());

let check = request.then(|res: Result<PingResponse, Error>| match res {
Ok(_) => Ok(false),
_ => Ok(true),
Ok(_) => Ok(true),
_ => Ok(false),
});

Box::new(check)
Expand All @@ -29,19 +30,37 @@ pub fn call(client: AsyncClient, timeout_secs: u64) -> Box<Future<Item = (), Err
timeout_secs
);

let timer = Timer::default();
let stream = stream::repeat(Ping { client: client });

let wait = timer.interval(Duration::from_secs(10)).from_err();
let wait = Interval::new(Instant::now(), Duration::from_secs(10)).from_err();

let poll = stream
.take_while(|ping| ping.is_not_ready())
.take_while(|ping| ping.is_ready().map(|ready| !ready))
.zip(wait)
.collect();

let poll_or_timeout = timer
.timeout(poll, Duration::from_secs(timeout_secs))
.map(|_| ());
let poll_or_timeout = Deadline::new(poll, Instant::now() + Duration::from_secs(timeout_secs))
.map(|_| ())
.map_err(|e| if let Some(e) = e.into_inner() { e } else { Box::new(TimeoutError) });

Box::new(poll_or_timeout)
}

#[derive(Debug)]
struct TimeoutError;

impl StdError for TimeoutError {
fn description(&self) -> &str {
"timeout"
}

fn cause(&self) -> Option<&StdError> {
None
}
}

impl fmt::Display for TimeoutError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "timeout")
}
}

0 comments on commit 3a261cc

Please sign in to comment.