Skip to content

Commit

Permalink
Consistent Builders (#280)
Browse files Browse the repository at this point in the history
* rename .params to .params_fluent

* only fetch next address if builder has no value

* refactor client builders to use fluent methods
  • Loading branch information
KodrAus committed May 7, 2018
1 parent ef8b59e commit fdb01e2
Show file tree
Hide file tree
Showing 39 changed files with 306 additions and 169 deletions.
2 changes: 1 addition & 1 deletion benches/elastic/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fn main() {
let runs = measure::parse_runs_from_env();

let client = SyncClientBuilder::new()
.params(|p| p.header(http::header::Connection::keep_alive()))
.params_fluent(|p| p.header(http::header::Connection::keep_alive()))
.build()
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion benches/elastic_async/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ fn main() {
let runs = measure::parse_runs_from_env();

let client = AsyncClientBuilder::new()
.params(|p| p.header(http::header::Connection::keep_alive()))
.params_fluent(|p| p.header(http::header::Connection::keep_alive()))
.build(&core.handle())
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion benches/elastic_bulk/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ fn main() {

let client = SyncClientBuilder::new()
.http_client(http_client())
.params(|p| p.header(http::header::Connection::keep_alive()))
.params_fluent(|p| p.header(http::header::Connection::keep_alive()))
.build()
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion benches/elastic_raw/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ fn main() {
let runs = measure::parse_runs_from_env();

let client = SyncClientBuilder::new()
.params(|p| p.header(http::header::Connection::keep_alive()))
.params_fluent(|p| p.header(http::header::Connection::keep_alive()))
.build()
.unwrap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl PutBulkAccounts for Client {

let res = self.io
.request(req)
.params(|params| params.url_param("refresh", true))
.params_fluent(|params| params.url_param("refresh", true))
.send()?
.into_response::<BulkErrorsResponse>()?;

Expand Down
1 change: 1 addition & 0 deletions src/elastic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ reqwest = { version = "~0.8.0", features = ["unstable"] }
futures = "~0.1.16"
tokio-core = "~0.1.9"
futures-cpupool = "~0.1.6"
fluent_builder = "~0.5"

elastic_requests = { version = "~0.20.2", path = "../requests" }
elastic_responses = { version = "~0.20.2", path = "../responses" }
Expand Down
2 changes: 1 addition & 1 deletion src/elastic/examples/custom_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ fn run() -> Result<(), Box<Error>> {
// Send the request and process the response.
let res = client
.request(SearchRequest::new(query.to_string()))
.params(|q| q.url_param("filter_path", "hits.hits._source"))
.params_fluent(|q| q.url_param("filter_path", "hits.hits._source"))
.send()?
.into_response::<SearchResponse>()?;

Expand Down
3 changes: 0 additions & 3 deletions src/elastic/examples/load_balanced_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,11 @@
extern crate elastic;
extern crate env_logger;
extern crate futures;
#[macro_use]
extern crate serde_json;
extern crate tokio_core;

use std::error::Error;
use futures::Future;
use tokio_core::reactor::Core;
use serde_json::Value;
use elastic::prelude::*;

fn run() -> Result<(), Box<Error>> {
Expand Down
2 changes: 1 addition & 1 deletion src/elastic/examples/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ fn run() -> Result<(), Box<Error>> {
// A reqwest HTTP client and default parameters.
// The `params` includes the base node url (http://localhost:9200).
let client = SyncClientBuilder::new()
.params(|p| p.url_param("pretty", true))
.params_fluent(|p| p.url_param("pretty", true))
.build()?;

// A search request from the body.
Expand Down
2 changes: 1 addition & 1 deletion src/elastic/examples/typed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ fn put_index(client: &SyncClient) -> Result<(), Error> {
fn put_doc(client: &SyncClient, doc: MyType) -> Result<(), Error> {
client
.document_index(sample_index(), id(doc.id), doc)
.params(|p| p.url_param("refresh", true))
.params_fluent(|p| p.url_param("refresh", true))
.send()?;

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/elastic/examples/typed_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ fn put_index(client: AsyncClient) -> Box<Future<Item = (), Error = Error>> {
fn put_doc(client: AsyncClient, doc: MyType) -> Box<Future<Item = (), Error = Error>> {
let index_doc = client
.document_index(sample_index(), id(doc.id), doc)
.params(|p| p.url_param("refresh", true))
.params_fluent(|p| p.url_param("refresh", true))
.send()
.map(|_| ());

Expand Down
2 changes: 1 addition & 1 deletion src/elastic/examples/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ fn run() -> Result<(), Box<Error>> {
// Index the document
client
.document_index(sample_index(), id(doc_id), doc)
.params(|p| p.url_param("refresh", true))
.params_fluent(|p| p.url_param("refresh", true))
.send()?;

// Update the document using a script
Expand Down
2 changes: 1 addition & 1 deletion src/elastic/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ If the request was sent asynchronously, the response is returned as a `Future`.
let request_builder = client.request(req);
// Set additional url parameters
let request_builder = request_builder.params(|p| p
let request_builder = request_builder.params_fluent(|p| p
.url_param("pretty", true)
.url_param("refresh", true)
);
Expand Down
3 changes: 1 addition & 2 deletions src/elastic/src/client/requests/document_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,8 @@ where
{
let ty = TDocument::name().into();

RequestBuilder::new(
RequestBuilder::initial(
self.clone(),
None,
DeleteRequestInner {
index: index,
ty: ty,
Expand Down
3 changes: 1 addition & 2 deletions src/elastic/src/client/requests/document_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,8 @@ where
{
let ty = TDocument::name().into();

RequestBuilder::new(
RequestBuilder::initial(
self.clone(),
None,
GetRequestInner {
index: index,
ty: ty,
Expand Down
3 changes: 1 addition & 2 deletions src/elastic/src/client/requests/document_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,8 @@ where
{
let ty = TDocument::name().into();

RequestBuilder::new(
RequestBuilder::initial(
self.clone(),
None,
IndexRequestInner {
index: index,
ty: ty,
Expand Down
3 changes: 1 addition & 2 deletions src/elastic/src/client/requests/document_put_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,8 @@ where
{
let ty = TDocument::name().into();

RequestBuilder::new(
RequestBuilder::initial(
self.clone(),
None,
PutMappingRequestInner {
index: index,
ty: ty,
Expand Down
3 changes: 1 addition & 2 deletions src/elastic/src/client/requests/document_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,8 @@ where
{
let ty = TDocument::name().into();

RequestBuilder::new(
RequestBuilder::initial(
self.clone(),
None,
UpdateRequestInner {
index: index,
ty: ty,
Expand Down
2 changes: 1 addition & 1 deletion src/elastic/src/client/requests/index_close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ where
[send-async]: requests/index_close/type.IndexCloseRequestBuilder.html#send-asynchronously
*/
pub fn index_close(&self, index: Index<'static>) -> IndexCloseRequestBuilder<TSender> {
RequestBuilder::new(self.clone(), None, IndexCloseRequestInner { index: index })
RequestBuilder::initial(self.clone(), IndexCloseRequestInner { index: index })
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/elastic/src/client/requests/index_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,8 @@ where
[documents-mod]: ../types/document/index.html
*/
pub fn index_create(&self, index: Index<'static>) -> IndexCreateRequestBuilder<TSender, DefaultBody> {
RequestBuilder::new(
RequestBuilder::initial(
self.clone(),
None,
IndexCreateRequestInner {
index: index,
body: empty_body(),
Expand Down
2 changes: 1 addition & 1 deletion src/elastic/src/client/requests/index_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ where
[send-async]: requests/index_delete/type.IndexDeleteRequestBuilder.html#send-asynchronously
*/
pub fn index_delete(&self, index: Index<'static>) -> IndexDeleteRequestBuilder<TSender> {
RequestBuilder::new(self.clone(), None, IndexDeleteRequestInner { index: index })
RequestBuilder::initial(self.clone(), IndexDeleteRequestInner { index: index })
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/elastic/src/client/requests/index_exists.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ where
[send-async]: requests/index_exists/type.IndexExistsRequestBuilder.html#send-asynchronously
*/
pub fn index_exists(&self, index: Index<'static>) -> IndexExistsRequestBuilder<TSender> {
RequestBuilder::new(self.clone(), None, IndexExistsRequestInner { index: index })
RequestBuilder::initial(self.clone(), IndexExistsRequestInner { index: index })
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/elastic/src/client/requests/index_open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ where
[send-async]: requests/index_open/type.IndexOpenRequestBuilder.html#send-asynchronously
*/
pub fn index_open(&self, index: Index<'static>) -> IndexOpenRequestBuilder<TSender> {
RequestBuilder::new(self.clone(), None, IndexOpenRequestInner { index: index })
RequestBuilder::initial(self.clone(), IndexOpenRequestInner { index: index })
}
}

Expand Down
63 changes: 47 additions & 16 deletions src/elastic/src/client/requests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ Request types for the Elasticsearch REST API.
This module contains implementation details that are useful if you want to customise the request process, but aren't generally important for sending requests.
*/

use std::sync::Arc;
use futures_cpupool::CpuPool;
use fluent_builder::FluentBuilder;

use client::Client;
use client::sender::{AsyncSender, RequestParams, Sender};
Expand Down Expand Up @@ -73,7 +73,7 @@ where
TSender: Sender,
{
client: Client<TSender>,
params_builder: Option<Arc<Fn(RequestParams) -> RequestParams>>,
params_builder: FluentBuilder<RequestParams>,
inner: TRequest,
}

Expand All @@ -86,7 +86,15 @@ impl<TSender, TRequest> RequestBuilder<TSender, TRequest>
where
TSender: Sender,
{
fn new(client: Client<TSender>, builder: Option<Arc<Fn(RequestParams) -> RequestParams>>, req: TRequest) -> Self {
fn initial(client: Client<TSender>, req: TRequest) -> Self {
RequestBuilder {
client: client,
params_builder: FluentBuilder::new(),
inner: req,
}
}

fn new(client: Client<TSender>, builder: FluentBuilder<RequestParams>, req: TRequest) -> Self {
RequestBuilder {
client: client,
params_builder: builder,
Expand All @@ -113,7 +121,7 @@ where
# let client = SyncClientBuilder::new().build()?;
# fn get_req() -> PingRequest<'static> { PingRequest::new() }
let builder = client.request(get_req())
.params(|p| p.url_param("refresh", true));
.params_fluent(|p| p.url_param("refresh", true));
# Ok(())
# }
```
Expand All @@ -128,25 +136,48 @@ where
# let client = SyncClientBuilder::new().build()?;
# fn get_req() -> PingRequest<'static> { PingRequest::new() }
let builder = client.request(get_req())
.params(|p| p.base_url("http://different-host:9200"));
.params_fluent(|p| p.base_url("http://different-host:9200"));
# Ok(())
# }
```
*/
pub fn params<F>(mut self, builder: F) -> Self
pub fn params_fluent<F>(mut self, builder: F) -> Self
where
F: Fn(RequestParams) -> RequestParams + 'static,
{
if let Some(old_params_builder) = self.params_builder {
let params_builder = move |params: RequestParams| {
let params = old_params_builder(params);
builder(params)
};

self.params_builder = Some(Arc::new(params_builder));
} else {
self.params_builder = Some(Arc::new(builder));
}
self.params_builder = self.params_builder.fluent(builder).boxed();

self
}

/**
Specify default request parameters.
This method differs from `params_fluent` by not taking any default parameters into account.
The `RequestParams` passed in are exactly the `RequestParams` used to build the request.
# Examples
Add a url param to force an index refresh and send the request to `http://different-host:9200`:
```no_run
# extern crate elastic;
# use elastic::prelude::*;
# fn main() { run().unwrap() }
# fn run() -> Result<(), Box<::std::error::Error>> {
# let client = SyncClientBuilder::new().build()?;
# fn get_req() -> PingRequest<'static> { PingRequest::new() }
let builder = client.request(get_req())
.params(RequestParams::new("http://different-hos:9200").url_param("refresh", true));
# Ok(())
# }
```
*/
pub fn params<I>(mut self, params: I) -> Self
where
I: Into<RequestParams>
{
self.params_builder = self.params_builder.value(params.into());

self
}
Expand Down
2 changes: 1 addition & 1 deletion src/elastic/src/client/requests/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ where
[send-async]: requests/ping/type.PingRequestBuilder.html#send-asynchronously
*/
pub fn ping(&self) -> PingRequestBuilder<TSender> {
RequestBuilder::new(self.clone(), None, PingRequestInner)
RequestBuilder::initial(self.clone(), PingRequestInner)
}
}

Expand Down
22 changes: 17 additions & 5 deletions src/elastic/src/client/requests/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ Builders for raw requests.
*/

use std::marker::PhantomData;
use fluent_builder::TryIntoValue;

use client::Client;
use client::sender::{NextParams, NodeAddresses, SendableRequest, Sender};
use client::sender::{NextParams, NodeAddresses, SendableRequest, SendableRequestParams, Sender};
use client::requests::{HttpRequest, RequestBuilder};

/**
Expand Down Expand Up @@ -79,7 +80,7 @@ where
TRequest: Into<HttpRequest<'static, TBody>>,
TBody: Into<TSender::Body>,
{
RequestBuilder::new(self.clone(), None, RawRequestInner::new(req))
RequestBuilder::initial(self.clone(), RawRequestInner::new(req))
}
}

Expand Down Expand Up @@ -162,10 +163,21 @@ where
pub fn send(self) -> TSender::Response {
let client = self.client;
let req = self.inner.req.into();
let params_builder = self.params_builder;
let params = client.addresses.next();

let req = SendableRequest::new(req, params, params_builder);
// Only try fetch a next address if an explicit `RequestParams` hasn't been given
let params = match self.params_builder.try_into_value() {
TryIntoValue::Value(value) => {
SendableRequestParams::Value(value)
},
TryIntoValue::Builder(builder) => {
SendableRequestParams::Builder {
params: client.addresses.next(),
builder,
}
}
};

let req = SendableRequest::new(req, params);

client.sender.send(req)
}
Expand Down
2 changes: 1 addition & 1 deletion src/elastic/src/client/requests/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ where
where
TDocument: DeserializeOwned,
{
RequestBuilder::new(self.clone(), None, SearchRequestInner::new(empty_body()))
RequestBuilder::initial(self.clone(), SearchRequestInner::new(empty_body()))
}
}

Expand Down
Loading

0 comments on commit fdb01e2

Please sign in to comment.