Skip to content

Commit

Permalink
NIFI-12773: Added join and anchored RecordPath function
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Sampson <chris.sampson82@gmail.com>

This closes #8391
  • Loading branch information
markap14 authored and ChrisSamo632 committed Feb 28, 2024
1 parent 01ca19e commit 74bd798
Show file tree
Hide file tree
Showing 6 changed files with 320 additions and 0 deletions.
4 changes: 4 additions & 0 deletions nifi-commons/nifi-record-path/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@
<artifactId>nifi-uuid5</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-property-utils</artifactId>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr-runtime</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.record.path.functions;

import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.StandardRecordPathEvaluationContext;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.serialization.record.Record;

import java.util.Arrays;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class Anchored extends RecordPathSegment {

private final RecordPathSegment anchorPath;
private final RecordPathSegment evaluationPath;

public Anchored(final RecordPathSegment anchorPath, final RecordPathSegment evaluationPath, final boolean absolute) {
super("anchored", null, absolute);

this.anchorPath = anchorPath;
this.evaluationPath = evaluationPath;
}


@Override
public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
final Stream<FieldValue> anchoredStream = anchorPath.evaluate(context);

return anchoredStream.flatMap(fv -> {
final Object value = fv.getValue();
return evaluateFieldValue(value);
});
}

private Stream<FieldValue> evaluateFieldValue(final Object value) {
if (value == null) {
return Stream.of();
}

if (value instanceof Record) {
return evaluateAtRoot((Record) value);
}

if (value instanceof final Record[] array) {
return Arrays.stream(array).flatMap(this::evaluateAtRoot);
}

if (value instanceof final Iterable<?> iterable) {
return StreamSupport.stream(iterable.spliterator(), false).flatMap(element -> {
if (!(element instanceof Record)) {
return Stream.of();
}

return evaluateAtRoot((Record) element);
});
}

return Stream.of();
}

private Stream<FieldValue> evaluateAtRoot(final Record root) {
final RecordPathEvaluationContext recordPathEvaluateContext = new StandardRecordPathEvaluationContext(root);
return evaluationPath.evaluate(recordPathEvaluateContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.record.path.functions;

import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.record.path.util.RecordPathUtils;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;

public class Join extends RecordPathSegment {
private final RecordPathSegment delimiterPath;
private final RecordPathSegment[] valuePaths;

public Join(final RecordPathSegment delimiterPath, final RecordPathSegment[] valuePaths, final boolean absolute) {
super("join", null, absolute);
this.delimiterPath = delimiterPath;
this.valuePaths = valuePaths;
}

@Override
public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
String delimiter = RecordPathUtils.getFirstStringValue(delimiterPath, context);
if (delimiter == null) {
delimiter = "";
}

final List<String> values = new ArrayList<>();
for (final RecordPathSegment valuePath : valuePaths) {
final Stream<FieldValue> stream = valuePath.evaluate(context);

stream.forEach(fv -> {
final Object value = fv.getValue();
addStringValue(value, values);
});
}

final String joined = String.join(delimiter, values);
final RecordField field = new RecordField("join", RecordFieldType.STRING.getDataType());
final FieldValue responseValue = new StandardFieldValue(joined, field, null);
return Stream.of(responseValue);
}

private void addStringValue(final Object value, final List<String> values) {
if (value == null) {
values.add("null");
return;
}

if (value instanceof final Object[] array) {
for (final Object element : array) {
addStringValue(element, values);
}
} else if (value instanceof final Iterable<?> iterable) {
for (final Object element : iterable) {
addStringValue(element, values);
}
} else {
values.add(DataTypeUtils.toString(value, null));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.nifi.record.path.filter.NotFilter;
import org.apache.nifi.record.path.filter.RecordPathFilter;
import org.apache.nifi.record.path.filter.StartsWith;
import org.apache.nifi.record.path.functions.Anchored;
import org.apache.nifi.record.path.functions.Base64Decode;
import org.apache.nifi.record.path.functions.Base64Encode;
import org.apache.nifi.record.path.functions.Coalesce;
Expand All @@ -45,6 +46,7 @@
import org.apache.nifi.record.path.functions.FilterFunction;
import org.apache.nifi.record.path.functions.Format;
import org.apache.nifi.record.path.functions.Hash;
import org.apache.nifi.record.path.functions.Join;
import org.apache.nifi.record.path.functions.MapOf;
import org.apache.nifi.record.path.functions.PadLeft;
import org.apache.nifi.record.path.functions.PadRight;
Expand Down Expand Up @@ -129,6 +131,10 @@ public static RecordPathSegment buildPath(final Tree tree, final RecordPathSegme
}
case CHILD_REFERENCE: {
final Tree childTree = tree.getChild(0);
if (childTree == null) {
return new RootPath();
}

final int childTreeType = childTree.getType();
if (childTreeType == FIELD_NAME) {
final String childName = childTree.getChild(0).getText();
Expand Down Expand Up @@ -404,6 +410,24 @@ public static RecordPathSegment buildPath(final Tree tree, final RecordPathSegme
final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute);
return new Count(args[0], absolute);
}
case "join": {
final int numArgs = argumentListTree.getChildCount();
if (numArgs < 2) {
throw new RecordPathException("Invalid number of arguments: " + functionName + " function takes 2 or more arguments but got " + numArgs);
}

final RecordPathSegment[] joinPaths = new RecordPathSegment[numArgs - 1];
for (int i = 0; i < numArgs - 1; i++) {
joinPaths[i] = buildPath(argumentListTree.getChild(i + 1), null, absolute);
}

final RecordPathSegment delimiterPath = buildPath(argumentListTree.getChild(0), null, absolute);
return new Join(delimiterPath, joinPaths, absolute);
}
case "anchored": {
final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
return new Anchored(args[0], args[1], absolute);
}
case "not":
case "contains":
case "containsRegex":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,89 @@ public void testConcat() {
assertEquals("John Doe: 48", RecordPath.compile("concat(/firstName, ' ', /lastName, ': ', 48)").evaluate(record).getSelectedFields().findFirst().get().getValue());
}

@Test
public void testJoinWithTwoFields() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("fullName", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("firstName", RecordFieldType.LONG.getDataType()));

final RecordSchema schema = new SimpleRecordSchema(fields);

final Map<String, Object> values = new HashMap<>();
values.put("lastName", "Doe");
values.put("firstName", "John");
final Record record = new MapRecord(schema, values);

assertEquals("Doe, John", RecordPath.compile("join(', ', /lastName, /firstName)").evaluate(record).getSelectedFields().findFirst().get().getValue());
}

@Test
public void testJoinWithArray() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("names", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType())));
final RecordSchema schema = new SimpleRecordSchema(fields);

final Map<String, Object> values = new HashMap<>();
values.put("names", new String[] {"John", "Jane", "Jacob", "Judy"});
final Record record = new MapRecord(schema, values);

assertEquals("John,Jane,Jacob,Judy", RecordPath.compile("join(',', /names)").evaluate(record).getSelectedFields().findFirst().get().getValue());
}

@Test
public void testJoinWithArrayAndMultipleFields() {
final List<RecordField> personFields = new ArrayList<>();
personFields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType()));
personFields.add(new RecordField("firstName", RecordFieldType.STRING.getDataType()));
personFields.add(new RecordField("friends", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType())));
final RecordSchema personSchema = new SimpleRecordSchema(personFields);

final Map<String, Object> values = new HashMap<>();
values.put("friends", new String[] {"John", "Jane", "Jacob", "Judy"});
values.put("firstName", "John");
values.put("lastName", "Doe");
final Record record = new MapRecord(personSchema, values);

assertEquals("Doe\nJohn\nJane\nJacob", RecordPath.compile("join('\\n', /lastName, /firstName, /friends[1..2])").evaluate(record).getSelectedFields().findFirst().get().getValue());
}

@Test
public void testAnchored() {
final List<RecordField> personFields = new ArrayList<>();
personFields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType()));
personFields.add(new RecordField("firstName", RecordFieldType.STRING.getDataType()));
final RecordSchema personSchema = new SimpleRecordSchema(personFields);

final List<RecordField> employeeFields = new ArrayList<>();
employeeFields.add(new RecordField("self", RecordFieldType.RECORD.getRecordDataType(personSchema)));
employeeFields.add(new RecordField("manager", RecordFieldType.RECORD.getRecordDataType(personSchema)));
employeeFields.add(new RecordField("directReports", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(personSchema))));
final RecordSchema employeeSchema = new SimpleRecordSchema(employeeFields);

final Record directReport1 = createPerson("John", "Doe", personSchema);
final Record directReport2 = createPerson("John", "Jingleheimer", personSchema);
final Record directReport3 = createPerson("John", "Jacob", personSchema);
final Record manager = createPerson("Jane", "Smith", personSchema);
final Record employee = new MapRecord(employeeSchema, Map.of(
"self", createPerson("John", "Schmidt", personSchema),
"manager", manager,
"directReports", new Record[] {directReport1, directReport2, directReport3}
));

assertEquals("John", RecordPath.compile("anchored(/directReports[0], /firstName)").evaluate(employee).getSelectedFields().findFirst().get().getValue());
assertEquals(List.of("John", "John", "John"), RecordPath.compile("anchored(/directReports, /firstName)").evaluate(employee).getSelectedFields().map(FieldValue::getValue).toList());
assertEquals(List.of(), RecordPath.compile("anchored(/self/lastName, / )").evaluate(employee).getSelectedFields().map(FieldValue::getValue).toList());
}

private Record createPerson(final String firstName, final String lastName, final RecordSchema schema) {
final Map<String, Object> values = Map.of(
"firstName", firstName,
"lastName", lastName);
return new MapRecord(schema, values);
}


@Test
public void testMapOf() {
final List<RecordField> fields = new ArrayList<>();
Expand Down
42 changes: 42 additions & 0 deletions nifi-docs/src/main/asciidoc/record-path-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,48 @@ Concatenates all the arguments together.
|==========================================================


=== join

Joins together multiple values with a separator.

|==========================================================
| RecordPath | Return value
| `join(', ', /workAddress/* )` | 123, 5th Avenue, New York, NY, 10020
|==========================================================


=== anchored

Allows evaluating a RecordPath while anchoring the root context to a child record.

|==========================================================
| RecordPath | Return value
| `anchored(/homeAddress, /city)` | Jersey City
|==========================================================

Additionally, this can be used in conjunction with arrays. For example, if we have the following record:
----
{
"id": "1234",
"elements": [{
"name": "book",
"color": "red"
}, {
"name": "computer",
"color": "black"
}]
}
----

We can evaluate hte following Record paths:

|==========================================================
| RecordPath | Return value
| `anchored(/elements, /name)` | The array containing `book` and `computer`
| `anchored(/elements, concat(/name, ': ', /color))` | The array containing `book: red` and `computer: black`
|==========================================================


=== fieldName

Normally, when a path is given to a particular field in a Record, what is returned is the value of that field. It
Expand Down

0 comments on commit 74bd798

Please sign in to comment.