From 0d4549bf5ab0226f5282d9c8b4e58e86a30982f8 Mon Sep 17 00:00:00 2001 From: Ashley Mannix Date: Sat, 25 Nov 2017 20:59:16 +1000 Subject: [PATCH 1/5] rename .params to .params_fluent --- .../src/ops/commands/put_bulk_accounts.rs | 2 +- src/elastic/Cargo.toml | 1 + .../src/client/requests/document_delete.rs | 3 +-- .../src/client/requests/document_get.rs | 3 +-- .../src/client/requests/document_index.rs | 3 +-- .../client/requests/document_put_mapping.rs | 3 +-- .../src/client/requests/document_update.rs | 4 +-- .../src/client/requests/index_close.rs | 2 +- .../src/client/requests/index_create.rs | 3 +-- .../src/client/requests/index_delete.rs | 2 +- src/elastic/src/client/requests/index_open.rs | 2 +- src/elastic/src/client/requests/mod.rs | 26 +++++++++---------- src/elastic/src/client/requests/ping.rs | 2 +- src/elastic/src/client/requests/raw.rs | 2 +- src/elastic/src/client/requests/search.rs | 2 +- src/elastic/src/client/sender/async.rs | 12 +++------ src/elastic/src/client/sender/mod.rs | 6 +++-- .../src/client/sender/sniffed_nodes/mod.rs | 4 ++- src/elastic/src/client/sender/sync.rs | 11 +++----- src/elastic/src/lib.rs | 1 + tests/run/src/document/delete.rs | 4 +-- tests/run/src/document/simple_index_get.rs | 2 +- tests/run/src/document/update_with_doc.rs | 4 +-- .../src/document/update_with_inline_script.rs | 4 +-- tests/run/src/document/update_with_script.rs | 4 +-- tests/run/src/search/empty_query.rs | 2 +- tests/run/src/search/raw_query_string.rs | 2 +- 27 files changed, 55 insertions(+), 61 deletions(-) diff --git a/examples/account_sample/src/ops/commands/put_bulk_accounts.rs b/examples/account_sample/src/ops/commands/put_bulk_accounts.rs index 5e37781eae..3b06003d90 100644 --- a/examples/account_sample/src/ops/commands/put_bulk_accounts.rs +++ b/examples/account_sample/src/ops/commands/put_bulk_accounts.rs @@ -24,7 +24,7 @@ impl PutBulkAccounts for Client { let res = self.io .request(req) - .params(|params| params.url_param("refresh", true)) + .params_fluent(|params| params.url_param("refresh", true)) .send()? .into_response::()?; diff --git a/src/elastic/Cargo.toml b/src/elastic/Cargo.toml index 4ef653bfa9..989a5d1dd8 100644 --- a/src/elastic/Cargo.toml +++ b/src/elastic/Cargo.toml @@ -27,6 +27,7 @@ reqwest = { version = "~0.8.0", features = ["unstable"] } futures = "~0.1.16" tokio-core = "~0.1.9" futures-cpupool = "~0.1.6" +fluent_builder = { version = "*", git = "https://github.com/KodrAus/fluent_builder.git", branch = "feat/fnonce-builders" } elastic_requests = { version = "~0.20.2", path = "../requests" } elastic_responses = { version = "~0.20.2", path = "../responses" } diff --git a/src/elastic/src/client/requests/document_delete.rs b/src/elastic/src/client/requests/document_delete.rs index 0ca8487c57..652b8392dc 100644 --- a/src/elastic/src/client/requests/document_delete.rs +++ b/src/elastic/src/client/requests/document_delete.rs @@ -95,9 +95,8 @@ where { let ty = TDocument::name().into(); - RequestBuilder::new( + RequestBuilder::initial( self.clone(), - None, DeleteRequestInner { index: index, ty: ty, diff --git a/src/elastic/src/client/requests/document_get.rs b/src/elastic/src/client/requests/document_get.rs index dddcfc1643..fa6db0083e 100644 --- a/src/elastic/src/client/requests/document_get.rs +++ b/src/elastic/src/client/requests/document_get.rs @@ -121,9 +121,8 @@ where { let ty = TDocument::name().into(); - RequestBuilder::new( + RequestBuilder::initial( self.clone(), - None, GetRequestInner { index: index, ty: ty, diff --git a/src/elastic/src/client/requests/document_index.rs b/src/elastic/src/client/requests/document_index.rs index 6f6d1d2669..ec461ce502 100644 --- a/src/elastic/src/client/requests/document_index.rs +++ b/src/elastic/src/client/requests/document_index.rs @@ -104,9 +104,8 @@ where { let ty = TDocument::name().into(); - RequestBuilder::new( + RequestBuilder::initial( self.clone(), - None, IndexRequestInner { index: index, ty: ty, diff --git a/src/elastic/src/client/requests/document_put_mapping.rs b/src/elastic/src/client/requests/document_put_mapping.rs index 950805507d..86d0162a88 100644 --- a/src/elastic/src/client/requests/document_put_mapping.rs +++ b/src/elastic/src/client/requests/document_put_mapping.rs @@ -94,9 +94,8 @@ where { let ty = TDocument::name().into(); - RequestBuilder::new( + RequestBuilder::initial( self.clone(), - None, PutMappingRequestInner { index: index, ty: ty, diff --git a/src/elastic/src/client/requests/document_update.rs b/src/elastic/src/client/requests/document_update.rs index 67531e7ec0..439c4c7faf 100644 --- a/src/elastic/src/client/requests/document_update.rs +++ b/src/elastic/src/client/requests/document_update.rs @@ -9,6 +9,7 @@ use futures::{Future, IntoFuture, Poll}; use futures_cpupool::CpuPool; use serde_json::{self, Map, Value}; use serde::ser::{Serialize, Serializer}; +use fluent_builder::{FluentBuilder, Override}; use error::{self, Error}; use client::Client; @@ -193,9 +194,8 @@ where { let ty = TDocument::name().into(); - RequestBuilder::new( + RequestBuilder::initial( self.clone(), - None, UpdateRequestInner { index: index, ty: ty, diff --git a/src/elastic/src/client/requests/index_close.rs b/src/elastic/src/client/requests/index_close.rs index 25325eb43f..fc930ec475 100644 --- a/src/elastic/src/client/requests/index_close.rs +++ b/src/elastic/src/client/requests/index_close.rs @@ -71,7 +71,7 @@ where [send-async]: requests/index_close/type.IndexCloseRequestBuilder.html#send-asynchronously */ pub fn index_close(&self, index: Index<'static>) -> IndexCloseRequestBuilder { - RequestBuilder::new(self.clone(), None, IndexCloseRequestInner { index: index }) + RequestBuilder::initial(self.clone(), IndexCloseRequestInner { index: index }) } } diff --git a/src/elastic/src/client/requests/index_create.rs b/src/elastic/src/client/requests/index_create.rs index 12b480010e..e793c12efa 100644 --- a/src/elastic/src/client/requests/index_create.rs +++ b/src/elastic/src/client/requests/index_create.rs @@ -114,9 +114,8 @@ where [documents-mod]: ../types/document/index.html */ pub fn index_create(&self, index: Index<'static>) -> IndexCreateRequestBuilder { - RequestBuilder::new( + RequestBuilder::initial( self.clone(), - None, IndexCreateRequestInner { index: index, body: empty_body(), diff --git a/src/elastic/src/client/requests/index_delete.rs b/src/elastic/src/client/requests/index_delete.rs index 12acf00a44..79c3eb3ab2 100644 --- a/src/elastic/src/client/requests/index_delete.rs +++ b/src/elastic/src/client/requests/index_delete.rs @@ -71,7 +71,7 @@ where [send-async]: requests/index_delete/type.IndexDeleteRequestBuilder.html#send-asynchronously */ pub fn index_delete(&self, index: Index<'static>) -> IndexDeleteRequestBuilder { - RequestBuilder::new(self.clone(), None, IndexDeleteRequestInner { index: index }) + RequestBuilder::initial(self.clone(), IndexDeleteRequestInner { index: index }) } } diff --git a/src/elastic/src/client/requests/index_open.rs b/src/elastic/src/client/requests/index_open.rs index c3e8edf596..bf603f6725 100644 --- a/src/elastic/src/client/requests/index_open.rs +++ b/src/elastic/src/client/requests/index_open.rs @@ -71,7 +71,7 @@ where [send-async]: requests/index_open/type.IndexOpenRequestBuilder.html#send-asynchronously */ pub fn index_open(&self, index: Index<'static>) -> IndexOpenRequestBuilder { - RequestBuilder::new(self.clone(), None, IndexOpenRequestInner { index: index }) + RequestBuilder::initial(self.clone(), IndexOpenRequestInner { index: index }) } } diff --git a/src/elastic/src/client/requests/mod.rs b/src/elastic/src/client/requests/mod.rs index bd4885f8be..7aacd2a470 100644 --- a/src/elastic/src/client/requests/mod.rs +++ b/src/elastic/src/client/requests/mod.rs @@ -6,6 +6,7 @@ This module contains implementation details that are useful if you want to custo use std::sync::Arc; use futures_cpupool::CpuPool; +use fluent_builder::FluentBuilder; use client::Client; use client::sender::{AsyncSender, RequestParams, Sender}; @@ -71,7 +72,7 @@ where TSender: Sender, { client: Client, - params_builder: Option RequestParams>>, + params_builder: FluentBuilder, inner: TRequest, } @@ -84,7 +85,15 @@ impl RequestBuilder where TSender: Sender, { - fn new(client: Client, builder: Option RequestParams>>, req: TRequest) -> Self { + fn initial(client: Client, req: TRequest) -> Self { + RequestBuilder { + client: client, + params_builder: FluentBuilder::new(), + inner: req, + } + } + + fn new(client: Client, builder: FluentBuilder, req: TRequest) -> Self { RequestBuilder { client: client, params_builder: builder, @@ -131,20 +140,11 @@ where # } ``` */ - pub fn params(mut self, builder: F) -> Self + pub fn params_fluent(mut self, builder: F) -> Self where F: Fn(RequestParams) -> RequestParams + 'static, { - if let Some(old_params_builder) = self.params_builder { - let params_builder = move |params: RequestParams| { - let params = old_params_builder(params); - builder(params) - }; - - self.params_builder = Some(Arc::new(params_builder)); - } else { - self.params_builder = Some(Arc::new(builder)); - } + self.params_builder = self.params_builder.fluent(builder).boxed(); self } diff --git a/src/elastic/src/client/requests/ping.rs b/src/elastic/src/client/requests/ping.rs index 1f53c82b5e..a60dc0abb1 100644 --- a/src/elastic/src/client/requests/ping.rs +++ b/src/elastic/src/client/requests/ping.rs @@ -70,7 +70,7 @@ where [send-async]: requests/ping/type.PingRequestBuilder.html#send-asynchronously */ pub fn ping(&self) -> PingRequestBuilder { - RequestBuilder::new(self.clone(), None, PingRequestInner) + RequestBuilder::initial(self.clone(), PingRequestInner) } } diff --git a/src/elastic/src/client/requests/raw.rs b/src/elastic/src/client/requests/raw.rs index 5e60ccfb96..30665c5311 100644 --- a/src/elastic/src/client/requests/raw.rs +++ b/src/elastic/src/client/requests/raw.rs @@ -79,7 +79,7 @@ where TRequest: Into>, TBody: Into, { - RequestBuilder::new(self.clone(), None, RawRequestInner::new(req)) + RequestBuilder::initial(self.clone(), RawRequestInner::new(req)) } } diff --git a/src/elastic/src/client/requests/search.rs b/src/elastic/src/client/requests/search.rs index 6c37717ee4..6d10fb8ee1 100644 --- a/src/elastic/src/client/requests/search.rs +++ b/src/elastic/src/client/requests/search.rs @@ -123,7 +123,7 @@ where where TDocument: DeserializeOwned, { - RequestBuilder::new(self.clone(), None, SearchRequestInner::new(empty_body())) + RequestBuilder::initial(self.clone(), SearchRequestInner::new(empty_body())) } } diff --git a/src/elastic/src/client/sender/async.rs b/src/elastic/src/client/sender/async.rs index f4c98a12be..6ea448c6ef 100644 --- a/src/elastic/src/client/sender/async.rs +++ b/src/elastic/src/client/sender/async.rs @@ -88,7 +88,9 @@ impl Sender for AsyncSender { }); let req_future = params_future.and_then(move |params| { - build_req(&http, params, params_builder, req) + let params = params_builder.into_value(|| params); + + build_req(&http, params, req) .send() .map_err(move |e| { error!( @@ -155,18 +157,12 @@ impl From for PendingParams { } /** Build an asynchronous `reqwest::RequestBuilder` from an Elasticsearch request. */ -fn build_req(client: &AsyncHttpClient, params: RequestParams, params_builder: Option RequestParams>>, req: I) -> AsyncHttpRequestBuilder +fn build_req(client: &AsyncHttpClient, params: RequestParams, req: I) -> AsyncHttpRequestBuilder where I: Into>, B: Into, { let req = req.into(); - let params = if let Some(params_builder) = params_builder { - params_builder(params) - } else { - params - }; - let url = build_url(&req.url, ¶ms); let method = build_method(req.method); let body = req.body; diff --git a/src/elastic/src/client/sender/mod.rs b/src/elastic/src/client/sender/mod.rs index f01b97900c..1b009f5472 100644 --- a/src/elastic/src/client/sender/mod.rs +++ b/src/elastic/src/client/sender/mod.rs @@ -13,6 +13,8 @@ Some notable types include: [Client]: ../struct.Client.html */ +use fluent_builder::FluentBuilder; + pub mod static_nodes; pub mod sniffed_nodes; @@ -44,12 +46,12 @@ pub struct SendableRequest { correlation_id: Uuid, inner: TRequest, params: TParams, - params_builder: Option RequestParams>>, + params_builder: FluentBuilder, _marker: PhantomData, } impl SendableRequest { - pub(crate) fn new(inner: TRequest, params: TParams, params_builder: Option RequestParams>>) -> Self { + pub(crate) fn new(inner: TRequest, params: TParams, params_builder: FluentBuilder) -> Self { SendableRequest { correlation_id: Uuid::new_v4(), inner: inner, diff --git a/src/elastic/src/client/sender/sniffed_nodes/mod.rs b/src/elastic/src/client/sender/sniffed_nodes/mod.rs index 338837d934..b327b1053d 100644 --- a/src/elastic/src/client/sender/sniffed_nodes/mod.rs +++ b/src/elastic/src/client/sender/sniffed_nodes/mod.rs @@ -30,6 +30,8 @@ use std::time::{Duration, Instant}; use std::sync::{Arc, RwLock}; use url::Url; use futures::{Future, IntoFuture}; +use fluent_builder::FluentBuilder; + use client::sender::static_nodes::StaticNodes; use client::sender::{AsyncSender, NextParams, NodeAddress, PreRequestParams, RequestParams, SendableRequest, Sender, SyncSender}; use client::requests::{DefaultBody, NodesInfoRequest}; @@ -227,7 +229,7 @@ impl SniffedNodes { } fn sendable_request(&self) -> SendableRequest, RequestParams, DefaultBody> { - SendableRequest::new(NodesInfoRequest::new(), self.refresh_params.clone(), None) + SendableRequest::new(NodesInfoRequest::new(), self.refresh_params.clone(), FluentBuilder::new()) } fn finish_refresh(inner: &RwLock, refresh_params: &RequestParams, fresh_nodes: Result) -> Result { diff --git a/src/elastic/src/client/sender/sync.rs b/src/elastic/src/client/sender/sync.rs index 7610c7c7fe..f5f6c3dbe3 100644 --- a/src/elastic/src/client/sender/sync.rs +++ b/src/elastic/src/client/sender/sync.rs @@ -72,7 +72,9 @@ impl Sender for SyncSender { e })?; - let mut req = build_req(&self.http, params, params_builder, req); + let params = params_builder.into_value(move || params); + + let mut req = build_req(&self.http, params, req); let res = match req.send().map_err(error::request) { Ok(res) => { @@ -126,17 +128,12 @@ impl From for Params { } /** Build a synchronous `reqwest::RequestBuilder` from an Elasticsearch request. */ -fn build_req(client: &SyncHttpClient, params: RequestParams, params_builder: Option RequestParams>>, req: I) -> SyncHttpRequestBuilder +fn build_req(client: &SyncHttpClient, params: RequestParams, req: I) -> SyncHttpRequestBuilder where I: Into>, B: Into, { let req = req.into(); - let params = if let Some(params_builder) = params_builder { - params_builder(params) - } else { - params - }; let url = build_url(&req.url, ¶ms); let method = build_method(req.method); diff --git a/src/elastic/src/lib.rs b/src/elastic/src/lib.rs index b36b77b9c0..deeb02bc37 100644 --- a/src/elastic/src/lib.rs +++ b/src/elastic/src/lib.rs @@ -299,6 +299,7 @@ extern crate serde_json; extern crate tokio_core; extern crate url; extern crate uuid; +extern crate fluent_builder; pub mod error; pub use error::Error; diff --git a/tests/run/src/document/delete.rs b/tests/run/src/document/delete.rs index b3b0c5dbd9..09208610b9 100644 --- a/tests/run/src/document/delete.rs +++ b/tests/run/src/document/delete.rs @@ -35,14 +35,14 @@ impl IntegrationTest for Delete { fn request(&self, client: AsyncClient) -> Box> { let index_res = client .document_index(index(INDEX), id(ID), Doc { id: ID }) - .params(|p| p.url_param("refresh", true)) + .params_fluent(|p| p.url_param("refresh", true)) .send(); let pre_delete_res = client.document_get(index(INDEX), id(ID)).send(); let delete_res = client .document_delete::(index(INDEX), id(ID)) - .params(|p| p.url_param("refresh", true)) + .params_fluent(|p| p.url_param("refresh", true)) .send(); let post_delete_res = client.document_get(index(INDEX), id(ID)).send(); diff --git a/tests/run/src/document/simple_index_get.rs b/tests/run/src/document/simple_index_get.rs index bb3d5f9dd3..fbef853a84 100644 --- a/tests/run/src/document/simple_index_get.rs +++ b/tests/run/src/document/simple_index_get.rs @@ -45,7 +45,7 @@ impl IntegrationTest for SimpleIndexGet { fn request(&self, client: AsyncClient) -> Box> { let index_res = client .document_index(index(INDEX), id(ID), doc()) - .params(|p| p.url_param("refresh", true)) + .params_fluent(|p| p.url_param("refresh", true)) .send(); let get_res = client.document_get(index(INDEX), id(ID)).send(); diff --git a/tests/run/src/document/update_with_doc.rs b/tests/run/src/document/update_with_doc.rs index d432d685d3..11520faedf 100644 --- a/tests/run/src/document/update_with_doc.rs +++ b/tests/run/src/document/update_with_doc.rs @@ -44,7 +44,7 @@ impl IntegrationTest for UpdateWithDoc { fn request(&self, client: AsyncClient) -> Box> { let index_res = client .document_index(index(INDEX), id(ID), doc()) - .params(|p| p.url_param("refresh", true)) + .params_fluent(|p| p.url_param("refresh", true)) .send(); let update_res = client @@ -53,7 +53,7 @@ impl IntegrationTest for UpdateWithDoc { id: ID, title: EXPECTED_TITLE.to_owned(), }) - .params(|p| p.url_param("refresh", true)) + .params_fluent(|p| p.url_param("refresh", true)) .send(); let get_res = client.document_get(index(INDEX), id(ID)).send(); diff --git a/tests/run/src/document/update_with_inline_script.rs b/tests/run/src/document/update_with_inline_script.rs index 32400e6472..64fcd457cb 100644 --- a/tests/run/src/document/update_with_inline_script.rs +++ b/tests/run/src/document/update_with_inline_script.rs @@ -39,7 +39,7 @@ impl IntegrationTest for UpdateWithInlineScript { let index_res = client .document_index(index(INDEX), id(ID), doc()) - .params(|p| p.url_param("refresh", true)) + .params_fluent(|p| p.url_param("refresh", true)) .send(); Box::new(delete_res.then(|_| index_res).map(|_| ())) @@ -50,7 +50,7 @@ impl IntegrationTest for UpdateWithInlineScript { let update_res = client .document_update::(index(INDEX), id(ID)) .script(format!("ctx._source.title = \"{}\"", EXPECTED_TITLE)) - .params(|p| p.url_param("refresh", true)) + .params_fluent(|p| p.url_param("refresh", true)) .send(); let get_res = client.document_get(index(INDEX), id(ID)).send(); diff --git a/tests/run/src/document/update_with_script.rs b/tests/run/src/document/update_with_script.rs index 6c5ecc88ff..ba8449eef9 100644 --- a/tests/run/src/document/update_with_script.rs +++ b/tests/run/src/document/update_with_script.rs @@ -39,7 +39,7 @@ impl IntegrationTest for UpdateWithScript { let index_res = client .document_index(index(INDEX), id(ID), doc()) - .params(|p| p.url_param("refresh", true)) + .params_fluent(|p| p.url_param("refresh", true)) .send(); Box::new(delete_res.then(|_| index_res).map(|_| ())) @@ -53,7 +53,7 @@ impl IntegrationTest for UpdateWithScript { "ctx._source.title = params.newTitle", |s| s.param("newTitle", EXPECTED_TITLE), ) - .params(|p| p.url_param("refresh", true)) + .params_fluent(|p| p.url_param("refresh", true)) .send(); let get_res = client.document_get(index(INDEX), id(ID)).send(); diff --git a/tests/run/src/search/empty_query.rs b/tests/run/src/search/empty_query.rs index 6a1c721498..be7ce72178 100644 --- a/tests/run/src/search/empty_query.rs +++ b/tests/run/src/search/empty_query.rs @@ -30,7 +30,7 @@ impl IntegrationTest for EmptyQuery { let index_reqs = future::join_all((0..10).into_iter().map(move |i| { client .document_index(index(INDEX), id(i), Doc { id: i }) - .params(|p| p.url_param("refresh", true)) + .params_fluent(|p| p.url_param("refresh", true)) .send() })); diff --git a/tests/run/src/search/raw_query_string.rs b/tests/run/src/search/raw_query_string.rs index 801eb3fdc6..eb4f9856dd 100644 --- a/tests/run/src/search/raw_query_string.rs +++ b/tests/run/src/search/raw_query_string.rs @@ -30,7 +30,7 @@ impl IntegrationTest for RawQueryString { let index_reqs = future::join_all((0..10).into_iter().map(move |i| { client .document_index(index(INDEX), id(i), Doc { id: i }) - .params(|p| p.url_param("refresh", true)) + .params_fluent(|p| p.url_param("refresh", true)) .send() })); From 12241a7db267ddbafd4785c431286e6445006f14 Mon Sep 17 00:00:00 2001 From: Ashley Mannix Date: Sat, 25 Nov 2017 22:12:53 +1000 Subject: [PATCH 2/5] only fetch next address if builder has no value --- src/elastic/src/client/requests/mod.rs | 9 ++++++ src/elastic/src/client/requests/raw.rs | 19 +++++++++--- src/elastic/src/client/sender/async.rs | 30 +++++++++++-------- src/elastic/src/client/sender/mod.rs | 14 ++++++--- .../src/client/sender/sniffed_nodes/mod.rs | 4 +-- src/elastic/src/client/sender/sync.rs | 27 ++++++++++------- 6 files changed, 70 insertions(+), 33 deletions(-) diff --git a/src/elastic/src/client/requests/mod.rs b/src/elastic/src/client/requests/mod.rs index 7aacd2a470..28365e46e2 100644 --- a/src/elastic/src/client/requests/mod.rs +++ b/src/elastic/src/client/requests/mod.rs @@ -148,6 +148,15 @@ where self } + + pub fn params(mut self, params: I) -> Self + where + I: Into + { + self.params_builder = self.params_builder.value(params.into()); + + self + } } /** diff --git a/src/elastic/src/client/requests/raw.rs b/src/elastic/src/client/requests/raw.rs index 30665c5311..e2b7d54ffb 100644 --- a/src/elastic/src/client/requests/raw.rs +++ b/src/elastic/src/client/requests/raw.rs @@ -5,7 +5,7 @@ Builders for raw requests. use std::marker::PhantomData; use client::Client; -use client::sender::{NextParams, NodeAddresses, SendableRequest, Sender}; +use client::sender::{NextParams, NodeAddresses, SendableRequest, SendableRequestParams, Sender}; use client::requests::{HttpRequest, RequestBuilder}; /** @@ -162,10 +162,21 @@ where pub fn send(self) -> TSender::Response { let client = self.client; let req = self.inner.req.into(); - let params_builder = self.params_builder; - let params = client.addresses.next(); - let req = SendableRequest::new(req, params, params_builder); + // Only try fetch a next address if an explicit `RequestParams` hasn't been given + let params = match self.params_builder.try_into_value() { + Ok(value) => { + SendableRequestParams::Value(value) + }, + Err(builder) => { + SendableRequestParams::Builder { + params: client.addresses.next(), + builder, + } + } + }; + + let req = SendableRequest::new(req, params); client.sender.send(req) } diff --git a/src/elastic/src/client/sender/async.rs b/src/elastic/src/client/sender/async.rs index 6ea448c6ef..dffb8488af 100644 --- a/src/elastic/src/client/sender/async.rs +++ b/src/elastic/src/client/sender/async.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use std::error::Error as StdError; use futures::{Future, IntoFuture, Poll}; +use futures::future::Either; use futures_cpupool::CpuPool; use tokio_core::reactor::Handle; use reqwest::Error as ReqwestError; @@ -9,7 +10,7 @@ use reqwest::unstable::async::{Client as AsyncHttpClient, ClientBuilder as Async use error::{self, Error}; use private; use client::requests::{AsyncBody, HttpRequest}; -use client::sender::{build_method, build_url, NextParams, NodeAddress, NodeAddresses, NodeAddressesBuilder, NodeAddressesInner, PreRequestParams, RequestParams, SendableRequest, Sender}; +use client::sender::{build_method, build_url, NextParams, NodeAddress, NodeAddresses, NodeAddressesBuilder, NodeAddressesInner, PreRequestParams, RequestParams, SendableRequest, SendableRequestParams, Sender}; use client::sender::sniffed_nodes::SniffedNodesBuilder; use client::responses::{async_response, AsyncResponseBuilder}; use client::Client; @@ -69,7 +70,7 @@ impl Sender for AsyncSender { let correlation_id = request.correlation_id; let serde_pool = self.serde_pool.clone(); let http = self.http.clone(); - let params_builder = request.params_builder; + let params = request.params; let req = request.inner.into(); info!( @@ -78,18 +79,23 @@ impl Sender for AsyncSender { req.url.as_ref() ); - let params_future = request.params.into().map_err(move |e| { - error!( - "Elasticsearch Node Selection: correlation_id: '{}', error: '{}'", - correlation_id, - e - ); - e - }); + let params_future = match params { + SendableRequestParams::Value(params) => Either::A(Ok(params).into_future()), + SendableRequestParams::Builder { params, builder } => { + let params = params.into().map_err(move |e| { + error!( + "Elasticsearch Node Selection: correlation_id: '{}', error: '{}'", + correlation_id, + e + ); + e + }); - let req_future = params_future.and_then(move |params| { - let params = params_builder.into_value(|| params); + Either::B(params.and_then(|params| Ok(builder.into_value(move || params)))) + } + }; + let req_future = params_future.and_then(move |params| { build_req(&http, params, req) .send() .map_err(move |e| { diff --git a/src/elastic/src/client/sender/mod.rs b/src/elastic/src/client/sender/mod.rs index 1b009f5472..87b1949898 100644 --- a/src/elastic/src/client/sender/mod.rs +++ b/src/elastic/src/client/sender/mod.rs @@ -45,23 +45,29 @@ This type encapsulates the state needed between a [`Client`][Client] and a [`Sen pub struct SendableRequest { correlation_id: Uuid, inner: TRequest, - params: TParams, - params_builder: FluentBuilder, + params: SendableRequestParams, _marker: PhantomData, } impl SendableRequest { - pub(crate) fn new(inner: TRequest, params: TParams, params_builder: FluentBuilder) -> Self { + pub(crate) fn new(inner: TRequest, params: SendableRequestParams) -> Self { SendableRequest { correlation_id: Uuid::new_v4(), inner: inner, params: params, - params_builder: params_builder, _marker: PhantomData, } } } +pub(crate) enum SendableRequestParams { + Value(RequestParams), + Builder { + params: TParams, + builder: FluentBuilder, + } +} + /** Represents a type that can send a request. diff --git a/src/elastic/src/client/sender/sniffed_nodes/mod.rs b/src/elastic/src/client/sender/sniffed_nodes/mod.rs index b327b1053d..55491a5d01 100644 --- a/src/elastic/src/client/sender/sniffed_nodes/mod.rs +++ b/src/elastic/src/client/sender/sniffed_nodes/mod.rs @@ -33,7 +33,7 @@ use futures::{Future, IntoFuture}; use fluent_builder::FluentBuilder; use client::sender::static_nodes::StaticNodes; -use client::sender::{AsyncSender, NextParams, NodeAddress, PreRequestParams, RequestParams, SendableRequest, Sender, SyncSender}; +use client::sender::{AsyncSender, NextParams, NodeAddress, PreRequestParams, RequestParams, SendableRequest, SendableRequestParams, Sender, SyncSender}; use client::requests::{DefaultBody, NodesInfoRequest}; use error::{self, Error}; use private; @@ -229,7 +229,7 @@ impl SniffedNodes { } fn sendable_request(&self) -> SendableRequest, RequestParams, DefaultBody> { - SendableRequest::new(NodesInfoRequest::new(), self.refresh_params.clone(), FluentBuilder::new()) + SendableRequest::new(NodesInfoRequest::new(), SendableRequestParams::Value(self.refresh_params.clone())) } fn finish_refresh(inner: &RwLock, refresh_params: &RequestParams, fresh_nodes: Result) -> Result { diff --git a/src/elastic/src/client/sender/sync.rs b/src/elastic/src/client/sender/sync.rs index f5f6c3dbe3..ef45a0cc26 100644 --- a/src/elastic/src/client/sender/sync.rs +++ b/src/elastic/src/client/sender/sync.rs @@ -3,7 +3,7 @@ use reqwest::{Client as SyncHttpClient, ClientBuilder as SyncHttpClientBuilder, use error::{self, Result}; use private; -use client::sender::{build_method, build_url, NextParams, NodeAddress, NodeAddresses, NodeAddressesBuilder, NodeAddressesInner, PreRequestParams, RequestParams, SendableRequest, Sender}; +use client::sender::{build_method, build_url, NextParams, NodeAddress, NodeAddresses, NodeAddressesBuilder, NodeAddressesInner, PreRequestParams, RequestParams, SendableRequest, SendableRequestParams, Sender}; use client::sender::sniffed_nodes::SniffedNodesBuilder; use client::requests::{HttpRequest, SyncBody}; use client::responses::{sync_response, SyncResponseBuilder}; @@ -54,7 +54,7 @@ impl Sender for SyncSender { TParams: Into + 'static, { let correlation_id = request.correlation_id; - let params_builder = request.params_builder; + let params = request.params; let req = request.inner.into(); info!( @@ -63,16 +63,21 @@ impl Sender for SyncSender { req.url.as_ref() ); - let params = request.params.into().inner.map_err(|e| { - error!( - "Elasticsearch Node Selection: correlation_id: '{}', error: '{}'", - correlation_id, - e - ); - e - })?; + let params = match params { + SendableRequestParams::Value(params) => params, + SendableRequestParams::Builder { params, builder } => { + let params = params.into().inner.map_err(|e| { + error!( + "Elasticsearch Node Selection: correlation_id: '{}', error: '{}'", + correlation_id, + e + ); + e + })?; - let params = params_builder.into_value(move || params); + builder.into_value(move || params) + } + }; let mut req = build_req(&self.http, params, req); From 47110e225aa731f38ca2e959221f1fda411601a9 Mon Sep 17 00:00:00 2001 From: Ashley Mannix Date: Sun, 26 Nov 2017 12:05:51 +1000 Subject: [PATCH 3/5] refactor client builders to use fluent methods --- src/elastic/src/client/sender/async.rs | 32 +++++++++++------- src/elastic/src/client/sender/mod.rs | 33 +++++++++++++++++-- .../src/client/sender/sniffed_nodes/mod.rs | 11 +++++++ src/elastic/src/client/sender/sync.rs | 32 +++++++++++------- 4 files changed, 83 insertions(+), 25 deletions(-) diff --git a/src/elastic/src/client/sender/async.rs b/src/elastic/src/client/sender/async.rs index dffb8488af..28c82c3296 100644 --- a/src/elastic/src/client/sender/async.rs +++ b/src/elastic/src/client/sender/async.rs @@ -6,6 +6,7 @@ use futures_cpupool::CpuPool; use tokio_core::reactor::Handle; use reqwest::Error as ReqwestError; use reqwest::unstable::async::{Client as AsyncHttpClient, ClientBuilder as AsyncHttpClientBuilder, RequestBuilder as AsyncHttpRequestBuilder}; +use fluent_builder::FluentBuilder; use error::{self, Error}; use private; @@ -214,7 +215,7 @@ impl Future for PendingResponse { pub struct AsyncClientBuilder { serde_pool: Option, nodes: NodeAddressesBuilder, - params: PreRequestParams, + params: FluentBuilder, } /** @@ -269,7 +270,7 @@ impl AsyncClientBuilder { pub fn new() -> Self { AsyncClientBuilder { serde_pool: None, - params: PreRequestParams::default(), + params: FluentBuilder::new(), nodes: NodeAddressesBuilder::default(), } } @@ -280,7 +281,7 @@ impl AsyncClientBuilder { pub fn from_params(params: PreRequestParams) -> Self { AsyncClientBuilder { serde_pool: None, - params: params, + params: FluentBuilder::new().value(params), nodes: NodeAddressesBuilder::default(), } } @@ -326,7 +327,7 @@ impl AsyncClientBuilder { where I: Into, { - self.nodes = NodeAddressesBuilder::Sniffed(builder.into()); + self.nodes = self.nodes.sniff_nodes(builder.into()); self } @@ -349,10 +350,18 @@ impl AsyncClientBuilder { pub fn sniff_nodes_fluent(mut self, address: I, builder: F) -> Self where I: Into, - F: Fn(SniffedNodesBuilder) -> SniffedNodesBuilder, + F: Fn(SniffedNodesBuilder) -> SniffedNodesBuilder + 'static, { - let address = address.into(); - self.nodes = NodeAddressesBuilder::Sniffed(builder(address.into())); + self.nodes = self.nodes.sniff_nodes_fluent(address.into(), builder); + + self + } + + pub fn params(mut self, params: I) -> Self + where + I: Into, + { + self.params = self.params.value(params.into()); self } @@ -385,11 +394,11 @@ impl AsyncClientBuilder { ``` [AsyncClientBuilder.base_url]: #method.base_url */ - pub fn params(mut self, builder: F) -> Self + pub fn params_fluent(mut self, builder: F) -> Self where - F: Fn(PreRequestParams) -> PreRequestParams, + F: Fn(PreRequestParams) -> PreRequestParams + 'static, { - self.params = builder(self.params); + self.params = self.params.fluent(builder).boxed(); self } @@ -476,13 +485,14 @@ impl AsyncClientBuilder { TIntoHttp: IntoAsyncHttpClient, { let http = client.into_async_http_client().map_err(error::build)?; + let params = self.params.into_value(|| PreRequestParams::default()); let sender = AsyncSender { http: http, serde_pool: self.serde_pool, }; - let addresses = self.nodes.build(self.params, sender.clone()); + let addresses = self.nodes.build(params, sender.clone()); Ok(AsyncClient { sender: sender, diff --git a/src/elastic/src/client/sender/mod.rs b/src/elastic/src/client/sender/mod.rs index 87b1949898..eb77fb759d 100644 --- a/src/elastic/src/client/sender/mod.rs +++ b/src/elastic/src/client/sender/mod.rs @@ -13,7 +13,7 @@ Some notable types include: [Client]: ../struct.Client.html */ -use fluent_builder::FluentBuilder; +use fluent_builder::{FluentBuilder, StatefulFluentBuilder}; pub mod static_nodes; pub mod sniffed_nodes; @@ -170,7 +170,34 @@ enum NodeAddressesInner { enum NodeAddressesBuilder { Static(Vec), - Sniffed(SniffedNodesBuilder), + Sniffed(StatefulFluentBuilder), +} + +impl NodeAddressesBuilder { + fn sniff_nodes(mut self, builder: SniffedNodesBuilder) -> Self { + match self { + NodeAddressesBuilder::Sniffed(fluent_builder) => { + NodeAddressesBuilder::Sniffed(fluent_builder.value(builder)) + }, + _ => { + NodeAddressesBuilder::Sniffed(StatefulFluentBuilder::from_value(builder.into())) + } + } + } + + fn sniff_nodes_fluent(self, address: NodeAddress, fleunt_method: F) -> Self + where + F: FnOnce(SniffedNodesBuilder) -> SniffedNodesBuilder + 'static, + { + match self { + NodeAddressesBuilder::Sniffed(fluent_builder) => { + NodeAddressesBuilder::Sniffed(fluent_builder.fluent(address.into(), fleunt_method).boxed()) + }, + _ => { + NodeAddressesBuilder::Sniffed(StatefulFluentBuilder::from_fluent(address.into(), fleunt_method).boxed()) + } + } + } } impl Default for NodeAddressesBuilder { @@ -188,7 +215,7 @@ impl NodeAddressesBuilder { NodeAddresses::static_nodes(nodes) } NodeAddressesBuilder::Sniffed(builder) => { - let nodes = builder.build(params, sender); + let nodes = builder.into_value(|node| SniffedNodesBuilder::new(node)).build(params, sender); NodeAddresses::sniffed_nodes(nodes) } diff --git a/src/elastic/src/client/sender/sniffed_nodes/mod.rs b/src/elastic/src/client/sender/sniffed_nodes/mod.rs index 55491a5d01..c423acab1c 100644 --- a/src/elastic/src/client/sender/sniffed_nodes/mod.rs +++ b/src/elastic/src/client/sender/sniffed_nodes/mod.rs @@ -133,6 +133,17 @@ impl SniffedNodesBuilder { } } + /** + Specify a given base address. + */ + pub fn base_url(mut self, address: I) -> Self + where + I: Into, + { + self.base_url = address.into(); + self + } + /** Specify a minimum duration to wait before refreshing the set of node addresses. */ diff --git a/src/elastic/src/client/sender/sync.rs b/src/elastic/src/client/sender/sync.rs index ef45a0cc26..7ecb1888b1 100644 --- a/src/elastic/src/client/sender/sync.rs +++ b/src/elastic/src/client/sender/sync.rs @@ -1,5 +1,6 @@ use std::sync::Arc; use reqwest::{Client as SyncHttpClient, ClientBuilder as SyncHttpClientBuilder, RequestBuilder as SyncHttpRequestBuilder}; +use fluent_builder::FluentBuilder; use error::{self, Result}; use private; @@ -160,7 +161,7 @@ where pub struct SyncClientBuilder { http: Option, nodes: NodeAddressesBuilder, - params: PreRequestParams, + params: FluentBuilder, } impl Default for SyncClientBuilder { @@ -183,7 +184,7 @@ impl SyncClientBuilder { SyncClientBuilder { http: None, nodes: NodeAddressesBuilder::default(), - params: PreRequestParams::default(), + params: FluentBuilder::new(), } } @@ -194,7 +195,7 @@ impl SyncClientBuilder { SyncClientBuilder { http: None, nodes: NodeAddressesBuilder::default(), - params: params, + params: FluentBuilder::new().value(params), } } @@ -239,7 +240,7 @@ impl SyncClientBuilder { where I: Into, { - self.nodes = NodeAddressesBuilder::Sniffed(builder.into()); + self.nodes = self.nodes.sniff_nodes(builder.into()); self } @@ -262,10 +263,18 @@ impl SyncClientBuilder { pub fn sniff_nodes_fluent(mut self, address: I, builder: F) -> Self where I: Into, - F: Fn(SniffedNodesBuilder) -> SniffedNodesBuilder, + F: Fn(SniffedNodesBuilder) -> SniffedNodesBuilder + 'static, { - let address = address.into(); - self.nodes = NodeAddressesBuilder::Sniffed(builder(address.into())); + self.nodes = self.nodes.sniff_nodes_fluent(address.into(), builder); + + self + } + + pub fn params(mut self, params: I) -> Self + where + I: Into, + { + self.params = self.params.value(params.into()); self } @@ -298,11 +307,11 @@ impl SyncClientBuilder { ``` [SyncClientBuilder.base_url]: #method.base_url */ - pub fn params(mut self, builder: F) -> Self + pub fn params_fluent(mut self, builder: F) -> Self where - F: Fn(PreRequestParams) -> PreRequestParams, + F: Fn(PreRequestParams) -> PreRequestParams + 'static, { - self.params = builder(self.params); + self.params = self.params.fluent(builder).boxed(); self } @@ -325,9 +334,10 @@ impl SyncClientBuilder { .unwrap_or_else(|| SyncHttpClientBuilder::new().build()) .map_err(error::build)?; + let params = self.params.into_value(|| PreRequestParams::default()); let sender = SyncSender { http: http }; - let addresses = self.nodes.build(self.params, sender.clone()); + let addresses = self.nodes.build(params, sender.clone()); Ok(SyncClient { sender: sender, From 49d57a1aaf1f2df60c93dcb1a85950de95b3511a Mon Sep 17 00:00:00 2001 From: Ashley Mannix Date: Mon, 27 Nov 2017 07:22:35 +1000 Subject: [PATCH 4/5] update fluent_builder --- src/elastic/src/client/requests/raw.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/elastic/src/client/requests/raw.rs b/src/elastic/src/client/requests/raw.rs index e2b7d54ffb..92b2180c52 100644 --- a/src/elastic/src/client/requests/raw.rs +++ b/src/elastic/src/client/requests/raw.rs @@ -3,6 +3,7 @@ Builders for raw requests. */ use std::marker::PhantomData; +use fluent_builder::TryIntoValue; use client::Client; use client::sender::{NextParams, NodeAddresses, SendableRequest, SendableRequestParams, Sender}; @@ -165,10 +166,10 @@ where // Only try fetch a next address if an explicit `RequestParams` hasn't been given let params = match self.params_builder.try_into_value() { - Ok(value) => { + TryIntoValue::Value(value) => { SendableRequestParams::Value(value) }, - Err(builder) => { + TryIntoValue::Builder(builder) => { SendableRequestParams::Builder { params: client.addresses.next(), builder, From 55d5ea4e4848929dd68a85d3be04bea19071630f Mon Sep 17 00:00:00 2001 From: Ashley Mannix Date: Thu, 30 Nov 2017 16:33:02 +1000 Subject: [PATCH 5/5] fix up tests for new builder methods --- benches/elastic/src/main.rs | 2 +- benches/elastic_async/src/main.rs | 2 +- benches/elastic_bulk/src/main.rs | 2 +- benches/elastic_raw/src/main.rs | 2 +- src/elastic/Cargo.toml | 2 +- src/elastic/examples/custom_response.rs | 2 +- src/elastic/examples/load_balanced_async.rs | 3 - src/elastic/examples/raw.rs | 2 +- src/elastic/examples/typed.rs | 2 +- src/elastic/examples/typed_async.rs | 2 +- src/elastic/examples/update.rs | 2 +- src/elastic/src/client/mod.rs | 2 +- .../src/client/requests/document_update.rs | 1 - src/elastic/src/client/requests/mod.rs | 28 +++++++- src/elastic/src/client/sender/async.rs | 66 +++++++++++------- src/elastic/src/client/sender/mod.rs | 2 +- .../src/client/sender/sniffed_nodes/mod.rs | 1 - src/elastic/src/client/sender/sync.rs | 68 ++++++++++++------- src/elastic/src/lib.rs | 5 +- 19 files changed, 122 insertions(+), 74 deletions(-) diff --git a/benches/elastic/src/main.rs b/benches/elastic/src/main.rs index 8a910d2db9..a54b722f94 100644 --- a/benches/elastic/src/main.rs +++ b/benches/elastic/src/main.rs @@ -45,7 +45,7 @@ fn main() { let runs = measure::parse_runs_from_env(); let client = SyncClientBuilder::new() - .params(|p| p.header(http::header::Connection::keep_alive())) + .params_fluent(|p| p.header(http::header::Connection::keep_alive())) .build() .unwrap(); diff --git a/benches/elastic_async/src/main.rs b/benches/elastic_async/src/main.rs index 2b168053a1..453141a3b0 100644 --- a/benches/elastic_async/src/main.rs +++ b/benches/elastic_async/src/main.rs @@ -47,7 +47,7 @@ fn main() { let runs = measure::parse_runs_from_env(); let client = AsyncClientBuilder::new() - .params(|p| p.header(http::header::Connection::keep_alive())) + .params_fluent(|p| p.header(http::header::Connection::keep_alive())) .build(&core.handle()) .unwrap(); diff --git a/benches/elastic_bulk/src/main.rs b/benches/elastic_bulk/src/main.rs index 2e8dfe639e..4796e7259d 100644 --- a/benches/elastic_bulk/src/main.rs +++ b/benches/elastic_bulk/src/main.rs @@ -85,7 +85,7 @@ fn main() { let client = SyncClientBuilder::new() .http_client(http_client()) - .params(|p| p.header(http::header::Connection::keep_alive())) + .params_fluent(|p| p.header(http::header::Connection::keep_alive())) .build() .unwrap(); diff --git a/benches/elastic_raw/src/main.rs b/benches/elastic_raw/src/main.rs index a66928346a..8557e12081 100644 --- a/benches/elastic_raw/src/main.rs +++ b/benches/elastic_raw/src/main.rs @@ -25,7 +25,7 @@ fn main() { let runs = measure::parse_runs_from_env(); let client = SyncClientBuilder::new() - .params(|p| p.header(http::header::Connection::keep_alive())) + .params_fluent(|p| p.header(http::header::Connection::keep_alive())) .build() .unwrap(); diff --git a/src/elastic/Cargo.toml b/src/elastic/Cargo.toml index 989a5d1dd8..dcbbb51068 100644 --- a/src/elastic/Cargo.toml +++ b/src/elastic/Cargo.toml @@ -27,7 +27,7 @@ reqwest = { version = "~0.8.0", features = ["unstable"] } futures = "~0.1.16" tokio-core = "~0.1.9" futures-cpupool = "~0.1.6" -fluent_builder = { version = "*", git = "https://github.com/KodrAus/fluent_builder.git", branch = "feat/fnonce-builders" } +fluent_builder = "~0.5" elastic_requests = { version = "~0.20.2", path = "../requests" } elastic_responses = { version = "~0.20.2", path = "../responses" } diff --git a/src/elastic/examples/custom_response.rs b/src/elastic/examples/custom_response.rs index b1261eb181..3c7a4069ce 100644 --- a/src/elastic/examples/custom_response.rs +++ b/src/elastic/examples/custom_response.rs @@ -59,7 +59,7 @@ fn run() -> Result<(), Box> { // Send the request and process the response. let res = client .request(SearchRequest::new(query.to_string())) - .params(|q| q.url_param("filter_path", "hits.hits._source")) + .params_fluent(|q| q.url_param("filter_path", "hits.hits._source")) .send()? .into_response::()?; diff --git a/src/elastic/examples/load_balanced_async.rs b/src/elastic/examples/load_balanced_async.rs index 0c040c7ece..b37435ae5e 100644 --- a/src/elastic/examples/load_balanced_async.rs +++ b/src/elastic/examples/load_balanced_async.rs @@ -7,14 +7,11 @@ extern crate elastic; extern crate env_logger; extern crate futures; -#[macro_use] -extern crate serde_json; extern crate tokio_core; use std::error::Error; use futures::Future; use tokio_core::reactor::Core; -use serde_json::Value; use elastic::prelude::*; fn run() -> Result<(), Box> { diff --git a/src/elastic/examples/raw.rs b/src/elastic/examples/raw.rs index da4435c7af..22d41249d3 100644 --- a/src/elastic/examples/raw.rs +++ b/src/elastic/examples/raw.rs @@ -16,7 +16,7 @@ fn run() -> Result<(), Box> { // A reqwest HTTP client and default parameters. // The `params` includes the base node url (http://localhost:9200). let client = SyncClientBuilder::new() - .params(|p| p.url_param("pretty", true)) + .params_fluent(|p| p.url_param("pretty", true)) .build()?; // A search request from the body. diff --git a/src/elastic/examples/typed.rs b/src/elastic/examples/typed.rs index a4fbb1da67..0eb62cbd2d 100644 --- a/src/elastic/examples/typed.rs +++ b/src/elastic/examples/typed.rs @@ -100,7 +100,7 @@ fn put_index(client: &SyncClient) -> Result<(), Error> { fn put_doc(client: &SyncClient, doc: MyType) -> Result<(), Error> { client .document_index(sample_index(), id(doc.id), doc) - .params(|p| p.url_param("refresh", true)) + .params_fluent(|p| p.url_param("refresh", true)) .send()?; Ok(()) diff --git a/src/elastic/examples/typed_async.rs b/src/elastic/examples/typed_async.rs index 9e0814f795..e2de69f62a 100644 --- a/src/elastic/examples/typed_async.rs +++ b/src/elastic/examples/typed_async.rs @@ -119,7 +119,7 @@ fn put_index(client: AsyncClient) -> Box> { fn put_doc(client: AsyncClient, doc: MyType) -> Box> { let index_doc = client .document_index(sample_index(), id(doc.id), doc) - .params(|p| p.url_param("refresh", true)) + .params_fluent(|p| p.url_param("refresh", true)) .send() .map(|_| ()); diff --git a/src/elastic/examples/update.rs b/src/elastic/examples/update.rs index 3d219a91c3..a7e689c164 100644 --- a/src/elastic/examples/update.rs +++ b/src/elastic/examples/update.rs @@ -50,7 +50,7 @@ fn run() -> Result<(), Box> { // Index the document client .document_index(sample_index(), id(doc_id), doc) - .params(|p| p.url_param("refresh", true)) + .params_fluent(|p| p.url_param("refresh", true)) .send()?; // Update the document using a script diff --git a/src/elastic/src/client/mod.rs b/src/elastic/src/client/mod.rs index 2d12970d79..26f928511e 100644 --- a/src/elastic/src/client/mod.rs +++ b/src/elastic/src/client/mod.rs @@ -272,7 +272,7 @@ If the request was sent asynchronously, the response is returned as a `Future`. let request_builder = client.request(req); // Set additional url parameters -let request_builder = request_builder.params(|p| p +let request_builder = request_builder.params_fluent(|p| p .url_param("pretty", true) .url_param("refresh", true) ); diff --git a/src/elastic/src/client/requests/document_update.rs b/src/elastic/src/client/requests/document_update.rs index 439c4c7faf..c053e9ce2c 100644 --- a/src/elastic/src/client/requests/document_update.rs +++ b/src/elastic/src/client/requests/document_update.rs @@ -9,7 +9,6 @@ use futures::{Future, IntoFuture, Poll}; use futures_cpupool::CpuPool; use serde_json::{self, Map, Value}; use serde::ser::{Serialize, Serializer}; -use fluent_builder::{FluentBuilder, Override}; use error::{self, Error}; use client::Client; diff --git a/src/elastic/src/client/requests/mod.rs b/src/elastic/src/client/requests/mod.rs index 28365e46e2..d3127a2b60 100644 --- a/src/elastic/src/client/requests/mod.rs +++ b/src/elastic/src/client/requests/mod.rs @@ -4,7 +4,6 @@ Request types for the Elasticsearch REST API. This module contains implementation details that are useful if you want to customise the request process, but aren't generally important for sending requests. */ -use std::sync::Arc; use futures_cpupool::CpuPool; use fluent_builder::FluentBuilder; @@ -120,7 +119,7 @@ where # let client = SyncClientBuilder::new().build()?; # fn get_req() -> PingRequest<'static> { PingRequest::new() } let builder = client.request(get_req()) - .params(|p| p.url_param("refresh", true)); + .params_fluent(|p| p.url_param("refresh", true)); # Ok(()) # } ``` @@ -135,7 +134,7 @@ where # let client = SyncClientBuilder::new().build()?; # fn get_req() -> PingRequest<'static> { PingRequest::new() } let builder = client.request(get_req()) - .params(|p| p.base_url("http://different-host:9200")); + .params_fluent(|p| p.base_url("http://different-host:9200")); # Ok(()) # } ``` @@ -149,6 +148,29 @@ where self } + /** + Specify default request parameters. + + This method differs from `params_fluent` by not taking any default parameters into account. + The `RequestParams` passed in are exactly the `RequestParams` used to build the request. + + # Examples + + Add a url param to force an index refresh and send the request to `http://different-host:9200`: + + ```no_run + # extern crate elastic; + # use elastic::prelude::*; + # fn main() { run().unwrap() } + # fn run() -> Result<(), Box<::std::error::Error>> { + # let client = SyncClientBuilder::new().build()?; + # fn get_req() -> PingRequest<'static> { PingRequest::new() } + let builder = client.request(get_req()) + .params(RequestParams::new("http://different-hos:9200").url_param("refresh", true)); + # Ok(()) + # } + ``` + */ pub fn params(mut self, params: I) -> Self where I: Into diff --git a/src/elastic/src/client/sender/async.rs b/src/elastic/src/client/sender/async.rs index 28c82c3296..2443205444 100644 --- a/src/elastic/src/client/sender/async.rs +++ b/src/elastic/src/client/sender/async.rs @@ -1,4 +1,3 @@ -use std::sync::Arc; use std::error::Error as StdError; use futures::{Future, IntoFuture, Poll}; use futures::future::Either; @@ -20,6 +19,7 @@ use client::Client; An asynchronous Elasticsearch client. Use an [`AsyncClientBuilder`][AsyncClientBuilder] to configure and build an `AsyncClient`. +For more details about the methods available to an `AsyncClient`, see the base [`Client`][Client] type. # Examples @@ -44,6 +44,7 @@ core.run(response_future)?; # } ``` +[Client]: struct.Client.html [AsyncClientBuilder]: struct.AsyncClientBuilder.html */ pub type AsyncClient = Client; @@ -357,18 +358,43 @@ impl AsyncClientBuilder { self } - pub fn params(mut self, params: I) -> Self + /** + Specify default request parameters. + + # Examples + + Require all responses use pretty-printing: + + ``` + # use elastic::prelude::*; + let builder = AsyncClientBuilder::new() + .params_fluent(|p| p + .url_param("pretty", true)); + ``` + + Add an authorization header: + + ``` + # use elastic::prelude::*; + use elastic::http::header::Authorization; + + let builder = AsyncClientBuilder::new() + .params_fluent(|p| p + .header(Authorization("let me in".to_owned()))); + ``` + */ + pub fn params_fluent(mut self, builder: F) -> Self where - I: Into, + F: Fn(PreRequestParams) -> PreRequestParams + 'static, { - self.params = self.params.value(params.into()); + self.params = self.params.fluent(builder).boxed(); self } /** Specify default request parameters. - + # Examples Require all responses use pretty-printing: @@ -376,9 +402,8 @@ impl AsyncClientBuilder { ``` # use elastic::prelude::*; let builder = AsyncClientBuilder::new() - .params(|p| { - p.url_param("pretty", true) - }); + .params(PreRequestParams::new() + .url_param("pretty", true)); ``` Add an authorization header: @@ -388,17 +413,15 @@ impl AsyncClientBuilder { use elastic::http::header::Authorization; let builder = AsyncClientBuilder::new() - .params(|p| { - p.header(Authorization("let me in".to_owned())) - }); + .params(PreRequestParams::new() + .header(Authorization("let me in".to_owned()))); ``` - [AsyncClientBuilder.base_url]: #method.base_url */ - pub fn params_fluent(mut self, builder: F) -> Self + pub fn params(mut self, params: I) -> Self where - F: Fn(PreRequestParams) -> PreRequestParams + 'static, + I: Into, { - self.params = self.params.fluent(builder).boxed(); + self.params = self.params.value(params.into()); self } @@ -512,11 +535,7 @@ mod tests { use client::requests::*; fn params() -> RequestParams { - RequestParams::new("eshost:9200/path").url_param("pretty", false) - } - - fn builder() -> Option RequestParams>> { - Some(Arc::new(|params| params.url_param("pretty", true))) + RequestParams::new("eshost:9200/path").url_param("pretty", true) } fn expected_req(cli: &Client, method: Method, url: &str, body: Option>) -> RequestBuilder { @@ -543,7 +562,7 @@ mod tests { #[test] fn head_req() { let cli = Client::new(&core().handle()); - let req = build_req(&cli, params(), builder(), PingHeadRequest::new()); + let req = build_req(&cli, params(), PingHeadRequest::new()); let url = "eshost:9200/path/?pretty=true"; @@ -555,7 +574,7 @@ mod tests { #[test] fn get_req() { let cli = Client::new(&core().handle()); - let req = build_req(&cli, params(), builder(), SimpleSearchRequest::new()); + let req = build_req(&cli, params(), SimpleSearchRequest::new()); let url = "eshost:9200/path/_search?pretty=true"; @@ -570,7 +589,6 @@ mod tests { let req = build_req( &cli, params(), - builder(), PercolateRequest::for_index_ty("idx", "ty", vec![]), ); @@ -587,7 +605,6 @@ mod tests { let req = build_req( &cli, params(), - builder(), IndicesCreateRequest::for_index("idx", vec![]), ); @@ -604,7 +621,6 @@ mod tests { let req = build_req( &cli, params(), - builder(), IndicesDeleteRequest::for_index("idx"), ); diff --git a/src/elastic/src/client/sender/mod.rs b/src/elastic/src/client/sender/mod.rs index eb77fb759d..ee84fcaca8 100644 --- a/src/elastic/src/client/sender/mod.rs +++ b/src/elastic/src/client/sender/mod.rs @@ -174,7 +174,7 @@ enum NodeAddressesBuilder { } impl NodeAddressesBuilder { - fn sniff_nodes(mut self, builder: SniffedNodesBuilder) -> Self { + fn sniff_nodes(self, builder: SniffedNodesBuilder) -> Self { match self { NodeAddressesBuilder::Sniffed(fluent_builder) => { NodeAddressesBuilder::Sniffed(fluent_builder.value(builder)) diff --git a/src/elastic/src/client/sender/sniffed_nodes/mod.rs b/src/elastic/src/client/sender/sniffed_nodes/mod.rs index c423acab1c..62a2988999 100644 --- a/src/elastic/src/client/sender/sniffed_nodes/mod.rs +++ b/src/elastic/src/client/sender/sniffed_nodes/mod.rs @@ -30,7 +30,6 @@ use std::time::{Duration, Instant}; use std::sync::{Arc, RwLock}; use url::Url; use futures::{Future, IntoFuture}; -use fluent_builder::FluentBuilder; use client::sender::static_nodes::StaticNodes; use client::sender::{AsyncSender, NextParams, NodeAddress, PreRequestParams, RequestParams, SendableRequest, SendableRequestParams, Sender, SyncSender}; diff --git a/src/elastic/src/client/sender/sync.rs b/src/elastic/src/client/sender/sync.rs index 7ecb1888b1..64a42a6f80 100644 --- a/src/elastic/src/client/sender/sync.rs +++ b/src/elastic/src/client/sender/sync.rs @@ -1,4 +1,3 @@ -use std::sync::Arc; use reqwest::{Client as SyncHttpClient, ClientBuilder as SyncHttpClientBuilder, RequestBuilder as SyncHttpRequestBuilder}; use fluent_builder::FluentBuilder; @@ -14,6 +13,7 @@ use client::Client; A synchronous Elasticsearch client. Use a [`SyncClientBuilder`][SyncClientBuilder] to configure and build a `SyncClient`. +For more details about the methods available to a `SyncClient`, see the base [`Client`][Client] type. # Examples @@ -31,6 +31,7 @@ let response = client.ping().send()?; # } ``` +[Client]: struct.Client.html [SyncClientBuilder]: struct.SyncClientBuilder.html */ pub type SyncClient = Client; @@ -270,18 +271,43 @@ impl SyncClientBuilder { self } - pub fn params(mut self, params: I) -> Self + /** + Specify default request parameters. + + # Examples + + Require all responses use pretty-printing: + + ``` + # use elastic::prelude::*; + let builder = SyncClientBuilder::new() + .params_fluent(|p| p + .url_param("pretty", true)); + ``` + + Add an authorization header: + + ``` + # use elastic::prelude::*; + use elastic::http::header::Authorization; + + let builder = SyncClientBuilder::new() + .params_fluent(|p| p + .header(Authorization("let me in".to_owned()))); + ``` + */ + pub fn params_fluent(mut self, builder: F) -> Self where - I: Into, + F: Fn(PreRequestParams) -> PreRequestParams + 'static, { - self.params = self.params.value(params.into()); + self.params = self.params.fluent(builder).boxed(); self } /** Specify default request parameters. - + # Examples Require all responses use pretty-printing: @@ -289,9 +315,8 @@ impl SyncClientBuilder { ``` # use elastic::prelude::*; let builder = SyncClientBuilder::new() - .params(|p| { - p.url_param("pretty", true) - }); + .params(PreRequestParams::default() + .url_param("pretty", true)); ``` Add an authorization header: @@ -301,17 +326,15 @@ impl SyncClientBuilder { use elastic::http::header::Authorization; let builder = SyncClientBuilder::new() - .params(|p| { - p.header(Authorization("let me in".to_owned())) - }); + .params(PreRequestParams::default() + .header(Authorization("let me in".to_owned()))); ``` - [SyncClientBuilder.base_url]: #method.base_url */ - pub fn params_fluent(mut self, builder: F) -> Self + pub fn params(mut self, params: I) -> Self where - F: Fn(PreRequestParams) -> PreRequestParams + 'static, + I: Into, { - self.params = self.params.fluent(builder).boxed(); + self.params = self.params.value(params.into()); self } @@ -326,7 +349,7 @@ impl SyncClientBuilder { /** Construct a [`SyncClient`][SyncClient] from this builder. - [Client]: struct.Client.html + [SyncClient]: type.SyncClient.html */ pub fn build(self) -> Result { let http = self.http @@ -356,11 +379,7 @@ mod tests { use client::requests::*; fn params() -> RequestParams { - RequestParams::new("eshost:9200/path").url_param("pretty", false) - } - - fn builder() -> Option RequestParams>> { - Some(Arc::new(|params| params.url_param("pretty", true))) + RequestParams::new("eshost:9200/path").url_param("pretty", true) } fn expected_req(cli: &Client, method: Method, url: &str, body: Option>) -> RequestBuilder { @@ -383,7 +402,7 @@ mod tests { #[test] fn head_req() { let cli = Client::new(); - let req = build_req(&cli, params(), builder(), PingHeadRequest::new()); + let req = build_req(&cli, params(), PingHeadRequest::new()); let url = "eshost:9200/path/?pretty=true"; @@ -395,7 +414,7 @@ mod tests { #[test] fn get_req() { let cli = Client::new(); - let req = build_req(&cli, params(), builder(), SimpleSearchRequest::new()); + let req = build_req(&cli, params(), SimpleSearchRequest::new()); let url = "eshost:9200/path/_search?pretty=true"; @@ -410,7 +429,6 @@ mod tests { let req = build_req( &cli, params(), - builder(), PercolateRequest::for_index_ty("idx", "ty", vec![]), ); @@ -427,7 +445,6 @@ mod tests { let req = build_req( &cli, params(), - builder(), IndicesCreateRequest::for_index("idx", vec![]), ); @@ -444,7 +461,6 @@ mod tests { let req = build_req( &cli, params(), - builder(), IndicesDeleteRequest::for_index("idx"), ); diff --git a/src/elastic/src/lib.rs b/src/elastic/src/lib.rs index deeb02bc37..3c7e2be689 100644 --- a/src/elastic/src/lib.rs +++ b/src/elastic/src/lib.rs @@ -63,7 +63,7 @@ use elastic::http::header::Authorization; let builder = SyncClientBuilder::new() .static_node("http://es_host:9200") - .params(|p| p + .params_fluent(|p| p .url_param("pretty", true) .header(Authorization("let me in".to_owned()))); @@ -84,7 +84,7 @@ Individual requests can override these parameter values: let client = SyncClientBuilder::new().build()?; let response = client.search::() - .params(|p| p.url_param("pretty", false)) + .params_fluent(|p| p.url_param("pretty", false)) .send()?; # Ok(()) # } @@ -239,7 +239,6 @@ for hit in response.hits() { This crate is mostly a meta-package composed of a number of smaller pieces including: -- `elastic_reqwest` HTTP transport - `elastic_requests` API request builders - `elastic_responses` API response parsers - `elastic_types` tools for document and mapping APIs