diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml index 54259979101d..22e1c981ce9a 100755 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml @@ -170,6 +170,8 @@ src/test/resources/avro/multiple-types.avsc src/test/resources/avro/simple.avsc src/test/resources/avro/recursive.avsc + src/test/resources/avro/schemaless_simple_record.avsc + src/test/resources/avro/schemaless_simple_record_extra_field.avsc src/test/resources/cef/empty-row.txt src/test/resources/cef/misformatted-row.txt diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java index ab20aad81184..6f22123170e8 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java @@ -91,7 +91,7 @@ protected GenericRecord nextAvroRecord() throws IOException { try { genericRecord = datumReader.read(null, decoder); } catch (final EOFException eof) { - return null; + throw new IOException("Was expecting more data, but reached EOF.", eof); } return genericRecord; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java index 6a3a639161a8..9bbaa55bb718 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java @@ -16,14 +16,10 @@ */ package org.apache.nifi.avro; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.nifi.serialization.MalformedRecordException; -import org.apache.nifi.serialization.RecordReader; -import org.apache.nifi.serialization.SimpleRecordSchema; -import org.apache.nifi.serialization.record.Record; -import org.apache.nifi.serialization.record.RecordSchema; -import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import java.io.File; import java.io.FileInputStream; @@ -32,11 +28,15 @@ import java.util.HashMap; import java.util.List; import java.util.Map; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertThrows; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.junit.jupiter.api.Test; public class TestAvroReaderWithExplicitSchema { @@ -105,6 +105,34 @@ public void testAvroExplicitReaderWithEmbeddedSchemaFileDifferentFromExplicitSch assertThrows(IOException.class, () -> new AvroReaderWithExplicitSchema(fileInputStream, recordSchema, dataSchema)); } + @Test + public void testAvroExplicitReaderWithSchemalessFileAndExplicitSchema() throws Exception { + AvroReaderWithExplicitSchema avroReader = createAvroReaderWithExplicitSchema( + "src/test/resources/avro/schemaless_simple_record.avro", + "src/test/resources/avro/schemaless_simple_record.avsc" + ); + + GenericData.Record expected = new GenericData.Record(new Schema.Parser().parse(new File("src/test/resources/avro/schemaless_simple_record.avsc"))); + expected.put("field_1", 123); + expected.put("field_2", "44"); + expected.put("field_3", 5); + + GenericRecord actual1 = avroReader.nextAvroRecord(); + assertEquals(expected, actual1); + + GenericRecord actual2 = avroReader.nextAvroRecord(); + assertNull(actual2); + } + + @Test + public void testAvroExplicitReaderWithSchemalessFileAndWrongExplicitSchema() throws Exception { + AvroReaderWithExplicitSchema avroReader = createAvroReaderWithExplicitSchema( + "src/test/resources/avro/schemaless_simple_record.avro", + "src/test/resources/avro/schemaless_simple_record_extra_field.avsc" + ); + assertThrows(IOException.class, avroReader::nextAvroRecord); + } + @Test public void testAvroExplicitReaderWithSchemalessFileDecimalValuesWithDifferentBufferSize() throws Exception { // GIVEN diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record.avro b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record.avro new file mode 100644 index 000000000000..858062de33e7 Binary files /dev/null and b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record.avro differ diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record.avsc new file mode 100644 index 000000000000..788b7b73c7e7 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record.avsc @@ -0,0 +1,28 @@ +{ + "type": "record", + "name": "nifiRecord", + "namespace": "org.apache.nifi", + "fields": [ + { + "name": "field_1", + "type": [ + "int", + "null" + ] + }, + { + "name": "field_2", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_3", + "type": [ + "int", + "null" + ] + } + ] +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record_extra_field.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record_extra_field.avsc new file mode 100644 index 000000000000..276107a0e128 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record_extra_field.avsc @@ -0,0 +1,24 @@ +{ + "type":"record", + "name":"message_name", + "namespace":"message_namespace", + "fields":[ + { + "name":"field_1", + "type":["long"] + }, + { + "name":"field_2", + "type":["string"] + }, + { + "name":"field_3", + "type":["int"] + }, + { + "name":"extra", + "type":["long"] + } + ] +} +