Skip to content

Commit

Permalink
NIFI-10067 enable use of script when updating documents in Elasticsea…
Browse files Browse the repository at this point in the history
…rch via PutElasticsearchJson or PutElasticsearchRecord

NIFI-3262 enable use of dynamic_templates in Elasticsearch _bulk operations

NIFI-3262 enable use of bulk header fields in Elasticsearch _bulk operations via BULK: dynamic headers in PutElasticsearchJson and PutElasticsearchRecord
  • Loading branch information
ChrisSamo632 committed Feb 13, 2023
1 parent 6542505 commit a757ca2
Show file tree
Hide file tree
Showing 23 changed files with 1,011 additions and 245 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
PropertyDescriptor HTTP_HOSTS = new PropertyDescriptor.Builder()
.name("el-cs-http-hosts")
.displayName("HTTP Hosts")
.description("A comma-separated list of HTTP hosts that host Elasticsearch query nodes.")
.description("A comma-separated list of HTTP hosts that host Elasticsearch query nodes. " +
"Note that the Host is included in requests as a header (typically including domain and port, e.g. elasticsearch:9200).")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,21 @@ public class IndexOperationRequest {
private final String id;
private final Map<String, Object> fields;
private final Operation operation;
private final Map<String, Object> script;
private final Map<String, Object> dynamicTemplates;
private final Map<String, String> headerFields;

public IndexOperationRequest(final String index, final String type, final String id, final Map<String, Object> fields, final Operation operation) {
public IndexOperationRequest(final String index, final String type, final String id, final Map<String, Object> fields,
final Operation operation, final Map<String, Object> script, final Map<String, Object> dynamicTemplates,
final Map<String, String> headerFields) {
this.index = index;
this.type = type;
this.id = id;
this.fields = fields;
this.operation = operation;
this.script = script;
this.dynamicTemplates = dynamicTemplates;
this.headerFields = headerFields;
}

public String getIndex() {
Expand All @@ -60,6 +68,18 @@ public Operation getOperation() {
return operation;
}

public Map<String, Object> getScript() {
return script;
}

public Map<String, Object> getDynamicTemplates() {
return dynamicTemplates;
}

public Map<String, String> getHeaderFields() {
return headerFields;
}

public enum Operation {
Create("create"),
Delete("delete"),
Expand All @@ -81,10 +101,6 @@ public static Operation forValue(final String value) {
.filter(o -> o.getValue().equalsIgnoreCase(value)).findFirst()
.orElseThrow(() -> new IllegalArgumentException(String.format("Unknown Index Operation %s", value)));
}

public static String[] allValues() {
return Arrays.stream(Operation.values()).map(Operation::getValue).sorted().toArray(String[]::new);
}
}

@Override
Expand All @@ -95,6 +111,9 @@ public String toString() {
", id='" + id + '\'' +
", fields=" + fields +
", operation=" + operation +
", script=" + script +
", dynamicTemplates=" + dynamicTemplates +
", headerFields=" + headerFields +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -611,23 +611,28 @@ private String buildBulkHeader(final IndexOperationRequest request) throws JsonP
final String operation = request.getOperation().equals(IndexOperationRequest.Operation.Upsert)
? "update"
: request.getOperation().getValue();
return buildBulkHeader(operation, request.getIndex(), request.getType(), request.getId());
return buildBulkHeader(operation, request.getIndex(), request.getType(), request.getId(), request.getDynamicTemplates(), request.getHeaderFields());
}

private String buildBulkHeader(final String operation, final String index, final String type, final String id) throws JsonProcessingException {
final Map<String, Object> header = new HashMap<String, Object>() {{
put(operation, new HashMap<String, Object>() {{
put("_index", index);
if (StringUtils.isNotBlank(id)) {
put("_id", id);
}
if (StringUtils.isNotBlank(type)) {
put("_type", type);
}
}});
}};
private String buildBulkHeader(final String operation, final String index, final String type, final String id,
final Map<String, Object> dynamicTemplates, final Map<String, String> headerFields) throws JsonProcessingException {
final Map<String, Object> operationBody = new HashMap<>();
operationBody.put("_index", index);
if (StringUtils.isNotBlank(id)) {
operationBody.put("_id", id);
}
if (StringUtils.isNotBlank(type)) {
operationBody.put("_type", type);
}
if (dynamicTemplates != null && !dynamicTemplates.isEmpty()) {
operationBody.put("dynamic_templates", dynamicTemplates);
}
if (headerFields != null && !headerFields.isEmpty()) {
headerFields.entrySet().stream().filter(e -> StringUtils.isNotBlank(e.getValue()))
.forEach(e -> operationBody.putIfAbsent(e.getKey(), e.getValue()));
}

return flatten(mapper.writeValueAsString(header));
return flatten(mapper.writeValueAsString(Collections.singletonMap(operation, operationBody)));
}

protected void buildRequest(final IndexOperationRequest request, final StringBuilder builder) throws JsonProcessingException {
Expand All @@ -641,13 +646,20 @@ protected void buildRequest(final IndexOperationRequest request, final StringBui
break;
case Update:
case Upsert:
final Map<String, Object> doc = new HashMap<String, Object>() {{
put("doc", request.getFields());
final Map<String, Object> updateBody = new HashMap<>(2, 1);
if (request.getScript() != null && !request.getScript().isEmpty()) {
updateBody.put("script", request.getScript());
if (request.getOperation().equals(IndexOperationRequest.Operation.Upsert)) {
updateBody.put("upsert", request.getFields());
}
} else {
updateBody.put("doc", request.getFields());
if (request.getOperation().equals(IndexOperationRequest.Operation.Upsert)) {
put("doc_as_upsert", true);
updateBody.put("doc_as_upsert", true);
}
}};
final String update = flatten(mapper.writeValueAsString(doc)).trim();
}

final String update = flatten(mapper.writeValueAsString(updateBody)).trim();
builder.append(update).append("\n");
break;
case Delete:
Expand Down Expand Up @@ -706,7 +718,7 @@ public DeleteOperationResponse deleteById(final String index, final String type,
try {
final StringBuilder sb = new StringBuilder();
for (final String id : ids) {
final String header = buildBulkHeader("delete", index, type, id);
final String header = buildBulkHeader("delete", index, type, id, null, null);
sb.append(header).append("\n");
}
final HttpEntity entity = new NStringEntity(sb.toString(), ContentType.APPLICATION_JSON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ abstract class AbstractElasticsearch_IT extends AbstractElasticsearchITBase {
static final List<String> TEST_INDICES =
Arrays.asList("user_details", "complex", "nested", "bulk_a", "bulk_b", "bulk_c", "error_handler", "messages");

ElasticSearchClientServiceImpl service;
ElasticSearchClientService service;

@BeforeEach
void before() throws Exception {
Expand Down

0 comments on commit a757ca2

Please sign in to comment.