From d958c7bdb14ea42f80b955a94fd9d4f6b1b870fc Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Wed, 29 Mar 2017 08:17:25 -0700 Subject: [PATCH 1/6] KafkaIO : Add withTopic() api that takes single topic. Remove need for setting key coder for Writer while writing values only. If we didn't specifiy the key coder, validation succeeded but it failed a check while instantiating Kafka producer. --- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 7880cbc9e27f..da1360cc8dd9 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -124,7 +124,7 @@ * pipeline * .apply(KafkaIO.read() * .withBootstrapServers("broker_1:9092,broker_2:9092") - * .withTopics(ImmutableList.of("topic_a", "topic_b")) + * .withTopic("my_topic") // use withTopics(List) to consume from multiple topics. * // set a Coder for Key and Value * .withKeyCoder(BigEndianLongCoder.of()) * .withValueCoder(StringUtf8Coder.of()) @@ -254,7 +254,6 @@ public static Read read() { public static Write write() { return new AutoValue_KafkaIO_Write.Builder() .setProducerConfig(Write.DEFAULT_PRODUCER_PROPERTIES) - .setValueOnly(false) .build(); } @@ -308,6 +307,15 @@ public Read withBootstrapServers(String bootstrapServers) { ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)); } + /** + * Returns a new {@link Read} that reads from the topic. + * See {@link UnboundedKafkaSource#generateInitialSplits(int, PipelineOptions)} for description + * of how the partitions are distributed among the splits. + */ + public Read withTopic(String topic) { + return withTopics(ImmutableList.of(topic)); + } + /** * Returns a new {@link Read} that reads from the topics. All the partitions from each * of the topics are read. @@ -1159,7 +1167,6 @@ public abstract static class Write extends PTransform @Nullable abstract String getTopic(); @Nullable abstract Coder getKeyCoder(); @Nullable abstract Coder getValueCoder(); - abstract boolean getValueOnly(); abstract Map getProducerConfig(); @Nullable abstract SerializableFunction, Producer> getProducerFactoryFn(); @@ -1171,7 +1178,6 @@ abstract static class Builder { abstract Builder setTopic(String topic); abstract Builder setKeyCoder(Coder keyCoder); abstract Builder setValueCoder(Coder valueCoder); - abstract Builder setValueOnly(boolean valueOnly); abstract Builder setProducerConfig(Map producerConfig); abstract Builder setProducerFactoryFn( SerializableFunction, Producer> fn); @@ -1231,7 +1237,7 @@ public Write withProducerFactoryFn( * collections of values rather thank {@link KV}s. */ public PTransform, PDone> values() { - return new KafkaValueWrite<>(toBuilder().setValueOnly(true).build()); + return new KafkaValueWrite<>(withKeyCoder(new NullOnlyCoder()).toBuilder().build()); } @Override @@ -1245,9 +1251,7 @@ public void validate(PCollection> input) { checkNotNull(getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG), "Kafka bootstrap servers should be set"); checkNotNull(getTopic(), "Kafka topic should be set"); - if (!getValueOnly()) { - checkNotNull(getKeyCoder(), "Key coder should be set"); - } + checkNotNull(getKeyCoder(), "Key coder should be set"); checkNotNull(getValueCoder(), "Value coder should be set"); } From d2da7061cc8fc9c1dee682f57da55043e9fe65a4 Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Wed, 29 Mar 2017 15:17:23 -0700 Subject: [PATCH 2/6] minor --- .../src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index da1360cc8dd9..f71736df00b3 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -124,7 +124,7 @@ * pipeline * .apply(KafkaIO.read() * .withBootstrapServers("broker_1:9092,broker_2:9092") - * .withTopic("my_topic") // use withTopics(List) to consume from multiple topics. + * .withTopic("my_topic") // use withTopics(List) to read from multiple topics. * // set a Coder for Key and Value * .withKeyCoder(BigEndianLongCoder.of()) * .withValueCoder(StringUtf8Coder.of()) From 5a2d828e83dd793287c444e10af45240fc2739f8 Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Wed, 29 Mar 2017 21:22:03 -0700 Subject: [PATCH 3/6] remove writer key coder fix. --- .../main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index f71736df00b3..b807b53166ef 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -1167,6 +1167,7 @@ public abstract static class Write extends PTransform @Nullable abstract String getTopic(); @Nullable abstract Coder getKeyCoder(); @Nullable abstract Coder getValueCoder(); + abstract boolean getValueOnly(); abstract Map getProducerConfig(); @Nullable abstract SerializableFunction, Producer> getProducerFactoryFn(); @@ -1178,6 +1179,7 @@ abstract static class Builder { abstract Builder setTopic(String topic); abstract Builder setKeyCoder(Coder keyCoder); abstract Builder setValueCoder(Coder valueCoder); + abstract Builder setValueOnly(boolean valueOnly); abstract Builder setProducerConfig(Map producerConfig); abstract Builder setProducerFactoryFn( SerializableFunction, Producer> fn); @@ -1237,7 +1239,7 @@ public Write withProducerFactoryFn( * collections of values rather thank {@link KV}s. */ public PTransform, PDone> values() { - return new KafkaValueWrite<>(withKeyCoder(new NullOnlyCoder()).toBuilder().build()); + return new KafkaValueWrite<>(toBuilder().setValueOnly(true).build()); } @Override @@ -1251,7 +1253,9 @@ public void validate(PCollection> input) { checkNotNull(getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG), "Kafka bootstrap servers should be set"); checkNotNull(getTopic(), "Kafka topic should be set"); - checkNotNull(getKeyCoder(), "Key coder should be set"); + if (!getValueOnly()) { + checkNotNull(getKeyCoder(), "Key coder should be set"); + } checkNotNull(getValueCoder(), "Value coder should be set"); } From b8be6d734afc05abf98faac3f620304a8ccd8575 Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Wed, 29 Mar 2017 21:40:19 -0700 Subject: [PATCH 4/6] Add a test with single topic. --- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 1897127ac296..fa747946e048 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -291,6 +291,31 @@ public void testUnboundedSource() { p.run(); } + @Test + @Category(NeedsRunner.class) + public void testUnboundedSourceWithSingleTopic() { + // same as testUnboundedSource, but with single topic + + int numElements = 1000; + String topic = "my_topic"; + + KafkaIO.Read reader = KafkaIO.read() + .withBootstrapServers("none") + .withTopic("my_topic") + .withConsumerFactoryFn(new ConsumerFactoryFn( + ImmutableList.of(topic), 10, numElements, OffsetResetStrategy.EARLIEST)) + .withKeyCoder(BigEndianIntegerCoder.of()) + .withValueCoder(BigEndianLongCoder.of()) + .withMaxNumRecords(numElements); + + PCollection input = p + .apply(reader.withoutMetadata()) + .apply(Values.create()); + + addCountingAsserts(input, numElements); + p.run(); + } + @Test @Category(NeedsRunner.class) public void testUnboundedSourceWithExplicitPartitions() { From a9b2f0a54cfebf357ecd1be46ba310f444618dfe Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Wed, 29 Mar 2017 21:43:07 -0700 Subject: [PATCH 5/6] revert setValueOnly() that was missing. --- .../src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index b807b53166ef..d4da2e65e67a 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -254,6 +254,7 @@ public static Read read() { public static Write write() { return new AutoValue_KafkaIO_Write.Builder() .setProducerConfig(Write.DEFAULT_PRODUCER_PROPERTIES) + .setValueOnly(false) .build(); } From 330c48dc18bf939cd809b0428540c595baf66af8 Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Thu, 30 Mar 2017 12:15:45 -0700 Subject: [PATCH 6/6] remove NeedsRunner annotation. --- .../src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index fa747946e048..1667373fca9c 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -292,7 +292,6 @@ public void testUnboundedSource() { } @Test - @Category(NeedsRunner.class) public void testUnboundedSourceWithSingleTopic() { // same as testUnboundedSource, but with single topic