Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-10865] Support for Kafka deserialization API with headers (since Kafka API 2.1.0) #12794

Merged
merged 4 commits into from Sep 21, 2020

Conversation

methodmissing
Copy link
Contributor

@methodmissing methodmissing commented Sep 9, 2020

TLDR Let KafkaIO support the deserializer API with headers

References mailing list posts:

Design decisions

The reason for SpEL is because with kafka-clients API < 2.1.0 as dependency, compilation fails with:

  required: String,byte[]
  found: String,Headers,byte[]
  reason: actual and formal argument lists differ in length
  where T is a type-variable:

Because the headers default API only landed in 2.1.0 via apache/kafka@f1f7192#diff-a4f4aee88ce5091db576139f6c610ced

I opted for ConsumerSpEL#deserializeKey and ConsumerSpEL#deserializeValue as API to ensure forward looking consistency for both KafkaUnboundedReader and ReadFromKafkaDoFn as both already depended on an instance thereof.

Not so great things

Using the SpEL for kafka-client API 2.1.0 onwards effectively turns the deserialization path into a more expensive indirection by calling the deserializer methods using reflection (2x per record, 1 x key, 1 x value):

 	at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:58) <<<<<<<<<<<
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) <<<<<<<<<<<
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) <<<<<<<<<<<
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) <<<<<<<<<<<
	at java.base/java.lang.reflect.Method.invoke(Method.java:566) <<<<<<<<<<<
	at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:117)
	at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:134)
	at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:52)
	at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:377)
	at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88)
	at org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:121)
	at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:262)
	at org.apache.beam.sdk.io.kafka.ConsumerSpEL.evaluateDeserializeWithHeaders(ConsumerSpEL.java:134)
	at org.apache.beam.sdk.io.kafka.ConsumerSpEL.deserializeValue(ConsumerSpEL.java:174)
	at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:195)
	at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:137)

And effectively this penalizes the more recent Kafka API versions in favor of the older ones. I have not measured the overhead thereof, yet.

Other avenues explored

For runtime deserialization:

  • Naively tried conditional compile options but the compiler cannot know which kafka-clients version could be used at runtime

For regression tests (that we don't stop passing headers in the future):

  • I tried Mockito and Powermock implementations on both LocalDeserializerProvider and the Integer and Long serializers in tests, but found the stack to be too deep and backed out of that.

  • Ditto for attempting to spy on ConsumerRecord#headers() (expect it to be called twice as much for the newer API), but again deep stack and hard to assert. Just the call is interesting because the constructor used for ConsumerRecord in the tests does not use the one that sets headers, presumably for client API compatibility too.

  • Evaluated ExtendedSerializer´s wrapper, but ExtendedSerializer is deprecated API and no point in bringing that in as a dependency

How the current regression test works

I figured it makes sense given this feature tests deserialization and the whole test suite depends on the Integer (for keys) and Long (for values) ones to implement a key and value deserializer that can assert the behaviour. And herein also lies somewhat of a problem because the test case is a bit weak as I relied on stack frames (wide array of suppored client versions makes anything else super complex) to infer the caller of the deserialize method, but unfortunately only class and method name context is provide and no arguments size of 3 or even types on those to assert on.

Kafka client API 1.0.0 :

Frame 0: java.lang.Thread.getStackTrace
Frame 1: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
Frame 2: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
Frame 3: org.apache.beam.sdk.io.kafka.ConsumerSpEL#deserializeKey

For clients before 2.1.0, frame 3 is ConsumerSpEL#deserializeKey, meaning it was called directly and not via a default or actual implementation on Deserializer. Frames 1 and 2 being equal is because of the super.deserialize call.

Kafka client API 2.1.0+ :

Frame 0: java.lang.Thread.getStackTrace
Frame 1: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
Frame 2: org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
Frame 3: org.apache.kafka.common.serialization.Deserializer#deserialize

For clients 2.1.0 and beyond, frame 3 is org.apache.kafka.common.serialization.Deserializer#deserialize. This is true for the bundled deserializers used in the tests because they delegate the call to the implementation on Deserializer. In practice this may refer to an actual override implementation.

Feedback items for me

  • Any alternatives for the SpEL evaluation for this hot path API? consumer.seekToEnd and consumer.assign are once off / periodic APIs and not called as often as twice per record.
  • Ideas for a better way to test for regressions?

Questions

  • Would it make sense to consider raising the minimum supported client API in order to
  • If this implementation (and very likely iterations thereof :-)), would support for the same API on serialization be appreciated as well?

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Dataflow Flink Samza Spark Twister2
Go Build Status --- Build Status --- Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status --- Build Status --- Build Status ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status --- --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@methodmissing
Copy link
Contributor Author


@Override
public Integer deserialize(String topic, byte[] data) {
StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the build.gradle you'll want to setup a junit test run that uses Kafka 2.1.0 and executes these tests.

This is a pretty good example of how to get this going:
https://docs.gradle.org/current/userguide/java_testing.html#sec:configuring_java_integration_tests

You shouldn't need any additional source sets but you'll want to add a configuration for kafka210 that adds the dependency and also the test task that uses it.

@lukecwik
Copy link
Member

@lukecwik
Copy link
Member

lukecwik commented Sep 10, 2020

Reflection might not be that bad enough to warrant an improvement but there is a way to compile code against multiple versions of Kafka using multiple source sets and then at runtime choose which version of the class to use. This would remove a lot of the overhead. (Flink does this in the Flink runner code base)

Do you see the reflective calls dominating performance in some way?

Also, this can always be improved later so unless its bad I wouldn't worry about it. You can leave a JIRA detailing this and a TODO as comment in the code.

@lukecwik
Copy link
Member

R: @lukecwik

@lukecwik
Copy link
Member

Waiting on tests that use Kafka 2.1.0 explicitly as described in my first review.

@methodmissing
Copy link
Contributor Author

Hi Luke,

Apologies for the lag on this. TLDR I have the integration test against KafkaIOTest going but seem to not be able to force the kafka-clients module override.

Executed with ./gradlew clean sdks:java:io:kafka:buildDependent

Main test task :sdks:java:io:kafka:test

<===========--> 86% EXECUTING [2m 7s]
> IDLE
> :sdks:java:io:expansion-service:shadowDistZip
> :sdks:java:extensions:sql:expansion-service:shadowJar
> :runners:spark:spotbugsMain
> :sdks:java:io:kafka:test > Executing test org.apache.beam.sdk.io.kafka.KafkaIOTest <<<<<<<<<<
> :sdks:java:testing:nexmark:spotbugsMain
> :runners:spark:job-server:shadowJar
> :sdks:java:io:kafka:test > 8 tests completed

Kafka client API 2.1.0 specific task :sdks:java:io:kafka:kafkaVersion210Test

<===========--> 88% EXECUTING [4m 2s]
> :runners:spark:test > Executing test org.apache.beam...metrics.sink.SparkMetricsSinkTest
> :sdks:java:extensions:sql:expansion-service:shadowJar
> :sdks:java:testing:nexmark:test > 9 tests completed, 1 skipped
> :sdks:java:testing:nexmark:test > Executing test org.apache.beam.sdk.nexmark.queries.QueryTest
> :sdks:java:io:kafka:kafkaVersion210Test > Executing test org.apache.beam.sdk.io.kafka.KafkaIOTest <<<<<<<<<<
> :sdks:java:io:kafka:kafkaVersion210Test > 1 test completed
> :runners:spark:job-server:shadowJar
> :runners:spark:test > 10 tests completed
> :runners:spark:test > Executing test org.apache.beam.runners.spark.metrics.SparkMetricsPusherTest
> :runners:spark:test > Executing test org.apache...streaming.TrackStreamingSourcesTest
methodmissing:beam lourens$ ls -la /Users/lourens/src/github.com/Shopify/cdc/beam/sdks/java/io/kafka/build/reports/tests
total 0
drwxr-xr-x  4 lourens  staff  128 Sep 15 11:00 .
drwxr-xr-x  5 lourens  staff  160 Sep 15 11:06 ..
drwxr-xr-x  7 lourens  staff  224 Sep 15 11:06 kafkaVersion210Test <<<<<<<<<<
drwxr-xr-x  7 lourens  staff  224 Sep 15 11:04 test

Any failure in task :sdks:java:io:kafka:test means the 2.1.0 specific test doesn't run though. Given this is an integration test, units that fail should not run the dependent integration test either, that is default Gradle behavior and should be fine for this case as well.

I opted to run the whole KafkaIOTest test with the new task as it exercises a larger net of the deserialization flows and doesn't add significantly to runtime.

!!! FAIL: However I'm failing at being able to inject the 2.1.0 kafka-client module dependency for the integration test and I'm wondering if you have any obvious thoughts on what I've done wrong in specifying the test module dependency?


I have some ideas (based on the python specific tasks and multi-version support) on how to support test tasks for all Kafka client API versions, but beyond the scope of this pull request.

@methodmissing
Copy link
Contributor Author

Hi Luke,

I got it working with a resolution strategy to force kafka-clients:2.1.0 for the kafkaVersion210 configuration. It's not super clean and I'd prefer to not have to use it, but I think this dependency provided library.java.kafka_clients is hard to substitute otherwise.

Original Kafka test, default API:

Screenshot 2020-09-20 at 01 45 31

API 2.1.0 specific test:

Screenshot 2020-09-20 at 01 46 00

I reverse tested with the following corruption of the 2.1.0 path in the ConsumerSpEL to confirm API 2.1.0 was loaded at runtime:

diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
index 9ca5bfc990..37a4a2bc8e 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
@@ -132,7 +132,7 @@ class ConsumerSpEL {
     mapContext.setVariable("deserializer", deserializer);
     mapContext.setVariable("topic", rawRecord.topic());
     mapContext.setVariable("headers", rawRecord.headers());
-    mapContext.setVariable("data", isKey ? rawRecord.key() : rawRecord.value());
+    mapContext.setVariable("data", isKey ? rawRecord.value() : rawRecord.key());
     return deserializeWithHeadersExpression.getValue(mapContext);
   }

Copy link
Member

@lukecwik lukecwik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the forceStrategy makes sense since we have to override the version from BeamModulePlugin.groovy as you noticed with library.java.kafka_clients.

If you could look into the additional post review comments and address them in a follow-up PR.

outputs.upToDateWhen { false }
testClassesDirs = sourceSets.test.output.classesDirs
classpath = configurations.kafkaVersion210 + sourceSets.test.runtimeClasspath
include '**/KafkaIOTest.class'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we run all the unit tests?

Suggested change
include '**/KafkaIOTest.class'
include '**/KafkaIOTest.class'

task kafkaVersion210Test(type: Test) {
group = "Verification"
description = 'Runs KafkaIO tests with Kafka clients API 2.1.0'
outputs.upToDateWhen { false }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of doing outputs.upToDateWhen { false } to have this run every time, please include the set of inputs/outputs that makes sense in a follow-up PR so that an incremental build can be supported.

@lukecwik lukecwik merged commit 7f474b5 into apache:master Sep 21, 2020
@methodmissing
Copy link
Contributor Author

Thanks for the feedback and the merge Luke. I'll address the updates in a PR.

I agree all tests makes more sense as one can never have enough coverage. That said, I'm been thinking about the ConsumerSpEL and the fact the in the test suite it until now only executed against Kafka clients API 1.0.0 (library.java.kafka_clients) for each run. It's possible to also have the tasks and configurations dynamically created for a range of Kafka API versions too now that a pattern emerged, but that also would add quite a lot of time to test runs.

Thoughts?

@lukecwik
Copy link
Member

Thanks for the feedback and the merge Luke. I'll address the updates in a PR.

I agree all tests makes more sense as one can never have enough coverage. That said, I'm been thinking about the ConsumerSpEL and the fact the in the test suite it until now only executed against Kafka clients API 1.0.0 (library.java.kafka_clients) for each run. It's possible to also have the tasks and configurations dynamically created for a range of Kafka API versions too now that a pattern emerged, but that also would add quite a lot of time to test runs.

Thoughts?

I think that is a good idea to enumerate the most popular kafka client versions. I don't think the unit tests are that slow and the community can always break-up the set that run to use more jenkins executors in parallel if it becomes a large enough issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants