From 268ae74deeaf0228fb07cf50c3dd343ff9fb7e9f Mon Sep 17 00:00:00 2001 From: Ed B Date: Sat, 20 Oct 2018 21:42:21 -0400 Subject: [PATCH 1/2] NIFI-5728 XML Writer to populate record tag name properly --- .../serialization/SimpleRecordSchema.java | 28 +++++++++++ .../serialization/record/RecordSchema.java | 11 ++++ .../org/apache/nifi/avro/AvroTypeUtil.java | 2 + .../org/apache/nifi/xml/WriteXMLResult.java | 2 +- .../nifi/xml/TestXMLRecordSetWriter.java | 50 +++++++++++++++++++ 5 files changed, 92 insertions(+), 1 deletion(-) diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java index 6926c939c021..946784dedacf 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java @@ -38,6 +38,8 @@ public class SimpleRecordSchema implements RecordSchema { private final AtomicReference text = new AtomicReference<>(); private final String schemaFormat; private final SchemaIdentifier schemaIdentifier; + private String schemaName; + private String schemaNamespace; public SimpleRecordSchema(final List fields) { this(fields, createText(fields), null, false, SchemaIdentifier.EMPTY); @@ -213,4 +215,30 @@ public String toString() { public SchemaIdentifier getIdentifier() { return schemaIdentifier; } + + /** + * Set schema name. + * @param schemaName schema name as defined in a root record. + */ + public void setSchemaName(String schemaName) { + this.schemaName = schemaName; + } + + @Override + public Optional getSchemaName() { + return Optional.ofNullable(schemaName); + } + + /** + * Set schema namespace. + * @param schemaNamespace schema name as defined in a root record. + */ + public void setSchemaNamespace(String schemaNamespace) { + this.schemaNamespace = schemaNamespace; + } + + @Override + public Optional getSchemaNamespace() { + return Optional.ofNullable(schemaNamespace); + } } diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSchema.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSchema.java index 367f2b0b53a1..cdc9a32fea75 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSchema.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSchema.java @@ -76,4 +76,15 @@ public interface RecordSchema { * @return the SchemaIdentifier, which provides various attributes for identifying a schema */ SchemaIdentifier getIdentifier(); + + /** + * @return the name of the schema's root record. + */ + Optional getSchemaName(); + + /** + * @return the namespace of the schema. + */ + Optional getSchemaNamespace(); + } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java index 2e8898a49502..9e023ccfc5f9 100755 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java @@ -396,6 +396,8 @@ public static RecordSchema createSchema(final Schema avroSchema, final String sc final String schemaFullName = avroSchema.getNamespace() + "." + avroSchema.getName(); final SimpleRecordSchema recordSchema = schemaText == null ? new SimpleRecordSchema(schemaId) : new SimpleRecordSchema(schemaText, AVRO_SCHEMA_FORMAT, schemaId); + recordSchema.setSchemaName(avroSchema.getName()); + recordSchema.setSchemaNamespace(avroSchema.getNamespace()); final DataType recordSchemaType = RecordFieldType.RECORD.getRecordDataType(recordSchema); final Map knownRecords = new HashMap<>(); knownRecords.put(schemaFullName, recordSchemaType); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java index baa3a135548a..382067c829e0 100755 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java @@ -91,7 +91,7 @@ public WriteXMLResult(final ComponentLog logger, final RecordSchema recordSchema if (recordTagName != null) { this.recordTagName = recordTagName; } else { - Optional recordTagNameOptional = recordSchema.getIdentifier().getName(); + Optional recordTagNameOptional = recordSchema.getSchemaName().isPresent()? recordSchema.getSchemaName() : recordSchema.getIdentifier().getName(); if (recordTagNameOptional.isPresent()) { this.recordTagName = recordTagNameOptional.get(); } else { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordSetWriter.java index 8008f65be0da..c835ede5d646 100755 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordSetWriter.java @@ -17,9 +17,16 @@ package org.apache.nifi.xml; +import org.apache.avro.Schema; +import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Assert; @@ -29,6 +36,7 @@ import org.xmlunit.matchers.CompareMatcher; import java.io.IOException; +import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Paths; @@ -116,6 +124,34 @@ public void testRootAndRecordNaming() throws IOException, InitializationExceptio assertThat(expected, CompareMatcher.isSimilarTo(actual).ignoreWhitespace().withNodeMatcher(new DefaultNodeMatcher(ElementSelectors.byNameAndText))); } + @Test + public void testSchemaRootRecordNaming() throws IOException, InitializationException { + String avroSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/xml/testschema3")));; + Schema avroSchema = new Schema.Parser().parse(avroSchemaText); + + SchemaIdentifier schemaId = SchemaIdentifier.builder().name("schemaName").build(); + RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema, avroSchemaText, schemaId); + + XMLRecordSetWriter writer = new _XMLRecordSetWriter(recordSchema); + TestRunner runner = setup(writer); + + runner.setProperty(writer, XMLRecordSetWriter.ROOT_TAG_NAME, "ROOT_NODE"); + //runner.setProperty(writer, XMLRecordSetWriter.RECORD_TAG_NAME, "RECORD_NODE"); + + runner.enableControllerService(writer); + runner.enqueue(""); + runner.run(); + runner.assertQueueEmpty(); + runner.assertAllFlowFilesTransferred(TestXMLRecordSetWriterProcessor.SUCCESS, 1); + + String expected = "13" + + "val1" + + "13" + + "val1"; + String actual = new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(TestXMLRecordSetWriterProcessor.SUCCESS).get(0))); + assertThat(expected, CompareMatcher.isSimilarTo(actual).ignoreWhitespace().withNodeMatcher(new DefaultNodeMatcher(ElementSelectors.byNameAndText))); + } + @Test public void testNullSuppression() throws IOException, InitializationException { XMLRecordSetWriter writer = new XMLRecordSetWriter(); @@ -194,5 +230,19 @@ public void testValidation() throws IOException, InitializationException { } } + static class _XMLRecordSetWriter extends XMLRecordSetWriter{ + + RecordSchema recordSchema; + + _XMLRecordSetWriter(RecordSchema recordSchema){ + this.recordSchema = recordSchema; + } + + @Override + public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out) + throws SchemaNotFoundException, IOException { + return super.createWriter(logger, this.recordSchema, out); + } + } } From fccbdd49e387b099e90c4980eb308b3522084221 Mon Sep 17 00:00:00 2001 From: Ed B Date: Thu, 1 Nov 2018 09:04:53 -0400 Subject: [PATCH 2/2] polishing javadoc and redundant comments --- .../java/org/apache/nifi/serialization/SimpleRecordSchema.java | 2 +- .../test/java/org/apache/nifi/xml/TestXMLRecordSetWriter.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java index 946784dedacf..ba507e960205 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java @@ -231,7 +231,7 @@ public Optional getSchemaName() { /** * Set schema namespace. - * @param schemaNamespace schema name as defined in a root record. + * @param schemaNamespace schema namespace as defined in a root record. */ public void setSchemaNamespace(String schemaNamespace) { this.schemaNamespace = schemaNamespace; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordSetWriter.java index c835ede5d646..becb3c5a911e 100755 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordSetWriter.java @@ -136,7 +136,6 @@ public void testSchemaRootRecordNaming() throws IOException, InitializationExcep TestRunner runner = setup(writer); runner.setProperty(writer, XMLRecordSetWriter.ROOT_TAG_NAME, "ROOT_NODE"); - //runner.setProperty(writer, XMLRecordSetWriter.RECORD_TAG_NAME, "RECORD_NODE"); runner.enableControllerService(writer); runner.enqueue("");