Skip to content

Commit

Permalink
Merge pull request #381 from mwilliammyers/feat/update-source
Browse files Browse the repository at this point in the history
Add support for _source on update requests
  • Loading branch information
mwilliammyers committed Oct 27, 2019
2 parents e01d8b5 + 54b7f9b commit f0f2952
Show file tree
Hide file tree
Showing 5 changed files with 279 additions and 2 deletions.
79 changes: 79 additions & 0 deletions src/elastic/examples/update_with_source.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
//! Update a document and return the updated document in a single request.
//!
//! NOTE: This sample expects you have a node running on `localhost:9200`.
//!
//! This sample demonstrates how to index & update a document and return
//! the newly updated document.
//! Also see the `typed` sample for a more complete implementation.

use std::error::Error;

use elastic::prelude::*;
use elastic_derive::ElasticType;
use env_logger;
use serde::{
Deserialize,
Serialize,
};

#[derive(Debug, Serialize, Deserialize, ElasticType)]
#[elastic(index = "update_with_source_sample_index")]
struct NewsArticle {
#[elastic(id)]
id: String,
title: String,
content: String,
likes: i64,
}

#[derive(Debug, Serialize, Deserialize, ElasticType)]
#[elastic(index = "update_with_source_sample_index")]
struct UpdatedNewsArticle {
#[elastic(id)]
id: String,
title: String,
content: String,
likes: i64,
}

fn run() -> Result<(), Box<dyn Error>> {
// A HTTP client and request parameters
let client = SyncClient::builder().build()?;

// Create a document to index
let doc = NewsArticle {
id: "1".to_string(),
title: "A title".to_string(),
content: "Some content.".to_string(),
likes: 0,
};

// Index the document
client.document().index(doc).send()?;

// Update the document using a script
let update = client
.document::<NewsArticle>()
.update("1")
.script("ctx._source.likes++")
// Request that the updated document be returned with the response
.source()
.send()?;

assert!(update.updated());

// Deserialize the updated document,
// will return `None` if `source()` was not called on the request
let updated_doc = update.into_document::<UpdatedNewsArticle>().unwrap();

assert!(updated_doc.likes > 0);

println!("{:#?}", &updated_doc);

Ok(())
}

fn main() {
env_logger::init();
run().unwrap()
}
61 changes: 61 additions & 0 deletions src/elastic/src/client/requests/document_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,67 @@ where
}
}

impl<TSender, TBody> UpdateRequestBuilder<TSender, TBody>
where
TSender: Sender,
{
/**
Request that the [`UpdateResponse`] include the `source` of the updated document.
Although not a requirement, be careful that both the document and
updated document types use the same index, e.g. by using the
[`#[elastic(index)]` attribute][index-attr].
# Examples
Request that the `source` is returned with the response and deserialize
the updated document by calling [`into_document`] on the [`UpdateResponse`]:
```no_run
# #[macro_use] extern crate serde_derive;
# #[macro_use] extern crate elastic_derive;
# use elastic::prelude::*;
# fn main() { run().unwrap() }
# fn run() -> Result<(), Box<dyn ::std::error::Error>> {
# #[derive(Serialize, Deserialize, ElasticType)]
# struct NewsArticle { id: i64, likes: i64 }
# #[derive(Serialize, Deserialize, ElasticType)]
# struct UpdatedNewsArticle { id: i64, likes: i64 }
# let client = SyncClientBuilder::new().build()?;
let response = client.document::<NewsArticle>()
.update(1)
.script("ctx._source.likes++")
.source()
.send()?;
assert!(response.into_document::<UpdatedNewsArticle>().unwrap().likes >= 1);
# Ok(())
# }
```
[index-attr]: ../../../types/document/index.html#specifying-a-default-index-name
[`UpdateResponse`]: ../../responses/struct.UpdateResponse.html
[`into_document`]: ../../responses/struct.UpdateResponse.html#method.into_document
*/
pub fn source(self) -> UpdateRequestBuilder<TSender, TBody> {
RequestBuilder::new(
self.client,
// TODO: allow passing in `source` parameter add `_source` to body
// instead because it supports more options
self.params_builder
.fluent(|params| params.url_param("_source", true))
.shared(),
UpdateRequestInner {
body: self.inner.body,
index: self.inner.index,
ty: self.inner.ty,
id: self.inner.id,
_marker: PhantomData,
},
)
}
}

/**
# Send synchronously
*/
Expand Down
53 changes: 52 additions & 1 deletion src/elastic/src/client/responses/document_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
Response types for a [update document request](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html).
*/

use serde::de::DeserializeOwned;
use serde_json::Value;

use super::common::DocumentResult;

use crate::{
Expand All @@ -13,7 +16,7 @@ use crate::{
},
};

/** Response for a [update document request](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html). */
/** Response for an [update document request](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html). */
#[derive(Deserialize, Debug)]
pub struct UpdateResponse {
#[serde(rename = "_index")]
Expand All @@ -31,9 +34,57 @@ pub struct UpdateResponse {
#[serde(rename = "_routing")]
routing: Option<String>,
result: DocumentResult,
get: Option<Value>,
}

impl UpdateResponse {
/**
Convert the source in the response into the updated document.
The [`source`] method must have been called first on the
[`UpdateRequestBuilder`], otherwise this will return `None`.
# Examples
```no_run
# #[macro_use] extern crate serde_derive;
# #[macro_use] extern crate elastic_derive;
# use elastic::prelude::*;
# fn main() { run().unwrap() }
# fn run() -> Result<(), Box<dyn ::std::error::Error>> {
# #[derive(Serialize, Deserialize, ElasticType)]
# struct NewsArticle { id: i64, likes: i64 }
# #[derive(Serialize, Deserialize, ElasticType)]
# struct UpdatedNewsArticle { id: i64, likes: i64 }
# let client = SyncClientBuilder::new().build()?;
let response = client.document::<NewsArticle>()
.update(1)
.script("ctx._source.likes++")
.source()
.send()?;
assert!(response.into_document::<UpdatedNewsArticle>().unwrap().likes >= 1);
# Ok(())
# }
```
[`source`]: ../requests/document_update/type.UpdateRequestBuilder.html#method.source
[`UpdateRequestBuilder`]: ../requests/document_update/type.UpdateRequestBuilder.html
*/
pub fn into_document<T>(&self) -> Option<T>
where
T: DeserializeOwned,
{
self.get.as_ref().map_or_else(
|| None,
|obj| {
obj.get("_source")
.cloned()
.map_or_else(|| None, |doc| serde_json::from_value::<T>(doc).ok())
},
)
}

/** Whether or not the document was updated. */
pub fn updated(&self) -> bool {
match self.result {
Expand Down
3 changes: 2 additions & 1 deletion tests/integration/src/tests/document/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ test_cases![
update_no_index,
update_with_doc,
update_with_inline_script,
update_with_script
update_with_script,
update_with_source
];

mod compile_test;
85 changes: 85 additions & 0 deletions tests/integration/src/tests/document/update_with_source.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use elastic::{
error::Error,
prelude::*,
};
use futures::Future;

#[derive(Debug, PartialEq, Serialize, Deserialize, ElasticType)]
#[elastic(index = "update_doc_source_idx")]
pub struct Doc {
#[elastic(id)]
id: String,
title: String,
}

#[derive(Debug, PartialEq, Serialize, Deserialize, ElasticType)]
#[elastic(index = "update_doc_source_idx")]
pub struct UpdatedDoc {
#[elastic(id)]
id: String,
title: String,
}

const EXPECTED_TITLE: &'static str = "Edited title";
const ID: &'static str = "1";

fn doc() -> Doc {
Doc {
id: ID.to_owned(),
title: "Not edited title".to_owned(),
}
}

test! {
const description: &'static str = "update and return source";

type Response = UpdateResponse;

// Ensure the index doesn't exist
fn prepare(&self, client: AsyncClient) -> Box<dyn Future<Item = (), Error = Error>> {
let delete_res = client
.index(Doc::static_index())
.delete()
.send()
.map(|_| ());

Box::new(delete_res)
}

// Execute an update request against that index using a new document & request
// that the updated document's `source` be returned with the response.
fn request(
&self,
client: AsyncClient,
) -> Box<dyn Future<Item = Self::Response, Error = Error>> {
let index_res = client
.document()
.index(doc())
.params_fluent(|p| p.url_param("refresh", true))
.send();

let update_res = client
.document::<Doc>()
.update(ID)
.doc(json!({
"title": EXPECTED_TITLE.to_owned(),
}))
.source()
.send();

Box::new(
index_res
.and_then(|_| update_res)
.map(|update| update)
)
}

// Ensure the response contains the expected document
fn assert_ok(&self, res: &Self::Response) -> bool {
let updated = res.updated();
let correct_version = res.version() == Some(2);
let correct_title = res.into_document::<UpdatedDoc>().unwrap().title == EXPECTED_TITLE;

updated && correct_version && correct_title
}
}

0 comments on commit f0f2952

Please sign in to comment.