Skip to content

Commit

Permalink
NIFI-4857: Support String<->byte[] conversion for records
Browse files Browse the repository at this point in the history
  • Loading branch information
mattyb149 committed Mar 21, 2018
1 parent 478e340 commit a1f9d46
Show file tree
Hide file tree
Showing 10 changed files with 478 additions and 28 deletions.
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.functions;

import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.record.path.util.RecordPathUtils;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;

import java.nio.charset.Charset;
import java.util.stream.Stream;

public class ToBytes extends RecordPathSegment {

private final RecordPathSegment recordPath;
private final RecordPathSegment charsetSegment;

public ToBytes(final RecordPathSegment recordPath, final RecordPathSegment charsetSegment, final boolean absolute) {
super("toBytes", null, absolute);
this.recordPath = recordPath;
this.charsetSegment = charsetSegment;
}

@Override
public Stream<FieldValue> evaluate(RecordPathEvaluationContext context) {
final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
return fieldValues.filter(fv -> fv.getValue() != null)
.map(fv -> {

if (!(fv.getValue() instanceof String)) {
return fv;
}

final Charset charset = getCharset(this.charsetSegment, context);

final byte[] bytesValue;
try {
Byte[] src = (Byte[]) DataTypeUtils.toArray(fv.getValue(), fv.getField().getFieldName(), RecordFieldType.BYTE.getDataType(), charset);
bytesValue = new byte[src.length];
for(int i=0;i<src.length;i++) {
bytesValue[i] = src[i];
}
} catch (final Exception e) {
return fv;
}

return new StandardFieldValue(bytesValue, fv.getField(), fv.getParent().orElse(null));
});
}

private Charset getCharset(final RecordPathSegment charsetSegment, final RecordPathEvaluationContext context) {
if (charsetSegment == null) {
return null;
}

final String charsetString = RecordPathUtils.getFirstStringValue(charsetSegment, context);
if (charsetString == null || charsetString.isEmpty()) {
return null;
}

try {
return DataTypeUtils.getCharset(charsetString);
} catch (final Exception e) {
return null;
}
}

}
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.functions;

import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.record.path.util.RecordPathUtils;
import org.apache.nifi.serialization.record.util.DataTypeUtils;

import java.nio.charset.Charset;
import java.util.stream.Stream;

public class ToString extends RecordPathSegment {

private final RecordPathSegment recordPath;
private final RecordPathSegment charsetSegment;

public ToString(final RecordPathSegment recordPath, final RecordPathSegment charsetSegment, final boolean absolute) {
super("toString", null, absolute);
this.recordPath = recordPath;
this.charsetSegment = charsetSegment;
}

@Override
public Stream<FieldValue> evaluate(RecordPathEvaluationContext context) {
final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
return fieldValues.filter(fv -> fv.getValue() != null)
.map(fv -> {
final Charset charset = getCharset(this.charsetSegment, context);
Object value = fv.getValue();
final String stringValue;

if (value instanceof Object[]) {
Object[] o = (Object[]) value;
if (o.length > 0) {

byte[] dest = new byte[o.length];
for (int i = 0; i < o.length; i++) {
dest[i] = (byte) o[i];
}
stringValue = new String(dest, charset);
} else {
stringValue = ""; // Empty array = empty string
}
} else if (!(fv.getValue() instanceof byte[])) {
return fv;
} else {
try {
stringValue = DataTypeUtils.toString(fv.getValue(), (String) null, charset);
} catch (final Exception e) {
return fv;
}

if (stringValue == null) {
return fv;
}
}
return new StandardFieldValue(stringValue, fv.getField(), fv.getParent().orElse(null));
});
}

private Charset getCharset(final RecordPathSegment charsetSegment, final RecordPathEvaluationContext context) {
if (charsetSegment == null) {
return null;
}

final String charsetString = RecordPathUtils.getFirstStringValue(charsetSegment, context);
if (charsetString == null || charsetString.isEmpty()) {
return null;
}

try {
return DataTypeUtils.getCharset(charsetString);
} catch (final Exception e) {
return null;
}
}

}
Expand Up @@ -75,7 +75,9 @@
import org.apache.nifi.record.path.functions.SubstringAfterLast;
import org.apache.nifi.record.path.functions.SubstringBefore;
import org.apache.nifi.record.path.functions.SubstringBeforeLast;
import org.apache.nifi.record.path.functions.ToBytes;
import org.apache.nifi.record.path.functions.ToDate;
import org.apache.nifi.record.path.functions.ToString;

public class RecordPathCompiler {

Expand Down Expand Up @@ -250,6 +252,14 @@ public static RecordPathSegment buildPath(final Tree tree, final RecordPathSegme
final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
return new ToDate(args[0], args[1], absolute);
}
case "toString": {
final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
return new ToString(args[0], args[1], absolute);
}
case "toBytes": {
final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
return new ToBytes(args[0], args[1], absolute);
}
case "format": {
final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
return new Format(args[0], args[1], absolute);
Expand Down
Expand Up @@ -21,6 +21,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.text.DateFormat;
import java.text.ParseException;
Expand Down Expand Up @@ -1210,6 +1211,39 @@ public void testFormatDateWhenNotDate() {
assertEquals("John Doe", RecordPath.compile("format(/name, 'yyyy-MM')").evaluate(record).getSelectedFields().findFirst().get().getValue());
}

@Test
public void testToString() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("bytes", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()))));

final RecordSchema schema = new SimpleRecordSchema(fields);

final Map<String, Object> values = new HashMap<>();
values.put("id", 48);
values.put("bytes", "Hello World!".getBytes(StandardCharsets.UTF_16));
final Record record = new MapRecord(schema, values);

assertEquals("Hello World!", RecordPath.compile("toString(/bytes, \"UTF-16\")").evaluate(record).getSelectedFields().findFirst().get().getValue());
}

@Test
public void testToBytes() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("s", RecordFieldType.STRING.getDataType()));

final RecordSchema schema = new SimpleRecordSchema(fields);

final Map<String, Object> values = new HashMap<>();
values.put("id", 48);
values.put("s", "Hello World!");
final Record record = new MapRecord(schema, values);

assertArrayEquals("Hello World!".getBytes(StandardCharsets.UTF_16LE),
(byte[]) RecordPath.compile("toBytes(/s, \"UTF-16LE\")").evaluate(record).getSelectedFields().findFirst().get().getValue());
}

private List<RecordField> getDefaultFields() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
Expand Down
Expand Up @@ -17,6 +17,7 @@

package org.apache.nifi.serialization.record;

import java.nio.charset.StandardCharsets;
import java.text.DateFormat;
import java.util.Date;
import java.util.HashMap;
Expand Down Expand Up @@ -268,7 +269,7 @@ public Date getAsDate(final String fieldName, final String format) {

@Override
public Object[] getAsArray(final String fieldName) {
return DataTypeUtils.toArray(getValue(fieldName), fieldName);
return DataTypeUtils.toArray(getValue(fieldName), fieldName, null, StandardCharsets.UTF_8);
}


Expand Down

0 comments on commit a1f9d46

Please sign in to comment.