Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-7029] Add KafkaIO.Read as external transform #8251

Merged
merged 1 commit into from
Apr 12, 2019
Merged

Conversation

mxm
Copy link
Contributor

@mxm mxm commented Apr 8, 2019

This adds KafkaIO.Read as an external transform and includes a Python wrapper (ReadFromkafka) for convenience.

CC @tweise @robertwb @ihji @lukecwik

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- --- --- --- ---
Java Build Status Build Status Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status
Python Build Status
Build Status
--- Build Status
Build Status
Build Status --- --- ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

Copy link
Contributor

@ihji ihji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Found some typos.

sdks/python/apache_beam/io/external/kafka.py Outdated Show resolved Hide resolved

args = {
'consumer_config':
KafkaRead._encode_map(self.consumer_config),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KafkaRead -> ReadFromKafka

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, thanks. I renamed this before opening the PR and missed those.

sdks/python/apache_beam/io/external/kafka.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/io/external/kafka.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/io/external/kafka.py Outdated Show resolved Hide resolved
Copy link
Member

@lukecwik lukecwik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't caught up on all the cross language pipeline stuff but for this PR and overall this CL looks fine based upon the current expansion service design. Maybe you could point me to some docs/discussions about some of the ideas/concerns I have below:

I would have expected that the loss of typing information in generics is typically going to cause you to not be able to "automatically" call setters on builders in any practical way. Also, using automatic conversion from name to method will mean that the API change will break compatibility with the "proto" version requiring people to providing a remapping feature anyways.

I would use explicit protos for each transform like:

message KafkaRead {
  map<string, string> consumer_config = 1;
  repeated string topics = 2;
  string key_deserializer = 3;
  string value_deserializer = 4;
}

that are defined inside a proto package like "org.apache.beam.external.kafka.v1".

Using an explicit proto would allow you to change the builder method to be annotated with the URN and then you would be able to automatically decode the payload and pass it into the method since you could look at the method parameter. This would also allow the Java SDK to create a transform with a payload that is this proto instead of serialized Java objects.

Also, how do you "choose" what the output type of the PCollection produced by Kafka.Read is?
If someone passes in the org.apache.kafka.common.serialization.LongDeserializer like in your example, the returned PCollection may contain some Java SDK specific coder. Would it make sense to have a mapping from beam coder URNs to kafka deserializers instead?

Note, that these questions may be better for the mailing list then this review if this is still up for discussion.

@mxm
Copy link
Contributor Author

mxm commented Apr 9, 2019

Thanks for taking a look @lukecwik. The design document for this is https://s.apache.org/beam-cross-language-io. Approach (2) in the design doc was the original proposal for configuring cross-language transforms. It is very close to what you are suggesting. The PR #7875 originally had this design, but we changed it to match approach (1) after a discussion in the PR, on the mailing list, and in a meeting which @chamikaramj summarized on the mailing list.

It is not necessarily a contradiction to support both, an explicit Proto for each transform, and a generic approach. However, the main objective for cross-language transforms was that there should not be the need for users to write Proto. So the generic approach was deemed more important.

Also, how do you "choose" what the output type of the PCollection produced by Kafka.Read is?
If someone passes in the org.apache.kafka.common.serialization.LongDeserializer like in your example, the returned PCollection may contain some Java SDK specific coder. Would it make sense to have a mapping from beam coder URNs to kafka deserializers instead?

Good point. I think it makes sense to define a mapping of supported coders for Kafka deserializers, since we can only support standard SDK coders.

@mxm
Copy link
Contributor Author

mxm commented Apr 9, 2019

Run Java PreCommit

@mxm
Copy link
Contributor Author

mxm commented Apr 9, 2019

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mxm.

@@ -172,6 +173,20 @@ def test_external_transform(self):

assert_that(res, equal_to([i for i in range(1, 10)]))

# We do not run KafkaIO but check that the transform
# is expanded by the ExpansionService without an error.
p = self.create_pipeline()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add a validator to validate output ?

@ihji KafkaIO will be a great addition to validates runner test suite you are adding.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are tests which check the correct configuration of KafkaIO. This just checks that the expansion succeeds. We definitely want a full end-to-end test. I have to figure out how to start an embedded Kafka cluster here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sg. I think a full end-to-end test will definitely be helpful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering that KafkaIO itself is only mock-tested, I'm not sure I want to bring up a Kafka cluster here. I think we have sufficient test coverage for the configuration through KafkaIOExternalTest. We could execute here and check for an exception, e.g. "Unknown broker".

| ReadFromKafka(consumer_config={'bootstrap.servers':
'localhost:94583, localhost:3531'},
topics=['topic1', 'topic2'],
key_deserializer='org.apache.kafka.common.'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably document this as a significant limitation. We have to specify Java functions from Python till we support UDFs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed an update with a better documentation.

'localhost:94583, localhost:3531'},
topics=['topic1', 'topic2'],
key_deserializer='org.apache.kafka.common.'
'serialization.BytesDeserializer',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can a user make a new Java function available for Python pipeline deployment. Please document this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's currently not easily possible. We have to rely on the built-in ones for now. Have documented this.

from apache_beam.transforms import ptransform


class ReadFromKafka(ptransform.PTransform):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a doc-comment describing the interface of Python Kafka transform and current limitations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc comment is below. I expanded it to contain more information about the status quo.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I think we should also provide end-to-end instructions on how a user can use this transform in a pipeline, including for example, steps for starting up the expansion service (here or elsewhere) given the this is the first real world source that some of our users might try to use out of the box in the next Beam release.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to include the instructions for getting started with cross-language portability here? We can add an outline here of what the user has to do. For detailed instructions, there should be a page on the Beam homepage. The only page I could think of at the moment would be https://beam.apache.org/contribute/portability/. We should have a more user-friendly page for the next release.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think yes. Otherwise users will have no way of knowing how to use this in a pipeline. I think the instructions can be as simple as steps to get the expansion service running (or a pointer to a script for doing this) and providing the address to the ReadFromKafka transform (no need to mention runner setup).

I agree that when things are more stable, we should develop a Webpage for cross-language transforms with complete instructions, examples (for transform authors and users), etc. (not in this PR).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've included a pointer to the Flink Job Server which starts the Java expansion service. Additionally, I've linked documentation pages which explain how to start the Flink Job Server. Should be sufficient for now.

raise Exception("ReadFromKafka must be a root transform")

args = {
'consumer_config':
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also support write ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I wanted to start off with Read but I can add Write here or in a seperate PR.


class ReadFromKafka(ptransform.PTransform):
"""
A PTransform which from Kafka topics.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, please mention currently supported runners (Flink) to prevent user-confusion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.

@chamikaramj
Copy link
Contributor

@lukecwik I summarized the current design (adapter from Max's original design doc) in an email to dev list: https://lists.apache.org/thread.html/b99ba8527422e31ec7bb7ad9dc3a6583551ea392ebdc5527b5fb4a67@%3Cdev.beam.apache.org%3E

The idea is to minimize the amount of work a transform author has to do to make a given transform available to remote SDKs. With the current solution, the transform authors simply have to provide a builder that can be used to build the transform from a given config object (POJO). Framework will generate a proto from this that defines the wire format. Remote SDKs can invoke arbitrary transforms that are exposed through the expansion API (and has a builder available) by using generic ExternalTransform. Also, transform authors may define nicer wrappers (of ExternalTransform) if they choose to do so.

Of course this means that a cross-SDK interaction can only be done thorough standard coders that are defined for both languages. These will be used for (1) encoding parameters used to build the rewmote transform (2) encoding data transferred between SDKs at runtime. A pipeline author has to be aware of this and has to make sure that transforms at SDK boundaries use standard coders. We hope the solution will become more generic once Beam SQL becomes available (where we should be able to do a Row to Row conversion from SDK A to SDK B for generic types).

@mxm
Copy link
Contributor Author

mxm commented Apr 9, 2019

Run Python PreCommit

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. LGTM other than few comments.

}
}

/** Exposes GenerateSequence as an external transform for cross-language usage. */
Copy link
Contributor

@chamikaramj chamikaramj Apr 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/GenerateSequence/KafkaIO.TypedWithoutMetadata

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

from apache_beam.transforms import ptransform


class ReadFromKafka(ptransform.PTransform):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I think we should also provide end-to-end instructions on how a user can use this transform in a pipeline, including for example, steps for starting up the expansion service (here or elsewhere) given the this is the first real world source that some of our users might try to use out of the box in the next Beam release.

@@ -172,6 +173,20 @@ def test_external_transform(self):

assert_that(res, equal_to([i for i in range(1, 10)]))

# We do not run KafkaIO but check that the transform
# is expanded by the ExpansionService without an error.
p = self.create_pipeline()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sg. I think a full end-to-end test will definitely be helpful.

@lukecwik
Copy link
Member

lukecwik commented Apr 9, 2019

Thanks @mxm and @chamikaramj for the links. I read through them and have a better understanding of the details.

Using the decoded PCollection<keyType, valueType> for KafkaIO.Read will cover many user usecases but from experience with the GCP pubsub native source for Dataflow and the Python SDK users always wanted to get additional attributes from the pubsub message and only passing through the "data" meant that this didn't satisfy what many users wanted (and exposing a few additional attributes at a time wasn't great). This was easy for pubsub since they used proto as the canonical wire format. As a future follow-up it may be useful to also expose a Java KafkaIO.Read that produces org.apache.kafka.common.record.Record encoded as a byte[]. This would mean that the output type of KafkaIO.Read for the cross language transform is PCollection<byte[]> and push all the decoding logic into the Python SDK. This would mean that the user could use any "deserializer" they want but this puts a greater burden on the language that wants to consume the records since they will need to be able to decode such messages. Many times this additional logic in the downstream SDK isn't difficult to implement since you can rely on a language specific source library to do the parsing. Alternatively, if there aren't many language specific libraries for the source format, it may be wise to produce an intermediate format such as proto/json/... (which can generate language specific bindings or are very well supported in almost all languages) which sends all the data across.

@chamikaramj
Copy link
Contributor

Makes sense in general. If a IO connector offers a transform that produces records as bytes, it makes sense to support pushing deserializers to remote SDKs.

Looks like there's no such a read transform for Java KafkaIO today though. KafkaIO.TypedWithoutMetadata produces a PCollection<KV<K, V>> while KafkIO.Read produces a PCollection<KafkaRecord<K, V>> where a KafkaRecord wraps K, V values (and other metadata) extracted from a native Kafka record. Also, KafkIO.Read will not work for cross-language transforms till we convert it to use SDF (instead of UnboundedSource).

@mxm
Copy link
Contributor Author

mxm commented Apr 10, 2019

@lukecwik That's correct. During implementation I discovered that data is always wrapped into KafakRecord and went with Read.TypedWithoutMetaData to get around this. Like you pointed out, we need either 1) a KafkaRecord standard coder (perhaps something applicable to other messaging systems as well or 2) byte[] where we implement all coding in the SDK. For both we need to provide Coders in the SDK, but the benefit of having a standard coder would be a language-independent encoding scheme which is important when dealing with cross-language pipelines.

Also, KafkIO.Read will not work for cross-language transforms till we convert it to use SDF (instead of UnboundedSource).

I think we will have to support Read in portable Runners for now until SDF is finalized and an adapter for UnboundedSource is available. For the Flink Runner this is already the case and other Runners can add it easily. It's not so bad because, after all, it is transparent to the user.

Copy link
Contributor Author

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your comments @chamikaramj.

}
}

/** Exposes GenerateSequence as an external transform for cross-language usage. */
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

from apache_beam.transforms import ptransform


class ReadFromKafka(ptransform.PTransform):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to include the instructions for getting started with cross-language portability here? We can add an outline here of what the user has to do. For detailed instructions, there should be a page on the Beam homepage. The only page I could think of at the moment would be https://beam.apache.org/contribute/portability/. We should have a more user-friendly page for the next release.

@@ -172,6 +173,20 @@ def test_external_transform(self):

assert_that(res, equal_to([i for i in range(1, 10)]))

# We do not run KafkaIO but check that the transform
# is expanded by the ExpansionService without an error.
p = self.create_pipeline()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering that KafkaIO itself is only mock-tested, I'm not sure I want to bring up a Kafka cluster here. I think we have sufficient test coverage for the configuration through KafkaIOExternalTest. We could execute here and check for an exception, e.g. "Unknown broker".

@lukecwik
Copy link
Member

lukecwik commented Apr 10, 2019

@mxm Supporting the whole KafkaRecord could be a TODO for the future. For now you could just produce KV<byte[], byte[]> with the intent that the receiving end performs any Kafka key/value -> language specific type decoding (instead of passing in the key / value deserializers to the Java SDK). This will produce a version that has a lot of value for many people since all "data" types can be supported in the Python SDK by exposing a UDF that users can implement to perform the decoding in the Python SDK. You could produce some defaults for things like bytes/utf8/dates/...

Also, to support the whole KafkaRecord in the future, Kafka must already have a well defined "encoding" format for compatibility across all its client SDKs so it would be great if we could use that and then use the parsing in the Kafka client libraries to do that work. Our coder spec would be beam:coder:kafka_record:v1 and just point to a certain version of the KafkaRecord spec.

@mxm
Copy link
Contributor Author

mxm commented Apr 10, 2019

@lukecwik I think your experience with Pubsub on Dataflow is a good reminder of the requirements users have when reading from a message queue. For simple use cases, only having the data might be fine but ultimately users will ask for the metadata (partition id, partition offset, timestamp). Defining a standard coder for this seems inevitable.

For now, going with the current approach here to just ship the data seems to be fine. The KV<byte[], byte[]>, coder agnostic way makes sense to me because it gives users freedom to implement their own encoding. Note that this is already possible if users configure the ByteArrayDeserializer.

I'm wondering whether we should keep the mapping from Kafka Deserializer to standard coders. Most Kafka users are familiar with Java and are used to Kafka Deserializers. The obvious drawback of this approach is that we need to maintain this mapping and we can't support all Deserializers. We could make KV<byte[], byte[]> the default and infer types of if Deserializers if they are provided by the user. This might be a more flexible approach.

@lukecwik @chamikaramj WDYT?

@chamikaramj
Copy link
Contributor

I think it makes sense to keep a mapping from deserializers to standard coders in Python SDK. This way users can configure next step of the python pipeline to consume data using the corresponding coder. The issue (as you mentioned) is keeping such a list up to date with what happens in the Java SDK. But probably changes to Kafka deserializers are rare enough to justify this.

@lukecwik
Copy link
Member

I agree with @chamikaramj about making the decoding happen within the Python SDK for the same reason.

This adds KafkaIO.Read as an external transform and includes a Python
wrapper (ReadFromkafka) for convenience. The transform only returns the
key/value data for a Kafka topic. It does not include the meta data such as
partition id, offset, or timestamp.

By default, the data is returned as `KV<byte[], byte[]>`. Users can supply a
Kafka Deserializer in ReadFromkafka such as LongDeserializer which will infer a
different coder. Only a limited amount of Deserializers are supported.
Alternatively, users can implement their own decoding in the target SDK.
@mxm
Copy link
Contributor Author

mxm commented Apr 11, 2019

I've made KV<byte[], byte[]> the default. Optionally, users can specify a different Deserializer. I've updated the documentation and modified the end-to-end-test in flink_runner_test.py to fully execute (and check failure due to missing Kafka cluster). I think we can merge this and then add WriteToKafka next.

@chamikaramj
Copy link
Contributor

LGTM. Thanks.

@mxm
Copy link
Contributor Author

mxm commented Apr 12, 2019

Thanks for the reviews!

@mxm
Copy link
Contributor Author

mxm commented Apr 12, 2019

Unrelated WatchTest failures: https://builds.apache.org/job/beam_PreCommit_Java_Commit/5332/

@mxm mxm merged commit 3aaf39a into apache:master Apr 12, 2019
@manuelaguilar
Copy link

manuelaguilar commented Jul 31, 2019

@mxm I've been trying to use the ReadFromKafka transform (kafka.py) via Flink with the portable runner, but it seems the value coder resolves to ByteArrayCoder instead of KafkaRecordCoder. Am I missing a registration step? My transform is similar to the one in the unit test (flink_runner_test.py).

Here are some logs with some extra INFO I added:

[Source: Read From Kafka/ExternalTransform(beam:external:java:kafka:read:v1)/KafkaIO.Read/Read(KafkaUnboundedSource) -> Flat Map -> [1]Read From Kafka/ExternalTransform(beam:external:java:kafka:read:v1)/Remove Kafka Metadata (1/4)] INFO org.apache.beam.sdk.coders.LengthPrefixCoder - Creating length prefix coder with value coder class org.apache.beam.sdk.coders.ByteArrayCoder
[grpc-default-executor-1] INFO org.apache.beam.fn.harness.FnHarness - Fn Harness started
[grpc-default-executor-1] INFO org.apache.beam.fn.harness.FnHarness - Entering instruction processing loop
[Source: Read From Kafka/ExternalTransform(beam:external:java:kafka:read:v1)/KafkaIO.Read/Read(KafkaUnboundedSource) -> Flat Map -> [1]Read From Kafka/ExternalTransform(beam:external:java:kafka:read:v1)/Remove Kafka Metadata (2/4)] INFO org.apache.beam.runners.core.construction.CoderTranslators - Getting length prefix coder for type class org.apache.beam.sdk.coders.ByteArrayCoder
[Source: Read From Kafka/ExternalTransform(beam:external:java:kafka:read:v1)/KafkaIO.Read/Read(KafkaUnboundedSource) -> Flat Map -> [1]Read From Kafka/ExternalTransform(beam:external:java:kafka:read:v1)/Remove Kafka Metadata (2/4)] INFO org.apache.beam.sdk.coders.LengthPrefixCoder - Creating length prefix coder with value coder class org.apache.beam.sdk.coders.ByteArrayCoder
[Source: Read From Kafka/ExternalTransform(beam:external:java:kafka:read:v1)/KafkaIO.Read/Read(KafkaUnboundedSource) -> Flat Map -> [1]Read From Kafka/ExternalTransform(beam:external:java:kafka:read:v1)/Remove Kafka Metadata (3/4)] INFO org.apache.beam.runners.core.construction.CoderTranslators - Getting length prefix coder for type class org.apache.beam.sdk.coders.ByteArrayCoder
[Source: Read From Kafka/ExternalTransform(beam:external:java:kafka:read:v1)/KafkaIO.Read/Read(KafkaUnboundedSource) -> Flat Map -> [1]Read From Kafka/ExternalTransform(beam:external:java:kafka:read:v1)/Remove Kafka Metadata (3/4)] INFO org.apache.beam.sdk.coders.LengthPrefixCoder - Creating length prefix coder with value coder class org.apache.beam.sdk.coders.ByteArrayCoder
[Source: Read From Kafka/ExternalTransform(beam:external:java:kafka:read:v1)/KafkaIO.Read/Read(KafkaUnboundedSource) -> Flat Map -> [1]Read From Kafka/ExternalTransform(beam:external:java:kafka:read:v1)/Remove Kafka Metadata (4/4)] INFO org.apache.beam.sdk.coders.LengthPrefixCoder - Creating length prefix coder with value coder class org.apache.beam.sdk.coders.ByteArrayCoder
...
[Source: Read From Kafka/ExternalTransform(beam:external:java:kafka:read:v1)/KafkaIO.Read/Read(KafkaUnboundedSource) -> Flat Map -> [1]Read From Kafka/ExternalTransform(beam:external:java:kafka:read:v1)/Remove Kafka Metadata (4/4)] INFO org.apache.beam.sdk.coders.LengthPrefixCoder - Encoding value of type class org.apache.beam.sdk.io.kafka.KafkaRecord with coder of type class org.apache.beam.sdk.coders.ByteArrayCoder
[pool-13-thread-1] INFO org.apache.beam.sdk.io.kafka.KafkaUnboundedSource - Reader-3: Returning from consumer pool loop
[Source: Read From Kafka/ExternalTransform(beam:external:java:kafka:read:v1)/KafkaIO.Read/Read(KafkaUnboundedSource) -> Flat Map -> [1]Read From Kafka/ExternalTransform(beam:external:java:kafka:read:v1)/Remove Kafka Metadata (4/4)] INFO org.apache.flink.runtime.taskmanager.Task - Source: Read From Kafka/ExternalTransform(beam:external:java:kafka:read:v1)/KafkaIO.Read/Read(KafkaUnboundedSource) -> Flat Map -> [1]Read From Kafka/ExternalTransform(beam:external:java:kafka:read:v1)/Remove Kafka Metadata (4/4) (6eabba22b9485eeaf271d72fffdc08b1) switched from RUNNING to FAILED.
java.lang.ClassCastException: org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B
        at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:44)
        at org.apache.beam.sdk.coders.LengthPrefixCoder.encode(LengthPrefixCoder.java:66)
        at org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:112)
        at org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:87)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:537)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:486)
        at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
        at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
        at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
        at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
        at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.copy(CoderTypeSerializer.java:89)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.emitElement(UnboundedSourceWrapper.java:334)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:239)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
        at org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:45)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)

@mxm
Copy link
Contributor Author

mxm commented Aug 1, 2019

Thanks for trying out the Kafka consumer. There are limitations for the consumer to work correctly. The biggest issue is the structure of KafkaIO itself, which uses a combination of the source interface and DoFns to generate the desired output. The problem is that the source interface is natively translated by Flink to support unbounded sources in portability, while the DoFn runs in a Java environment. To transfer data between the two a coder needs to be involved. It happens to be that the initial read does not immediately drop the KafakRecord structure which does not work together well with our current assumption of only supporting "standard coders" present in all SDKs.

There are several possible solutions:

  1. Make the DoFn which drops the KafkaRecordCoder a native Java transform in the Flink Runner
  2. Modify KafkaIO to immediately drop the KafkaRecord structure
  3. Add the KafkaRecordCoder to all SDKs
  4. Add a generic coder, e.g. AvroCoder to all SDKs

For a workaround which uses (3), please see this patch which is not a proper fix but adds KafkaRecordCoder to the SDK such that it can be used encode/decode records: mxm@b31cf99

I'm planning to fix the issue in the coming weeks.

@mxm
Copy link
Contributor Author

mxm commented Aug 1, 2019

Created a JIRA issue: https://jira.apache.org/jira/browse/BEAM-7870

@manuelaguilar
Copy link

manuelaguilar commented Aug 1, 2019

@mxm I tried the patch and I can see that the right coder is applied to the transform. Thank you.

My next step is to have the transform output (KafkaRecords) to be passed to my Python pipeline.

How can I handover KafkaRecord elements from Java to Python? Will it work by calling an external transform in the Python pipeline or is there a simpler way to translate the ReadFromKafka output into a Python implementation of KafkaRecord (e.g. confluent_kafka).

For my pipeline test, I added a Map function 'passthrough' (it just logs the message and yields the element) in my Python pipeline after reading from Kafka:

messages = ( p | 'Read From Kafka' >> ReadFromKafka(
                                          consumer_config={ ......
                                          },
                                          ......
                                          expansion_service='localhost:8097'))


messages | 'Passthrough' >> beam.Map(passthrough)

As of now, this is the error I'm getting while the portable runner is reading from Kafka topics

[grpc-default-executor-1] ERROR org.apache.beam.fn.harness.control.BeamFnControlClient - Exception while trying to handle InstructionRequest 5 java.lang.IllegalArgumentException: Expected DoFn to be FunctionSpec with URN urn:beam:dofn:javasdk:0.1, but URN was beam:dofn:pickled_python_info:v1
        at org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:416)
        at org.apache.beam.runners.core.construction.ParDoTranslation.doFnWithExecutionInformationFromProto(ParDoTranslation.java:572)
        at org.apache.beam.runners.core.construction.ParDoTranslation.getDoFn(ParDoTranslation.java:282)
        at org.apache.beam.fn.harness.DoFnPTransformRunnerFactory$Context.<init>(DoFnPTransformRunnerFactory.java:197)
        at org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.createRunnerForPTransform(DoFnPTransformRunnerFactory.java:96)
        at org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.createRunnerForPTransform(DoFnPTransformRunnerFactory.java:64)
        at org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:194)
        at org.apache.beam.fn.harness.control.ProcessBundleHandler.createRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:163)
        at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:290)
        at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:160)
        at org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:144)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

@mxm
Copy link
Contributor Author

mxm commented Aug 1, 2019

This error occurs when the output coder can not be inferred correctly from the Python function. The following should work:

messages | 'Passthrough' >> beam.Map(passthrough).with_output_types(KV[bytes, bytes])

Apart from this, no additional work should be necessary to make this work. Let me know if that fixes the problem you are seeing. Thanks for the feedback.

@manuelaguilar
Copy link

@mxm I tried the hinted type but I'm still getting the same error. 'passthrough' is a local Python function:

def passthrough(element): logging.info("Received element {}".format(element)) yield element

And the error:

[grpc-default-executor-0] ERROR org.apache.beam.fn.harness.control.BeamFnControlClient - Exception while trying to handle InstructionRequest 5 java.lang.IllegalArgumentException: Expected DoFn to be FunctionSpec with URN urn:beam:dofn:ja vasdk:0.1, but URN was beam:dofn:pickled_python_info:v1

Does portability allow having both external and local transforms in a pipeline in different languages? From what I read, the pipeline is submitted to the runner for execution, how does the runner handle cross language pipelines?

In this case, the execution of the pipeline occurs in the Flink server (Java), and it's not expecting a Python urn.

@manuelaguilar
Copy link

manuelaguilar commented Aug 1, 2019

@mxm By looking at the payload, I think I introduced an issue when hardcoding SDK docker image names in the fn executor docker factory (my organization uses the very unusual id@domain as Unix user name and docker rejects such image names).
So I think based on the payload urn, the portable runner will choose which image to run the pipeline in.

@mxm
Copy link
Contributor Author

mxm commented Aug 1, 2019

Does portability allow having both external and local transforms in a pipeline in different languages?

Yes it does. You can mix Python, Java code, and Go code in the same pipeline. Each have their own environment via docker containers. That's also why we are limited to standard coders to transfer data between the different environments.

It is a bit tricky though for sources where we actually mix three different environments a) Java via docker b) Python via docker c) native Java via the Flink runner to run the old style unbounded sources, i.e. Kafka IO. The latter code path will be removed once we have an adapter to run old style sources via SplittableDoFn. If that does not make sense to you, that's alright :)

Not sure what the issue is here. Earlier I ran a semantically identical pipeline with a ReadFromKafka and a pass-through Python transform, followed by a WriteToKafka. Will take a closer look when I get a chance (currently traveling).

@manuelaguilar
Copy link

@mxm The issue was related to invalid docker image names.

My first issue (invalid docker image name) was solved by creating a plain Unix account and building all the images there.

A small issue I noticed after was when deploying the portable runner in Linux for a pipeline that runs multiple languages (Java + Python). The flink server does not have the USER environment variable defined, so it can't find the right payload image to use on the second transform and probably onwards. I hardcoded the USER environment variable in the flink server Dockerfile and it now finds the right image for the right payload.

The patch works and I'm able to consume Kafka payload using the portable runner.

Another thing I noticed is that although the deserializer I specified in the ReadFromKafka transform is for ByteArray (both for key and value), the Python pipeline is consuming it as a string without further conversion.

Would it make sense to have the KafkaRecord metadata as an option in the portable pipeline? (to be able to access partitions, offsets, timestamps, and other KafkaRecord properties).

I'll continue running some performance tests to see how Flink scales up with a few million records in the portability framework.

@mxm
Copy link
Contributor Author

mxm commented Aug 12, 2019

Great to hear that you resolved the issue. The user variable could probably be inferred from the user's home directory.

Another thing I noticed is that although the deserializer I specified in the ReadFromKafka transform is for ByteArray (both for key and value), the Python pipeline is consuming it as a string without further conversion.

The string consumption will only work for Python 2 though where bytes and strings are treated equally.

Would it make sense to have the KafkaRecord metadata as an option in the portable pipeline? (to be able to access partitions, offsets, timestamps, and other KafkaRecord properties).

That would make perfect sense. However, we wanted to avoid having to include KafkaRecord as a type across SDKs. We figured it might be sufficient to have only the raw data available. As you saw, it is not yet possible to completely use the KafkaIO consumer without the KafkaRecord cross-language coder. Hopefully, we will have fixed that soon and introduce a structure for preserving KafkaRecord meta data across SDKs.

Curious to learn about your performance tests.

@manuelaguilar
Copy link

@mxm It seems performance depends mostly on the sink. I've been able to get 3000 msg/sec with a file sink (which doesn't complete the final write when I cancel the job via Flink), and 2300 msg/sec with a recently patched version of the mongodb sink. The pipeline had a map transform to get the value from every KV element. This was done on a quad core Intel(R) Xeon(R) CPU E5-2695 v4 @ 2.10GHz virtual machine with 8GB memory using a standalone Flink docker image as runner endpoint.
I have observed our Google Dataflow n1-standard-1 instance (1vCPU, 3.75GB memory) can process 2000 msg/sec per worker (using the respective Dataflow runner). This was using a Java dataflow template.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants