From 57a5a09fe91dce191fc459745dfa343f1578ee85 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 21 Dec 2017 12:31:32 -0500 Subject: [PATCH] NIFI-4717: Several minor bug fixes and performance improvements around record-oriented processors --- .../serialization/SimpleRecordSchema.java | 31 +++------ .../nifi/serialization/record/MapRecord.java | 11 ++- .../serialization/record/RecordField.java | 10 ++- .../record/ResultSetRecordSet.java | 10 ++- .../record/util/DataTypeUtils.java | 3 + .../apache/nifi/util/MockPropertyValue.java | 2 +- .../org/apache/nifi/avro/AvroTypeUtil.java | 26 ++++++- .../nifi/processors/standard/QueryRecord.java | 2 + .../processors/standard/UpdateRecord.java | 24 ++++--- .../nifi/queryrecord/FlowFileTable.java | 36 +++++++++- .../processors/standard/TestUpdateRecord.java | 3 + .../org/apache/nifi/csv/CSVRecordReader.java | 57 ++++++++++------ .../nifi/csv/JacksonCSVRecordReader.java | 39 ++++++----- .../nifi/json/JsonTreeRowRecordReader.java | 68 ++++++++++++++----- .../org/apache/nifi/json/WriteJsonResult.java | 10 ++- 15 files changed, 236 insertions(+), 96 deletions(-) diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java index 871c7bf74fb2..5b85f030a2e3 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.OptionalInt; import java.util.stream.Collectors; import org.apache.nifi.serialization.record.DataType; @@ -33,7 +32,7 @@ public class SimpleRecordSchema implements RecordSchema { private List fields = null; - private Map fieldIndices = null; + private Map fieldMap = null; private final boolean textAvailable; private final String text; private final String schemaFormat; @@ -88,29 +87,25 @@ public List getFields() { } public void setFields(final List fields) { - if (this.fields != null) { throw new IllegalArgumentException("Fields have already been set."); } this.fields = Collections.unmodifiableList(new ArrayList<>(fields)); - this.fieldIndices = new HashMap<>(fields.size()); + this.fieldMap = new HashMap<>(fields.size() * 2); - int index = 0; for (final RecordField field : fields) { - Integer previousValue = fieldIndices.put(field.getFieldName(), index); + RecordField previousValue = fieldMap.put(field.getFieldName(), field); if (previousValue != null) { throw new IllegalArgumentException("Two fields are given with the same name (or alias) of '" + field.getFieldName() + "'"); } for (final String alias : field.getAliases()) { - previousValue = fieldIndices.put(alias, index); + previousValue = fieldMap.put(alias, field); if (previousValue != null) { throw new IllegalArgumentException("Two fields are given with the same name (or alias) of '" + field.getFieldName() + "'"); } } - - index++; } } @@ -138,24 +133,18 @@ public List getFieldNames() { @Override public Optional getDataType(final String fieldName) { - final OptionalInt idx = getFieldIndex(fieldName); - return idx.isPresent() ? Optional.of(fields.get(idx.getAsInt()).getDataType()) : Optional.empty(); + final RecordField field = fieldMap.get(fieldName); + if (field == null) { + return Optional.empty(); + } + return Optional.of(field.getDataType()); } @Override public Optional getField(final String fieldName) { - final OptionalInt indexOption = getFieldIndex(fieldName); - if (indexOption.isPresent()) { - return Optional.of(fields.get(indexOption.getAsInt())); - } - - return Optional.empty(); + return Optional.ofNullable(fieldMap.get(fieldName)); } - private OptionalInt getFieldIndex(final String fieldName) { - final Integer index = fieldIndices.get(fieldName); - return index == null ? OptionalInt.empty() : OptionalInt.of(index); - } @Override public boolean equals(final Object obj) { diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java index c3444ed2cd1a..0335bd271fe5 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java @@ -70,10 +70,10 @@ public MapRecord(final RecordSchema schema, final Map values, fi private Map checkTypes(final Map values, final RecordSchema schema) { for (final RecordField field : schema.getFields()) { - final Object value = getExplicitValue(field, values); + Object value = getExplicitValue(field, values); if (value == null) { - if (field.isNullable()) { + if (field.isNullable() || field.getDefaultValue() != null) { continue; } @@ -109,7 +109,12 @@ public Object[] getValues() { final Object[] values = new Object[schema.getFieldCount()]; int i = 0; for (final RecordField recordField : schema.getFields()) { - values[i++] = getValue(recordField); + Object value = getExplicitValue(recordField); + if (value == null) { + value = recordField.getDefaultValue(); + } + + values[i++] = value; } return values; } diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java index 41da6bebe6ea..b4ff8483040d 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java @@ -68,7 +68,15 @@ public RecordField(final String fieldName, final DataType dataType, final Object this.fieldName = Objects.requireNonNull(fieldName); this.dataType = Objects.requireNonNull(dataType); - this.aliases = Collections.unmodifiableSet(Objects.requireNonNull(aliases)); + + // If aliases is the empty set, don't bother with the expense of wrapping in an unmodifiableSet. + Objects.requireNonNull(aliases); + if ((Set) aliases == Collections.EMPTY_SET) { + this.aliases = aliases; + } else { + this.aliases = Collections.unmodifiableSet(aliases); + } + this.defaultValue = defaultValue; this.nullable = nullable; } diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java index ad26d792f938..571bf77bf705 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java @@ -31,6 +31,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.nifi.serialization.SimpleRecordSchema; import org.slf4j.Logger; @@ -174,7 +176,13 @@ private static DataType getDataType(final int sqlType, final ResultSet rs, final final Object obj = rs.getObject(columnIndex); if (obj == null || !(obj instanceof Record)) { - return RecordFieldType.RECORD.getDataType(); + final List dataTypes = Stream.of(RecordFieldType.BIGINT, RecordFieldType.BOOLEAN, RecordFieldType.BYTE, RecordFieldType.CHAR, RecordFieldType.DATE, + RecordFieldType.DOUBLE, RecordFieldType.FLOAT, RecordFieldType.INT, RecordFieldType.LONG, RecordFieldType.SHORT, RecordFieldType.STRING, RecordFieldType.TIME, + RecordFieldType.TIMESTAMP) + .map(recordFieldType -> recordFieldType.getDataType()) + .collect(Collectors.toList()); + + return RecordFieldType.CHOICE.getChoiceDataType(dataTypes); } final Record record = (Record) obj; diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java index ccd9270d3a91..6063d3b21c51 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java @@ -891,6 +891,9 @@ public static RecordSchema merge(final RecordSchema thisSchema, final RecordSche if (otherSchema == null) { return thisSchema; } + if (thisSchema == otherSchema) { + return thisSchema; + } final List otherFields = otherSchema.getFields(); if (otherFields.isEmpty()) { diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java index c55ad2326515..08c8e978be33 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java @@ -225,7 +225,7 @@ public String toString() { @Override public boolean isExpressionLanguagePresent() { - if (!expectExpressions) { + if (!Boolean.TRUE.equals(expectExpressions)) { return false; } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java index c5256c4679e4..cc1fd38d1923 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java @@ -27,9 +27,11 @@ import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; @@ -150,9 +152,20 @@ private static Schema buildAvroSchema(final DataType dataType, final String fiel final ChoiceDataType choiceDataType = (ChoiceDataType) dataType; final List options = choiceDataType.getPossibleSubTypes(); + // We need to keep track of which types have been added to the union, because if we have + // two elements in the UNION with the same type, it will fail - even if the logical type is + // different. So if we have an int and a logical type date (which also has a 'concrete type' of int) + // then an Exception will be thrown when we try to create the union. To avoid this, we just keep track + // of the Types and avoid adding it in such a case. final List unionTypes = new ArrayList<>(options.size()); + final Set typesAdded = new HashSet<>(); + for (final DataType option : options) { - unionTypes.add(buildAvroSchema(option, fieldName, false)); + final Schema optionSchema = buildAvroSchema(option, fieldName, false); + if (!typesAdded.contains(optionSchema.getType())) { + unionTypes.add(optionSchema); + typesAdded.add(optionSchema.getType()); + } } schema = Schema.createUnion(unionTypes); @@ -213,6 +226,17 @@ private static Schema buildAvroSchema(final DataType dataType, final String fiel } private static Schema nullable(final Schema schema) { + if (schema.getType() == Type.UNION) { + final List unionTypes = new ArrayList<>(schema.getTypes()); + final Schema nullSchema = Schema.create(Type.NULL); + if (unionTypes.contains(nullSchema)) { + return schema; + } + + unionTypes.add(nullSchema); + return Schema.createUnion(unionTypes); + } + return Schema.createUnion(Schema.create(Type.NULL), schema); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java index 3084b72b5bc2..5798323bba69 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java @@ -453,6 +453,8 @@ protected QueryResult queryWithCache(final ProcessSession session, final FlowFil return new QueryResult() { @Override public void close() throws IOException { + table.close(); + final BlockingQueue statementQueue = statementQueues.get(sql); if (statementQueue == null || !statementQueue.offer(cachedStatement)) { try { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java index b2c8002604ee..63e05abfd2c9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java @@ -165,17 +165,23 @@ record = processRelativePath(replacementRecordPath, result.getSelectedFields(), } } else { final PropertyValue replacementValue = context.getProperty(recordPathText); - final Map fieldVariables = new HashMap<>(4); - result.getSelectedFields().forEach(fieldVal -> { - fieldVariables.clear(); - fieldVariables.put(FIELD_NAME, fieldVal.getField().getFieldName()); - fieldVariables.put(FIELD_VALUE, DataTypeUtils.toString(fieldVal.getValue(), (String) null)); - fieldVariables.put(FIELD_TYPE, fieldVal.getField().getDataType().getFieldType().name()); + if (replacementValue.isExpressionLanguagePresent()) { + final Map fieldVariables = new HashMap<>(); - final String evaluatedReplacementVal = replacementValue.evaluateAttributeExpressions(flowFile, fieldVariables).getValue(); - fieldVal.updateValue(evaluatedReplacementVal); - }); + result.getSelectedFields().forEach(fieldVal -> { + fieldVariables.clear(); + fieldVariables.put(FIELD_NAME, fieldVal.getField().getFieldName()); + fieldVariables.put(FIELD_VALUE, DataTypeUtils.toString(fieldVal.getValue(), (String) null)); + fieldVariables.put(FIELD_TYPE, fieldVal.getField().getDataType().getFieldType().name()); + + final String evaluatedReplacementVal = replacementValue.evaluateAttributeExpressions(flowFile, fieldVariables).getValue(); + fieldVal.updateValue(evaluatedReplacementVal); + }); + } else { + final String evaluatedReplacementVal = replacementValue.getValue(); + result.getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(evaluatedReplacementVal)); + } } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java index bd15dc2cdbb5..c40e36482be8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java @@ -18,9 +18,12 @@ import java.io.InputStream; import java.lang.reflect.Type; +import java.math.BigInteger; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.linq4j.AbstractEnumerable; @@ -47,6 +50,7 @@ import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordSchema; @@ -63,6 +67,8 @@ public class FlowFileTable extends AbstractTable implements QueryableTable private volatile FlowFile flowFile; private volatile int maxRecordsRead; + private final Set> enumerators = new HashSet<>(); + /** * Creates a FlowFile table. */ @@ -85,6 +91,14 @@ public String toString() { return "FlowFileTable"; } + public void close() { + synchronized (enumerators) { + for (final FlowFileEnumerator enumerator : enumerators) { + enumerator.close(); + } + } + } + /** * Returns an enumerable over a given projection of the fields. * @@ -96,7 +110,7 @@ public Enumerable project(final int[] fields) { @Override @SuppressWarnings({"unchecked", "rawtypes"}) public Enumerator enumerator() { - return new FlowFileEnumerator(session, flowFile, logger, recordParserFactory, fields) { + final FlowFileEnumerator flowFileEnumerator = new FlowFileEnumerator(session, flowFile, logger, recordParserFactory, fields) { @Override protected void onFinish() { final int recordCount = getRecordsRead(); @@ -104,7 +118,21 @@ protected void onFinish() { maxRecordsRead = recordCount; } } + + @Override + public void close() { + synchronized (enumerators) { + enumerators.remove(this); + } + super.close(); + } }; + + synchronized (enumerators) { + enumerators.add(flowFileEnumerator); + } + + return flowFileEnumerator; } }; } @@ -203,9 +231,13 @@ private RelDataType getRelDataType(final DataType fieldType, final JavaTypeFacto case ARRAY: return typeFactory.createJavaType(Object[].class); case RECORD: - return typeFactory.createJavaType(Object.class); + return typeFactory.createJavaType(Record.class); case MAP: return typeFactory.createJavaType(HashMap.class); + case BIGINT: + return typeFactory.createJavaType(BigInteger.class); + case CHOICE: + return typeFactory.createJavaType(Object.class); } throw new IllegalArgumentException("Unknown Record Field Type: " + fieldType); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java index 6669f4bdc2e6..33bec74b0b44 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java @@ -64,6 +64,7 @@ public void setup() throws InitializationException { public void testLiteralReplacementValue() { runner.setProperty("/name", "Jane Doe"); runner.enqueue(""); + runner.setValidateExpressionUsage(false); readerService.addRecord("John Doe", 35); runner.run(); @@ -188,6 +189,7 @@ public void testChangingSchema() throws InitializationException, IOException { public void testUpdateInArray() throws InitializationException, IOException { final JsonTreeReader jsonReader = new JsonTreeReader(); runner.addControllerService("reader", jsonReader); + runner.setValidateExpressionUsage(false); final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-address.avsc"))); final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-address.avsc"))); @@ -218,6 +220,7 @@ public void testUpdateInArray() throws InitializationException, IOException { public void testUpdateInNullArray() throws InitializationException, IOException { final JsonTreeReader jsonReader = new JsonTreeReader(); runner.addControllerService("reader", jsonReader); + runner.setValidateExpressionUsage(false); final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-address.avsc"))); final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-address.avsc"))); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java index 70aaba9e5182..f01fc3ee83ad 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java @@ -23,7 +23,7 @@ import java.io.Reader; import java.text.DateFormat; import java.util.ArrayList; -import java.util.LinkedHashMap; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -41,6 +41,8 @@ import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.util.DataTypeUtils; @@ -53,7 +55,7 @@ public class CSVRecordReader implements RecordReader { private final Supplier LAZY_TIME_FORMAT; private final Supplier LAZY_TIMESTAMP_FORMAT; - private List rawFieldNames; + private List recordFields; public CSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader, final String dateFormat, final String timeFormat, final String timestampFormat, final String encoding) throws IOException { @@ -87,31 +89,37 @@ public CSVRecordReader(final InputStream in, final ComponentLog logger, final Re public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException { final RecordSchema schema = getSchema(); - final List rawFieldNames = getRawFieldNames(); - final int numFieldNames = rawFieldNames.size(); + final List recordFields = getRecordFields(); + final int numFieldNames = recordFields.size(); for (final CSVRecord csvRecord : csvParser) { - final Map values = new LinkedHashMap<>(); + final Map values = new HashMap<>(recordFields.size() * 2); for (int i = 0; i < csvRecord.size(); i++) { - final String rawFieldName = numFieldNames <= i ? "unknown_field_index_" + i : rawFieldNames.get(i); final String rawValue = csvRecord.get(i); - final Optional dataTypeOption = schema.getDataType(rawFieldName); + final String rawFieldName; + final DataType dataType; + if (i >= numFieldNames) { + if (!dropUnknownFields) { + values.put("unknown_field_index_" + i, rawValue); + } - if (!dataTypeOption.isPresent() && dropUnknownFields) { continue; + } else { + final RecordField recordField = recordFields.get(i); + rawFieldName = recordField.getFieldName(); + dataType = recordField.getDataType(); } + final Object value; - if (coerceTypes && dataTypeOption.isPresent()) { - value = convert(rawValue, dataTypeOption.get(), rawFieldName); - } else if (dataTypeOption.isPresent()) { + if (coerceTypes) { + value = convert(rawValue, dataType, rawFieldName); + } else { // The CSV Reader is going to return all fields as Strings, because CSV doesn't have any way to // dictate a field type. As a result, we will use the schema that we have to attempt to convert // the value into the desired type if it's a simple type. - value = convertSimpleIfPossible(rawValue, dataTypeOption.get(), rawFieldName); - } else { - value = rawValue; + value = convertSimpleIfPossible(rawValue, dataType, rawFieldName); } values.put(rawFieldName, value); @@ -124,9 +132,9 @@ public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFie } - private List getRawFieldNames() { - if (this.rawFieldNames != null) { - return this.rawFieldNames; + private List getRecordFields() { + if (this.recordFields != null) { + return this.recordFields; } // Use a SortedMap keyed by index of the field so that we can get a List of field names in the correct order @@ -135,8 +143,19 @@ private List getRawFieldNames() { sortedMap.put(entry.getValue(), entry.getKey()); } - this.rawFieldNames = new ArrayList<>(sortedMap.values()); - return this.rawFieldNames; + final List fields = new ArrayList<>(); + final List rawFieldNames = new ArrayList<>(sortedMap.values()); + for (final String rawFieldName : rawFieldNames) { + final Optional option = schema.getField(rawFieldName); + if (option.isPresent()) { + fields.add(option.get()); + } else { + fields.add(new RecordField(rawFieldName, RecordFieldType.STRING.getDataType())); + } + } + + this.recordFields = fields; + return fields; } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/JacksonCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/JacksonCSVRecordReader.java index a273d0cbff27..91cca810977f 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/JacksonCSVRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/JacksonCSVRecordReader.java @@ -17,11 +17,19 @@ package org.apache.nifi.csv; -import com.fasterxml.jackson.databind.MappingIterator; -import com.fasterxml.jackson.databind.ObjectReader; -import com.fasterxml.jackson.dataformat.csv.CsvMapper; -import com.fasterxml.jackson.dataformat.csv.CsvParser; -import com.fasterxml.jackson.dataformat.csv.CsvSchema; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.text.DateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Supplier; + import org.apache.commons.csv.CSVFormat; import org.apache.commons.io.input.BOMInputStream; import org.apache.commons.lang3.CharUtils; @@ -35,18 +43,11 @@ import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.util.DataTypeUtils; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Reader; -import java.text.DateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.function.Supplier; +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; public class JacksonCSVRecordReader implements RecordReader { @@ -140,6 +141,7 @@ public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFie } } } + // Check for empty lines and ignore them boolean foundRecord = true; if (csvRecord == null || (csvRecord.length == 1 && StringUtils.isEmpty(csvRecord[0]))) { @@ -154,12 +156,13 @@ public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFie } } } + // If we didn't find a record, then the end of the file was comprised of empty lines, so we have no record to return if (!foundRecord) { return null; } - final Map values = new LinkedHashMap<>(); + final Map values = new HashMap<>(rawFieldNames.size() * 2); final int numFieldNames = rawFieldNames.size(); for (int i = 0; i < csvRecord.length; i++) { final String rawFieldName = numFieldNames <= i ? "unknown_field_index_" + i : rawFieldNames.get(i); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java index 489d1142775a..9e2c965a9315 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.function.Supplier; @@ -84,31 +83,64 @@ private Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSche return convertJsonNodeToRecord(jsonNode, schema, fieldNamePrefix, coerceTypes, dropUnknown); } + private JsonNode getChildNode(final JsonNode jsonNode, final RecordField field) { + if (jsonNode.has(field.getFieldName())) { + return jsonNode.get(field.getFieldName()); + } + + for (final String alias : field.getAliases()) { + if (jsonNode.has(alias)) { + return jsonNode.get(alias); + } + } + + return null; + } private Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema, final String fieldNamePrefix, final boolean coerceTypes, final boolean dropUnknown) throws IOException, MalformedRecordException { - final Map values = new LinkedHashMap<>(); - final Iterator fieldNames = jsonNode.getFieldNames(); - while (fieldNames.hasNext()) { - final String fieldName = fieldNames.next(); - final JsonNode childNode = jsonNode.get(fieldName); + final Map values = new HashMap<>(schema.getFieldCount() * 2); - final RecordField recordField = schema.getField(fieldName).orElse(null); - if (recordField == null && dropUnknown) { - continue; - } + if (dropUnknown) { + for (final RecordField recordField : schema.getFields()) { + final JsonNode childNode = getChildNode(jsonNode, recordField); + if (childNode == null) { + continue; + } + + final String fieldName = recordField.getFieldName(); + + final Object value; + if (coerceTypes) { + final DataType desiredType = recordField.getDataType(); + final String fullFieldName = fieldNamePrefix == null ? fieldName : fieldNamePrefix + fieldName; + value = convertField(childNode, fullFieldName, desiredType, dropUnknown); + } else { + value = getRawNodeValue(childNode, recordField == null ? null : recordField.getDataType()); + } - final Object value; - if (coerceTypes && recordField != null) { - final DataType desiredType = recordField.getDataType(); - final String fullFieldName = fieldNamePrefix == null ? fieldName : fieldNamePrefix + fieldName; - value = convertField(childNode, fullFieldName, desiredType, dropUnknown); - } else { - value = getRawNodeValue(childNode, recordField == null ? null : recordField.getDataType()); + values.put(fieldName, value); } + } else { + final Iterator fieldNames = jsonNode.getFieldNames(); + while (fieldNames.hasNext()) { + final String fieldName = fieldNames.next(); + final JsonNode childNode = jsonNode.get(fieldName); + + final RecordField recordField = schema.getField(fieldName).orElse(null); + + final Object value; + if (coerceTypes && recordField != null) { + final DataType desiredType = recordField.getDataType(); + final String fullFieldName = fieldNamePrefix == null ? fieldName : fieldNamePrefix + fieldName; + value = convertField(childNode, fullFieldName, desiredType, dropUnknown); + } else { + value = getRawNodeValue(childNode, recordField == null ? null : recordField.getDataType()); + } - values.put(fieldName, value); + values.put(fieldName, value); + } } final Supplier supplier = () -> jsonNode.toString(); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java index 5cfd3ac63d58..fc8418149ff7 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java @@ -21,6 +21,7 @@ import java.io.OutputStream; import java.math.BigInteger; import java.text.DateFormat; +import java.util.Collections; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -117,26 +118,31 @@ public void flush() throws IOException { public Map writeRecord(final Record record) throws IOException { // If we are not writing an active record set, then we need to ensure that we write the // schema information. + boolean firstRecord = false; if (!isActiveRecordSet()) { generator.flush(); schemaAccess.writeHeader(recordSchema, getOutputStream()); + firstRecord = true; } writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject(), true); - return schemaAccess.getAttributes(recordSchema); + return firstRecord ? schemaAccess.getAttributes(recordSchema) : Collections.emptyMap(); } @Override public WriteResult writeRawRecord(final Record record) throws IOException { // If we are not writing an active record set, then we need to ensure that we write the // schema information. + boolean firstRecord = false; if (!isActiveRecordSet()) { generator.flush(); schemaAccess.writeHeader(recordSchema, getOutputStream()); + firstRecord = true; } writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject(), false); - return WriteResult.of(incrementRecordCount(), schemaAccess.getAttributes(recordSchema)); + final Map attributes = firstRecord ? schemaAccess.getAttributes(recordSchema) : Collections.emptyMap(); + return WriteResult.of(incrementRecordCount(), attributes); } private void writeRecord(final Record record, final RecordSchema writeSchema, final JsonGenerator generator,