Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consistent Builders #280

Merged
merged 5 commits into from
Nov 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benches/elastic/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fn main() {
let runs = measure::parse_runs_from_env();

let client = SyncClientBuilder::new()
.params(|p| p.header(http::header::Connection::keep_alive()))
.params_fluent(|p| p.header(http::header::Connection::keep_alive()))
.build()
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion benches/elastic_async/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ fn main() {
let runs = measure::parse_runs_from_env();

let client = AsyncClientBuilder::new()
.params(|p| p.header(http::header::Connection::keep_alive()))
.params_fluent(|p| p.header(http::header::Connection::keep_alive()))
.build(&core.handle())
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion benches/elastic_bulk/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ fn main() {

let client = SyncClientBuilder::new()
.http_client(http_client())
.params(|p| p.header(http::header::Connection::keep_alive()))
.params_fluent(|p| p.header(http::header::Connection::keep_alive()))
.build()
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion benches/elastic_raw/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ fn main() {
let runs = measure::parse_runs_from_env();

let client = SyncClientBuilder::new()
.params(|p| p.header(http::header::Connection::keep_alive()))
.params_fluent(|p| p.header(http::header::Connection::keep_alive()))
.build()
.unwrap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl PutBulkAccounts for Client {

let res = self.io
.request(req)
.params(|params| params.url_param("refresh", true))
.params_fluent(|params| params.url_param("refresh", true))
.send()?
.into_response::<BulkErrorsResponse>()?;

Expand Down
1 change: 1 addition & 0 deletions src/elastic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ reqwest = { version = "~0.8.0", features = ["unstable"] }
futures = "~0.1.16"
tokio-core = "~0.1.9"
futures-cpupool = "~0.1.6"
fluent_builder = "~0.5"

elastic_requests = { version = "~0.20.2", path = "../requests" }
elastic_responses = { version = "~0.20.2", path = "../responses" }
Expand Down
2 changes: 1 addition & 1 deletion src/elastic/examples/custom_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ fn run() -> Result<(), Box<Error>> {
// Send the request and process the response.
let res = client
.request(SearchRequest::new(query.to_string()))
.params(|q| q.url_param("filter_path", "hits.hits._source"))
.params_fluent(|q| q.url_param("filter_path", "hits.hits._source"))
.send()?
.into_response::<SearchResponse>()?;

Expand Down
3 changes: 0 additions & 3 deletions src/elastic/examples/load_balanced_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,11 @@
extern crate elastic;
extern crate env_logger;
extern crate futures;
#[macro_use]
extern crate serde_json;
extern crate tokio_core;

use std::error::Error;
use futures::Future;
use tokio_core::reactor::Core;
use serde_json::Value;
use elastic::prelude::*;

fn run() -> Result<(), Box<Error>> {
Expand Down
2 changes: 1 addition & 1 deletion src/elastic/examples/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ fn run() -> Result<(), Box<Error>> {
// A reqwest HTTP client and default parameters.
// The `params` includes the base node url (http://localhost:9200).
let client = SyncClientBuilder::new()
.params(|p| p.url_param("pretty", true))
.params_fluent(|p| p.url_param("pretty", true))
.build()?;

// A search request from the body.
Expand Down
2 changes: 1 addition & 1 deletion src/elastic/examples/typed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ fn put_index(client: &SyncClient) -> Result<(), Error> {
fn put_doc(client: &SyncClient, doc: MyType) -> Result<(), Error> {
client
.document_index(sample_index(), id(doc.id), doc)
.params(|p| p.url_param("refresh", true))
.params_fluent(|p| p.url_param("refresh", true))
.send()?;

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/elastic/examples/typed_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ fn put_index(client: AsyncClient) -> Box<Future<Item = (), Error = Error>> {
fn put_doc(client: AsyncClient, doc: MyType) -> Box<Future<Item = (), Error = Error>> {
let index_doc = client
.document_index(sample_index(), id(doc.id), doc)
.params(|p| p.url_param("refresh", true))
.params_fluent(|p| p.url_param("refresh", true))
.send()
.map(|_| ());

Expand Down
2 changes: 1 addition & 1 deletion src/elastic/examples/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ fn run() -> Result<(), Box<Error>> {
// Index the document
client
.document_index(sample_index(), id(doc_id), doc)
.params(|p| p.url_param("refresh", true))
.params_fluent(|p| p.url_param("refresh", true))
.send()?;

// Update the document using a script
Expand Down
2 changes: 1 addition & 1 deletion src/elastic/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ If the request was sent asynchronously, the response is returned as a `Future`.
let request_builder = client.request(req);

// Set additional url parameters
let request_builder = request_builder.params(|p| p
let request_builder = request_builder.params_fluent(|p| p
.url_param("pretty", true)
.url_param("refresh", true)
);
Expand Down
3 changes: 1 addition & 2 deletions src/elastic/src/client/requests/document_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,8 @@ where
{
let ty = TDocument::name().into();

RequestBuilder::new(
RequestBuilder::initial(
self.clone(),
None,
DeleteRequestInner {
index: index,
ty: ty,
Expand Down
3 changes: 1 addition & 2 deletions src/elastic/src/client/requests/document_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,8 @@ where
{
let ty = TDocument::name().into();

RequestBuilder::new(
RequestBuilder::initial(
self.clone(),
None,
GetRequestInner {
index: index,
ty: ty,
Expand Down
3 changes: 1 addition & 2 deletions src/elastic/src/client/requests/document_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,8 @@ where
{
let ty = TDocument::name().into();

RequestBuilder::new(
RequestBuilder::initial(
self.clone(),
None,
IndexRequestInner {
index: index,
ty: ty,
Expand Down
3 changes: 1 addition & 2 deletions src/elastic/src/client/requests/document_put_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,8 @@ where
{
let ty = TDocument::name().into();

RequestBuilder::new(
RequestBuilder::initial(
self.clone(),
None,
PutMappingRequestInner {
index: index,
ty: ty,
Expand Down
3 changes: 1 addition & 2 deletions src/elastic/src/client/requests/document_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,8 @@ where
{
let ty = TDocument::name().into();

RequestBuilder::new(
RequestBuilder::initial(
self.clone(),
None,
UpdateRequestInner {
index: index,
ty: ty,
Expand Down
2 changes: 1 addition & 1 deletion src/elastic/src/client/requests/index_close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ where
[send-async]: requests/index_close/type.IndexCloseRequestBuilder.html#send-asynchronously
*/
pub fn index_close(&self, index: Index<'static>) -> IndexCloseRequestBuilder<TSender> {
RequestBuilder::new(self.clone(), None, IndexCloseRequestInner { index: index })
RequestBuilder::initial(self.clone(), IndexCloseRequestInner { index: index })
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/elastic/src/client/requests/index_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,8 @@ where
[documents-mod]: ../types/document/index.html
*/
pub fn index_create(&self, index: Index<'static>) -> IndexCreateRequestBuilder<TSender, DefaultBody> {
RequestBuilder::new(
RequestBuilder::initial(
self.clone(),
None,
IndexCreateRequestInner {
index: index,
body: empty_body(),
Expand Down
2 changes: 1 addition & 1 deletion src/elastic/src/client/requests/index_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ where
[send-async]: requests/index_delete/type.IndexDeleteRequestBuilder.html#send-asynchronously
*/
pub fn index_delete(&self, index: Index<'static>) -> IndexDeleteRequestBuilder<TSender> {
RequestBuilder::new(self.clone(), None, IndexDeleteRequestInner { index: index })
RequestBuilder::initial(self.clone(), IndexDeleteRequestInner { index: index })
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/elastic/src/client/requests/index_open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ where
[send-async]: requests/index_open/type.IndexOpenRequestBuilder.html#send-asynchronously
*/
pub fn index_open(&self, index: Index<'static>) -> IndexOpenRequestBuilder<TSender> {
RequestBuilder::new(self.clone(), None, IndexOpenRequestInner { index: index })
RequestBuilder::initial(self.clone(), IndexOpenRequestInner { index: index })
}
}

Expand Down
63 changes: 47 additions & 16 deletions src/elastic/src/client/requests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ Request types for the Elasticsearch REST API.
This module contains implementation details that are useful if you want to customise the request process, but aren't generally important for sending requests.
*/

use std::sync::Arc;
use futures_cpupool::CpuPool;
use fluent_builder::FluentBuilder;

use client::Client;
use client::sender::{AsyncSender, RequestParams, Sender};
Expand Down Expand Up @@ -71,7 +71,7 @@ where
TSender: Sender,
{
client: Client<TSender>,
params_builder: Option<Arc<Fn(RequestParams) -> RequestParams>>,
params_builder: FluentBuilder<RequestParams>,
inner: TRequest,
}

Expand All @@ -84,7 +84,15 @@ impl<TSender, TRequest> RequestBuilder<TSender, TRequest>
where
TSender: Sender,
{
fn new(client: Client<TSender>, builder: Option<Arc<Fn(RequestParams) -> RequestParams>>, req: TRequest) -> Self {
fn initial(client: Client<TSender>, req: TRequest) -> Self {
RequestBuilder {
client: client,
params_builder: FluentBuilder::new(),
inner: req,
}
}

fn new(client: Client<TSender>, builder: FluentBuilder<RequestParams>, req: TRequest) -> Self {
RequestBuilder {
client: client,
params_builder: builder,
Expand All @@ -111,7 +119,7 @@ where
# let client = SyncClientBuilder::new().build()?;
# fn get_req() -> PingRequest<'static> { PingRequest::new() }
let builder = client.request(get_req())
.params(|p| p.url_param("refresh", true));
.params_fluent(|p| p.url_param("refresh", true));
# Ok(())
# }
```
Expand All @@ -126,25 +134,48 @@ where
# let client = SyncClientBuilder::new().build()?;
# fn get_req() -> PingRequest<'static> { PingRequest::new() }
let builder = client.request(get_req())
.params(|p| p.base_url("http://different-host:9200"));
.params_fluent(|p| p.base_url("http://different-host:9200"));
# Ok(())
# }
```
*/
pub fn params<F>(mut self, builder: F) -> Self
pub fn params_fluent<F>(mut self, builder: F) -> Self
where
F: Fn(RequestParams) -> RequestParams + 'static,
{
if let Some(old_params_builder) = self.params_builder {
let params_builder = move |params: RequestParams| {
let params = old_params_builder(params);
builder(params)
};

self.params_builder = Some(Arc::new(params_builder));
} else {
self.params_builder = Some(Arc::new(builder));
}
self.params_builder = self.params_builder.fluent(builder).boxed();

self
}

/**
Specify default request parameters.

This method differs from `params_fluent` by not taking any default parameters into account.
The `RequestParams` passed in are exactly the `RequestParams` used to build the request.

# Examples

Add a url param to force an index refresh and send the request to `http://different-host:9200`:

```no_run
# extern crate elastic;
# use elastic::prelude::*;
# fn main() { run().unwrap() }
# fn run() -> Result<(), Box<::std::error::Error>> {
# let client = SyncClientBuilder::new().build()?;
# fn get_req() -> PingRequest<'static> { PingRequest::new() }
let builder = client.request(get_req())
.params(RequestParams::new("http://different-hos:9200").url_param("refresh", true));
# Ok(())
# }
```
*/
pub fn params<I>(mut self, params: I) -> Self
where
I: Into<RequestParams>
{
self.params_builder = self.params_builder.value(params.into());

self
}
Expand Down
2 changes: 1 addition & 1 deletion src/elastic/src/client/requests/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ where
[send-async]: requests/ping/type.PingRequestBuilder.html#send-asynchronously
*/
pub fn ping(&self) -> PingRequestBuilder<TSender> {
RequestBuilder::new(self.clone(), None, PingRequestInner)
RequestBuilder::initial(self.clone(), PingRequestInner)
}
}

Expand Down
22 changes: 17 additions & 5 deletions src/elastic/src/client/requests/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ Builders for raw requests.
*/

use std::marker::PhantomData;
use fluent_builder::TryIntoValue;

use client::Client;
use client::sender::{NextParams, NodeAddresses, SendableRequest, Sender};
use client::sender::{NextParams, NodeAddresses, SendableRequest, SendableRequestParams, Sender};
use client::requests::{HttpRequest, RequestBuilder};

/**
Expand Down Expand Up @@ -79,7 +80,7 @@ where
TRequest: Into<HttpRequest<'static, TBody>>,
TBody: Into<TSender::Body>,
{
RequestBuilder::new(self.clone(), None, RawRequestInner::new(req))
RequestBuilder::initial(self.clone(), RawRequestInner::new(req))
}
}

Expand Down Expand Up @@ -162,10 +163,21 @@ where
pub fn send(self) -> TSender::Response {
let client = self.client;
let req = self.inner.req.into();
let params_builder = self.params_builder;
let params = client.addresses.next();

let req = SendableRequest::new(req, params, params_builder);
// Only try fetch a next address if an explicit `RequestParams` hasn't been given
let params = match self.params_builder.try_into_value() {
TryIntoValue::Value(value) => {
SendableRequestParams::Value(value)
},
TryIntoValue::Builder(builder) => {
SendableRequestParams::Builder {
params: client.addresses.next(),
builder,
}
}
};

let req = SendableRequest::new(req, params);

client.sender.send(req)
}
Expand Down
2 changes: 1 addition & 1 deletion src/elastic/src/client/requests/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ where
where
TDocument: DeserializeOwned,
{
RequestBuilder::new(self.clone(), None, SearchRequestInner::new(empty_body()))
RequestBuilder::initial(self.clone(), SearchRequestInner::new(empty_body()))
}
}

Expand Down
Loading