Skip to content

Commit

Permalink
Merge 56ea7f7 into ffd8755
Browse files Browse the repository at this point in the history
  • Loading branch information
rangadi committed Mar 30, 2017
2 parents ffd8755 + 56ea7f7 commit 2dcc984
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 12 deletions.
Expand Up @@ -254,7 +254,6 @@ public static <K, V> Read<K, V> read() {
public static <K, V> Write<K, V> write() {
return new AutoValue_KafkaIO_Write.Builder<K, V>()
.setProducerConfig(Write.DEFAULT_PRODUCER_PROPERTIES)
.setValueOnly(false)
.build();
}

Expand Down Expand Up @@ -1159,7 +1158,6 @@ public abstract static class Write<K, V> extends PTransform<PCollection<KV<K, V>
@Nullable abstract String getTopic();
@Nullable abstract Coder<K> getKeyCoder();
@Nullable abstract Coder<V> getValueCoder();
abstract boolean getValueOnly();
abstract Map<String, Object> getProducerConfig();
@Nullable
abstract SerializableFunction<Map<String, Object>, Producer<K, V>> getProducerFactoryFn();
Expand All @@ -1171,7 +1169,6 @@ abstract static class Builder<K, V> {
abstract Builder<K, V> setTopic(String topic);
abstract Builder<K, V> setKeyCoder(Coder<K> keyCoder);
abstract Builder<K, V> setValueCoder(Coder<V> valueCoder);
abstract Builder<K, V> setValueOnly(boolean valueOnly);
abstract Builder<K, V> setProducerConfig(Map<String, Object> producerConfig);
abstract Builder<K, V> setProducerFactoryFn(
SerializableFunction<Map<String, Object>, Producer<K, V>> fn);
Expand Down Expand Up @@ -1231,7 +1228,7 @@ public Write<K, V> withProducerFactoryFn(
* collections of values rather thank {@link KV}s.
*/
public PTransform<PCollection<V>, PDone> values() {
return new KafkaValueWrite<>(toBuilder().setValueOnly(true).build());
return new KafkaValueWrite<>(withKeyCoder(new NullOnlyCoder<K>()).toBuilder().build());
}

@Override
Expand All @@ -1245,21 +1242,20 @@ public void validate(PCollection<KV<K, V>> input) {
checkNotNull(getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
"Kafka bootstrap servers should be set");
checkNotNull(getTopic(), "Kafka topic should be set");
if (!getValueOnly()) {
checkNotNull(getKeyCoder(), "Key coder should be set");
}
checkNotNull(getKeyCoder(), "Key coder should be set");
checkNotNull(getValueCoder(), "Value coder should be set");
}

// set config defaults
private static final Map<String, Object> DEFAULT_PRODUCER_PROPERTIES =
ImmutableMap.<String, Object>of(
ProducerConfig.RETRIES_CONFIG, 3,
// See comment about custom serializers in KafkaWriter constructor.
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, CoderBasedKafkaSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CoderBasedKafkaSerializer.class);

/**
* A set of properties that are not required or don't make sense for our consumer.
* A set of properties that are not required or don't make sense for our producer.
*/
private static final Map<String, String> IGNORED_PRODUCER_PROPERTIES = ImmutableMap.of(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "Set keyCoder instead",
Expand Down Expand Up @@ -1373,11 +1369,13 @@ public void teardown() {
KafkaWriter(Write<K, V> spec) {
this.spec = spec;

// Set custom kafka serializers. We can not serialize user objects then pass the bytes to
// producer. The key and value objects are used in kafka Partitioner interface.
// Set custom kafka serializers. We do not want to serialize user objects then pass the bytes
// to producer since key and value objects are used in Kafka Partitioner interface.
// This does not matter for default partitioner in Kafka as it uses just the serialized
// key bytes to pick a partition. But are making sure user's custom partitioner would work
// as expected.
// key bytes to pick a partition. But we don't want to limit use of custom partitions.
// We pass key and values objects the user writes directly Kafka and user supplied
// coders to serialize them are invoked inside CoderBasedKafkaSerializer.
// Use case : write all the events for a single session to same Kafka partition.

this.producerConfig = new HashMap<>(spec.getProducerConfig());
this.producerConfig.put(configForKeySerializer(), spec.getKeyCoder());
Expand Down
Expand Up @@ -71,9 +71,12 @@
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.joda.time.Instant;
import org.junit.Rule;
Expand Down Expand Up @@ -728,8 +731,21 @@ public void flush() {
private static class ProducerFactoryFn
implements SerializableFunction<Map<String, Object>, Producer<Integer, Long>> {

@SuppressWarnings("unchecked")
@Override
public Producer<Integer, Long> apply(Map<String, Object> config) {

// Make sure the config is correctly set up for serializers.
Utils.newInstance(
((Class<?>) config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
.asSubclass(Serializer.class)
).configure(config, true);

Utils.newInstance(
((Class<?>) config.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
.asSubclass(Serializer.class)
).configure(config, false);

return MOCK_PRODUCER;
}
}
Expand Down

0 comments on commit 2dcc984

Please sign in to comment.