From d2c7abde27764bf74cfe17235e0070562d057859 Mon Sep 17 00:00:00 2001 From: Rajmund Takacs Date: Tue, 6 Feb 2024 16:03:56 +0100 Subject: [PATCH 1/4] NIFI-12745: Fix AvroReader silently dropping malformed records --- .../pom.xml | 2 ++ .../avro/AvroReaderWithExplicitSchema.java | 2 +- .../TestAvroReaderWithExplicitSchema.java | 30 ++++++++++++++++++ .../avro/schemaless_simple_record.avro | Bin 0 -> 9 bytes .../avro/schemaless_simple_record.avsc | 28 ++++++++++++++++ .../schemaless_simple_record_extra_field.avsc | 24 ++++++++++++++ 6 files changed, 85 insertions(+), 1 deletion(-) create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record.avro create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record.avsc create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record_extra_field.avsc diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml index 54259979101d..22e1c981ce9a 100755 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml @@ -170,6 +170,8 @@ src/test/resources/avro/multiple-types.avsc src/test/resources/avro/simple.avsc src/test/resources/avro/recursive.avsc + src/test/resources/avro/schemaless_simple_record.avsc + src/test/resources/avro/schemaless_simple_record_extra_field.avsc src/test/resources/cef/empty-row.txt src/test/resources/cef/misformatted-row.txt diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java index ab20aad81184..6f22123170e8 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java @@ -91,7 +91,7 @@ protected GenericRecord nextAvroRecord() throws IOException { try { genericRecord = datumReader.read(null, decoder); } catch (final EOFException eof) { - return null; + throw new IOException("Was expecting more data, but reached EOF.", eof); } return genericRecord; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java index 6a3a639161a8..48d102514e26 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java @@ -105,6 +105,36 @@ public void testAvroExplicitReaderWithEmbeddedSchemaFileDifferentFromExplicitSch assertThrows(IOException.class, () -> new AvroReaderWithExplicitSchema(fileInputStream, recordSchema, dataSchema)); } + @Test + public void testAvroExplicitReaderWithSchemalessFileAndExplicitSchema() throws Exception { + File avroFileWithoutSchema = new File("src/test/resources/avro/schemaless_simple_record.avro"); + FileInputStream fileInputStream = new FileInputStream(avroFileWithoutSchema); + Schema dataSchema = new Schema.Parser().parse(new File("src/test/resources/avro/schemaless_simple_record.avsc")); + RecordSchema recordSchema = new SimpleRecordSchema(dataSchema.toString(), AvroTypeUtil.AVRO_SCHEMA_FORMAT, null); + + AvroReaderWithExplicitSchema avroReader = new AvroReaderWithExplicitSchema(fileInputStream, recordSchema, dataSchema); + GenericRecord record = avroReader.nextAvroRecord(); + assertNotNull(record); + assertEquals(123, record.get("field_1")); + assertNotNull(record.get("field_2")); + assertEquals("44", record.get("field_2").toString()); + assertEquals(5, record.get("field_3")); + + record = avroReader.nextAvroRecord(); + assertNull(record); + } + + @Test + public void testAvroExplicitReaderWithSchemalessFileAndWrongExplicitSchema() throws Exception { + File avroFileWithoutSchema = new File("src/test/resources/avro/schemaless_simple_record.avro"); + FileInputStream fileInputStream = new FileInputStream(avroFileWithoutSchema); + Schema dataSchema = new Schema.Parser().parse(new File("src/test/resources/avro/schemaless_simple_record_extra_field.avsc")); + RecordSchema recordSchema = new SimpleRecordSchema(dataSchema.toString(), AvroTypeUtil.AVRO_SCHEMA_FORMAT, null); + + AvroReaderWithExplicitSchema avroReader = new AvroReaderWithExplicitSchema(fileInputStream, recordSchema, dataSchema); + assertThrows(IOException.class, avroReader::nextAvroRecord); + } + @Test public void testAvroExplicitReaderWithSchemalessFileDecimalValuesWithDifferentBufferSize() throws Exception { // GIVEN diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record.avro b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record.avro new file mode 100644 index 0000000000000000000000000000000000000000..858062de33e769b9e6242ac3d5a524d8808ba6fe GIT binary patch literal 9 QcmZSh#>l{8V#2@$00~L~ZU6uP literal 0 HcmV?d00001 diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record.avsc new file mode 100644 index 000000000000..788b7b73c7e7 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record.avsc @@ -0,0 +1,28 @@ +{ + "type": "record", + "name": "nifiRecord", + "namespace": "org.apache.nifi", + "fields": [ + { + "name": "field_1", + "type": [ + "int", + "null" + ] + }, + { + "name": "field_2", + "type": [ + "string", + "null" + ] + }, + { + "name": "field_3", + "type": [ + "int", + "null" + ] + } + ] +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record_extra_field.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record_extra_field.avsc new file mode 100644 index 000000000000..276107a0e128 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/schemaless_simple_record_extra_field.avsc @@ -0,0 +1,24 @@ +{ + "type":"record", + "name":"message_name", + "namespace":"message_namespace", + "fields":[ + { + "name":"field_1", + "type":["long"] + }, + { + "name":"field_2", + "type":["string"] + }, + { + "name":"field_3", + "type":["int"] + }, + { + "name":"extra", + "type":["long"] + } + ] +} + From c334b9858663bb3ccc7a97cb2e92100e885025d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rajmund=20Tak=C3=A1cs?= Date: Thu, 8 Feb 2024 15:48:53 +0100 Subject: [PATCH 2/4] Apply suggestions from code review Co-authored-by: tpalfy <53442425+tpalfy@users.noreply.github.com> --- .../TestAvroReaderWithExplicitSchema.java | 36 +++++++++---------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java index 48d102514e26..df823fcd8c8d 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java @@ -107,31 +107,29 @@ public void testAvroExplicitReaderWithEmbeddedSchemaFileDifferentFromExplicitSch @Test public void testAvroExplicitReaderWithSchemalessFileAndExplicitSchema() throws Exception { - File avroFileWithoutSchema = new File("src/test/resources/avro/schemaless_simple_record.avro"); - FileInputStream fileInputStream = new FileInputStream(avroFileWithoutSchema); - Schema dataSchema = new Schema.Parser().parse(new File("src/test/resources/avro/schemaless_simple_record.avsc")); - RecordSchema recordSchema = new SimpleRecordSchema(dataSchema.toString(), AvroTypeUtil.AVRO_SCHEMA_FORMAT, null); + AvroReaderWithExplicitSchema avroReader = createAvroReaderWithExplicitSchema( + "src/test/resources/avro/schemaless_simple_record.avro", + "src/test/resources/avro/schemaless_simple_record.avsc" + ); - AvroReaderWithExplicitSchema avroReader = new AvroReaderWithExplicitSchema(fileInputStream, recordSchema, dataSchema); - GenericRecord record = avroReader.nextAvroRecord(); - assertNotNull(record); - assertEquals(123, record.get("field_1")); - assertNotNull(record.get("field_2")); - assertEquals("44", record.get("field_2").toString()); - assertEquals(5, record.get("field_3")); + GenericData.Record expected = new GenericData.Record(new Schema.Parser().parse(new File("src/test/resources/avro/schemaless_simple_record.avsc"))); + expected.put("field_1", 123); + expected.put("field_2", "44"); + expected.put("field_3", 5); - record = avroReader.nextAvroRecord(); - assertNull(record); + GenericRecord actual1 = avroReader.nextAvroRecord(); + assertNotNull(actual1); + + GenericRecord actual2 = avroReader.nextAvroRecord(); + assertNull(actual2); } @Test public void testAvroExplicitReaderWithSchemalessFileAndWrongExplicitSchema() throws Exception { - File avroFileWithoutSchema = new File("src/test/resources/avro/schemaless_simple_record.avro"); - FileInputStream fileInputStream = new FileInputStream(avroFileWithoutSchema); - Schema dataSchema = new Schema.Parser().parse(new File("src/test/resources/avro/schemaless_simple_record_extra_field.avsc")); - RecordSchema recordSchema = new SimpleRecordSchema(dataSchema.toString(), AvroTypeUtil.AVRO_SCHEMA_FORMAT, null); - - AvroReaderWithExplicitSchema avroReader = new AvroReaderWithExplicitSchema(fileInputStream, recordSchema, dataSchema); + AvroReaderWithExplicitSchema avroReader = createAvroReaderWithExplicitSchema( + "src/test/resources/avro/schemaless_simple_record.avro", + "src/test/resources/avro/schemaless_simple_record_extra_field.avsc" + ); assertThrows(IOException.class, avroReader::nextAvroRecord); } From dd74edd3e59b7b738a13835a5e98846d628cbd75 Mon Sep 17 00:00:00 2001 From: Rajmund Takacs Date: Thu, 8 Feb 2024 15:52:36 +0100 Subject: [PATCH 3/4] Fix imports --- .../TestAvroReaderWithExplicitSchema.java | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java index df823fcd8c8d..1308c1a3abb9 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java @@ -16,14 +16,10 @@ */ package org.apache.nifi.avro; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.nifi.serialization.MalformedRecordException; -import org.apache.nifi.serialization.RecordReader; -import org.apache.nifi.serialization.SimpleRecordSchema; -import org.apache.nifi.serialization.record.Record; -import org.apache.nifi.serialization.record.RecordSchema; -import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import java.io.File; import java.io.FileInputStream; @@ -32,11 +28,15 @@ import java.util.HashMap; import java.util.List; import java.util.Map; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertThrows; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.junit.jupiter.api.Test; public class TestAvroReaderWithExplicitSchema { From 6c7751c2989e0ce61616a8f7d41323aa76e1c04d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rajmund=20Tak=C3=A1cs?= Date: Thu, 8 Feb 2024 16:57:22 +0100 Subject: [PATCH 4/4] Update nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java Co-authored-by: tpalfy <53442425+tpalfy@users.noreply.github.com> --- .../org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java index 1308c1a3abb9..9bbaa55bb718 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithExplicitSchema.java @@ -118,7 +118,7 @@ public void testAvroExplicitReaderWithSchemalessFileAndExplicitSchema() throws E expected.put("field_3", 5); GenericRecord actual1 = avroReader.nextAvroRecord(); - assertNotNull(actual1); + assertEquals(expected, actual1); GenericRecord actual2 = avroReader.nextAvroRecord(); assertNull(actual2);