Skip to content

Commit

Permalink
[hotfix][Kafka] Refactor properties for KafkaTestEnvironment setup
Browse files Browse the repository at this point in the history
  • Loading branch information
pnowojski authored and StefanRRichter committed Aug 8, 2017
1 parent b7f96f7 commit e11a591
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 52 deletions.
Expand Up @@ -79,8 +79,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
private String zookeeperConnectionString; private String zookeeperConnectionString;
private String brokerConnectionString = ""; private String brokerConnectionString = "";
private Properties standardProps; private Properties standardProps;
private Properties additionalServerProperties; private Config config;
private boolean secureMode = false;
// 6 seconds is default. Seems to be too small for travis. 30 seconds // 6 seconds is default. Seems to be too small for travis. 30 seconds
private int zkTimeout = 30000; private int zkTimeout = 30000;


Expand All @@ -96,7 +95,7 @@ public Properties getStandardProperties() {
@Override @Override
public Properties getSecureProperties() { public Properties getSecureProperties() {
Properties prop = new Properties(); Properties prop = new Properties();
if (secureMode) { if (config.isSecureMode()) {
prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT"); prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
prop.put("security.protocol", "SASL_PLAINTEXT"); prop.put("security.protocol", "SASL_PLAINTEXT");
prop.put("sasl.kerberos.service.name", "kafka"); prop.put("sasl.kerberos.service.name", "kafka");
Expand Down Expand Up @@ -215,26 +214,24 @@ public boolean isSecureRunSupported() {
} }


@Override @Override
public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) { public void prepare(Config config) {
//increase the timeout since in Travis ZK connection takes long time for secure connection. //increase the timeout since in Travis ZK connection takes long time for secure connection.
if (secureMode) { if (config.isSecureMode()) {
//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout //run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
numKafkaServers = 1; config.setKafkaServersNumber(1);
zkTimeout = zkTimeout * 15; zkTimeout = zkTimeout * 15;
} }
this.config = config;


this.additionalServerProperties = additionalServerProperties;
this.secureMode = secureMode;
File tempDir = new File(System.getProperty("java.io.tmpdir")); File tempDir = new File(System.getProperty("java.io.tmpdir"));

tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString())); tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs()); assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());


tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString())); tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString()));
assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs()); assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());


tmpKafkaDirs = new ArrayList<>(numKafkaServers); tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
for (int i = 0; i < numKafkaServers; i++) { for (int i = 0; i < config.getKafkaServersNumber(); i++) {
File tmpDir = new File(tmpKafkaParent, "server-" + i); File tmpDir = new File(tmpKafkaParent, "server-" + i);
assertTrue("cannot create kafka temp dir", tmpDir.mkdir()); assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
tmpKafkaDirs.add(tmpDir); tmpKafkaDirs.add(tmpDir);
Expand All @@ -249,12 +246,12 @@ public void prepare(int numKafkaServers, Properties additionalServerProperties,
LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString); LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);


LOG.info("Starting KafkaServer"); LOG.info("Starting KafkaServer");
brokers = new ArrayList<>(numKafkaServers); brokers = new ArrayList<>(config.getKafkaServersNumber());


for (int i = 0; i < numKafkaServers; i++) { for (int i = 0; i < config.getKafkaServersNumber(); i++) {
brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i))); brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));


if (secureMode) { if (config.isSecureMode()) {
brokerConnectionString += hostAndPortToUrlString( brokerConnectionString += hostAndPortToUrlString(
KafkaTestEnvironment.KAFKA_HOST, KafkaTestEnvironment.KAFKA_HOST,
brokers.get(i).socketServer().boundPort( brokers.get(i).socketServer().boundPort(
Expand Down Expand Up @@ -347,7 +344,7 @@ public void createTestTopic(String topic, int numberOfPartitions, int replicatio
final long deadline = System.nanoTime() + 30_000_000_000L; final long deadline = System.nanoTime() + 30_000_000_000L;
do { do {
try { try {
if (secureMode) { if (config.isSecureMode()) {
//increase wait time since in Travis ZK timeout occurs frequently //increase wait time since in Travis ZK timeout occurs frequently
int wait = zkTimeout / 100; int wait = zkTimeout / 100;
LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic); LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
Expand Down Expand Up @@ -407,8 +404,8 @@ protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Except
// for CI stability, increase zookeeper session timeout // for CI stability, increase zookeeper session timeout
kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout); kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout); kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout);
if (additionalServerProperties != null) { if (config.getKafkaServerProperties() != null) {
kafkaProperties.putAll(additionalServerProperties); kafkaProperties.putAll(config.getKafkaServerProperties());
} }


final int numTries = 5; final int numTries = 5;
Expand All @@ -418,7 +415,7 @@ protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Except
kafkaProperties.put("port", Integer.toString(kafkaPort)); kafkaProperties.put("port", Integer.toString(kafkaPort));


//to support secure kafka cluster //to support secure kafka cluster
if (secureMode) { if (config.isSecureMode()) {
LOG.info("Adding Kafka secure configurations"); LOG.info("Adding Kafka secure configurations");
kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort); kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort); kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
Expand Down
Expand Up @@ -84,7 +84,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
private String zookeeperConnectionString; private String zookeeperConnectionString;
private String brokerConnectionString = ""; private String brokerConnectionString = "";
private Properties standardProps; private Properties standardProps;
private Properties additionalServerProperties;
private Config config;


public String getBrokerConnectionString() { public String getBrokerConnectionString() {
return brokerConnectionString; return brokerConnectionString;
Expand Down Expand Up @@ -206,8 +207,8 @@ public boolean isSecureRunSupported() {
} }


@Override @Override
public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) { public void prepare(Config config) {
this.additionalServerProperties = additionalServerProperties; this.config = config;
File tempDir = new File(System.getProperty("java.io.tmpdir")); File tempDir = new File(System.getProperty("java.io.tmpdir"));


tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString())); tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
Expand All @@ -224,8 +225,8 @@ public void prepare(int numKafkaServers, Properties additionalServerProperties,
fail("cannot create kafka temp dir: " + e.getMessage()); fail("cannot create kafka temp dir: " + e.getMessage());
} }


tmpKafkaDirs = new ArrayList<>(numKafkaServers); tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
for (int i = 0; i < numKafkaServers; i++) { for (int i = 0; i < config.getKafkaServersNumber(); i++) {
File tmpDir = new File(tmpKafkaParent, "server-" + i); File tmpDir = new File(tmpKafkaParent, "server-" + i);
assertTrue("cannot create kafka temp dir", tmpDir.mkdir()); assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
tmpKafkaDirs.add(tmpDir); tmpKafkaDirs.add(tmpDir);
Expand All @@ -240,9 +241,9 @@ public void prepare(int numKafkaServers, Properties additionalServerProperties,
zookeeperConnectionString = zookeeper.getConnectString(); zookeeperConnectionString = zookeeper.getConnectString();


LOG.info("Starting KafkaServer"); LOG.info("Starting KafkaServer");
brokers = new ArrayList<>(numKafkaServers); brokers = new ArrayList<>(config.getKafkaServersNumber());


for (int i = 0; i < numKafkaServers; i++) { for (int i = 0; i < config.getKafkaServersNumber(); i++) {
brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i))); brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
SocketServer socketServer = brokers.get(i).socketServer(); SocketServer socketServer = brokers.get(i).socketServer();


Expand Down Expand Up @@ -391,8 +392,8 @@ protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Except
// for CI stability, increase zookeeper session timeout // for CI stability, increase zookeeper session timeout
kafkaProperties.put("zookeeper.session.timeout.ms", "30000"); kafkaProperties.put("zookeeper.session.timeout.ms", "30000");
kafkaProperties.put("zookeeper.connection.timeout.ms", "30000"); kafkaProperties.put("zookeeper.connection.timeout.ms", "30000");
if (additionalServerProperties != null) { if (config.getKafkaServerProperties() != null) {
kafkaProperties.putAll(additionalServerProperties); kafkaProperties.putAll(config.getKafkaServerProperties());
} }


final int numTries = 5; final int numTries = 5;
Expand Down
Expand Up @@ -78,11 +78,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
private String zookeeperConnectionString; private String zookeeperConnectionString;
private String brokerConnectionString = ""; private String brokerConnectionString = "";
private Properties standardProps; private Properties standardProps;
private Properties additionalServerProperties;
private boolean secureMode = false;
// 6 seconds is default. Seems to be too small for travis. 30 seconds // 6 seconds is default. Seems to be too small for travis. 30 seconds
private String zkTimeout = "30000"; private String zkTimeout = "30000";


private Config config;

public String getBrokerConnectionString() { public String getBrokerConnectionString() {
return brokerConnectionString; return brokerConnectionString;
} }
Expand Down Expand Up @@ -200,27 +200,24 @@ public boolean isSecureRunSupported() {
} }


@Override @Override
public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) { public void prepare(Config config) {

//increase the timeout since in Travis ZK connection takes long time for secure connection. //increase the timeout since in Travis ZK connection takes long time for secure connection.
if (secureMode) { if (config.isSecureMode()) {
//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout //run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
numKafkaServers = 1; config.setKafkaServersNumber(1);
zkTimeout = String.valueOf(Integer.parseInt(zkTimeout) * 15); zkTimeout = String.valueOf(Integer.parseInt(zkTimeout) * 15);
} }
this.config = config;


this.additionalServerProperties = additionalServerProperties;
this.secureMode = secureMode;
File tempDir = new File(System.getProperty("java.io.tmpdir")); File tempDir = new File(System.getProperty("java.io.tmpdir"));

tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString())); tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs()); assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());


tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString())); tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString()));
assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs()); assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());


tmpKafkaDirs = new ArrayList<>(numKafkaServers); tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
for (int i = 0; i < numKafkaServers; i++) { for (int i = 0; i < config.getKafkaServersNumber(); i++) {
File tmpDir = new File(tmpKafkaParent, "server-" + i); File tmpDir = new File(tmpKafkaParent, "server-" + i);
assertTrue("cannot create kafka temp dir", tmpDir.mkdir()); assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
tmpKafkaDirs.add(tmpDir); tmpKafkaDirs.add(tmpDir);
Expand All @@ -236,13 +233,13 @@ public void prepare(int numKafkaServers, Properties additionalServerProperties,
LOG.info("zookeeperConnectionString: {}", zookeeperConnectionString); LOG.info("zookeeperConnectionString: {}", zookeeperConnectionString);


LOG.info("Starting KafkaServer"); LOG.info("Starting KafkaServer");
brokers = new ArrayList<>(numKafkaServers); brokers = new ArrayList<>(config.getKafkaServersNumber());


for (int i = 0; i < numKafkaServers; i++) { for (int i = 0; i < config.getKafkaServersNumber(); i++) {
brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i))); brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));


SocketServer socketServer = brokers.get(i).socketServer(); SocketServer socketServer = brokers.get(i).socketServer();
if (secureMode) { if (this.config.isSecureMode()) {
brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ","; brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ",";
} else { } else {
brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ","; brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
Expand Down Expand Up @@ -335,7 +332,7 @@ public void createTestTopic(String topic, int numberOfPartitions, int replicatio
final long deadline = System.nanoTime() + Integer.parseInt(zkTimeout) * 1_000_000L; final long deadline = System.nanoTime() + Integer.parseInt(zkTimeout) * 1_000_000L;
do { do {
try { try {
if (secureMode) { if (config.isSecureMode()) {
//increase wait time since in Travis ZK timeout occurs frequently //increase wait time since in Travis ZK timeout occurs frequently
int wait = Integer.parseInt(zkTimeout) / 100; int wait = Integer.parseInt(zkTimeout) / 100;
LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic); LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
Expand Down Expand Up @@ -400,8 +397,8 @@ protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Except
// for CI stability, increase zookeeper session timeout // for CI stability, increase zookeeper session timeout
kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout); kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout); kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout);
if (additionalServerProperties != null) { if (config.getKafkaServerProperties() != null) {
kafkaProperties.putAll(additionalServerProperties); kafkaProperties.putAll(config.getKafkaServerProperties());
} }


final int numTries = 5; final int numTries = 5;
Expand All @@ -411,7 +408,7 @@ protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Except
kafkaProperties.put("port", Integer.toString(kafkaPort)); kafkaProperties.put("port", Integer.toString(kafkaPort));


//to support secure kafka cluster //to support secure kafka cluster
if (secureMode) { if (config.isSecureMode()) {
LOG.info("Adding Kafka secure configurations"); LOG.info("Adding Kafka secure configurations");
kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort); kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort); kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
Expand Down Expand Up @@ -442,7 +439,7 @@ protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Except


public Properties getSecureProperties() { public Properties getSecureProperties() {
Properties prop = new Properties(); Properties prop = new Properties();
if (secureMode) { if (config.isSecureMode()) {
prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT"); prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
prop.put("security.protocol", "SASL_PLAINTEXT"); prop.put("security.protocol", "SASL_PLAINTEXT");
prop.put("sasl.kerberos.service.name", "kafka"); prop.put("sasl.kerberos.service.name", "kafka");
Expand Down
Expand Up @@ -98,7 +98,7 @@ public static void prepare() throws IOException, ClassNotFoundException {
specificProperties.setProperty("log.retention.minutes", "0"); specificProperties.setProperty("log.retention.minutes", "0");
specificProperties.setProperty("log.retention.ms", "250"); specificProperties.setProperty("log.retention.ms", "250");
specificProperties.setProperty("log.retention.check.interval.ms", "100"); specificProperties.setProperty("log.retention.check.interval.ms", "100");
kafkaServer.prepare(1, specificProperties, false); kafkaServer.prepare(kafkaServer.createConfig().setKafkaServerProperties(specificProperties));


standardProps = kafkaServer.getStandardProperties(); standardProps = kafkaServer.getStandardProperties();


Expand Down
Expand Up @@ -135,7 +135,7 @@ protected static void startClusters(boolean secureMode) throws ClassNotFoundExce


LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion()); LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion());


kafkaServer.prepare(NUMBER_OF_KAFKA_SERVERS, secureMode); kafkaServer.prepare(kafkaServer.createConfig().setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS).setSecureMode(secureMode));


standardProps = kafkaServer.getStandardProperties(); standardProps = kafkaServer.getStandardProperties();


Expand Down
Expand Up @@ -38,15 +38,56 @@
* Abstract class providing a Kafka test environment. * Abstract class providing a Kafka test environment.
*/ */
public abstract class KafkaTestEnvironment { public abstract class KafkaTestEnvironment {
/**
* Configuration class for {@link KafkaTestEnvironment}.
*/
public static class Config {
private int kafkaServersNumber = 1;
private Properties kafkaServerProperties = null;
private boolean secureMode = false;

/**
* Please use {@link KafkaTestEnvironment#createConfig()} method.
*/
private Config() {
}

public int getKafkaServersNumber() {
return kafkaServersNumber;
}

public Config setKafkaServersNumber(int kafkaServersNumber) {
this.kafkaServersNumber = kafkaServersNumber;
return this;
}

public Properties getKafkaServerProperties() {
return kafkaServerProperties;
}

public Config setKafkaServerProperties(Properties kafkaServerProperties) {
this.kafkaServerProperties = kafkaServerProperties;
return this;
}

public boolean isSecureMode() {
return secureMode;
}

public Config setSecureMode(boolean secureMode) {
this.secureMode = secureMode;
return this;
}
}


protected static final String KAFKA_HOST = "localhost"; protected static final String KAFKA_HOST = "localhost";


public abstract void prepare(int numKafkaServers, Properties kafkaServerProperties, boolean secureMode); public static Config createConfig() {

return new Config();
public void prepare(int numberOfKafkaServers, boolean secureMode) {
this.prepare(numberOfKafkaServers, null, secureMode);
} }


public abstract void prepare(Config config);

public abstract void shutdown(); public abstract void shutdown();


public abstract void deleteTestTopic(String topic); public abstract void deleteTestTopic(String topic);
Expand Down

0 comments on commit e11a591

Please sign in to comment.