Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-10067 enable use of script, dynamic_templates and _bulk headers for PutElasticsearchJson/Record processors #6628

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 24 additions & 0 deletions nifi-nar-bundles/nifi-elasticsearch-bundle/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,30 @@ done
- [Elasticsearch Client Service](nifi-elasticsearch-client-service)
- [Elasticsearch REST API Processors](nifi-elasticsearch-restapi-processors)

## Running on Mac

Testcontainers support for "Mac OS X - Docker for Mac" is currently [best-efforts](https://www.testcontainers.org/supported_docker_environment/).

It may be necessary to do the following to run the tests successfully:

- Link the Docker Unix Socket to the standard location

```sh
sudo ln -s $HOME/.docker/run/docker.sock /var/run/docker.sock
```

- Set the `TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE` environment variable

```sh
export TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE=/var/run/docker.sock
```

## Running in IDEs

- Edit "Run Configurations" to:
- enable Testcontainers via the system property, i.e. `-Delasticsearch.testcontainers.enabled=true`
- set the `TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE` environment variable if required

## Misc

Integration Tests with Testcontainers currently only uses the `amd64` Docker Images.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
PropertyDescriptor HTTP_HOSTS = new PropertyDescriptor.Builder()
.name("el-cs-http-hosts")
.displayName("HTTP Hosts")
.description("A comma-separated list of HTTP hosts that host Elasticsearch query nodes.")
.description("A comma-separated list of HTTP hosts that host Elasticsearch query nodes. " +
"Note that the Host is included in requests as a header (typically including domain and port, e.g. elasticsearch:9200).")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
Expand Down Expand Up @@ -340,11 +341,21 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
/**
* Check whether an index exists.
*
* @param index The index to target, if omitted then all indices will be updated.
* @param index The index to check.
* @param requestParameters A collection of URL request parameters. Optional.
*/
boolean exists(final String index, final Map<String, String> requestParameters);

/**
* Check whether a document exists.
*
* @param index The index that holds the document.
* @param type The document type. Optional. Will not be used in future versions of Elasticsearch.
* @param id The document ID
* @param requestParameters A collection of URL request parameters. Optional.
*/
boolean documentExists(final String index, final String type, final String id, final Map<String, String> requestParameters);

/**
* Get a document by ID.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,24 @@ public class IndexOperationRequest {
private final String id;
private final Map<String, Object> fields;
private final Operation operation;
private final Map<String, Object> script;

public IndexOperationRequest(final String index, final String type, final String id, final Map<String, Object> fields, final Operation operation) {
private final boolean scriptedUpsert;
private final Map<String, Object> dynamicTemplates;
private final Map<String, String> headerFields;

public IndexOperationRequest(final String index, final String type, final String id, final Map<String, Object> fields,
final Operation operation, final Map<String, Object> script, final boolean scriptedUpsert,
final Map<String, Object> dynamicTemplates, final Map<String, String> headerFields) {
this.index = index;
this.type = type;
this.id = id;
this.fields = fields;
this.operation = operation;
this.script = script;
this.scriptedUpsert = scriptedUpsert;
this.dynamicTemplates = dynamicTemplates;
this.headerFields = headerFields;
}

public String getIndex() {
Expand All @@ -60,6 +71,22 @@ public Operation getOperation() {
return operation;
}

public Map<String, Object> getScript() {
return script;
}

public boolean isScriptedUpsert() {
return scriptedUpsert;
}

public Map<String, Object> getDynamicTemplates() {
return dynamicTemplates;
}

public Map<String, String> getHeaderFields() {
return headerFields;
}

public enum Operation {
Create("create"),
Delete("delete"),
Expand All @@ -81,10 +108,6 @@ public static Operation forValue(final String value) {
.filter(o -> o.getValue().equalsIgnoreCase(value)).findFirst()
.orElseThrow(() -> new IllegalArgumentException(String.format("Unknown Index Operation %s", value)));
}

public static String[] allValues() {
return Arrays.stream(Operation.values()).map(Operation::getValue).sorted().toArray(String[]::new);
}
}

@Override
Expand All @@ -95,6 +118,10 @@ public String toString() {
", id='" + id + '\'' +
", fields=" + fields +
", operation=" + operation +
", script=" + script +
", scriptedUpsert=" + scriptedUpsert +
", dynamicTemplates=" + dynamicTemplates +
", headerFields=" + headerFields +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,31 +114,10 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
private ObjectWriter prettyPrintWriter;

static {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(HTTP_HOSTS);
props.add(PATH_PREFIX);
props.add(AUTHORIZATION_SCHEME);
props.add(USERNAME);
props.add(PASSWORD);
props.add(API_KEY_ID);
props.add(API_KEY);
props.add(PROP_SSL_CONTEXT_SERVICE);
props.add(PROXY_CONFIGURATION_SERVICE);
props.add(CONNECT_TIMEOUT);
props.add(SOCKET_TIMEOUT);
props.add(CHARSET);
props.add(SUPPRESS_NULLS);
props.add(COMPRESSION);
props.add(SEND_META_HEADER);
props.add(STRICT_DEPRECATION);
props.add(NODE_SELECTOR);
props.add(SNIFF_CLUSTER_NODES);
props.add(SNIFFER_INTERVAL);
props.add(SNIFFER_REQUEST_TIMEOUT);
props.add(SNIFF_ON_FAILURE);
props.add(SNIFFER_FAILURE_DELAY);

properties = Collections.unmodifiableList(props);
properties = List.of(HTTP_HOSTS, PATH_PREFIX, AUTHORIZATION_SCHEME, USERNAME, PASSWORD, API_KEY_ID, API_KEY,
PROP_SSL_CONTEXT_SERVICE, PROXY_CONFIGURATION_SERVICE, CONNECT_TIMEOUT, SOCKET_TIMEOUT, CHARSET,
SUPPRESS_NULLS, COMPRESSION, SEND_META_HEADER, STRICT_DEPRECATION, NODE_SELECTOR, SNIFF_CLUSTER_NODES,
SNIFFER_INTERVAL, SNIFFER_REQUEST_TIMEOUT, SNIFF_ON_FAILURE, SNIFFER_FAILURE_DELAY);
}

@Override
Expand Down Expand Up @@ -611,23 +590,28 @@ private String buildBulkHeader(final IndexOperationRequest request) throws JsonP
final String operation = request.getOperation().equals(IndexOperationRequest.Operation.Upsert)
? "update"
: request.getOperation().getValue();
return buildBulkHeader(operation, request.getIndex(), request.getType(), request.getId());
return buildBulkHeader(operation, request.getIndex(), request.getType(), request.getId(), request.getDynamicTemplates(), request.getHeaderFields());
}

private String buildBulkHeader(final String operation, final String index, final String type, final String id) throws JsonProcessingException {
final Map<String, Object> header = new HashMap<String, Object>() {{
put(operation, new HashMap<String, Object>() {{
put("_index", index);
if (StringUtils.isNotBlank(id)) {
put("_id", id);
}
if (StringUtils.isNotBlank(type)) {
put("_type", type);
}
}});
}};
private String buildBulkHeader(final String operation, final String index, final String type, final String id,
final Map<String, Object> dynamicTemplates, final Map<String, String> headerFields) throws JsonProcessingException {
final Map<String, Object> operationBody = new HashMap<>();
operationBody.put("_index", index);
if (StringUtils.isNotBlank(id)) {
operationBody.put("_id", id);
}
if (StringUtils.isNotBlank(type)) {
operationBody.put("_type", type);
}
if (dynamicTemplates != null && !dynamicTemplates.isEmpty()) {
operationBody.put("dynamic_templates", dynamicTemplates);
}
if (headerFields != null && !headerFields.isEmpty()) {
headerFields.entrySet().stream().filter(e -> StringUtils.isNotBlank(e.getValue()))
.forEach(e -> operationBody.putIfAbsent(e.getKey(), e.getValue()));
}

return flatten(mapper.writeValueAsString(header));
return flatten(mapper.writeValueAsString(Collections.singletonMap(operation, operationBody)));
}

protected void buildRequest(final IndexOperationRequest request, final StringBuilder builder) throws JsonProcessingException {
Expand All @@ -641,13 +625,21 @@ protected void buildRequest(final IndexOperationRequest request, final StringBui
break;
case Update:
case Upsert:
final Map<String, Object> doc = new HashMap<String, Object>() {{
put("doc", request.getFields());
final Map<String, Object> updateBody = new HashMap<>(2, 1);
if (request.getScript() != null && !request.getScript().isEmpty()) {
updateBody.put("script", request.getScript());
if (request.getOperation().equals(IndexOperationRequest.Operation.Upsert)) {
updateBody.put("scripted_upsert", request.isScriptedUpsert());
updateBody.put("upsert", request.getFields());
}
} else {
updateBody.put("doc", request.getFields());
if (request.getOperation().equals(IndexOperationRequest.Operation.Upsert)) {
put("doc_as_upsert", true);
updateBody.put("doc_as_upsert", true);
}
}};
final String update = flatten(mapper.writeValueAsString(doc)).trim();
}

final String update = flatten(mapper.writeValueAsString(updateBody)).trim();
builder.append(update).append("\n");
break;
case Delete:
Expand Down Expand Up @@ -706,7 +698,7 @@ public DeleteOperationResponse deleteById(final String index, final String type,
try {
final StringBuilder sb = new StringBuilder();
for (final String id : ids) {
final String header = buildBulkHeader("delete", index, type, id);
final String header = buildBulkHeader("delete", index, type, id, null, null);
sb.append(header).append("\n");
}
final HttpEntity entity = new NStringEntity(sb.toString(), ContentType.APPLICATION_JSON);
Expand Down Expand Up @@ -788,6 +780,23 @@ public boolean exists(final String index, final Map<String, String> requestParam
}
}

@Override
public boolean documentExists(final String index, final String type, final String id, final Map<String, String> requestParameters) {
boolean exists = true;
try {
final Map<String, String> existsParameters = requestParameters != null ? new HashMap<>(requestParameters) : new HashMap<>();
existsParameters.putIfAbsent("_source", "false");
get(index, type, id, existsParameters);
} catch (final ElasticsearchException ee) {
if (ee.isNotFound()) {
exists = false;
} else {
throw ee;
}
}
return exists;
}

@SuppressWarnings("unchecked")
@Override
public Map<String, Object> get(final String index, final String type, final String id, final Map<String, String> requestParameters) {
Expand Down Expand Up @@ -849,7 +858,7 @@ public SearchResponse scroll(final String scroll) {
@Override
public String initialisePointInTime(final String index, final String keepAlive) {
try {
final Map<String, String> params = new HashMap<String, String>() {{
final Map<String, String> params = new HashMap<>() {{
if (StringUtils.isNotBlank(keepAlive)) {
put("keep_alive", keepAlive);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ public boolean exists(final String index, final Map<String, String> requestParam
return true;
}

@Override
public boolean documentExists(String index, String type, String id, Map<String, String> requestParameters) {
return true;
}

@Override
public Map<String, Object> get(String index, String type, String id, Map<String, String> requestParameters) {
return data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ abstract class AbstractElasticsearch_IT extends AbstractElasticsearchITBase {
static final List<String> TEST_INDICES =
Arrays.asList("user_details", "complex", "nested", "bulk_a", "bulk_b", "bulk_c", "error_handler", "messages");

ElasticSearchClientServiceImpl service;
ElasticSearchClientService service;

@BeforeEach
void before() throws Exception {
Expand Down