Skip to content

Commit

Permalink
[7.16] [Transform] Report _preview warning in the face of pipeline fa…
Browse files Browse the repository at this point in the history
…ilure. (#81972) (#82202)
  • Loading branch information
przemekwitek committed Jan 5, 2022
1 parent 3e73eaf commit 5b3b52a
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.WarningsHandler;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.xcontent.XContentBuilder;
Expand All @@ -29,7 +31,9 @@

import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasSize;
Expand Down Expand Up @@ -949,6 +953,59 @@ public void testPreviewTransformWithPipeline() throws Exception {
});
}

@SuppressWarnings("unchecked")
public void testPreviewTransformWithPipelineScript() throws Exception {
String pipelineId = "my-preview-pivot-pipeline-script";
Request pipelineRequest = new Request("PUT", "/_ingest/pipeline/" + pipelineId);
pipelineRequest.setJsonEntity(
"{\n"
+ " \"description\" : \"my pivot preview pipeline\",\n"
+ " \"processors\" : [\n"
+ " {\n"
+ " \"script\" : {\n"
+ " \"lang\": \"painless\",\n"
+ " \"source\": \"ctx._id = ctx['non']['existing'];\"\n"
+ " }\n"
+ " }\n"
+ " ]\n"
+ "}"
);
client().performRequest(pipelineRequest);

setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME);
final Request createPreviewRequest = createRequestWithAuth("POST", getTransformEndpoint() + "_preview", null);
createPreviewRequest.setOptions(RequestOptions.DEFAULT.toBuilder().setWarningsHandler(WarningsHandler.PERMISSIVE));

String config = "{ \"source\": {\"index\":\""
+ REVIEWS_INDEX_NAME
+ "\"} ,"
+ "\"dest\": {\"pipeline\": \""
+ pipelineId
+ "\"},"
+ " \"pivot\": {"
+ " \"group_by\": {"
+ " \"user.id\": {\"terms\": { \"field\": \"user_id\" }},"
+ " \"by_day\": {\"date_histogram\": {\"fixed_interval\": \"1d\",\"field\":\"timestamp\"}}},"
+ " \"aggregations\": {"
+ " \"user.avg_rating\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } } } }"
+ "}";
createPreviewRequest.setJsonEntity(config);

Response createPreviewResponse = client().performRequest(createPreviewRequest);
Map<String, Object> previewTransformResponse = entityAsMap(createPreviewResponse);
List<Map<String, Object>> preview = (List<Map<String, Object>>) previewTransformResponse.get("preview");
// Pipeline failed for all the docs so the preview is empty
assertThat(preview, is(empty()));
assertThat(createPreviewResponse.getWarnings(), is(not(empty())));
assertThat(
createPreviewResponse.getWarnings().get(createPreviewResponse.getWarnings().size() - 1),
allOf(containsString("Pipeline returned 100 errors, first error:"), containsString("type=script_exception"))
);
}

public void testPivotWithMaxOnDateField() throws Exception {
String transformId = "simple_date_histogram_pivot_with_max_time";
String transformIndex = "pivot_reviews_via_date_histogram_with_max_time";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,24 @@ private void getPreview(

ActionListener<SimulatePipelineResponse> pipelineResponseActionListener = ActionListener.wrap(simulatePipelineResponse -> {
List<Map<String, Object>> docs = new ArrayList<>(simulatePipelineResponse.getResults().size());
List<Map<String, Object>> errors = new ArrayList<>();
for (SimulateDocumentResult simulateDocumentResult : simulatePipelineResponse.getResults()) {
try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) {
XContentBuilder content = simulateDocumentResult.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
Map<String, Object> tempMap = XContentHelper.convertToMap(BytesReference.bytes(content), true, XContentType.JSON).v2();
docs.add((Map<String, Object>) XContentMapValues.extractValue("doc._source", tempMap));
Map<String, Object> doc = (Map<String, Object>) XContentMapValues.extractValue("doc._source", tempMap);
if (doc != null) {
docs.add(doc);
}
Map<String, Object> error = (Map<String, Object>) XContentMapValues.extractValue("error", tempMap);
if (error != null) {
errors.add(error);
}
}
}
if (errors.isEmpty() == false) {
HeaderWarning.addWarning("Pipeline returned " + errors.size() + " errors, first error: " + errors.get(0));
}
TransformDestIndexSettings generatedDestIndexSettings = TransformIndex.createTransformDestIndexSettings(
mappings.get(),
transformId,
Expand Down

0 comments on commit 5b3b52a

Please sign in to comment.