Skip to content

Commit

Permalink
NIFI-10067 add scripted_upsert option to PutElasticsearchJson and Put…
Browse files Browse the repository at this point in the history
…ElasticsearchRecord processors
  • Loading branch information
ChrisSamo632 committed Apr 2, 2023
1 parent bc0404f commit 35760bf
Show file tree
Hide file tree
Showing 15 changed files with 240 additions and 98 deletions.
24 changes: 24 additions & 0 deletions nifi-nar-bundles/nifi-elasticsearch-bundle/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,30 @@ done
- [Elasticsearch Client Service](nifi-elasticsearch-client-service)
- [Elasticsearch REST API Processors](nifi-elasticsearch-restapi-processors)

## Running on Mac

Testcontainers support for "Mac OS X - Docker for Mac" is currently [best-efforts](https://www.testcontainers.org/supported_docker_environment/).

It may be necessary to do the following to run the tests successfully:

- Link the Docker Unix Socket to the standard location

```sh
sudo ln -s $HOME/.docker/run/docker.sock /var/run/docker.sock
```

- Set the `TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE` environment variable

```sh
export TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE=/var/run/docker.sock
```

## Running in IDEs

- Edit "Run Configurations" to:
- enable Testcontainers via the system property, i.e. `-Delasticsearch.testcontainers.enabled=true`
- set the `TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE` environment variable if required

## Misc

Integration Tests with Testcontainers currently only uses the `amd64` Docker Images.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,11 +341,21 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
/**
* Check whether an index exists.
*
* @param index The index to target, if omitted then all indices will be updated.
* @param index The index to check.
* @param requestParameters A collection of URL request parameters. Optional.
*/
boolean exists(final String index, final Map<String, String> requestParameters);

/**
* Check whether a document exists.
*
* @param index The index that holds the document.
* @param type The document type. Optional. Will not be used in future versions of Elasticsearch.
* @param id The document ID
* @param requestParameters A collection of URL request parameters. Optional.
*/
boolean documentExists(final String index, final String type, final String id, final Map<String, String> requestParameters);

/**
* Get a document by ID.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,21 @@ public class IndexOperationRequest {
private final Map<String, Object> fields;
private final Operation operation;
private final Map<String, Object> script;

private final boolean scriptedUpsert;
private final Map<String, Object> dynamicTemplates;
private final Map<String, String> headerFields;

public IndexOperationRequest(final String index, final String type, final String id, final Map<String, Object> fields,
final Operation operation, final Map<String, Object> script, final Map<String, Object> dynamicTemplates,
final Map<String, String> headerFields) {
final Operation operation, final Map<String, Object> script, final boolean scriptedUpsert,
final Map<String, Object> dynamicTemplates, final Map<String, String> headerFields) {
this.index = index;
this.type = type;
this.id = id;
this.fields = fields;
this.operation = operation;
this.script = script;
this.scriptedUpsert = scriptedUpsert;
this.dynamicTemplates = dynamicTemplates;
this.headerFields = headerFields;
}
Expand Down Expand Up @@ -72,6 +75,10 @@ public Map<String, Object> getScript() {
return script;
}

public boolean isScriptedUpsert() {
return scriptedUpsert;
}

public Map<String, Object> getDynamicTemplates() {
return dynamicTemplates;
}
Expand Down Expand Up @@ -112,6 +119,7 @@ public String toString() {
", fields=" + fields +
", operation=" + operation +
", script=" + script +
", scriptedUpsert=" + scriptedUpsert +
", dynamicTemplates=" + dynamicTemplates +
", headerFields=" + headerFields +
'}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,31 +114,10 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
private ObjectWriter prettyPrintWriter;

static {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(HTTP_HOSTS);
props.add(PATH_PREFIX);
props.add(AUTHORIZATION_SCHEME);
props.add(USERNAME);
props.add(PASSWORD);
props.add(API_KEY_ID);
props.add(API_KEY);
props.add(PROP_SSL_CONTEXT_SERVICE);
props.add(PROXY_CONFIGURATION_SERVICE);
props.add(CONNECT_TIMEOUT);
props.add(SOCKET_TIMEOUT);
props.add(CHARSET);
props.add(SUPPRESS_NULLS);
props.add(COMPRESSION);
props.add(SEND_META_HEADER);
props.add(STRICT_DEPRECATION);
props.add(NODE_SELECTOR);
props.add(SNIFF_CLUSTER_NODES);
props.add(SNIFFER_INTERVAL);
props.add(SNIFFER_REQUEST_TIMEOUT);
props.add(SNIFF_ON_FAILURE);
props.add(SNIFFER_FAILURE_DELAY);

properties = Collections.unmodifiableList(props);
properties = List.of(HTTP_HOSTS, PATH_PREFIX, AUTHORIZATION_SCHEME, USERNAME, PASSWORD, API_KEY_ID, API_KEY,
PROP_SSL_CONTEXT_SERVICE, PROXY_CONFIGURATION_SERVICE, CONNECT_TIMEOUT, SOCKET_TIMEOUT, CHARSET,
SUPPRESS_NULLS, COMPRESSION, SEND_META_HEADER, STRICT_DEPRECATION, NODE_SELECTOR, SNIFF_CLUSTER_NODES,
SNIFFER_INTERVAL, SNIFFER_REQUEST_TIMEOUT, SNIFF_ON_FAILURE, SNIFFER_FAILURE_DELAY);
}

@Override
Expand Down Expand Up @@ -650,6 +629,7 @@ protected void buildRequest(final IndexOperationRequest request, final StringBui
if (request.getScript() != null && !request.getScript().isEmpty()) {
updateBody.put("script", request.getScript());
if (request.getOperation().equals(IndexOperationRequest.Operation.Upsert)) {
updateBody.put("scripted_upsert", request.isScriptedUpsert());
updateBody.put("upsert", request.getFields());
}
} else {
Expand Down Expand Up @@ -800,6 +780,23 @@ public boolean exists(final String index, final Map<String, String> requestParam
}
}

@Override
public boolean documentExists(final String index, final String type, final String id, final Map<String, String> requestParameters) {
boolean exists = true;
try {
final Map<String, String> existsParameters = new HashMap<>(requestParameters);
existsParameters.putIfAbsent("_source", "false");
get(index, type, id, existsParameters);
} catch (final ElasticsearchException ee) {
if (ee.isNotFound()) {
exists = false;
} else {
throw ee;
}
}
return exists;
}

@SuppressWarnings("unchecked")
@Override
public Map<String, Object> get(final String index, final String type, final String id, final Map<String, String> requestParameters) {
Expand Down Expand Up @@ -861,7 +858,7 @@ public SearchResponse scroll(final String scroll) {
@Override
public String initialisePointInTime(final String index, final String keepAlive) {
try {
final Map<String, String> params = new HashMap<String, String>() {{
final Map<String, String> params = new HashMap<>() {{
if (StringUtils.isNotBlank(keepAlive)) {
put("keep_alive", keepAlive);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ public boolean exists(final String index, final Map<String, String> requestParam
return true;
}

@Override
public boolean documentExists(String index, String type, String id, Map<String, String> requestParameters) {
return true;
}

@Override
public Map<String, Object> get(String index, String type, String id, Map<String, String> requestParameters) {
return data;
Expand Down

0 comments on commit 35760bf

Please sign in to comment.