Skip to content

NIFI-14123: support kafka tombstone with USE_WRAPPER strategy#9604

Merged
greyp9 merged 5 commits intoapache:mainfrom
babubabu:NIFI-14123
Jan 7, 2025
Merged

NIFI-14123: support kafka tombstone with USE_WRAPPER strategy#9604
greyp9 merged 5 commits intoapache:mainfrom
babubabu:NIFI-14123

Conversation

@babubabu
Copy link
Contributor

@babubabu babubabu commented Dec 30, 2024

Add possibility to leave out the "value" attribute in flowfile wrapper to support publishing kafka tombstones. With this solution, we support a wrapper either completely without the "value" attribute or with "value: null". The JSON Reader and AVRO Set writer support null values out-of-the-box

Summary

NIFI-14123
NIFI-14122

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using mvn clean install -P contrib-check
    • JDK 21

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

@greyp9
Copy link
Contributor

greyp9 commented Jan 1, 2025

Hi @babubabu; thanks for the contribution! Looks like the "integration-tests" CI failed on this PR for the new test that was added. You can easily run this single test (after a full build) with a command line something like this:

nifi/nifi-extension-bundles/nifi-kafka-bundle % mvn verify -P integration-tests -pl nifi-kafka-3-integration -Dit.test=PublishKafkaWrapperX6IT

@babubabu
Copy link
Contributor Author

babubabu commented Jan 1, 2025

Hi @greyp9 ,
Thanks for your response. Yes I saw it, but couldn't fix it easily yesterday. I'm on it at 2nd Jan ;-)
Happy new year :-)

@babubabu
Copy link
Contributor Author

babubabu commented Jan 1, 2025

Hello @greyp9,
I fixed the test setup, now it should work ;-)
Can you please review it again?

Add possibility to leave out the "value" attribute in flowfile wrapper to support publishing kafka tombstones.
With this solution, we support a wrapper either completely without the "value" attribute or  with "value: null".
The JSON Reader and AVRO Set writer support null values out-of-the-box
@babubabu
Copy link
Contributor Author

babubabu commented Jan 2, 2025

Fixed checkstyle violations and rebased to current main

@greyp9
Copy link
Contributor

greyp9 commented Jan 2, 2025

Hello @greyp9, I fixed the test setup, now it should work ;-) Can you please review it again?

I just started the automation.

It looks like there is a bit of additional nuance related to the kafka.tombstone FlowFile attribute. As you noted in the JIRA, this field is currently unused.

It makes sense that RecordWrapperStreamKafkaRecordConverter implements the tombstone logic. But if we implement this here, then FlowFileStreamKafkaRecordConverter should get a corresponding touch. The other two implementations (record and delimited) probably don't need an implementation, as those were unimplemented in the old PublishKafka_2_6. The relevant code seems to be here:

https://github.com/apache/nifi/blob/support/nifi-1.x/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java#L164-L170

Additionally, ConsumeKafka should also get a touch, to set the FlowFile attribute if a null value is received, as implemented in NIFI-12371 changeset.

Thanks for your continued attention to this!

Consume: Kafka tombstones are consumed by writing an empty content and setting the flowfile attribute kafka.tombstone=true
Publish: An empty flowfile-content with attribute kafka.tombstone=true results in an published kafka.tombstone
@babubabu
Copy link
Contributor Author

babubabu commented Jan 3, 2025

To keep the changes in one place, I commited the changes for ConsumeKafka and PublishKafka to this PR. I hope, it's still fine :-)
Please have detailed look at it.
There are 2 additional ITs to test the tombstone publish and consume. All other tests were not changed and are running fine.

@babubabu
Copy link
Contributor Author

babubabu commented Jan 3, 2025

Hi @greyp9 : the checkstyle violations have been fixed. Can you please start again the actions?

Copy link
Contributor

@greyp9 greyp9 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please consider my suggestion about coverage for the Publish empty string test case. A couple of couple comments as well.

Thanks for your continued attention!

@@ -0,0 +1,14 @@
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you were on to something with the previous version of this test resource. Three test cases should provide good coverage:

  • value is empty string
  • value is null
  • omit value from the JSON representation of the record

(The test code would then need to be adjusted to expect three records to be emitted from the publish part.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just overthought the empty string case: This can be a valid "filled" value for String values, without sending tombstones at all.

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getKafkaConsumerProperties())) {
consumer.subscribe(Arrays.asList(TEST_TOPIC, OVERRIDE_TOPIC));
final ConsumerRecords<String, String> records = consumer.poll(DURATION_POLL);
assertEquals(1, records.count());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is where we would expect three, after the adjustment of the test resource.

final byte[] value = converter.toBytes(WrapperRecord.VALUE, writerFactory);
ProducerUtils.checkMessageSize(maxMessageSize, value.length);

if (value != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the RecordWrapper strategy, there needs to be some logic at this point to do the evaluation of value and the FlowFile attribute "kafka.tombstone". One option is to implement a utility method to do things inline at line 109, something like:

private byte[] toValue(final byte[] bytes, final FlowFile flowFile) {
    final String tombstone = flowFile.getAttribute(KafkaFlowFileAttribute.KAFKA_TOMBSTONE);
    final boolean isTombstone = (Boolean.TRUE.toString().equals(tombstone) && (flowFile.getSize() == 0));
    return isTombstone ? null : bytes;
}

This is surfaced by adding the zero byte value test case described in this review.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure to depend a tombstone produce on both value=null and the flowfile attribute? I know, on the one hand, the attribute is a part of the "contract" of the nifi-kafka-interface, but on the other hand as a user, I would not assume to also set the flowfile attribute after adjusting the content to wrapper structure (e.g. jolt-transfom or others). Additionally, we can keep the code cleaner ;-)
I would follow your intention in this point. If you want to, I'll add the dependency!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there is some ambiguity here.

Given the other code touch, it seems like it makes sense to let the "wrapper record" value pass through, which would be implicitly interpreted by Kafka as a tombstone. The confusion is that for the FlowFile strategy, it is not possible to set the value to null, so we need to flex based on the FlowFile attribute. But not for this strategy.

I think this change is good...

+ "may be record-oriented data that can be read by the configured Record Reader. "
+ "The complementary NiFi processor for fetching messages is ConsumeKafka.")
+ "The complementary NiFi processor for fetching messages is ConsumeKafka. "
+ "To produce a kafka tombstone message while using PublishStrategy.USE_WRAPPER, simply set the value of a record to 'null'.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous implementation seems to suggest the expectation that value=null should only be sent when both conditions are true (zero byte, tombstone attribute). But then it would be nice to be able to send n wrapper records, where some are tombstone records and some are not. So this should be ok.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't understand this point! Is this related to the Wrapper change above (adding the tombstone attribute)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was talking myself through the tradeoffs. For USE_WRAPPER, this documentation seems fine. No request here.

recordHeaders.add(recordHeader);
});

// NIFI-14122: Support Kafka tombstones
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code comments like this (with JIRA numbers) in the production code are generally disfavored (though they do exist). Maybe just:

Support Kafka tombstones

@babubabu babubabu requested a review from greyp9 January 3, 2025 20:25
Copy link
Contributor

@greyp9 greyp9 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost there. Thanks for the patience while I reasoned through the intersection of RecordWrapperStrategy and the tombstone behavior.

There is a problem with the behavior of maven-failsafe-plugin regarding PublishKafkaWrapperRecordTombstoneIT2. I think it just needs a rename to PublishKafkaWrapperRecordTombstoneIT so it will be seen and the test run in the context of -P integration-tests.

@babubabu babubabu requested a review from greyp9 January 7, 2025 07:38
@babubabu
Copy link
Contributor Author

babubabu commented Jan 7, 2025

Thanks for your patience, too!
The IT class is renamed, my bad.... It's squashed into the last commit
I had to set up another develop infrastructure for this PR, and I'm not quiet used to it...

Copy link
Contributor

@greyp9 greyp9 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this. I verified that the renamed IT now runs during the integration tests. I'm approving the PR, and plan to merge once the automation completes.

@babubabu
Copy link
Contributor Author

babubabu commented Jan 7, 2025

Thank you for reviewing and gate-keeping :-)

@greyp9 greyp9 merged commit 34dad34 into apache:main Jan 7, 2025
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants