Skip to content

Commit

Permalink
NIFI-10067: enable use of script when updating documents in Elasticse…
Browse files Browse the repository at this point in the history
…arch via PutElasticsearchJson or PutElasticsearchRecord

NIFI-3262: enable use of dynamic_templates in Elasticsearch _bulk operations

Signed-off-by: Joe Gresock <jgresock@gmail.com>

This closes apache#6628.
  • Loading branch information
ChrisSamo632 committed May 2, 2023
1 parent 2e2604a commit 73fb0ad
Show file tree
Hide file tree
Showing 28 changed files with 1,250 additions and 317 deletions.
24 changes: 24 additions & 0 deletions nifi-nar-bundles/nifi-elasticsearch-bundle/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,30 @@ done
- [Elasticsearch Client Service](nifi-elasticsearch-client-service)
- [Elasticsearch REST API Processors](nifi-elasticsearch-restapi-processors)

## Running on Mac

Testcontainers support for "Mac OS X - Docker for Mac" is currently [best-efforts](https://www.testcontainers.org/supported_docker_environment/).

It may be necessary to do the following to run the tests successfully:

- Link the Docker Unix Socket to the standard location

```sh
sudo ln -s $HOME/.docker/run/docker.sock /var/run/docker.sock
```

- Set the `TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE` environment variable

```sh
export TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE=/var/run/docker.sock
```

## Running in IDEs

- Edit "Run Configurations" to:
- enable Testcontainers via the system property, i.e. `-Delasticsearch.testcontainers.enabled=true`
- set the `TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE` environment variable if required

## Misc

Integration Tests with Testcontainers currently only uses the `amd64` Docker Images.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
PropertyDescriptor HTTP_HOSTS = new PropertyDescriptor.Builder()
.name("el-cs-http-hosts")
.displayName("HTTP Hosts")
.description("A comma-separated list of HTTP hosts that host Elasticsearch query nodes.")
.description("A comma-separated list of HTTP hosts that host Elasticsearch query nodes. " +
"Note that the Host is included in requests as a header (typically including domain and port, e.g. elasticsearch:9200).")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
Expand Down Expand Up @@ -245,11 +246,21 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
/**
* Check whether an index exists.
*
* @param index The index to target, if omitted then all indices will be updated.
* @param index The index to check.
* @param requestParameters A collection of URL request parameters. Optional.
*/
boolean exists(final String index, final Map<String, String> requestParameters);

/**
* Check whether a document exists.
*
* @param index The index that holds the document.
* @param type The document type. Optional. Will not be used in future versions of Elasticsearch.
* @param id The document ID
* @param requestParameters A collection of URL request parameters. Optional.
*/
boolean documentExists(final String index, final String type, final String id, final Map<String, String> requestParameters);

/**
* Get a document by ID.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,24 @@ public class IndexOperationRequest {
private final String id;
private final Map<String, Object> fields;
private final Operation operation;
private final Map<String, Object> script;

public IndexOperationRequest(final String index, final String type, final String id, final Map<String, Object> fields, final Operation operation) {
private final boolean scriptedUpsert;
private final Map<String, Object> dynamicTemplates;
private final Map<String, String> headerFields;

public IndexOperationRequest(final String index, final String type, final String id, final Map<String, Object> fields,
final Operation operation, final Map<String, Object> script, final boolean scriptedUpsert,
final Map<String, Object> dynamicTemplates, final Map<String, String> headerFields) {
this.index = index;
this.type = type;
this.id = id;
this.fields = fields;
this.operation = operation;
this.script = script;
this.scriptedUpsert = scriptedUpsert;
this.dynamicTemplates = dynamicTemplates;
this.headerFields = headerFields;
}

public String getIndex() {
Expand All @@ -60,6 +71,22 @@ public Operation getOperation() {
return operation;
}

public Map<String, Object> getScript() {
return script;
}

public boolean isScriptedUpsert() {
return scriptedUpsert;
}

public Map<String, Object> getDynamicTemplates() {
return dynamicTemplates;
}

public Map<String, String> getHeaderFields() {
return headerFields;
}

public enum Operation {
Create("create"),
Delete("delete"),
Expand All @@ -81,10 +108,6 @@ public static Operation forValue(final String value) {
.filter(o -> o.getValue().equalsIgnoreCase(value)).findFirst()
.orElseThrow(() -> new IllegalArgumentException(String.format("Unknown Index Operation %s", value)));
}

public static String[] allValues() {
return Arrays.stream(Operation.values()).map(Operation::getValue).sorted().toArray(String[]::new);
}
}

@Override
Expand All @@ -95,6 +118,10 @@ public String toString() {
", id='" + id + '\'' +
", fields=" + fields +
", operation=" + operation +
", script=" + script +
", scriptedUpsert=" + scriptedUpsert +
", dynamicTemplates=" + dynamicTemplates +
", headerFields=" + headerFields +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -415,23 +415,28 @@ private String buildBulkHeader(final IndexOperationRequest request) throws JsonP
final String operation = request.getOperation().equals(IndexOperationRequest.Operation.Upsert)
? "update"
: request.getOperation().getValue();
return buildBulkHeader(operation, request.getIndex(), request.getType(), request.getId());
return buildBulkHeader(operation, request.getIndex(), request.getType(), request.getId(), request.getDynamicTemplates(), request.getHeaderFields());
}

private String buildBulkHeader(final String operation, final String index, final String type, final String id) throws JsonProcessingException {
final Map<String, Object> header = new HashMap<String, Object>() {{
put(operation, new HashMap<String, Object>() {{
put("_index", index);
if (StringUtils.isNotBlank(id)) {
put("_id", id);
}
if (StringUtils.isNotBlank(type)) {
put("_type", type);
}
}});
}};
private String buildBulkHeader(final String operation, final String index, final String type, final String id,
final Map<String, Object> dynamicTemplates, final Map<String, String> headerFields) throws JsonProcessingException {
final Map<String, Object> operationBody = new HashMap<>();
operationBody.put("_index", index);
if (StringUtils.isNotBlank(id)) {
operationBody.put("_id", id);
}
if (StringUtils.isNotBlank(type)) {
operationBody.put("_type", type);
}
if (dynamicTemplates != null && !dynamicTemplates.isEmpty()) {
operationBody.put("dynamic_templates", dynamicTemplates);
}
if (headerFields != null && !headerFields.isEmpty()) {
headerFields.entrySet().stream().filter(e -> StringUtils.isNotBlank(e.getValue()))
.forEach(e -> operationBody.putIfAbsent(e.getKey(), e.getValue()));
}

return flatten(mapper.writeValueAsString(header));
return flatten(mapper.writeValueAsString(Collections.singletonMap(operation, operationBody)));
}

protected void buildRequest(final IndexOperationRequest request, final StringBuilder builder) throws JsonProcessingException {
Expand All @@ -445,13 +450,21 @@ protected void buildRequest(final IndexOperationRequest request, final StringBui
break;
case Update:
case Upsert:
final Map<String, Object> doc = new HashMap<String, Object>() {{
put("doc", request.getFields());
final Map<String, Object> updateBody = new HashMap<>(2, 1);
if (request.getScript() != null && !request.getScript().isEmpty()) {
updateBody.put("script", request.getScript());
if (request.getOperation().equals(IndexOperationRequest.Operation.Upsert)) {
updateBody.put("scripted_upsert", request.isScriptedUpsert());
updateBody.put("upsert", request.getFields());
}
} else {
updateBody.put("doc", request.getFields());
if (request.getOperation().equals(IndexOperationRequest.Operation.Upsert)) {
put("doc_as_upsert", true);
updateBody.put("doc_as_upsert", true);
}
}};
final String update = flatten(mapper.writeValueAsString(doc)).trim();
}

final String update = flatten(mapper.writeValueAsString(updateBody)).trim();
builder.append(update).append("\n");
break;
case Delete:
Expand Down Expand Up @@ -510,7 +523,7 @@ public DeleteOperationResponse deleteById(final String index, final String type,
try {
final StringBuilder sb = new StringBuilder();
for (final String id : ids) {
final String header = buildBulkHeader("delete", index, type, id);
final String header = buildBulkHeader("delete", index, type, id, null, null);
sb.append(header).append("\n");
}
final HttpEntity entity = new NStringEntity(sb.toString(), ContentType.APPLICATION_JSON);
Expand Down Expand Up @@ -592,6 +605,23 @@ public boolean exists(final String index, final Map<String, String> requestParam
}
}

@Override
public boolean documentExists(final String index, final String type, final String id, final Map<String, String> requestParameters) {
boolean exists = true;
try {
final Map<String, String> existsParameters = requestParameters != null ? new HashMap<>(requestParameters) : new HashMap<>();
existsParameters.putIfAbsent("_source", "false");
get(index, type, id, existsParameters);
} catch (final ElasticsearchException ee) {
if (ee.isNotFound()) {
exists = false;
} else {
throw ee;
}
}
return exists;
}

@SuppressWarnings("unchecked")
@Override
public Map<String, Object> get(final String index, final String type, final String id, final Map<String, String> requestParameters) {
Expand Down Expand Up @@ -796,29 +826,28 @@ private Response performRequest(final String method, final String endpoint, fina
}
if (entity != null) {
request.setEntity(entity);
}

if (getLogger().isDebugEnabled()) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
entity.writeTo(out);
out.close();

StringBuilder builder = new StringBuilder(1000);
builder.append("Dumping Elasticsearch REST request...\n")
.append("HTTP Method: ")
.append(method)
.append("\n")
.append("Endpoint: ")
.append(endpoint)
.append("\n")
.append("Parameters: ")
.append(prettyPrintWriter.writeValueAsString(parameters))
.append("\n")
.append("Request body: ")
.append(new String(out.toByteArray()))
.append("\n");

getLogger().debug(builder.toString());
if (getLogger().isDebugEnabled()) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
entity.writeTo(out);
out.close();

String builder = "Dumping Elasticsearch REST request...\n" +
"HTTP Method: " +
method +
"\n" +
"Endpoint: " +
endpoint +
"\n" +
"Parameters: " +
prettyPrintWriter.writeValueAsString(parameters) +
"\n" +
"Request body: " +
out +
"\n";

getLogger().debug(builder);
}
}

return client.performRequest(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ public boolean exists(final String index, final Map<String, String> requestParam
return true;
}

@Override
public boolean documentExists(String index, String type, String id, Map<String, String> requestParameters) {
return true;
}

@Override
public Map<String, Object> get(String index, String type, String id, Map<String, String> requestParameters) {
return data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ abstract class AbstractElasticsearch_IT extends AbstractElasticsearchITBase {
static final List<String> TEST_INDICES =
Arrays.asList("user_details", "complex", "nested", "bulk_a", "bulk_b", "bulk_c", "error_handler", "messages");

ElasticSearchClientServiceImpl service;
ElasticSearchClientService service;

@BeforeEach
void before() throws Exception {
Expand Down

0 comments on commit 73fb0ad

Please sign in to comment.