Skip to content

Commit

Permalink
Add source method to UpdateRequestBuilder
Browse files Browse the repository at this point in the history
This is a more ergonomic way to include the updated doc in the response.

Also:
- Add integration test
- Add example
- Update docs

Close #381
  • Loading branch information
mwilliammyers committed Oct 11, 2019
1 parent b60808a commit 06a3e97
Show file tree
Hide file tree
Showing 5 changed files with 304 additions and 37 deletions.
79 changes: 79 additions & 0 deletions src/elastic/examples/update_with_source.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
//! Update a document and return the updated document in a single request.
//!
//! NOTE: This sample expects you have a node running on `localhost:9200`.
//!
//! This sample demonstrates how to index & update a document and return
//! the newly updated document.
//! Also see the `typed` sample for a more complete implementation.

use std::error::Error;

use elastic::prelude::*;
use elastic_derive::ElasticType;
use env_logger;
use serde::{
Deserialize,
Serialize,
};

#[derive(Debug, Serialize, Deserialize, ElasticType)]
#[elastic(index = "update_with_source_sample_index")]
struct NewsArticle {
#[elastic(id)]
id: String,
title: String,
content: String,
likes: i64,
}

#[derive(Debug, Serialize, Deserialize, ElasticType)]
#[elastic(index = "update_with_source_sample_index")]
struct UpdatedNewsArticle {
#[elastic(id)]
id: String,
title: String,
content: String,
likes: i64,
}

fn run() -> Result<(), Box<dyn Error>> {
// A HTTP client and request parameters
let client = SyncClient::builder().build()?;

// Create a document to index
let doc = NewsArticle {
id: "1".to_string(),
title: "A title".to_string(),
content: "Some content.".to_string(),
likes: 0,
};

// Index the document
client.document().index(doc).send()?;

// Update the document using a script
let update = client
.document::<NewsArticle>()
.update("1")
.script("ctx._source.likes++")
// Request that the updated document be returned with the response
.source::<UpdatedNewsArticle>()
.send()?;

assert!(update.updated());

// Deserialize the updated document to the type we requested earlier;
// this will return `None` if `source()` was not called on the request
let updated_doc = update.into_document().unwrap();

assert!(updated_doc.likes > 0);

println!("{:#?}", &updated_doc);

Ok(())
}

fn main() {
env_logger::init();
run().unwrap()
}
100 changes: 86 additions & 14 deletions src/elastic/src/client/requests/document_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use std::marker::PhantomData;
use crate::{
client::{
requests::{
Pending as BasePending,
raw::RawRequestInner,
Pending as BasePending,
RequestBuilder,
},
responses::UpdateResponse,
Expand Down Expand Up @@ -49,6 +49,8 @@ pub use crate::client::requests::common::{
ScriptBuilder,
};

pub(crate) type DefaultUpdatedSource = serde_json::Value;

/**
An [update document request][docs-update] builder that can be configured before sending.
Expand All @@ -60,15 +62,17 @@ The `send` method will either send the request [synchronously][send-sync] or [as
[send-async]: #send-asynchronously
[Client.document.update]: ../../struct.DocumentClient.html#update-document-request
*/
pub type UpdateRequestBuilder<TSender, TBody> = RequestBuilder<TSender, UpdateRequestInner<TBody>>;
pub type UpdateRequestBuilder<TSender, TBody, TSource = DefaultUpdatedSource> =
RequestBuilder<TSender, UpdateRequestInner<TBody, TSource>>;

#[doc(hidden)]
pub struct UpdateRequestInner<TBody> {
pub struct UpdateRequestInner<TBody, TSource> {
index: Index<'static>,
ty: Type<'static>,
id: Id<'static>,
body: TBody,
_marker: PhantomData<TBody>,
_body_marker: PhantomData<TBody>,
_update_source_marker: PhantomData<TSource>,
}

/**
Expand Down Expand Up @@ -217,7 +221,8 @@ where
ty: ty,
id: id.into(),
body: Doc::empty(),
_marker: PhantomData,
_body_marker: PhantomData,
_update_source_marker: PhantomData,
},
)
}
Expand Down Expand Up @@ -272,13 +277,14 @@ where
ty: DEFAULT_DOC_TYPE.into(),
id: id.into(),
body: Doc::empty(),
_marker: PhantomData,
_body_marker: PhantomData,
_update_source_marker: PhantomData,
},
)
}
}

impl<TBody> UpdateRequestInner<TBody>
impl<TBody, TSource> UpdateRequestInner<TBody, TSource>
where
TBody: Serialize,
{
Expand Down Expand Up @@ -382,7 +388,8 @@ where
index: self.inner.index,
ty: self.inner.ty,
id: self.inner.id,
_marker: PhantomData,
_body_marker: PhantomData,
_update_source_marker: PhantomData,
},
)
}
Expand Down Expand Up @@ -461,7 +468,8 @@ where
index: self.inner.index,
ty: self.inner.ty,
id: self.inner.id,
_marker: PhantomData,
_body_marker: PhantomData,
_update_source_marker: PhantomData,
},
)
}
Expand Down Expand Up @@ -552,10 +560,73 @@ where
}
}

impl<TSender, TBody, TSource> UpdateRequestBuilder<TSender, TBody, TSource>
where
TSender: Sender,
{
/**
Request that the [`UpdateResponse`] include the `source` of the updated document.
Although not a requirement, be careful that both the document and
updated document types use the same index, e.g. by using the
[`#[elastic(index)]` attribute][index-attr].
# Examples
Request that the `source` is returned with the response and deserialize
the updated document by calling [`into_document`] on the [`UpdateResponse`]:
```no_run
# #[macro_use] extern crate serde_derive;
# #[macro_use] extern crate elastic_derive;
# use elastic::prelude::*;
# fn main() { run().unwrap() }
# fn run() -> Result<(), Box<dyn ::std::error::Error>> {
# #[derive(Serialize, Deserialize, ElasticType)]
# struct NewsArticle { likes: i64 }
# #[derive(Serialize, Deserialize, ElasticType)]
# struct UpdatedNewsArticle { likes: i64 }
# let client = SyncClientBuilder::new().build()?;
let response = client.document::<NewsArticle>()
.update(1)
.script("ctx._source.likes++")
.source::<UpdatedNewsArticle>()
.send()?;
assert!(response.into_document().unwrap().likes >= 1);
# Ok(())
# }
```
[index-attr]: ../../../types/document/index.html#specifying-a-default-index-name
[`UpdateResponse`]: ../../responses/struct.UpdateResponse.html
[`into_document`]: ../../responses/struct.UpdateResponse.html#method.into_document
*/
pub fn source<T>(self) -> UpdateRequestBuilder<TSender, TBody, T>
where
T: DeserializeOwned,
{
RequestBuilder::new(
self.client,
self.params_builder
.fluent(|params| params.url_param("_source", true))
.shared(),
UpdateRequestInner {
body: self.inner.body,
index: self.inner.index,
ty: self.inner.ty,
id: self.inner.id,
_body_marker: PhantomData,
_update_source_marker: PhantomData,
},
)
}
}

/**
# Send synchronously
*/
impl<TBody> UpdateRequestBuilder<SyncSender, TBody>
impl<TBody, TSource> UpdateRequestBuilder<SyncSender, TBody, TSource>
where
TBody: Serialize,
{
Expand Down Expand Up @@ -595,7 +666,7 @@ where
[SyncClient]: ../../type.SyncClient.html
[documents-mod]: ../../types/document/index.html
*/
pub fn send(self) -> Result<UpdateResponse, Error> {
pub fn send(self) -> Result<UpdateResponse<TSource>, Error> {
let req = self.inner.into_request()?;

RequestBuilder::new(self.client, self.params_builder, RawRequestInner::new(req))
Expand All @@ -607,9 +678,10 @@ where
/**
# Send asynchronously
*/
impl<TBody> UpdateRequestBuilder<AsyncSender, TBody>
impl<TBody, TSource> UpdateRequestBuilder<AsyncSender, TBody, TSource>
where
TBody: Serialize + Send + 'static,
TSource: Send + 'static,
{
/**
Send an `UpdateRequestBuilder` asynchronously using an [`AsyncClient`][AsyncClient].
Expand Down Expand Up @@ -654,7 +726,7 @@ where
[AsyncClient]: ../../type.AsyncClient.html
[documents-mod]: ../../types/document/index.html
*/
pub fn send(self) -> Pending {
pub fn send(self) -> Pending<TSource> {
let (client, params_builder, inner) = (self.client, self.params_builder, self.inner);

let req_future = client.sender.maybe_async(move || inner.into_request());
Expand Down Expand Up @@ -686,7 +758,7 @@ mod tests {

#[test]
fn is_send() {
assert_send::<super::Pending>();
assert_send::<super::Pending<serde_json::Value>>();
}

#[derive(Serialize, ElasticType)]
Expand Down
74 changes: 52 additions & 22 deletions src/elastic/src/client/responses/document_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
Response types for a [update document request](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html).
*/

use std::marker::PhantomData;

use serde::de::DeserializeOwned;
use serde_json::Value;

use super::common::DocumentResult;

use crate::{
client::requests::document_update::DefaultUpdatedSource,
http::receiver::IsOkOnSuccess,
types::document::{
Id,
Expand All @@ -16,17 +19,9 @@ use crate::{
},
};

/** Response for an [update document request](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html). */
#[derive(Deserialize, Debug)]
struct UpdatedSource<T> {
#[serde(rename = "_source")]
source: Option<T>,
}

impl<T> IsOkOnSuccess for UpdatedSource<T> {}

/** Response for a [update document request](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html). */
#[derive(Deserialize, Debug)]
pub struct UpdateResponse {
pub struct UpdateResponse<TSource = DefaultUpdatedSource> {
#[serde(rename = "_index")]
index: String,
#[serde(rename = "_type")]
Expand All @@ -42,20 +37,55 @@ pub struct UpdateResponse {
#[serde(rename = "_routing")]
routing: Option<String>,
result: DocumentResult,
get: Option<UpdatedSource<Value>>,
get: Option<Value>,
#[serde(skip)]
_marker: PhantomData<TSource>,
}

impl UpdateResponse {
/** Convert the source in the response into the updated document. The request must have been made with the [`_source` parameter](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html#request-body-search-source-filtering). */
pub fn into_document<T>(self) -> Option<T>
where
T: DeserializeOwned,
{
self.get.map_or_else(
impl<TSource> UpdateResponse<TSource>
where
TSource: DeserializeOwned,
{
/**
Convert the source in the response into the updated document.
The [`source`] method must have been called first on the
[`UpdateRequestBuilder`], otherwise this will return `None`.
# Examples
```no_run
# #[macro_use] extern crate serde_derive;
# #[macro_use] extern crate elastic_derive;
# use elastic::prelude::*;
# fn main() { run().unwrap() }
# fn run() -> Result<(), Box<dyn ::std::error::Error>> {
# #[derive(Serialize, Deserialize, ElasticType)]
# struct NewsArticle { likes: i64 }
# #[derive(Serialize, Deserialize, ElasticType)]
# struct UpdatedNewsArticle { likes: i64 }
# let client = SyncClientBuilder::new().build()?;
let response = client.document::<NewsArticle>()
.update(1)
.script("ctx._source.likes++")
.source::<UpdatedNewsArticle>()
.send()?;
assert!(response.into_document().unwrap().likes >= 1);
# Ok(())
# }
```
[`source`]: ../requests/document_update/type.UpdateRequestBuilder.html#method.source
[`UpdateRequestBuilder`]: ../requests/document_update/type.UpdateRequestBuilder.html
*/
pub fn into_document(&self) -> Option<TSource> {
self.get.as_ref().map_or_else(
|| None,
|get| {
get.source
.map_or_else(|| None, |doc| serde_json::from_value::<T>(doc).ok())
|obj| {
obj.get("_source")
.cloned()
.map_or_else(|| None, |doc| serde_json::from_value::<TSource>(doc).ok())
},
)
}
Expand Down Expand Up @@ -107,4 +137,4 @@ impl UpdateResponse {
}
}

impl IsOkOnSuccess for UpdateResponse {}
impl<TSource> IsOkOnSuccess for UpdateResponse<TSource> {}
3 changes: 2 additions & 1 deletion tests/integration/src/tests/document/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ test_cases![
update_no_index,
update_with_doc,
update_with_inline_script,
update_with_script
update_with_script,
update_with_source
];

mod compile_test;
Loading

0 comments on commit 06a3e97

Please sign in to comment.