Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Replace tokio-core with tokio #317

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benches/elastic_async/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ authors = ["Ashley Mannix <ashleymannix@live.com.au>"]
[dependencies]
elastic = { version = "*", path = "../../src/elastic", features = ["nightly"] }
elastic_derive = { version = "*", path = "../../src/elastic_derive" }
tokio_core = "*"
tokio = "*"
futures = "~0.1.16"
serde = "*"
serde_derive = "*"
Expand Down
6 changes: 3 additions & 3 deletions benches/elastic_async/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ extern crate elastic_derive;
extern crate futures;
#[macro_use]
extern crate serde_derive;
extern crate tokio_core;
extern crate tokio;

extern crate elastic;
extern crate serde;
Expand Down Expand Up @@ -48,7 +48,7 @@ fn main() {

let client = AsyncClientBuilder::new()
.params(|p| p.header(http::header::Connection::keep_alive()))
.build(&core.handle())
.build()
.unwrap();

let results_future = measure::run_future(runs, || {
Expand All @@ -59,7 +59,7 @@ fn main() {
.send()
});

results = core.run(results_future).unwrap();
results = block_on_all;(results_future).unwrap();

println!("{}", results);
}
2 changes: 1 addition & 1 deletion benches/elastic_bulk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ gzip = []

[dependencies]
elastic = { version = "*", path = "../../src/elastic" }
reqwest = { version = "~0.8.0", features = ["unstable"] }
reqwest = { version = "~0.9.0" }
string_cache = { version = "*", optional = true }
inlinable_string = { version = "*", features = ["serde"], optional = true }
lazy_static = { version = "*" }
Expand Down
6 changes: 3 additions & 3 deletions src/elastic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ travis-ci = { repository = "elastic-rs/elastic" }
appveyor = { repository = "elastic-rs/elastic" }

[dependencies]
error-chain = "0.11.0"
error-chain = "0.12.0"
log = "~0.3.8"
uuid = { version = "0.6", features = [ "v4" ] }
serde = "~1"
serde_json = "~1"
serde_derive = "~1"
reqwest = { version = "~0.8.0", features = ["unstable"] }
reqwest = { version = "~0.9" }
futures = "~0.1.16"
tokio-core = "~0.1.9"
tokio = "~0.1"
futures-cpupool = "~0.1.6"

elastic_reqwest = { version = "~0.20.7", path = "../reqwest" }
Expand Down
9 changes: 4 additions & 5 deletions src/elastic/examples/basic_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,24 @@ extern crate futures;
extern crate futures_cpupool;
#[macro_use]
extern crate serde_json;
extern crate tokio_core;
extern crate tokio;

use std::error::Error;
use futures::Future;
use futures_cpupool::CpuPool;
use tokio_core::reactor::Core;
use serde_json::Value;
use elastic::prelude::*;
use tokio::runtime::current_thread::block_on_all;

fn run() -> Result<(), Box<Error>> {
let mut core = Core::new()?;
let pool = CpuPool::new(4);

// A reqwest HTTP client and default parameters.
// The `params` includes the base node url (http://localhost:9200).
// We also specify a cpu pool for serialising and deserialising data on.
let client = AsyncClientBuilder::new()
.serde_pool(pool)
.build(&core.handle())?;
.build()?;

// Send the request and process the response.
let res_future = client
Expand All @@ -53,7 +52,7 @@ fn run() -> Result<(), Box<Error>> {
Ok(())
});

core.run(search_future)?;
block_on_all(search_future)?;

Ok(())
}
Expand Down
10 changes: 4 additions & 6 deletions src/elastic/examples/bulk_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,16 @@
extern crate elastic;
extern crate env_logger;
extern crate futures;
extern crate tokio_core;
extern crate tokio;

use std::error::Error;
use futures::Future;
use tokio_core::reactor::Core;
use elastic::prelude::*;
use tokio::runtime::current_thread::block_on_all;

fn run() -> Result<(), Box<Error>> {
let mut core = Core::new()?;

// A HTTP client and request parameters
let client = AsyncClientBuilder::new().build(&core.handle())?;
let client = AsyncClientBuilder::new().build()?;

// Execute a bulk request
let res_future = client
Expand All @@ -40,7 +38,7 @@ fn run() -> Result<(), Box<Error>> {
Ok(())
});

core.run(bulk_future)?;
block_on_all(bulk_future)?;

Ok(())
}
Expand Down
10 changes: 4 additions & 6 deletions src/elastic/examples/typed_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ extern crate serde;
extern crate serde_derive;
#[macro_use]
extern crate serde_json;
extern crate tokio_core;

extern crate tokio;
extern crate elastic;

use std::error::Error as StdError;
use futures::{Future, IntoFuture};
use elastic::prelude::*;
use elastic::error::{ApiError, Error};
use tokio::runtime::current_thread::block_on_all;

#[derive(Debug, Serialize, Deserialize, ElasticType)]
struct MyType {
Expand All @@ -36,10 +36,8 @@ struct MyType {
}

fn run() -> Result<(), Box<StdError>> {
let mut core = tokio_core::reactor::Core::new()?;

// A HTTP client and request parameters
let client = AsyncClientBuilder::new().build(&core.handle())?;
let client = AsyncClientBuilder::new().build()?;

// Create a document to index
let doc = MyType {
Expand All @@ -60,7 +58,7 @@ fn run() -> Result<(), Box<StdError>> {
Ok(())
});

core.run(res_future)?;
block_on_all(res_future)?;

Ok(())
}
Expand Down
55 changes: 11 additions & 44 deletions src/elastic/src/client/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ use uuid::Uuid;
use futures::{Future, Poll};
use futures::future::{FutureResult, IntoFuture, Either};
use futures_cpupool::{CpuPool, CpuFuture};
use tokio_core::reactor::Handle;
use elastic_reqwest::{AsyncBody, AsyncElasticClient};
use reqwest::Error as ReqwestError;
use reqwest::unstable::async::{Client as AsyncHttpClient, ClientBuilder as AsyncHttpClientBuilder};
use reqwest::async::{Client as AsyncHttpClient, ClientBuilder as AsyncHttpClientBuilder};

use error::{self, Error};
use client::requests::HttpRequest;
Expand All @@ -24,19 +23,18 @@ Create an asynchronous `Client` and send a ping request:

```no_run
# extern crate futures;
# extern crate tokio_core;
# extern crate tokio;
# extern crate elastic;
# use futures::Future;
# use tokio_core::reactor::Core;
# use tokio::runtime::current_thread::block_on_all;
# use elastic::prelude::*;
# fn main() { run().unwrap() }
# fn run() -> Result<(), Box<::std::error::Error>> {
let mut core = Core::new()?;
let client = AsyncClientBuilder::new().build(&core.handle())?;
let client = AsyncClientBuilder::new().build()?;

let response_future = client.ping().send();

core.run(response_future)?;
block_on_all(response_future)?;
# Ok(())
# }
```
Expand Down Expand Up @@ -167,14 +165,6 @@ impl IntoAsyncHttpClient for AsyncHttpClient {
}
}

impl<'a> IntoAsyncHttpClient for &'a Handle {
type Error = ReqwestError;

fn into_async_http_client(self) -> Result<AsyncHttpClient, Self::Error> {
AsyncHttpClientBuilder::new().build(self)
}
}

impl Default for AsyncClientBuilder {
fn default() -> Self {
AsyncClientBuilder::new()
Expand Down Expand Up @@ -326,45 +316,22 @@ impl AsyncClientBuilder {
This will build an `AsyncClient` with a default underlying `AsyncHttpClient` using the handle.

```no_run
# extern crate tokio_core;
# extern crate elastic;
# use elastic::prelude::*;
# use tokio_core::reactor::Core;
# fn main() { run().unwrap() }
# fn run() -> Result<(), Box<::std::error::Error>> {
let mut core = Core::new()?;

let builder = AsyncClientBuilder::new().build(&core.handle());
# Ok(())
# }
```

Build with a given `AsyncHttpClient`.

```no_run
# extern crate tokio_core;
# extern crate reqwest;
# extern crate tokio;
# extern crate elastic;
# use tokio_core::reactor::Core;
# use reqwest::unstable::async::Client;
# use elastic::prelude::*;
# use tokio::runtime::current_thread::block_on_all;
# fn main() { run().unwrap() }
# fn run() -> Result<(), Box<::std::error::Error>> {
let mut core = Core::new()?;
let client = Client::new(&core.handle());

let builder = AsyncClientBuilder::new().build(client);
let builder = AsyncClientBuilder::new().build();
# Ok(())
# }
```

[AsyncClient]: type.AsyncClient.html
*/
pub fn build<TIntoHttp>(self, client: TIntoHttp) -> Result<AsyncClient, Error>
where
TIntoHttp: IntoAsyncHttpClient,
{
let http = client.into_async_http_client().map_err(error::build)?;

pub fn build(self) -> Result<AsyncClient, Error> {
let http = AsyncHttpClient::new();

Ok(AsyncClient {
sender: AsyncSender {
Expand Down
24 changes: 10 additions & 14 deletions src/elastic/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,12 @@ Use an [`AsyncClientBuilder`][AsyncClientBuilder] to configure an asynchronous c
The asynchronous client requires a handle to a `tokio::reactor::Core`:

```
# extern crate tokio_core;
# extern crate tokio;
# extern crate elastic;
# use elastic::prelude::*;
# fn main() { run().unwrap() }
# fn run() -> Result<(), Box<::std::error::Error>> {
# let core = tokio_core::reactor::Core::new()?;
let client = AsyncClientBuilder::new().build(&core.handle())?;
let client = AsyncClientBuilder::new().build()?;
# Ok(())
# }
```
Expand Down Expand Up @@ -363,7 +362,7 @@ Call [`AsyncResponseBuilder.into_response`][AsyncResponseBuilder.into_response]

```no_run
# extern crate futures;
# extern crate tokio_core;
# extern crate tokio;
# extern crate serde;
# extern crate serde_json;
# #[macro_use] extern crate serde_derive;
Expand All @@ -380,8 +379,7 @@ Call [`AsyncResponseBuilder.into_response`][AsyncResponseBuilder.into_response]
# pub title: String,
# pub timestamp: Date<DefaultDateMapping>
# }
# let core = tokio_core::reactor::Core::new()?;
# let client = AsyncClientBuilder::new().build(&core.handle())?;
# let client = AsyncClientBuilder::new().build()?;
# let req = PingRequest::new();
let future = client.request(req)
.send()
Expand All @@ -403,7 +401,7 @@ Alternatively, call [`AsyncResponseBuilder.into_raw`][AsyncResponseBuilder.into_

```no_run
# extern crate futures;
# extern crate tokio_core;
# extern crate tokio;
# extern crate serde;
# #[macro_use] extern crate serde_derive;
# #[macro_use] extern crate elastic_derive;
Expand All @@ -414,8 +412,7 @@ Alternatively, call [`AsyncResponseBuilder.into_raw`][AsyncResponseBuilder.into_
# use elastic::prelude::*;
# fn main() { run().unwrap() }
# fn run() -> Result<(), Box<::std::error::Error>> {
# let core = tokio_core::reactor::Core::new()?;
# let client = AsyncClientBuilder::new().build(&core.handle())?;
# let client = AsyncClientBuilder::new().build()?;
# let req = PingRequest::new();
let future = client.request(req)
.send()
Expand Down Expand Up @@ -575,21 +572,20 @@ Create an asynchronous `Client` and send a ping request:

```no_run
# extern crate futures;
# extern crate tokio_core;
# extern crate tokio;
# extern crate elastic;
# use futures::Future;
# use tokio_core::reactor::Core;
# use tokio::runtime::current_thread::block_on_all;
# use elastic::prelude::*;
# fn main() { run().unwrap() }
# fn run() -> Result<(), Box<::std::error::Error>> {
let mut core = Core::new()?;
let client = AsyncClientBuilder::new().build(&core.handle())?;
let client = AsyncClientBuilder::new().build()?;

let response_future = client.request(PingRequest::new())
.send()
.and_then(|res| res.into_response::<PingResponse>());

core.run(response_future)?;
block_on_all(response_future)?;
# Ok(())
# }
```
Expand Down
5 changes: 2 additions & 3 deletions src/elastic/src/client/requests/document_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl<TDocument> DeleteRequestBuilder<AsyncSender, TDocument> {

```no_run
# extern crate futures;
# extern crate tokio_core;
# extern crate tokio;
# extern crate serde;
# extern crate serde_json;
# #[macro_use] extern crate serde_derive;
Expand All @@ -207,8 +207,7 @@ impl<TDocument> DeleteRequestBuilder<AsyncSender, TDocument> {
# use elastic::prelude::*;
# fn main() { run().unwrap() }
# fn run() -> Result<(), Box<::std::error::Error>> {
# let core = tokio_core::reactor::Core::new()?;
# let client = AsyncClientBuilder::new().build(&core.handle())?;
# let client = AsyncClientBuilder::new().build()?;
let future = client.document_delete::<Value>(index("myindex"), id(1))
.ty("mytype")
.send();
Expand Down
5 changes: 2 additions & 3 deletions src/elastic/src/client/requests/document_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ where

```no_run
# extern crate futures;
# extern crate tokio_core;
# extern crate tokio;
# extern crate serde;
# extern crate serde_json;
# #[macro_use] extern crate serde_derive;
Expand All @@ -236,8 +236,7 @@ where
# use elastic::prelude::*;
# fn main() { run().unwrap() }
# fn run() -> Result<(), Box<::std::error::Error>> {
# let core = tokio_core::reactor::Core::new()?;
# let client = AsyncClientBuilder::new().build(&core.handle())?;
# let client = AsyncClientBuilder::new().build()?;
let future = client.document_get::<Value>(index("myindex"), id(1))
.ty("mytype")
.send();
Expand Down
Loading