Skip to content

Comments

[BEAM-1830] KafkaIO : Add withTopic() api that takes single topic.#2364

Closed
rangadi wants to merge 6 commits intoapache:masterfrom
rangadi:with_topic
Closed

[BEAM-1830] KafkaIO : Add withTopic() api that takes single topic.#2364
rangadi wants to merge 6 commits intoapache:masterfrom
rangadi:with_topic

Conversation

@rangadi
Copy link
Contributor

@rangadi rangadi commented Mar 29, 2017

Overwhelming majority of KafkaIO readers consume just one topic. It would be nice to have withTopic(topic) rather than always requiring a list.

In addition, I am fixing a small bug I noticed while using KafkaIO.write().values(). Remove need for setting key coder for Writer while writing values only. If we didn't specifiy the key coder, validation succeeded but it failed a check while instantiating Kafka producer.

Remove need for setting key coder for Writer while writing
values only. If we didn't specifiy the key coder, validation
succeeded but it failed a check while instantiating Kafka producer.
@rangadi
Copy link
Contributor Author

rangadi commented Mar 29, 2017

+R: @amitsela +R: @jkff

@coveralls
Copy link

Coverage Status

Coverage increased (+0.003%) to 70.167% when pulling d2da706 on rangadi:with_topic into 8fa1159 on apache:master.

@Nullable abstract String getTopic();
@Nullable abstract Coder<K> getKeyCoder();
@Nullable abstract Coder<V> getValueCoder();
abstract boolean getValueOnly();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and the changes below don't seem to be related to the current commit or PR. Also, all of these changes are missing tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, mentioned about it in the description. This is a bug I noticed when I tried to use the writer.

withTopic() is just a wrapper/builder method. All the existing tests use multiple topics, should I another test with single topic?

Testing sink changes are partially done: The validation code is tested in KafkaIOTest.testValuesSink(). This test didn't fail earlier since we don't use a real KafkaProducer that instantiates the serializers. Let me see if I can manually try to instantiate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either way please split into multiple commits (ideally multiple PRs, since these commits are not logically related)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted writer fix. Added a test with single topic that uses new API.

@asfbot
Copy link

asfbot commented Mar 29, 2017

@coveralls
Copy link

Coverage Status

Coverage increased (+0.003%) to 70.167% when pulling d2da706 on rangadi:with_topic into 8fa1159 on apache:master.

@asfbot
Copy link

asfbot commented Mar 29, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8929/
--none--

@rangadi
Copy link
Contributor Author

rangadi commented Mar 30, 2017

Thanks Eugene. PTAL.
Reverted writer fix. Added a test with single topic that uses new API.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.004%) to 70.159% when pulling a9b2f0a on rangadi:with_topic into 8fa1159 on apache:master.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.004%) to 70.159% when pulling a9b2f0a on rangadi:with_topic into 8fa1159 on apache:master.

@asfbot
Copy link

asfbot commented Mar 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8940/
--none--

@rangadi
Copy link
Contributor Author

rangadi commented Mar 30, 2017

Kafka writer is fixed in #2369.

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, please also squash all commits into one.

@@ -291,6 +291,31 @@ public void testUnboundedSource() {
p.run();
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per #2359 we no longer need NeedsRunner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. Removed it for the new test.

@amitsela
Copy link
Member

amitsela commented Mar 30, 2017

-R: @amitsela as Eugene seems to have this.

@rangadi
Copy link
Contributor Author

rangadi commented Mar 30, 2017

@jkff Thanks for the review. It might be better for you to squash while merging. That way PR retains the commit history.

@asfbot
Copy link

asfbot commented Mar 30, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8960/

Build result: FAILURE

[...truncated 2.11 MB...] at java.lang.Thread.run(Thread.java:745)Caused by: org.apache.maven.plugin.MojoExecutionException: Command execution failed. at org.codehaus.mojo.exec.ExecMojo.execute(ExecMojo.java:302) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) ... 31 moreCaused by: org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit value: 1) at org.apache.commons.exec.DefaultExecutor.executeInternal(DefaultExecutor.java:404) at org.apache.commons.exec.DefaultExecutor.execute(DefaultExecutor.java:166) at org.codehaus.mojo.exec.ExecMojo.executeCommandLine(ExecMojo.java:764) at org.codehaus.mojo.exec.ExecMojo.executeCommandLine(ExecMojo.java:711) at org.codehaus.mojo.exec.ExecMojo.execute(ExecMojo.java:289) ... 33 more2017-03-30T19:54:48.312 [ERROR] 2017-03-30T19:54:48.312 [ERROR] Re-run Maven using the -X switch to enable full debug logging.2017-03-30T19:54:48.312 [ERROR] 2017-03-30T19:54:48.312 [ERROR] For more information about the errors and possible solutions, please read the following articles:2017-03-30T19:54:48.312 [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException2017-03-30T19:54:48.312 [ERROR] 2017-03-30T19:54:48.312 [ERROR] After correcting the problems, you can resume the build with the command2017-03-30T19:54:48.312 [ERROR] mvn -rf :beam-sdks-pythonchannel stoppedSetting status of 330c48d to FAILURE with url https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/8960/ and message: 'Build finished. 'Using context: Jenkins: Maven clean install
--none--

@rangadi
Copy link
Contributor Author

rangadi commented Mar 31, 2017

retest this please.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.2%) to 70.33% when pulling 330c48d on rangadi:with_topic into 8fa1159 on apache:master.

@rangadi
Copy link
Contributor Author

rangadi commented Mar 31, 2017

retest this please.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.2%) to 70.33% when pulling 330c48d on rangadi:with_topic into 8fa1159 on apache:master.

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I'll squash and merge.

@asfbot
Copy link

asfbot commented Mar 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/9030/
--none--

@asfgit asfgit closed this in 5f72b83 Apr 1, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants