From 761140494040679e8ca2347b763e8bedfc66fa7b Mon Sep 17 00:00:00 2001 From: JohannesDaniel Date: Tue, 22 May 2018 21:54:48 +0200 Subject: [PATCH] NIFI-5223 Allow the usage of expression language for properties of RecordSetWriters --- .../azure/eventhub/ConsumeAzureEventHub.java | 2 +- .../eventhub/TestConsumeAzureEventHub.java | 4 +-- .../nifi/processors/druid/PutDruidRecord.java | 6 ++-- .../hadoop/AbstractFetchHDFSRecord.java | 2 +- .../record/MockRecordWriter.java | 2 +- .../kafka/pubsub/ConsumerLease.java | 2 +- .../kafka/pubsub/PublisherLease.java | 2 +- .../kafka/pubsub/TestPublisherLease.java | 4 +-- .../kafka/pubsub/util/MockRecordWriter.java | 2 +- .../kafka/pubsub/ConsumerLease.java | 2 +- .../kafka/pubsub/PublisherLease.java | 2 +- .../kafka/pubsub/TestPublisherLease.java | 4 +-- .../kafka/pubsub/util/MockRecordWriter.java | 2 +- .../kafka/pubsub/ConsumerLease.java | 2 +- .../kafka/pubsub/PublisherLease.java | 2 +- .../kafka/pubsub/TestPublisherLease.java | 4 +-- .../kafka/pubsub/util/MockRecordWriter.java | 2 +- .../processors/parquet/FetchParquetTest.java | 2 +- .../script/ScriptedRecordSetWriter.java | 4 +-- .../script/ScriptedRecordSetWriterTest.groovy | 2 +- .../groovy/test_record_writer_inline.groovy | 2 +- .../AbstractSiteToSiteReportingTask.java | 2 +- .../apache/nifi/processors/solr/GetSolr.java | 2 +- .../nifi/processors/solr/QuerySolr.java | 7 ++-- .../standard/AbstractRecordProcessor.java | 2 +- .../standard/AbstractRouteRecord.java | 2 +- .../processors/standard/ListenTCPRecord.java | 2 +- .../processors/standard/ListenUDPRecord.java | 2 +- .../processors/standard/PartitionRecord.java | 2 +- .../nifi/processors/standard/QueryRecord.java | 2 +- .../nifi/processors/standard/SplitRecord.java | 2 +- .../processors/standard/ValidateRecord.java | 2 +- .../processors/standard/merge/RecordBin.java | 2 +- .../processors/standard/TestQueryRecord.java | 2 +- .../serialization/RecordSetWriterFactory.java | 22 ++++++++++++- .../apache/nifi/avro/AvroRecordSetWriter.java | 2 +- .../apache/nifi/csv/CSVRecordSetWriter.java | 3 +- .../apache/nifi/json/JsonRecordSetWriter.java | 3 +- .../text/FreeFormTextRecordSetWriter.java | 3 +- .../org/apache/nifi/xml/WriteXMLResult.java | 4 +-- .../apache/nifi/xml/XMLRecordSetWriter.java | 19 ++++++----- .../nifi/xml/TestXMLRecordSetWriter.java | 33 +++++++++++++++++++ .../xml/TestXMLRecordSetWriterProcessor.java | 3 +- 43 files changed, 120 insertions(+), 59 deletions(-) mode change 100644 => 100755 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java mode change 100644 => 100755 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java mode change 100644 => 100755 nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/druid/PutDruidRecord.java mode change 100644 => 100755 nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java mode change 100644 => 100755 nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java mode change 100644 => 100755 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java mode change 100644 => 100755 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java mode change 100644 => 100755 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java mode change 100644 => 100755 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java mode change 100644 => 100755 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java mode change 100644 => 100755 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java mode change 100644 => 100755 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java mode change 100644 => 100755 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java mode change 100644 => 100755 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java mode change 100644 => 100755 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java mode change 100644 => 100755 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java mode change 100644 => 100755 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java mode change 100644 => 100755 nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java mode change 100644 => 100755 nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java mode change 100644 => 100755 nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy mode change 100644 => 100755 nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy mode change 100644 => 100755 nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java mode change 100644 => 100755 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java mode change 100644 => 100755 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java mode change 100644 => 100755 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java mode change 100644 => 100755 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java mode change 100644 => 100755 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java mode change 100644 => 100755 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java mode change 100644 => 100755 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java mode change 100644 => 100755 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java mode change 100644 => 100755 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java mode change 100644 => 100755 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java mode change 100644 => 100755 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java mode change 100644 => 100755 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java mode change 100644 => 100755 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java mode change 100644 => 100755 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java old mode 100644 new mode 100755 index 4eb0da56f773..b612ee486963 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java @@ -465,7 +465,7 @@ private void writeRecords(PartitionContext context, Iterable messages // Initialize the writer when the first record is read. final RecordSchema readerSchema = record.getSchema(); final RecordSchema writeSchema = writerFactory.getSchema(schemaRetrievalVariables, readerSchema); - writer = writerFactory.createWriter(logger, writeSchema, out); + writer = writerFactory.createWriter(Collections.emptyMap(), logger, writeSchema, out); writer.beginRecordSet(); } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java old mode 100644 new mode 100755 index 1f54f392b9ad..139dc17f76b2 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java @@ -180,8 +180,8 @@ private void setupRecordWriter(String throwErrorWith) throws SchemaNotFoundExcep processor.setWriterFactory(writerFactory); final RecordSetWriter writer = mock(RecordSetWriter.class); final AtomicReference outRef = new AtomicReference<>(); - when(writerFactory.createWriter(any(), any(), any())).thenAnswer(invocation -> { - outRef.set(invocation.getArgumentAt(2, OutputStream.class)); + when(writerFactory.createWriter(anyMap(), any(), any(), any())).thenAnswer(invocation -> { + outRef.set(invocation.getArgumentAt(3, OutputStream.class)); return writer; }); when(writer.write(any(Record.class))).thenAnswer(invocation -> { diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/druid/PutDruidRecord.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/druid/PutDruidRecord.java old mode 100644 new mode 100755 index fa6cfac4cc7f..6a7a9898a316 --- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/druid/PutDruidRecord.java +++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/main/java/org/apache/nifi/processors/druid/PutDruidRecord.java @@ -187,11 +187,11 @@ private void processFlowFile(ProcessContext context, final ProcessSession sessio final RecordReader reader = recordParserFactory.createRecordReader(flowFile, in, getLogger()); final RecordSchema outSchema = writerFactory.getSchema(attributes, reader.getSchema()); - droppedRecordWriter = writerFactory.createWriter(log, outSchema, droppedOutputStream); + droppedRecordWriter = writerFactory.createWriter(attributes, log, outSchema, droppedOutputStream); droppedRecordWriter.beginRecordSet(); - failedRecordWriter = writerFactory.createWriter(log, outSchema, failedOutputStream); + failedRecordWriter = writerFactory.createWriter(attributes, log, outSchema, failedOutputStream); failedRecordWriter.beginRecordSet(); - successfulRecordWriter = writerFactory.createWriter(log, outSchema, successfulOutputStream); + successfulRecordWriter = writerFactory.createWriter(attributes, log, outSchema, successfulOutputStream); successfulRecordWriter.beginRecordSet(); Record r; diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java old mode 100644 new mode 100755 index 41b0365cae67..59faeae92fb7 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java @@ -199,7 +199,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final RecordSchema schema = recordSetWriterFactory.getSchema(originalFlowFile.getAttributes(), record == null ? null : record.getSchema()); - try (final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(getLogger(), schema, out)) { + try (final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(originalFlowFile.getAttributes(), getLogger(), schema, out)) { recordSetWriter.beginRecordSet(); if (record != null) { recordSetWriter.write(record); diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java old mode 100644 new mode 100755 index d7579e82a521..c1495c4aee31 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java @@ -59,7 +59,7 @@ public RecordSchema getSchema(Map variables, RecordSchema readSc } @Override - public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) { + public RecordSetWriter createWriter(final Map variables, final ComponentLog logger, final RecordSchema schema, final OutputStream out) { return new RecordSetWriter() { private int recordCount = 0; private boolean headerWritten = false; diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java old mode 100644 new mode 100755 index 26562b9e8110..587a67a81665 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -507,7 +507,7 @@ private void writeRecordData(final ProcessSession session, final List variables, RecordSchema readSc } @Override - public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) { + public RecordSetWriter createWriter(final Map variables, final ComponentLog logger, final RecordSchema schema, final OutputStream out) { return new RecordSetWriter() { @Override diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java old mode 100644 new mode 100755 index 4d9a5b6d536a..1b8ce3c1d1a8 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -555,7 +555,7 @@ private void writeRecordData(final ProcessSession session, final List additionalAttributes = Collections.emptyMap(); - try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) { + try (final RecordSetWriter writer = writerFactory.createWriter(flowFile.getAttributes(), logger, schema, baos)) { final WriteResult writeResult = writer.write(record); additionalAttributes = writeResult.getAttributes(); writer.flush(); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java old mode 100644 new mode 100755 index 3ab7abb099a5..00c51c84ebe4 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java @@ -277,11 +277,11 @@ public void testRecordsSentToRecordWriterAndThenToProducer() throws IOException, final RecordSetWriter writer = Mockito.mock(RecordSetWriter.class); Mockito.when(writer.write(Mockito.any(Record.class))).thenReturn(WriteResult.of(1, Collections.emptyMap())); - Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any())).thenReturn(writer); + Mockito.when(writerFactory.createWriter(eq(flowFile.getAttributes()), eq(logger), eq(schema), any())).thenReturn(writer); lease.publish(flowFile, recordSet, writerFactory, schema, keyField, topic); - verify(writerFactory, times(2)).createWriter(eq(logger), eq(schema), any()); + verify(writerFactory, times(2)).createWriter(eq(flowFile.getAttributes()), eq(logger), eq(schema), any()); verify(writer, times(2)).write(any(Record.class)); verify(producer, times(2)).send(any(), any()); } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java old mode 100644 new mode 100755 index 0eb860688b2f..2e619b6ecafe --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java @@ -57,7 +57,7 @@ public RecordSchema getSchema(Map variables, RecordSchema readSc } @Override - public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) { + public RecordSetWriter createWriter(final Map variables, final ComponentLog logger, final RecordSchema schema, final OutputStream out) { return new RecordSetWriter() { @Override diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java old mode 100644 new mode 100755 index 2e7e2d465eba..3329b37b0161 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -555,7 +555,7 @@ private void writeRecordData(final ProcessSession session, final List additionalAttributes = Collections.emptyMap(); - try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) { + try (final RecordSetWriter writer = writerFactory.createWriter(flowFile.getAttributes(), logger, schema, baos)) { final WriteResult writeResult = writer.write(record); additionalAttributes = writeResult.getAttributes(); writer.flush(); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java old mode 100644 new mode 100755 index 2fbf53937715..906580bc451c --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java @@ -277,11 +277,11 @@ public void testRecordsSentToRecordWriterAndThenToProducer() throws IOException, final RecordSetWriter writer = Mockito.mock(RecordSetWriter.class); Mockito.when(writer.write(Mockito.any(Record.class))).thenReturn(WriteResult.of(1, Collections.emptyMap())); - Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any())).thenReturn(writer); + Mockito.when(writerFactory.createWriter(eq(flowFile.getAttributes()), eq(logger), eq(schema), any())).thenReturn(writer); lease.publish(flowFile, recordSet, writerFactory, schema, keyField, topic); - verify(writerFactory, times(2)).createWriter(eq(logger), eq(schema), any()); + verify(writerFactory, times(2)).createWriter(eq(flowFile.getAttributes()), eq(logger), eq(schema), any()); verify(writer, times(2)).write(any(Record.class)); verify(producer, times(2)).send(any(), any()); } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java old mode 100644 new mode 100755 index 0eb860688b2f..2e619b6ecafe --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java @@ -57,7 +57,7 @@ public RecordSchema getSchema(Map variables, RecordSchema readSc } @Override - public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) { + public RecordSetWriter createWriter(final Map variables, final ComponentLog logger, final RecordSchema schema, final OutputStream out) { return new RecordSetWriter() { @Override diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java old mode 100644 new mode 100755 index 76d44aab5c8f..9d4b71b069e4 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java @@ -220,7 +220,7 @@ public void testIOExceptionWhileWritingShouldRouteToRetry() throws Initializatio final RecordSetWriterFactory recordSetWriterFactory = Mockito.mock(RecordSetWriterFactory.class); when(recordSetWriterFactory.getIdentifier()).thenReturn("mock-writer-factory"); - when(recordSetWriterFactory.createWriter(any(ComponentLog.class), any(RecordSchema.class), any(OutputStream.class))).thenReturn(recordSetWriter); + when(recordSetWriterFactory.createWriter(any(Map.class), any(ComponentLog.class), any(RecordSchema.class), any(OutputStream.class))).thenReturn(recordSetWriter); testRunner.addControllerService("mock-writer-factory", recordSetWriterFactory); testRunner.enableControllerService(recordSetWriterFactory); diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java old mode 100644 new mode 100755 index 97544016ee8d..c72742e06df5 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java @@ -62,10 +62,10 @@ public void onEnabled(final ConfigurationContext context) { @Override - public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out) throws SchemaNotFoundException, IOException { + public RecordSetWriter createWriter(Map variables, ComponentLog logger, RecordSchema schema, OutputStream out) throws SchemaNotFoundException, IOException { if (recordFactory.get() != null) { try { - return recordFactory.get().createWriter(logger, schema, out); + return recordFactory.get().createWriter(variables, logger, schema, out); } catch (UndeclaredThrowableException ute) { throw new IOException(ute.getCause()); } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy old mode 100644 new mode 100755 index c3a7990e247b..7c03cea09a69 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy @@ -102,7 +102,7 @@ class ScriptedRecordSetWriterTest { def schema = recordSetWriterFactory.getSchema(Collections.emptyMap(), null) ByteArrayOutputStream outputStream = new ByteArrayOutputStream() - RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(logger, schema, outputStream) + RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(Collections.emptyMap(), logger, schema, outputStream) assertNotNull(recordSetWriter) def recordSchema = new SimpleRecordSchema( diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy old mode 100644 new mode 100755 index ccdb9ae7f7a3..9861c50aeb39 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy @@ -104,7 +104,7 @@ class GroovyRecordSetWriterFactory extends AbstractControllerService implements } @Override - RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out) throws SchemaNotFoundException, IOException { + RecordSetWriter createWriter(Map variables, ComponentLog logger, RecordSchema schema, OutputStream out) throws SchemaNotFoundException, IOException { return new GroovyRecordSetWriter(out) } diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java old mode 100644 new mode 100755 index e7553547cc02..431157816647 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java @@ -271,7 +271,7 @@ protected byte[] getData(final ReportingContext context, InputStream in, Map originalAttributes; FlowFile flowFileResponse; if (flowFileOriginal == null) { @@ -317,8 +318,10 @@ public void onTrigger(final ProcessContext context, final ProcessSession session return; } flowFileResponse = session.create(); + originalAttributes = Collections.emptyMap(); } else { flowFileResponse = session.create(flowFileOriginal); + originalAttributes = flowFileOriginal.getAttributes(); } final SolrQuery solrQuery = new SolrQuery(); @@ -429,11 +432,11 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } else { final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).evaluateAttributeExpressions(flowFileResponse) .asControllerService(RecordSetWriterFactory.class); - final RecordSchema schema = writerFactory.getSchema(flowFileResponse.getAttributes(), null); + final RecordSchema schema = writerFactory.getSchema(originalAttributes, null); final RecordSet recordSet = SolrUtils.solrDocumentsToRecordSet(response.getResults(), schema); final StringBuffer mimeType = new StringBuffer(); flowFileResponse = session.write(flowFileResponse, out -> { - try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out)) { + try (final RecordSetWriter writer = writerFactory.createWriter(originalAttributes, getLogger(), schema, out)) { writer.write(recordSet); writer.flush(); mimeType.append(writer.getMimeType()); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java old mode 100644 new mode 100755 index 6f777ea5ebdc..e061d80a0b47 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java @@ -113,7 +113,7 @@ public void process(final InputStream in, final OutputStream out) throws IOExcep try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, getLogger())) { final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, reader.getSchema()); - try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out)) { + try (final RecordSetWriter writer = writerFactory.createWriter(originalAttributes, getLogger(), writeSchema, out)) { writer.beginRecordSet(); Record record; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java old mode 100644 new mode 100755 index 374ed483af5b..05a22181a74f --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java @@ -136,7 +136,7 @@ public void process(final InputStream in) throws IOException { if (tuple == null) { FlowFile outFlowFile = session.create(original); final OutputStream out = session.write(outFlowFile); - recordSetWriter = writerFactory.createWriter(getLogger(), writeSchema, out); + recordSetWriter = writerFactory.createWriter(originalAttributes, getLogger(), writeSchema, out); recordSetWriter.beginRecordSet(); tuple = new Tuple<>(outFlowFile, recordSetWriter); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java old mode 100644 new mode 100755 index 738c3e2ede91..c641d0d72481 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java @@ -379,7 +379,7 @@ record = recordReader.nextRecord(); final RecordSchema recordSchema = recordSetWriterFactory.getSchema(Collections.EMPTY_MAP, record.getSchema()); try (final OutputStream out = session.write(flowFile); - final RecordSetWriter recordWriter = recordSetWriterFactory.createWriter(getLogger(), recordSchema, out)) { + final RecordSetWriter recordWriter = recordSetWriterFactory.createWriter(Collections.emptyMap(), getLogger(), recordSchema, out)) { // start the record set and write the first record from above recordWriter.beginRecordSet(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java old mode 100644 new mode 100755 index 2e3a04a7fbe0..6cee2e1a3a1d --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java @@ -274,7 +274,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final RecordSchema recordSchema = firstRecord.getSchema(); final RecordSchema writeSchema = writerFactory.getSchema(Collections.emptyMap(), recordSchema); - writer = writerFactory.createWriter(getLogger(), writeSchema, rawOut); + writer = writerFactory.createWriter(Collections.emptyMap(), getLogger(), writeSchema, rawOut); writer.beginRecordSet(); flowFileRecordWriter = new FlowFileRecordWriter(flowFile, writer); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java old mode 100644 new mode 100755 index 43e1e4b89a46..bb31802800ad --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java @@ -223,7 +223,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final OutputStream out = session.write(childFlowFile); - writer = writerFactory.createWriter(getLogger(), writeSchema, out); + writer = writerFactory.createWriter(originalAttributes, getLogger(), writeSchema, out); writer.beginRecordSet(); writerMap.put(recordValueMap, writer); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java old mode 100644 new mode 100755 index 2412736b75ac..4ee2ffcdc54c --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java @@ -320,7 +320,7 @@ public void process(final OutputStream out) throws IOException { throw new ProcessException(e); } - try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(getLogger(), writeSchema, out)) { + try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(originalAttributes, getLogger(), writeSchema, out)) { writeResultRef.set(resultSetWriter.write(recordSet)); mimeTypeRef.set(resultSetWriter.getMimeType()); } catch (final Exception e) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java old mode 100644 new mode 100755 index 2a5679debb73..30ef4b73ee98 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java @@ -156,7 +156,7 @@ public void process(final InputStream in) throws IOException { final WriteResult writeResult; try (final OutputStream out = session.write(split); - final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out)) { + final RecordSetWriter writer = writerFactory.createWriter(originalAttributes, getLogger(), schema, out)) { if (maxRecords == 1) { final Record record = pushbackSet.next(); writeResult = writer.write(record); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java old mode 100644 new mode 100755 index 52f462ace15b..d98b10e7eb94 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java @@ -441,7 +441,7 @@ private RecordSetWriter createIfNecessary(final RecordSetWriter writer, final Re } final OutputStream out = session.write(flowFile); - final RecordSetWriter created = factory.createWriter(getLogger(), inputSchema, out); + final RecordSetWriter created = factory.createWriter(flowFile.getAttributes(), getLogger(), inputSchema, out); created.beginRecordSet(); return created; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java old mode 100644 new mode 100755 index ad60f6ae6487..a62bc86aa6a3 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java @@ -135,7 +135,7 @@ public boolean offer(final FlowFile flowFile, final RecordReader recordReader, f this.out = new ByteCountingOutputStream(rawOut); - recordWriter = writerFactory.createWriter(logger, record.getSchema(), out); + recordWriter = writerFactory.createWriter(flowFile.getAttributes(), logger, record.getSchema(), out); recordWriter.beginRecordSet(); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java old mode 100644 new mode 100755 index 345f8e4ce54c..02a0243753ae --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java @@ -281,7 +281,7 @@ public RecordSchema getSchema(Map variables, RecordSchema readSc } @Override - public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) { + public RecordSetWriter createWriter(final Map variables, final ComponentLog logger, final RecordSchema schema, final OutputStream out) { return new RecordSetWriter() { @Override diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java old mode 100644 new mode 100755 index a9032e4ae658..e0b9c18af667 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.record.RecordSchema; @@ -68,6 +69,7 @@ public interface RecordSetWriterFactory extends ControllerService { * Creates a new RecordSetWriter that is capable of writing record contents to an OutputStream. *

* + * @param flowFile Expression language is evaluated against the attributes of the FlowFile * @param logger the logger to use when logging information. This is passed in, rather than using the logger of the Controller Service * because it allows messages to be logged for the component that is calling this Controller Service. * @param schema the schema that will be used for writing records @@ -76,5 +78,23 @@ public interface RecordSetWriterFactory extends ControllerService { * @return a RecordSetWriter that can write record sets to an OutputStream * @throws IOException if unable to read from the given InputStream */ - RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out) throws SchemaNotFoundException, IOException; + default RecordSetWriter createWriter(FlowFile flowFile, ComponentLog logger, RecordSchema schema, OutputStream out) throws SchemaNotFoundException, IOException { + return createWriter(flowFile.getAttributes(), logger, schema, out); + } + + /** + *

+ * Creates a new RecordSetWriter that is capable of writing record contents to an OutputStream. + *

+ * + * @param variables Expression language is evaluated against these variables + * @param logger the logger to use when logging information. This is passed in, rather than using the logger of the Controller Service + * because it allows messages to be logged for the component that is calling this Controller Service. + * @param schema the schema that will be used for writing records + * @param out the OutputStream to write to + * + * @return a RecordSetWriter that can write record sets to an OutputStream + * @throws IOException if unable to read from the given InputStream + */ + RecordSetWriter createWriter(Map variables, ComponentLog logger, RecordSchema schema, OutputStream out) throws SchemaNotFoundException, IOException; } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java old mode 100644 new mode 100755 index 7e498414d44f..5d8e8a7dcd9d --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java @@ -79,7 +79,7 @@ protected boolean removeEldestEntry(final Map.Entry eldest) { "The FlowFile will have the Avro schema embedded into the content, as is typical with Avro"); @Override - public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema recordSchema, final OutputStream out) throws IOException { + public RecordSetWriter createWriter(final Map variables, final ComponentLog logger, final RecordSchema recordSchema, final OutputStream out) throws IOException { final String strategyValue = getConfigurationContext().getProperty(getSchemaWriteStrategyDescriptor()).getValue(); final String compressionFormat = getConfigurationContext().getProperty(COMPRESSION_FORMAT).getValue(); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java old mode 100644 new mode 100755 index 7aab5a36e656..619e9d5d7743 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java @@ -21,6 +21,7 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.commons.csv.CSVFormat; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -71,7 +72,7 @@ public void storeCsvFormat(final ConfigurationContext context) { } @Override - public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) throws SchemaNotFoundException, IOException { + public RecordSetWriter createWriter(final Map variables, final ComponentLog logger, final RecordSchema schema, final OutputStream out) throws SchemaNotFoundException, IOException { return new WriteCSVResult(csvFormat, schema, getSchemaAccessWriter(schema), out, getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), includeHeader, charSet); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java index e2b417d45bcf..1c06995559d2 100755 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import org.apache.nifi.record.NullSuppression; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -133,7 +134,7 @@ public void onEnabled(final ConfigurationContext context) { } @Override - public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) throws SchemaNotFoundException, IOException { + public RecordSetWriter createWriter(final Map variables, final ComponentLog logger, final RecordSchema schema, final OutputStream out) throws SchemaNotFoundException, IOException { return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema), out, prettyPrint, nullSuppression, outputGrouping, getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null)); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java old mode 100644 new mode 100755 index e5f851482e10..b6259df19744 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java @@ -21,6 +21,7 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; @@ -78,7 +79,7 @@ public void onEnabled(final ConfigurationContext context) { } @Override - public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) { + public RecordSetWriter createWriter(final Map variables, final ComponentLog logger, final RecordSchema schema, final OutputStream out) { return new FreeFormTextWriter(textValue, characterSet, out); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java index baa3a135548a..9004b775ed93 100755 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java @@ -359,7 +359,7 @@ private boolean writeFieldForType(Deque tagsToOpen, Object coercedValue, } else { if (nullSuppression.equals(NullSuppression.NEVER_SUPPRESS) || nullSuppression.equals(NullSuppression.SUPPRESS_MISSING)) { - writeAllTags(tagsToOpen, fieldName); + writeAllTags(tagsToOpen, elementName); writer.writeEndElement(); loopHasWritten = true; } @@ -556,7 +556,7 @@ private boolean writeUnknownField(Deque tagsToOpen, Object value, String } else { if (nullSuppression.equals(NullSuppression.NEVER_SUPPRESS) || nullSuppression.equals(NullSuppression.SUPPRESS_MISSING)) { - writeAllTags(tagsToOpen, fieldName); + writeAllTags(tagsToOpen, elementName); writer.writeEndElement(); loopHasWritten = true; } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordSetWriter.java index 6867af060476..f77b864c2c7c 100755 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordSetWriter.java @@ -39,6 +39,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; @Tags({"xml", "resultset", "writer", "serialize", "record", "recordset", "row"}) @CapabilityDescription("Writes a RecordSet to XML. The records are wrapped by a root tag.") @@ -85,7 +86,7 @@ public class XMLRecordSetWriter extends DateTimeTextRecordSetWriter implements R .description("Specifies the name of the XML root tag wrapping the record set. This property has to be defined if " + "the writer is supposed to write multiple records in a single FlowFile.") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .required(false) .build(); @@ -95,7 +96,7 @@ public class XMLRecordSetWriter extends DateTimeTextRecordSetWriter implements R .description("Specifies the name of the XML record tag wrapping the record fields. If this is not set, the writer " + "will use the record name in the schema.") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .required(false) .build(); @@ -113,7 +114,7 @@ public class XMLRecordSetWriter extends DateTimeTextRecordSetWriter implements R .displayName("Array Tag Name") .description("Name of the tag used by property \"Wrap Elements of Arrays\" to write arrays") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .required(false) .build(); @@ -122,7 +123,7 @@ public class XMLRecordSetWriter extends DateTimeTextRecordSetWriter implements R .description("The Character set to use when writing the data to the FlowFile") .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) .defaultValue("UTF-8") - .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .required(true) .build(); @@ -165,7 +166,7 @@ protected Collection customValidate(final ValidationContext va } @Override - public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) throws SchemaNotFoundException, IOException { + public RecordSetWriter createWriter(final Map variables, final ComponentLog logger, final RecordSchema schema, final OutputStream out) throws SchemaNotFoundException, IOException { final String nullSuppression = getConfigurationContext().getProperty(SUPPRESS_NULLS).getValue(); final NullSuppression nullSuppressionEnum; @@ -180,9 +181,9 @@ public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchem final boolean prettyPrint = getConfigurationContext().getProperty(PRETTY_PRINT_XML).getValue().equals("true"); final String rootTagName = getConfigurationContext().getProperty(ROOT_TAG_NAME).isSet() - ? getConfigurationContext().getProperty(ROOT_TAG_NAME).getValue() : null; + ? getConfigurationContext().getProperty(ROOT_TAG_NAME).evaluateAttributeExpressions(variables).getValue() : null; final String recordTagName = getConfigurationContext().getProperty(RECORD_TAG_NAME).isSet() - ? getConfigurationContext().getProperty(RECORD_TAG_NAME).getValue() : null; + ? getConfigurationContext().getProperty(RECORD_TAG_NAME).evaluateAttributeExpressions(variables).getValue() : null; final String arrayWrapping = getConfigurationContext().getProperty(ARRAY_WRAPPING).getValue(); final ArrayWrapping arrayWrappingEnum; @@ -196,12 +197,12 @@ public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchem final String arrayTagName; if (getConfigurationContext().getProperty(ARRAY_TAG_NAME).isSet()) { - arrayTagName = getConfigurationContext().getProperty(ARRAY_TAG_NAME).getValue(); + arrayTagName = getConfigurationContext().getProperty(ARRAY_TAG_NAME).evaluateAttributeExpressions(variables).getValue(); } else { arrayTagName = null; } - final String charSet = getConfigurationContext().getProperty(CHARACTER_SET).getValue(); + final String charSet = getConfigurationContext().getProperty(CHARACTER_SET).evaluateAttributeExpressions(variables).getValue(); return new WriteXMLResult(logger, schema, getSchemaAccessWriter(schema), out, prettyPrint, nullSuppressionEnum, arrayWrappingEnum, arrayTagName, rootTagName, recordTagName, charSet, diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordSetWriter.java index 372e60db3ceb..9937fdd10055 100755 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordSetWriter.java @@ -31,6 +31,8 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; import static org.junit.Assert.assertThat; @@ -74,6 +76,37 @@ public void testDefault() throws IOException, InitializationException { assertThat(expected, CompareMatcher.isSimilarTo(actual).ignoreWhitespace().withNodeMatcher(new DefaultNodeMatcher(ElementSelectors.byNameAndText))); } + @Test + public void testExpressionLanguage() throws IOException, InitializationException { + XMLRecordSetWriter writer = new XMLRecordSetWriter(); + TestRunner runner = setup(writer); + + runner.setProperty(writer, XMLRecordSetWriter.ROOT_TAG_NAME, "${attribute.root}"); + runner.setProperty(writer, XMLRecordSetWriter.RECORD_TAG_NAME, "${attribute.record}"); + runner.setProperty(writer, XMLRecordSetWriter.ARRAY_WRAPPING, XMLRecordSetWriter.USE_PROPERTY_FOR_ELEMENTS); + runner.setProperty(writer, XMLRecordSetWriter.ARRAY_TAG_NAME, "${attribute.array}"); + + runner.enableControllerService(writer); + + Map attributes = new HashMap<>(); + attributes.put("attribute.root", "root_node"); + attributes.put("attribute.record", "record_node"); + attributes.put("attribute.array", "array_node"); + + runner.enqueue("", attributes); + runner.run(); + runner.assertQueueEmpty(); + runner.assertAllFlowFilesTransferred(TestXMLRecordSetWriterProcessor.SUCCESS, 1); + + String expected = "13" + + "val1" + + "13" + + "val1"; + String actual = new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(TestXMLRecordSetWriterProcessor.SUCCESS).get(0))); + + assertThat(expected, CompareMatcher.isSimilarTo(actual).ignoreWhitespace().withNodeMatcher(new DefaultNodeMatcher(ElementSelectors.byNameAndText))); + } + @Test public void testDefaultSingleRecord() throws IOException, InitializationException { XMLRecordSetWriter writer = new XMLRecordSetWriter(); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordSetWriterProcessor.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordSetWriterProcessor.java index 57ee6248a5fc..6562c5b63e58 100755 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordSetWriterProcessor.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordSetWriterProcessor.java @@ -62,6 +62,7 @@ public class TestXMLRecordSetWriterProcessor extends AbstractProcessor { @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); + final Map attributes = flowFile.getAttributes(); final RecordSetWriterFactory writerFactory = context.getProperty(XML_WRITER).asControllerService(RecordSetWriterFactory.class); flowFile = session.write(flowFile, out -> { @@ -72,7 +73,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro boolean multipleRecords = Boolean.parseBoolean(context.getProperty(MULTIPLE_RECORDS).getValue()); RecordSet recordSet = getRecordSet(multipleRecords); - final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out); + final RecordSetWriter writer = writerFactory.createWriter(attributes, getLogger(), schema, out); writer.write(recordSet);