Skip to content

Commit

Permalink
[FLINK-3341] Make 'auto.offset.reset' compatible with Kafka 0.8 and 0.9
Browse files Browse the repository at this point in the history
This closes #1597
  • Loading branch information
rmetzger authored and StephanEwen committed Feb 10, 2016
1 parent 8ccd754 commit 9173825
Show file tree
Hide file tree
Showing 9 changed files with 25 additions and 55 deletions.
Expand Up @@ -70,7 +70,7 @@
* <li>socket.timeout.ms</li> * <li>socket.timeout.ms</li>
* <li>socket.receive.buffer.bytes</li> * <li>socket.receive.buffer.bytes</li>
* <li>fetch.message.max.bytes</li> * <li>fetch.message.max.bytes</li>
* <li>auto.offset.reset with the values "latest", "earliest" (unlike 0.8.2 behavior)</li> * <li>auto.offset.reset with the values "largest", "smallest"</li>
* <li>fetch.wait.max.ms</li> * <li>fetch.wait.max.ms</li>
* </ul> * </ul>
* </li> * </li>
Expand Down
Expand Up @@ -576,7 +576,8 @@ private static void getLastOffset(SimpleConsumer consumer, List<FetchPartition>


private static long getInvalidOffsetBehavior(Properties config) { private static long getInvalidOffsetBehavior(Properties config) {
long timeType; long timeType;
if (config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest").equals("latest")) { String val = config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest");
if (val.equals("largest") || val.equals("latest")) { // largest is kafka 0.8, latest is kafka 0.9
timeType = OffsetRequest.LatestTime(); timeType = OffsetRequest.LatestTime();
} else { } else {
timeType = OffsetRequest.EarliestTime(); timeType = OffsetRequest.EarliestTime();
Expand Down
Expand Up @@ -93,13 +93,13 @@ public void testInvalidOffset() throws Exception {


// set invalid offset: // set invalid offset:
CuratorFramework curatorClient = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient(); CuratorFramework curatorClient = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardCC.groupId(), topic, 0, 1234); ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardProps.getProperty("group.id"), topic, 0, 1234);
curatorClient.close(); curatorClient.close();


// read from topic // read from topic
final int valuesCount = 20; final int valuesCount = 20;
final int startFrom = 0; final int startFrom = 0;
readSequence(env, standardCC.props().props(), parallelism, topic, valuesCount, startFrom); readSequence(env, standardProps, parallelism, topic, valuesCount, startFrom);


deleteTestTopic(topic); deleteTestTopic(topic);
} }
Expand Down Expand Up @@ -188,9 +188,9 @@ public void testOffsetInZookeeper() throws Exception {


CuratorFramework curatorClient = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient(); CuratorFramework curatorClient = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();


long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardCC.groupId(), topicName, 0); long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 0);
long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardCC.groupId(), topicName, 1); long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 1);
long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardCC.groupId(), topicName, 2); long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 2);


LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3); LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3);


Expand All @@ -201,9 +201,9 @@ public void testOffsetInZookeeper() throws Exception {
LOG.info("Manipulating offsets"); LOG.info("Manipulating offsets");


// set the offset to 50 for the three partitions // set the offset to 50 for the three partitions
ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardCC.groupId(), topicName, 0, 49); ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 0, 49);
ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardCC.groupId(), topicName, 1, 49); ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 1, 49);
ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardCC.groupId(), topicName, 2, 49); ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardProps.getProperty("group.id"), topicName, 2, 49);


curatorClient.close(); curatorClient.close();


Expand Down Expand Up @@ -250,9 +250,9 @@ public void testOffsetAutocommitTest() throws Exception {
// get the offset // get the offset
CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient(); CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();


long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardCC.groupId(), topicName, 0); long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 0);
long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardCC.groupId(), topicName, 1); long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 1);
long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardCC.groupId(), topicName, 2); long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 2);


LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3); LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3);


Expand Down
Expand Up @@ -21,7 +21,6 @@
import kafka.admin.AdminUtils; import kafka.admin.AdminUtils;
import kafka.api.PartitionMetadata; import kafka.api.PartitionMetadata;
import kafka.common.KafkaException; import kafka.common.KafkaException;
import kafka.consumer.ConsumerConfig;
import kafka.network.SocketServer; import kafka.network.SocketServer;
import kafka.server.KafkaConfig; import kafka.server.KafkaConfig;
import kafka.server.KafkaServer; import kafka.server.KafkaServer;
Expand Down Expand Up @@ -68,19 +67,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
private String zookeeperConnectionString; private String zookeeperConnectionString;
private String brokerConnectionString = ""; private String brokerConnectionString = "";
private Properties standardProps; private Properties standardProps;
private ConsumerConfig standardCC;



public String getBrokerConnectionString() { public String getBrokerConnectionString() {
return brokerConnectionString; return brokerConnectionString;
} }



@Override
public ConsumerConfig getStandardConsumerConfig() {
return standardCC;
}

@Override @Override
public Properties getStandardProperties() { public Properties getStandardProperties() {
return standardProps; return standardProps;
Expand Down Expand Up @@ -187,13 +178,8 @@ public void prepare(int numKafkaServers) {
standardProps.setProperty("auto.commit.enable", "false"); standardProps.setProperty("auto.commit.enable", "false");
standardProps.setProperty("zookeeper.session.timeout.ms", "12000"); // 6 seconds is default. Seems to be too small for travis. standardProps.setProperty("zookeeper.session.timeout.ms", "12000"); // 6 seconds is default. Seems to be too small for travis.
standardProps.setProperty("zookeeper.connection.timeout.ms", "20000"); standardProps.setProperty("zookeeper.connection.timeout.ms", "20000");
standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. standardProps.setProperty("auto.offset.reset", "smallest"); // read from the beginning. (smallest is kafka 0.8)
standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!) standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)

Properties consumerConfigProps = new Properties();
consumerConfigProps.putAll(standardProps);
consumerConfigProps.setProperty("auto.offset.reset", "smallest");
standardCC = new ConsumerConfig(consumerConfigProps);
} }


@Override @Override
Expand Down Expand Up @@ -274,8 +260,8 @@ public void deleteTestTopic(String topic) {
} }


private ZkClient createZkClient() { private ZkClient createZkClient() {
return new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), return new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer()); Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
} }


/** /**
Expand Down
Expand Up @@ -65,18 +65,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
private String zookeeperConnectionString; private String zookeeperConnectionString;
private String brokerConnectionString = ""; private String brokerConnectionString = "";
private Properties standardProps; private Properties standardProps;
private ConsumerConfig standardCC;



public String getBrokerConnectionString() { public String getBrokerConnectionString() {
return brokerConnectionString; return brokerConnectionString;
} }


@Override
public ConsumerConfig getStandardConsumerConfig() {
return standardCC;
}

@Override @Override
public Properties getStandardProperties() { public Properties getStandardProperties() {
return standardProps; return standardProps;
Expand Down Expand Up @@ -184,13 +177,8 @@ public void prepare(int numKafkaServers) {
standardProps.setProperty("auto.commit.enable", "false"); standardProps.setProperty("auto.commit.enable", "false");
standardProps.setProperty("zookeeper.session.timeout.ms", "12000"); // 6 seconds is default. Seems to be too small for travis. standardProps.setProperty("zookeeper.session.timeout.ms", "12000"); // 6 seconds is default. Seems to be too small for travis.
standardProps.setProperty("zookeeper.connection.timeout.ms", "20000"); standardProps.setProperty("zookeeper.connection.timeout.ms", "20000");
standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.9 value)
standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!) standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)

Properties consumerConfigProps = new Properties();
consumerConfigProps.putAll(standardProps);
consumerConfigProps.setProperty("auto.offset.reset", "smallest");
standardCC = new ConsumerConfig(consumerConfigProps);
} }


@Override @Override
Expand Down Expand Up @@ -233,8 +221,8 @@ public void shutdown() {
} }


public ZkUtils getZkUtils() { public ZkUtils getZkUtils() {
ZkClient creator = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), ZkClient creator = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer()); Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
return ZkUtils.apply(creator, false); return ZkUtils.apply(creator, false);
} }


Expand Down Expand Up @@ -280,8 +268,8 @@ public void deleteTestTopic(String topic) {
try { try {
LOG.info("Deleting topic {}", topic); LOG.info("Deleting topic {}", topic);


ZkClient zk = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), ZkClient zk = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer()); Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());


AdminUtils.deleteTopic(zkUtils, topic); AdminUtils.deleteTopic(zkUtils, topic);


Expand Down
Expand Up @@ -1284,11 +1284,10 @@ private static void printTopic(String topicName, int elements,DeserializationSch
throws IOException throws IOException
{ {
// write the sequence to log for debugging purposes // write the sequence to log for debugging purposes
Properties stdProps = standardCC.props().props(); Properties newProps = new Properties(standardProps);
Properties newProps = new Properties(stdProps);
newProps.setProperty("group.id", "topic-printer"+ UUID.randomUUID().toString()); newProps.setProperty("group.id", "topic-printer"+ UUID.randomUUID().toString());
newProps.setProperty("auto.offset.reset", "smallest"); newProps.setProperty("auto.offset.reset", "smallest");
newProps.setProperty("zookeeper.connect", standardCC.zkConnect()); newProps.setProperty("zookeeper.connect", standardProps.getProperty("zookeeper.connect"));


ConsumerConfig printerConfig = new ConsumerConfig(newProps); ConsumerConfig printerConfig = new ConsumerConfig(newProps);
printTopic(topicName, printerConfig, deserializer, elements); printTopic(topicName, printerConfig, deserializer, elements);
Expand Down
Expand Up @@ -66,7 +66,6 @@ public abstract class KafkaTestBase extends TestLogger {


protected static String brokerConnectionStrings; protected static String brokerConnectionStrings;


protected static ConsumerConfig standardCC;
protected static Properties standardProps; protected static Properties standardProps;


protected static ForkableFlinkMiniCluster flink; protected static ForkableFlinkMiniCluster flink;
Expand Down Expand Up @@ -98,7 +97,6 @@ public static void prepare() throws IOException, ClassNotFoundException {
kafkaServer.prepare(NUMBER_OF_KAFKA_SERVERS); kafkaServer.prepare(NUMBER_OF_KAFKA_SERVERS);


standardProps = kafkaServer.getStandardProperties(); standardProps = kafkaServer.getStandardProperties();
standardCC = kafkaServer.getStandardConsumerConfig();
brokerConnectionStrings = kafkaServer.getBrokerConnectionString(); brokerConnectionStrings = kafkaServer.getBrokerConnectionString();


// start also a re-usable Flink mini cluster // start also a re-usable Flink mini cluster
Expand Down
Expand Up @@ -17,15 +17,13 @@


package org.apache.flink.streaming.connectors.kafka; package org.apache.flink.streaming.connectors.kafka;


import kafka.consumer.ConsumerConfig;
import kafka.server.KafkaServer; import kafka.server.KafkaServer;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;


import java.net.UnknownHostException;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
Expand All @@ -45,7 +43,6 @@ public abstract class KafkaTestEnvironment {


public abstract void createTestTopic(String topic, int numberOfPartitions, int replicationFactor); public abstract void createTestTopic(String topic, int numberOfPartitions, int replicationFactor);


public abstract ConsumerConfig getStandardConsumerConfig();


public abstract Properties getStandardProperties(); public abstract Properties getStandardProperties();


Expand Down
Expand Up @@ -155,6 +155,7 @@ public void testDetachedMode() {
List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)); List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
Assert.assertEquals(1, apps.size()); // Only one running Assert.assertEquals(1, apps.size()); // Only one running
ApplicationReport app = apps.get(0); ApplicationReport app = apps.get(0);

Assert.assertEquals("MyCustomName", app.getName()); Assert.assertEquals("MyCustomName", app.getName());
ApplicationId id = app.getApplicationId(); ApplicationId id = app.getApplicationId();
yc.killApplication(id); yc.killApplication(id);
Expand Down

0 comments on commit 9173825

Please sign in to comment.