From 71a9f3aa752e3fb2b444235e31fb8e2ed6164782 Mon Sep 17 00:00:00 2001 From: Paul Grey Date: Wed, 1 Oct 2025 17:50:52 -0400 Subject: [PATCH 1/3] CDPDFX-15027 - adjust AvroWriter handling of invalid payloads; ConsumeKafka impact --- .../ConsumeKafkaRecordWithNullIT.java | 145 ++++++++++++++++++ .../kafka/reader/schemaNullable.avsc.json | 15 ++ .../kafka/reader/schemaRequired.avsc.json | 14 ++ .../nifi/avro/WriteAvroResultWithSchema.java | 6 +- 4 files changed, 179 insertions(+), 1 deletion(-) create mode 100644 nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRecordWithNullIT.java create mode 100644 nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/resources/org/apache/nifi/kafka/reader/schemaNullable.avsc.json create mode 100644 nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/resources/org/apache/nifi/kafka/reader/schemaRequired.avsc.json diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRecordWithNullIT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRecordWithNullIT.java new file mode 100644 index 000000000000..ad29073cc8c2 --- /dev/null +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRecordWithNullIT.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.processors; + +import org.apache.avro.Schema; +import org.apache.avro.io.BinaryEncoder; +import org.apache.nifi.avro.AvroReader; +import org.apache.nifi.avro.AvroRecordSetWriter; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.avro.WriteAvroResultWithExternalSchema; +import org.apache.nifi.kafka.processors.consumer.ProcessingStrategy; +import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.schema.access.WriteAvroSchemaAttributeStrategy; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.ListRecordSet; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Demonstrate handling of incoming Avro payloads with null fields where the associated Avro schema specifies that + * the field is required. Invalid payloads should be routed to PARSE_FAILURE, as stated in the documentation. + */ +class ConsumeKafkaRecordWithNullIT extends AbstractConsumeKafkaIT { + private static final int FIRST_PARTITION = 0; + + private static final String RESOURCE_AVRO_SCHEMA_NULLABLE = "src/test/resources/org/apache/nifi/kafka/reader/schemaNullable.avsc.json"; + private static final String RESOURCE_AVRO_SCHEMA_REQUIRED = "src/test/resources/org/apache/nifi/kafka/reader/schemaRequired.avsc.json"; + + private TestRunner runner; + + @BeforeEach + void setRunner() throws InitializationException, IOException { + runner = TestRunners.newTestRunner(ConsumeKafka.class); + addKafkaConnectionService(runner); + runner.setProperty(ConsumeKafka.CONNECTION_SERVICE, CONNECTION_SERVICE_ID); + addRecordReaderServiceAvro(runner); + addRecordWriterServiceAvro(runner); + } + + private void addRecordReaderServiceAvro(final TestRunner runner) throws InitializationException, IOException { + final String readerId = ConsumeKafka.RECORD_READER.getName(); + final String schemaText = Files.readString(Path.of(RESOURCE_AVRO_SCHEMA_NULLABLE)); + final RecordReaderFactory readerService = new AvroReader(); + runner.addControllerService(readerId, readerService); + runner.setProperty(readerService, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue()); + runner.setProperty(readerService, SchemaAccessUtils.SCHEMA_TEXT, schemaText); + runner.enableControllerService(readerService); + runner.setProperty(readerId, readerId); + } + + private void addRecordWriterServiceAvro(final TestRunner runner) throws InitializationException, IOException { + final String writerId = ConsumeKafka.RECORD_WRITER.getName(); + final String schemaText = Files.readString(Path.of(RESOURCE_AVRO_SCHEMA_REQUIRED)); + final RecordSetWriterFactory writerService = new AvroRecordSetWriter(); + runner.addControllerService(writerId, writerService); + runner.setProperty(writerService, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue()); + runner.setProperty(writerService, SchemaAccessUtils.SCHEMA_TEXT, schemaText); + runner.enableControllerService(writerService); + runner.setProperty(writerId, writerId); + } + + private byte[] generateAvroPayloadWithNullField() throws IOException { + final String schemaText = Files.readString(Path.of(RESOURCE_AVRO_SCHEMA_NULLABLE)); + final Schema avroSchema = new Schema.Parser().parse(schemaText); + final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema); + + final Map fields1 = Map.of("text", "string1", "ordinal", 1); + final MapRecord record1 = new MapRecord(recordSchema, fields1); + final Map fields2 = Map.of("text", "string2"); // this record omits a required field + final MapRecord record2 = new MapRecord(recordSchema, fields2); + + final BlockingQueue encoderPool = new LinkedBlockingQueue<>(1); + final ByteArrayOutputStream os = new ByteArrayOutputStream(); + final WriteAvroSchemaAttributeStrategy schemaWriter = new WriteAvroSchemaAttributeStrategy(); + try (RecordSetWriter writer = new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, schemaWriter, os, encoderPool, runner.getLogger())) { + writer.write(new ListRecordSet(recordSchema, List.of(record1, record2))); + writer.flush(); + } + return os.toByteArray(); + } + + @Test + void testProcessingStrategyRecord() throws InterruptedException, ExecutionException, IOException { + final String topic = UUID.randomUUID().toString(); + final String groupId = topic.substring(0, topic.indexOf("-")); + runner.setProperty(ConsumeKafka.GROUP_ID, groupId); + runner.setProperty(ConsumeKafka.TOPICS, topic); + runner.setProperty(ConsumeKafka.PROCESSING_STRATEGY, ProcessingStrategy.RECORD.getValue()); + runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, AutoOffsetReset.EARLIEST.getValue()); + runner.run(1, false, true); + + final byte[] payload = generateAvroPayloadWithNullField(); + produceOne(topic, FIRST_PARTITION, null, new String(payload, StandardCharsets.UTF_8), null); + int pollAttempts = 5; // before fix, this loop never terminated; bounding the number of loops avoids any issue + while ((--pollAttempts >= 0) && runner.getFlowFilesForRelationship(ConsumeKafka.SUCCESS).isEmpty() + && runner.getFlowFilesForRelationship(ConsumeKafka.PARSE_FAILURE).isEmpty()) { + runner.run(1, false, false); + } + runner.run(1, true, false); + + assertTrue(pollAttempts >= 0); + final List flowFilesForRelationshipFail = runner.getFlowFilesForRelationship(ConsumeKafka.PARSE_FAILURE); + assertEquals(1, flowFilesForRelationshipFail.size()); // the invalid record goes here + final List flowFilesForRelationship = runner.getFlowFilesForRelationship(ConsumeKafka.SUCCESS); + assertEquals(1, flowFilesForRelationship.size()); // the valid record goes here + } +} diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/resources/org/apache/nifi/kafka/reader/schemaNullable.avsc.json b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/resources/org/apache/nifi/kafka/reader/schemaNullable.avsc.json new file mode 100644 index 000000000000..aea2a779553d --- /dev/null +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/resources/org/apache/nifi/kafka/reader/schemaNullable.avsc.json @@ -0,0 +1,15 @@ +{ + "name": "test", + "type": "record", + "fields": [ + { + "name": "text", + "type": "string" + }, + { + "name": "ordinal", + "type": ["null", "long"], + "default": null + } + ] +} diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/resources/org/apache/nifi/kafka/reader/schemaRequired.avsc.json b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/resources/org/apache/nifi/kafka/reader/schemaRequired.avsc.json new file mode 100644 index 000000000000..75183bb651a1 --- /dev/null +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/resources/org/apache/nifi/kafka/reader/schemaRequired.avsc.json @@ -0,0 +1,14 @@ +{ + "name": "test", + "type": "record", + "fields": [ + { + "name": "text", + "type": "string" + }, + { + "name": "ordinal", + "type": "long" + } + ] +} diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java index ea327a4b181a..b0abe8d20e3e 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java @@ -58,7 +58,11 @@ public void flush() throws IOException { @Override public Map writeRecord(final Record record) throws IOException { final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, schema); - dataFileWriter.append(rec); + try { + dataFileWriter.append(rec); + } catch (final DataFileWriter.AppendWriteException e) { + throw new IOException(e); + } return Collections.emptyMap(); } From 1de2aed9ae59a252c0b7e2e04d1843bb352edbef Mon Sep 17 00:00:00 2001 From: Paul Grey Date: Wed, 8 Oct 2025 13:45:07 -0400 Subject: [PATCH 2/3] embed AVRO schemas in IT; remove external resources --- .../ConsumeKafkaRecordWithNullIT.java | 50 ++++++++++++++----- .../kafka/reader/schemaNullable.avsc.json | 15 ------ .../kafka/reader/schemaRequired.avsc.json | 14 ------ 3 files changed, 38 insertions(+), 41 deletions(-) delete mode 100644 nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/resources/org/apache/nifi/kafka/reader/schemaNullable.avsc.json delete mode 100644 nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/resources/org/apache/nifi/kafka/reader/schemaRequired.avsc.json diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRecordWithNullIT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRecordWithNullIT.java index ad29073cc8c2..a43050f09f82 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRecordWithNullIT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRecordWithNullIT.java @@ -45,8 +45,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; import java.util.List; import java.util.Map; import java.util.UUID; @@ -61,8 +59,39 @@ class ConsumeKafkaRecordWithNullIT extends AbstractConsumeKafkaIT { private static final int FIRST_PARTITION = 0; - private static final String RESOURCE_AVRO_SCHEMA_NULLABLE = "src/test/resources/org/apache/nifi/kafka/reader/schemaNullable.avsc.json"; - private static final String RESOURCE_AVRO_SCHEMA_REQUIRED = "src/test/resources/org/apache/nifi/kafka/reader/schemaRequired.avsc.json"; + private static final String AVRO_SCHEMA_NULLABLE = """ + { + "name": "test", + "type": "record", + "fields": [ + { + "name": "text", + "type": "string" + }, + { + "name": "ordinal", + "type": ["null", "long"], + "default": null + } + ] + } + """; + private static final String AVRO_SCHEMA_REQUIRED = """ + { + "name": "test", + "type": "record", + "fields": [ + { + "name": "text", + "type": "string" + }, + { + "name": "ordinal", + "type": "long" + } + ] + } + """; private TestRunner runner; @@ -75,31 +104,28 @@ void setRunner() throws InitializationException, IOException { addRecordWriterServiceAvro(runner); } - private void addRecordReaderServiceAvro(final TestRunner runner) throws InitializationException, IOException { + private void addRecordReaderServiceAvro(final TestRunner runner) throws InitializationException { final String readerId = ConsumeKafka.RECORD_READER.getName(); - final String schemaText = Files.readString(Path.of(RESOURCE_AVRO_SCHEMA_NULLABLE)); final RecordReaderFactory readerService = new AvroReader(); runner.addControllerService(readerId, readerService); runner.setProperty(readerService, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue()); - runner.setProperty(readerService, SchemaAccessUtils.SCHEMA_TEXT, schemaText); + runner.setProperty(readerService, SchemaAccessUtils.SCHEMA_TEXT, AVRO_SCHEMA_NULLABLE); runner.enableControllerService(readerService); runner.setProperty(readerId, readerId); } - private void addRecordWriterServiceAvro(final TestRunner runner) throws InitializationException, IOException { + private void addRecordWriterServiceAvro(final TestRunner runner) throws InitializationException { final String writerId = ConsumeKafka.RECORD_WRITER.getName(); - final String schemaText = Files.readString(Path.of(RESOURCE_AVRO_SCHEMA_REQUIRED)); final RecordSetWriterFactory writerService = new AvroRecordSetWriter(); runner.addControllerService(writerId, writerService); runner.setProperty(writerService, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue()); - runner.setProperty(writerService, SchemaAccessUtils.SCHEMA_TEXT, schemaText); + runner.setProperty(writerService, SchemaAccessUtils.SCHEMA_TEXT, AVRO_SCHEMA_REQUIRED); runner.enableControllerService(writerService); runner.setProperty(writerId, writerId); } private byte[] generateAvroPayloadWithNullField() throws IOException { - final String schemaText = Files.readString(Path.of(RESOURCE_AVRO_SCHEMA_NULLABLE)); - final Schema avroSchema = new Schema.Parser().parse(schemaText); + final Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_NULLABLE); final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema); final Map fields1 = Map.of("text", "string1", "ordinal", 1); diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/resources/org/apache/nifi/kafka/reader/schemaNullable.avsc.json b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/resources/org/apache/nifi/kafka/reader/schemaNullable.avsc.json deleted file mode 100644 index aea2a779553d..000000000000 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/resources/org/apache/nifi/kafka/reader/schemaNullable.avsc.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "name": "test", - "type": "record", - "fields": [ - { - "name": "text", - "type": "string" - }, - { - "name": "ordinal", - "type": ["null", "long"], - "default": null - } - ] -} diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/resources/org/apache/nifi/kafka/reader/schemaRequired.avsc.json b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/resources/org/apache/nifi/kafka/reader/schemaRequired.avsc.json deleted file mode 100644 index 75183bb651a1..000000000000 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/resources/org/apache/nifi/kafka/reader/schemaRequired.avsc.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "name": "test", - "type": "record", - "fields": [ - { - "name": "text", - "type": "string" - }, - { - "name": "ordinal", - "type": "long" - } - ] -} From e93e5bb96f776219561faa9ca6cc98582d6f0dc2 Mon Sep 17 00:00:00 2001 From: Paul Grey Date: Thu, 9 Oct 2025 10:35:36 -0400 Subject: [PATCH 3/3] exception message on Avro record write failure --- .../java/org/apache/nifi/avro/WriteAvroResultWithSchema.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java index b0abe8d20e3e..62b6c9cc0d99 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java @@ -61,7 +61,7 @@ public Map writeRecord(final Record record) throws IOException { try { dataFileWriter.append(rec); } catch (final DataFileWriter.AppendWriteException e) { - throw new IOException(e); + throw new IOException("AppendWriteException while writing a datum to the Avro record buffer", e); } return Collections.emptyMap(); }