Skip to content

Commit

Permalink
[FLINK-1753] [streaming] Added test for Kafka connector with tuple type
Browse files Browse the repository at this point in the history
This closes #557
  • Loading branch information
Gábor Hermann authored and rmetzger committed Apr 2, 2015
1 parent 7cf9586 commit 359b39c
Show file tree
Hide file tree
Showing 4 changed files with 406 additions and 53 deletions.
Expand Up @@ -34,7 +34,6 @@
import kafka.producer.KeyedMessage; import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig; import kafka.producer.ProducerConfig;
import kafka.serializer.DefaultEncoder; import kafka.serializer.DefaultEncoder;
import kafka.serializer.StringEncoder;


/** /**
* Sink that emits its inputs to a Kafka topic. * Sink that emits its inputs to a Kafka topic.
Expand Down Expand Up @@ -123,7 +122,9 @@ public void open(Configuration configuration) {
props.put("request.required.acks", "1"); props.put("request.required.acks", "1");


props.put("serializer.class", DefaultEncoder.class.getCanonicalName()); props.put("serializer.class", DefaultEncoder.class.getCanonicalName());
props.put("key.serializer.class", StringEncoder.class.getCanonicalName());
// this will not be used as the key will not be serialized
props.put("key.serializer.class", DefaultEncoder.class.getCanonicalName());


if (partitioner != null) { if (partitioner != null) {
props.put("partitioner.class", PartitionerWrapper.class.getCanonicalName()); props.put("partitioner.class", PartitionerWrapper.class.getCanonicalName());
Expand Down Expand Up @@ -152,7 +153,9 @@ public void open(Configuration configuration) {
@Override @Override
public void invoke(IN next) { public void invoke(IN next) {
byte[] serialized = schema.serialize(next); byte[] serialized = schema.serialize(next);
producer.send(new KeyedMessage<IN, byte[]>(topicId, next, serialized));
// Sending message without serializable key.
producer.send(new KeyedMessage<IN, byte[]>(topicId, null, next, serialized));
} }


@Override @Override
Expand Down
Expand Up @@ -64,6 +64,26 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {


private volatile boolean isRunning = false; private volatile boolean isRunning = false;


/**
* Creates a KafkaSource that consumes a topic.
*
* @param zookeeperAddress
* Address of the Zookeeper host (with port number).
* @param topicId
* ID of the Kafka topic.
* @param groupId
* ID of the consumer group.
* @param deserializationSchema
* User defined deserialization schema.
* @param zookeeperSyncTimeMillis
* Synchronization time with zookeeper.
*/
public KafkaSource(String zookeeperAddress,
String topicId, String groupId,
DeserializationSchema<OUT> deserializationSchema,
long zookeeperSyncTimeMillis) {
this(zookeeperAddress, topicId, groupId, deserializationSchema, zookeeperSyncTimeMillis, null);
}
/** /**
* Creates a KafkaSource that consumes a topic. * Creates a KafkaSource that consumes a topic.
* *
Expand Down
Expand Up @@ -32,7 +32,7 @@
* *
* This PartitionerWrapper is called with the Properties. From there, we extract the wrapped Partitioner instance. * This PartitionerWrapper is called with the Properties. From there, we extract the wrapped Partitioner instance.
* *
* The serialziable PartitionerWrapper is serialized into the Properties Hashmap and also deserialized from there. * The serializable PartitionerWrapper is serialized into the Properties Hashmap and also deserialized from there.
*/ */
public class PartitionerWrapper implements Partitioner { public class PartitionerWrapper implements Partitioner {
public final static String SERIALIZED_WRAPPER_NAME = "flink.kafka.wrapper.serialized"; public final static String SERIALIZED_WRAPPER_NAME = "flink.kafka.wrapper.serialized";
Expand Down

0 comments on commit 359b39c

Please sign in to comment.