NIFI-9822 - ConsumeKafkaRecord allows writing out Kafka record key#6045
NIFI-9822 - ConsumeKafkaRecord allows writing out Kafka record key#6045greyp9 wants to merge 3 commits intoapache:mainfrom
Conversation
|
Wanted to vet an initial implementation of this ticket, including a first pass at documentation. It seems to work in the happy path case. I would like to wait on error handling, unit tests, and (properly formatted) documentation until you guys like the approach. |
markap14
left a comment
There was a problem hiding this comment.
Hey @greyp9 thanks for the updates! I think this is a good direction. I left several comments inline of things that I noticed. Some of these you may well already have realized given that you said you need to do more work on tests & docs. Some are minor recommendations that you can accept or ignore.
I do think we need to improve the Publisher a bit more, though. I think the publisher needs a new property that tells it whether the incoming data should be sent as-is, as a single record, or if the incoming data is in the "Wrapper" format. And only expose the Key Record Writer if using the Wrapper format. That way, we can easily receive a message from ConsumeKafkaRecord, do some processing, etc. and then push to another kafka topic, for instance, and easily retain the headers, the key, and the value.
| @@ -426,11 +465,13 @@ protected ConsumerPool createConsumerPool(final ProcessContext context, final Co | |||
| } | |||
|
|
|||
| return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topics, maxUncommittedTime, securityProtocol, | |||
There was a problem hiding this comment.
Doesn't necessarily need to be done in this ticket. But probably makes sense to introduce a Builder pattern here instead of so many constructor args. It made sense before this, too, though :)
| final InputStream is = new ByteArrayInputStream(key); | ||
| final RecordReader reader = keyReaderFactory.createRecordReader(attributes, is, key.length, logger); |
There was a problem hiding this comment.
Should use try-with-resources here to ensure that we close the InputStream and the Record Reader.
| throws IOException, SchemaNotFoundException, MalformedRecordException { | ||
| final Tuple<RecordField, Object> tuple; | ||
| final byte[] key = consumerRecord.key() == null ? new byte[0] : consumerRecord.key(); | ||
| if (KafkaProcessorUtils.RECORD.getValue().equals(keyFormat)) { |
There was a problem hiding this comment.
Took me a minute to figure out what this mean - KafkaProcessorUtils.RECORD wasn't immediately obvious to me. Perhaps it makes sense to rename RECORD, STRING, etc. to something that makes more sense outside the context, such as KEY_AS_RECORD, KEY_AS_STRING etc.? Is a bit of a nitpick and you can feel free to ignore if you want.
| tuple = new Tuple<>(recordField, record); | ||
| } else if (KafkaProcessorUtils.STRING.getValue().equals(keyFormat)) { | ||
| final RecordField recordField = new RecordField("key", RecordFieldType.STRING.getDataType()); | ||
| tuple = new Tuple<>(recordField, new String(key, StandardCharsets.UTF_8)); |
There was a problem hiding this comment.
We'll definitely want to make sure that we document that this strategy requires that the key be a UTF-8 compatible String. And we should probably ensure that we test with a non-UTF-8 compatible String. In that case, the record should probably go to the parse.failure relationship.
| } else { | ||
| final RecordField recordField = new RecordField("key", | ||
| RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())); | ||
| tuple = new Tuple<>(recordField, key); |
There was a problem hiding this comment.
Eventually we need to introduce a BYTES data type for Records. Right now, when we have an Array of type Byte, the record api expects this to be an array of Byte objects, not primitive bytes. So, as inefficient as it is, in this case, I think we need to create a Byte[] for the key instead of provide the byte[].
| .map(s -> s.getBytes(StandardCharsets.UTF_8)).orElse(null); | ||
| } else { | ||
| final ByteArrayOutputStream os = new ByteArrayOutputStream(1024); | ||
| final MapRecord keyRecord = (MapRecord) record.getValue(messageKeyField); |
There was a problem hiding this comment.
Should probably be using Record here - not MapRecord
| - Output Strategy "Write Value Only" (the default) emits flowfile records containing only the Kafka record value. | ||
| - Output Strategy "Use Wrapper" (new) emits flowfile records containing the Kafka record key, value, and headers, as | ||
| well as additional metadata from the Kafka record.</p> |
There was a problem hiding this comment.
Should probably use <ul> with <li> rather than - for denoting lists.
| <p>This processor (NiFi 1.17+) offers multiple output strategies (configured via processor property "Output Strategy") | ||
| for converting Kafka records into flow files. | ||
| - Output Strategy "Write Value Only" (the default) emits flowfile records containing only the Kafka record value. | ||
| - Output Strategy "Use Wrapper" (new) emits flowfile records containing the Kafka record key, value, and headers, as |
There was a problem hiding this comment.
We need to be sure that we call out the Record Schema that will be used here.
|
|
||
| <p>Additionally, the choice of the "Output Strategy" processor property affects the related properties "Headers to Add | ||
| as Attributes (Regex)" and "Key Attribute Encoding". These properties are available only when "Output Strategy" is set | ||
| to "Write Value Only".</p> |
There was a problem hiding this comment.
Might make sense to mention the reason they are only available when Output Strategy = Write Value Only. I.e., because it doesn't make sense when writing Records, as the Headers and keys are not attributes, they are part of the Record/FlowFile content.
| descriptors.add(KafkaProcessorUtils.TOKEN_AUTH); | ||
| descriptors.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE); | ||
| descriptors.add(SEPARATE_BY_KEY); | ||
| descriptors.add(OUTPUT_STRATEGY); |
There was a problem hiding this comment.
Output Strategy (and related properties) is going to be a very important thing for the user to think through when configuring this. Because of that, I'd recommend moving this property up in the list to just after Group ID
|
Closing in favor of #6131. |
Summary
NIFI-9822
Tracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
Pull Request Tracking
NIFI-00000NIFI-00000Pull Request Formatting
mainbranchVerification
Please indicate the verification steps performed prior to pull request creation.
Build
mvn clean install -P contrib-checkLicensing
LICENSEandNOTICEfilesDocumentation