From 9197f80bb512450a67b9a9d7a1bab4609abd0e9a Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 6 Dec 2017 11:40:05 -0500 Subject: [PATCH] NIFI-4671: Ensure that Avro Schemas that are created properly denote fields as being nullable iff the schemas says they are, for non-top-level fields --- .../nifi/serialization/record/ResultSetRecordSet.java | 11 ++++++++++- .../nifi/serialization/record/util/DataTypeUtils.java | 2 +- .../main/java/org/apache/nifi/avro/AvroTypeUtil.java | 7 ++++--- .../org/apache/nifi/csv/CSVHeaderSchemaStrategy.java | 2 +- .../main/java/org/apache/nifi/grok/GrokReader.java | 4 ++-- 5 files changed, 18 insertions(+), 8 deletions(-) diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java index b6daab76c628..ad26d792f938 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java @@ -127,7 +127,16 @@ private static RecordSchema createSchema(final ResultSet rs) throws SQLException final DataType dataType = getDataType(sqlType, rs, column); final String fieldName = metadata.getColumnLabel(column); - final RecordField field = new RecordField(fieldName, dataType); + + final int nullableFlag = metadata.isNullable(column); + final boolean nullable; + if (nullableFlag == ResultSetMetaData.columnNoNulls) { + nullable = false; + } else { + nullable = true; + } + + final RecordField field = new RecordField(fieldName, dataType, nullable); fields.add(field); } diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java index 55a4d6909cfc..ccd9270d3a91 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java @@ -985,7 +985,7 @@ public static RecordField merge(final RecordField thisField, final RecordField o dataType = RecordFieldType.CHOICE.getChoiceDataType(thisField.getDataType(), otherField.getDataType()); } - return new RecordField(fieldName, dataType, defaultValue, aliases); + return new RecordField(fieldName, dataType, defaultValue, aliases, thisField.isNullable() || otherField.isNullable()); } public static boolean isScalarValue(final DataType dataType, final Object value) { diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java index abc381fcf524..c5256c4679e4 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java @@ -289,11 +289,12 @@ public static DataType determineDataType(final Schema avroSchema, Map mapFields = new ArrayList<>(); for (final String key : map.keySet()) { - mapFields.add(new RecordField(key, elementType)); + mapFields.add(new RecordField(key, elementType, true)); } final RecordSchema mapSchema = new SimpleRecordSchema(mapFields); return new MapRecord(mapSchema, map); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java index 642f360817b2..9c31cca347f0 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java @@ -65,7 +65,7 @@ public RecordSchema getSchema(Map variables, final InputStream c final List fields = new ArrayList<>(); for (final String columnName : csvParser.getHeaderMap().keySet()) { - fields.add(new RecordField(columnName, RecordFieldType.STRING.getDataType())); + fields.add(new RecordField(columnName, RecordFieldType.STRING.getDataType(), true)); } return new SimpleRecordSchema(fields); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java index 4a26975b623f..6eea8e38129b 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java @@ -150,8 +150,8 @@ static RecordSchema createRecordSchema(final Grok grok) { String grokExpression = grok.getOriginalGrokPattern(); populateSchemaFieldNames(grok, grokExpression, fields); - fields.add(new RecordField(GrokRecordReader.STACK_TRACE_COLUMN_NAME, RecordFieldType.STRING.getDataType())); - fields.add(new RecordField(GrokRecordReader.RAW_MESSAGE_NAME, RecordFieldType.STRING.getDataType())); + fields.add(new RecordField(GrokRecordReader.STACK_TRACE_COLUMN_NAME, RecordFieldType.STRING.getDataType(), true)); + fields.add(new RecordField(GrokRecordReader.RAW_MESSAGE_NAME, RecordFieldType.STRING.getDataType(), true)); final RecordSchema schema = new SimpleRecordSchema(fields); return schema;