Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for _source on update requests #381

Merged
merged 3 commits into from
Oct 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions src/elastic/examples/update_with_source.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
//! Update a document and return the updated document in a single request.
//!
//! NOTE: This sample expects you have a node running on `localhost:9200`.
//!
//! This sample demonstrates how to index & update a document and return
//! the newly updated document.
//! Also see the `typed` sample for a more complete implementation.

use std::error::Error;

use elastic::prelude::*;
use elastic_derive::ElasticType;
use env_logger;
use serde::{
Deserialize,
Serialize,
};

#[derive(Debug, Serialize, Deserialize, ElasticType)]
#[elastic(index = "update_with_source_sample_index")]
struct NewsArticle {
#[elastic(id)]
id: String,
title: String,
content: String,
likes: i64,
}

#[derive(Debug, Serialize, Deserialize, ElasticType)]
#[elastic(index = "update_with_source_sample_index")]
struct UpdatedNewsArticle {
#[elastic(id)]
id: String,
title: String,
content: String,
likes: i64,
}

fn run() -> Result<(), Box<dyn Error>> {
// A HTTP client and request parameters
let client = SyncClient::builder().build()?;

// Create a document to index
let doc = NewsArticle {
id: "1".to_string(),
title: "A title".to_string(),
content: "Some content.".to_string(),
likes: 0,
};

// Index the document
client.document().index(doc).send()?;

// Update the document using a script
let update = client
.document::<NewsArticle>()
.update("1")
.script("ctx._source.likes++")
// Request that the updated document be returned with the response
.source()
.send()?;

assert!(update.updated());

// Deserialize the updated document,
// will return `None` if `source()` was not called on the request
let updated_doc = update.into_document::<UpdatedNewsArticle>().unwrap();

assert!(updated_doc.likes > 0);

println!("{:#?}", &updated_doc);

Ok(())
}

fn main() {
env_logger::init();
run().unwrap()
}
63 changes: 62 additions & 1 deletion src/elastic/src/client/requests/document_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use std::marker::PhantomData;
use crate::{
client::{
requests::{
Pending as BasePending,
raw::RawRequestInner,
Pending as BasePending,
RequestBuilder,
},
responses::UpdateResponse,
Expand Down Expand Up @@ -552,6 +552,67 @@ where
}
}

impl<TSender, TBody> UpdateRequestBuilder<TSender, TBody>
where
TSender: Sender,
{
/**
Request that the [`UpdateResponse`] include the `source` of the updated document.

Although not a requirement, be careful that both the document and
updated document types use the same index, e.g. by using the
[`#[elastic(index)]` attribute][index-attr].

# Examples

Request that the `source` is returned with the response and deserialize
the updated document by calling [`into_document`] on the [`UpdateResponse`]:

```no_run
# #[macro_use] extern crate serde_derive;
# #[macro_use] extern crate elastic_derive;
# use elastic::prelude::*;
# fn main() { run().unwrap() }
# fn run() -> Result<(), Box<dyn ::std::error::Error>> {
# #[derive(Serialize, Deserialize, ElasticType)]
# struct NewsArticle { id: i64, likes: i64 }
# #[derive(Serialize, Deserialize, ElasticType)]
# struct UpdatedNewsArticle { id: i64, likes: i64 }
# let client = SyncClientBuilder::new().build()?;
let response = client.document::<NewsArticle>()
.update(1)
.script("ctx._source.likes++")
.source()
.send()?;

assert!(response.into_document::<UpdatedNewsArticle>().unwrap().likes >= 1);
# Ok(())
# }
```

[index-attr]: ../../../types/document/index.html#specifying-a-default-index-name
[`UpdateResponse`]: ../../responses/struct.UpdateResponse.html
[`into_document`]: ../../responses/struct.UpdateResponse.html#method.into_document
*/
pub fn source(self) -> UpdateRequestBuilder<TSender, TBody> {
RequestBuilder::new(
self.client,
// TODO: allow passing in `source` parameter add `_source` to body
// instead because it supports more options
self.params_builder
.fluent(|params| params.url_param("_source", true))
.shared(),
UpdateRequestInner {
body: self.inner.body,
index: self.inner.index,
ty: self.inner.ty,
id: self.inner.id,
_marker: PhantomData,
},
)
}
}

/**
# Send synchronously
*/
Expand Down
53 changes: 52 additions & 1 deletion src/elastic/src/client/responses/document_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
Response types for a [update document request](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html).
*/

use serde::de::DeserializeOwned;
use serde_json::Value;

use super::common::DocumentResult;

use crate::{
Expand All @@ -13,7 +16,7 @@ use crate::{
},
};

/** Response for a [update document request](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html). */
/** Response for an [update document request](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html). */
#[derive(Deserialize, Debug)]
pub struct UpdateResponse {
#[serde(rename = "_index")]
Expand All @@ -31,9 +34,57 @@ pub struct UpdateResponse {
#[serde(rename = "_routing")]
routing: Option<String>,
result: DocumentResult,
get: Option<Value>,
}

impl UpdateResponse {
/**
Convert the source in the response into the updated document.

The [`source`] method must have been called first on the
[`UpdateRequestBuilder`], otherwise this will return `None`.

# Examples

```no_run
# #[macro_use] extern crate serde_derive;
# #[macro_use] extern crate elastic_derive;
# use elastic::prelude::*;
# fn main() { run().unwrap() }
# fn run() -> Result<(), Box<dyn ::std::error::Error>> {
# #[derive(Serialize, Deserialize, ElasticType)]
# struct NewsArticle { id: i64, likes: i64 }
# #[derive(Serialize, Deserialize, ElasticType)]
# struct UpdatedNewsArticle { id: i64, likes: i64 }
# let client = SyncClientBuilder::new().build()?;
let response = client.document::<NewsArticle>()
.update(1)
.script("ctx._source.likes++")
.source()
.send()?;

assert!(response.into_document::<UpdatedNewsArticle>().unwrap().likes >= 1);
# Ok(())
# }
```

[`source`]: ../requests/document_update/type.UpdateRequestBuilder.html#method.source
[`UpdateRequestBuilder`]: ../requests/document_update/type.UpdateRequestBuilder.html
*/
pub fn into_document<T>(&self) -> Option<T>
where
T: DeserializeOwned,
{
self.get.as_ref().map_or_else(
|| None,
|obj| {
obj.get("_source")
.cloned()
.map_or_else(|| None, |doc| serde_json::from_value::<T>(doc).ok())
},
)
}

/** Whether or not the document was updated. */
pub fn updated(&self) -> bool {
match self.result {
Expand Down
3 changes: 2 additions & 1 deletion tests/integration/src/tests/document/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ test_cases![
update_no_index,
update_with_doc,
update_with_inline_script,
update_with_script
update_with_script,
update_with_source
];

mod compile_test;
85 changes: 85 additions & 0 deletions tests/integration/src/tests/document/update_with_source.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use elastic::{
error::Error,
prelude::*,
};
use futures::Future;

#[derive(Debug, PartialEq, Serialize, Deserialize, ElasticType)]
#[elastic(index = "update_doc_source_idx")]
pub struct Doc {
#[elastic(id)]
id: String,
title: String,
}

#[derive(Debug, PartialEq, Serialize, Deserialize, ElasticType)]
#[elastic(index = "update_doc_source_idx")]
pub struct UpdatedDoc {
#[elastic(id)]
id: String,
title: String,
}

const EXPECTED_TITLE: &'static str = "Edited title";
const ID: &'static str = "1";

fn doc() -> Doc {
Doc {
id: ID.to_owned(),
title: "Not edited title".to_owned(),
}
}

test! {
const description: &'static str = "update and return source";

type Response = UpdateResponse;

// Ensure the index doesn't exist
fn prepare(&self, client: AsyncClient) -> Box<dyn Future<Item = (), Error = Error>> {
let delete_res = client
.index(Doc::static_index())
.delete()
.send()
.map(|_| ());

Box::new(delete_res)
}

// Execute an update request against that index using a new document & request
// that the updated document's `source` be returned with the response.
fn request(
&self,
client: AsyncClient,
) -> Box<dyn Future<Item = Self::Response, Error = Error>> {
let index_res = client
.document()
.index(doc())
.params_fluent(|p| p.url_param("refresh", true))
.send();

let update_res = client
.document::<Doc>()
.update(ID)
.doc(json!({
"title": EXPECTED_TITLE.to_owned(),
}))
.source()
.send();

Box::new(
index_res
.and_then(|_| update_res)
.map(|update| update)
)
}

// Ensure the response contains the expected document
fn assert_ok(&self, res: &Self::Response) -> bool {
let updated = res.updated();
let correct_version = res.version() == Some(2);
let correct_title = res.into_document::<UpdatedDoc>().unwrap().title == EXPECTED_TITLE;

updated && correct_version && correct_title
}
}