From 16336835266dfe238e8bc51d8c3fbb1369c9c44d Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Sun, 24 Apr 2016 13:06:59 -0700 Subject: [PATCH 1/3] Introduce metrics for ZK session expire listener --- .../scala/kafka/server/KafkaHealthcheck.scala | 57 ++++++++++------ .../main/scala/kafka/server/KafkaServer.scala | 5 +- .../server/SessionExpireListenerTest.scala | 68 +++++++++++++++++++ 3 files changed, 107 insertions(+), 23 deletions(-) create mode 100644 core/src/test/scala/unit/kafka/server/SessionExpireListenerTest.scala diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala index 4e3fc29f9dc41..117899b9c8d23 100644 --- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala +++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala @@ -18,15 +18,17 @@ package kafka.server import java.net.InetAddress +import java.util.Locale +import java.util.concurrent.TimeUnit import kafka.api.ApiVersion import kafka.cluster.EndPoint +import kafka.metrics.KafkaMetricsGroup import kafka.utils._ import org.I0Itec.zkclient.IZkStateListener import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.zookeeper.Watcher.Event.KeeperState - /** * This class registers the broker in zookeeper to allow * other brokers and consumers to detect failures. It uses an ephemeral znode with the path: @@ -35,14 +37,14 @@ import org.apache.zookeeper.Watcher.Event.KeeperState * Right now our definition of health is fairly naive. If we register in zk we are healthy, otherwise * we are dead. */ -class KafkaHealthcheck(private val brokerId: Int, - private val advertisedEndpoints: Map[SecurityProtocol, EndPoint], - private val zkUtils: ZkUtils, - private val rack: Option[String], - private val interBrokerProtocolVersion: ApiVersion) extends Logging { +class KafkaHealthcheck(brokerId: Int, + advertisedEndpoints: Map[SecurityProtocol, EndPoint], + zkUtils: ZkUtils, + rack: Option[String], + interBrokerProtocolVersion: ApiVersion) extends Logging { - val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId - val sessionExpireListener = new SessionExpireListener + private val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId + private[server] val sessionExpireListener = new SessionExpireListener def startup() { zkUtils.zkClient.subscribeStateChanges(sessionExpireListener) @@ -70,31 +72,44 @@ class KafkaHealthcheck(private val brokerId: Int, } /** - * When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a - * connection for us. We need to re-register this broker in the broker registry. + * When we get a SessionExpired event, it means that we have lost all ephemeral nodes and ZKClient has re-established + * a connection for us. We need to re-register this broker in the broker registry. We rely on `handleStateChanged` + * to record ZooKeeper connection state metrics. */ - class SessionExpireListener() extends IZkStateListener { + class SessionExpireListener extends IZkStateListener with KafkaMetricsGroup { + + private[server] val stateToMeterMap = { + import KeeperState._ + val stateToEventTypeMap = Map( + Disconnected -> "Disconnects", + SyncConnected -> "SyncConnects", + AuthFailed -> "AuthFailures", + ConnectedReadOnly -> "ReadOnlyConnects", + SaslAuthenticated -> "SaslAuthentications", + Expired -> "Expires" + ) + stateToEventTypeMap.map { case (state, eventType) => + state -> newMeter(s"ZooKeeper${eventType}PerSec", eventType.toLowerCase(Locale.ROOT), TimeUnit.SECONDS) + } + } + @throws(classOf[Exception]) - def handleStateChanged(state: KeeperState) {} + override def handleStateChanged(state: KeeperState) { + stateToMeterMap.get(state).foreach(_.mark()) + } - /** - * Called after the zookeeper session has expired and a new session has been created. You would have to re-create - * any ephemeral nodes here. - * - * @throws Exception - * On any error. - */ @throws(classOf[Exception]) - def handleNewSession() { + override def handleNewSession() { info("re-registering broker info in ZK for broker " + brokerId) register() info("done re-registering broker") info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath)) } - override def handleSessionEstablishmentError(error: Throwable): Unit = { + override def handleSessionEstablishmentError(error: Throwable) { fatal("Could not establish session with zookeeper", error) } + } } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 36b52fdc4b798..2832ebc6266ed 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -96,7 +96,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr private var shutdownLatch = new CountDownLatch(1) private val jmxPrefix: String = "kafka.server" - private val reporters: java.util.List[MetricsReporter] = config.metricReporterClasses + private val reporters: java.util.List[MetricsReporter] = config.metricReporterClasses reporters.add(new JmxReporter(jmxPrefix)) // This exists because the Metrics package from clients has its own Time implementation. @@ -239,7 +239,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr else (protocol, endpoint) } - kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack, config.interBrokerProtocolVersion) + kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack, + config.interBrokerProtocolVersion) kafkaHealthcheck.startup() // Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it diff --git a/core/src/test/scala/unit/kafka/server/SessionExpireListenerTest.scala b/core/src/test/scala/unit/kafka/server/SessionExpireListenerTest.scala new file mode 100644 index 0000000000000..4ffb18936fab7 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/SessionExpireListenerTest.scala @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.api.ApiVersion +import kafka.utils.ZkUtils +import org.I0Itec.zkclient.ZkClient +import org.apache.zookeeper.Watcher +import org.easymock.EasyMock +import org.junit.{Assert, Before, Test} +import Assert._ +import com.yammer.metrics.Metrics +import com.yammer.metrics.core.{Meter, Metric, MetricName} +import org.apache.kafka.common.utils.MockTime +import scala.collection.JavaConverters._ + +class SessionExpireListenerTest { + + private var time = new MockTime + private val brokerId = 1 + + @Test + def testSessionExpireListenerMetrics() { + + val metrics = Metrics.defaultRegistry + + def checkMeterCount(name: String, expected: Long) { + val meter = metrics.allMetrics.asScala.collectFirst { + case (metricName, meter: Meter) if metricName.getName == name => meter + }.getOrElse(sys.error(s"Unable to find meter with name $name")) + assertEquals("Unexpected meter count", expected, meter.count) + } + + val zkClient = EasyMock.mock(classOf[ZkClient]) + val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false) + import Watcher._ + val healthcheck = new KafkaHealthcheck(brokerId, Map.empty, zkUtils, None, ApiVersion.latestVersion) + + val expiresPerSecName = "ZooKeeperExpiresPerSec" + val disconnectsPerSecName = "ZooKeeperDisconnectsPerSec" + checkMeterCount(expiresPerSecName, 0) + checkMeterCount(disconnectsPerSecName, 0) + + healthcheck.sessionExpireListener.handleStateChanged(Event.KeeperState.Expired) + checkMeterCount(expiresPerSecName, 1) + checkMeterCount(disconnectsPerSecName, 0) + + healthcheck.sessionExpireListener.handleStateChanged(Event.KeeperState.Disconnected) + checkMeterCount(expiresPerSecName, 1) + checkMeterCount(disconnectsPerSecName, 1) + } + +} From bc2f7af3233916da6d6bac27bb3a0740009241f6 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Sun, 24 Apr 2016 13:07:26 -0700 Subject: [PATCH 2/3] Use `timeMs` instead of `time.milliseconds` in `Sensor.record` --- .../src/main/java/org/apache/kafka/common/metrics/Sensor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java index 0c5bcb74e6e46..098bfa85e5d16 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java @@ -96,7 +96,7 @@ public void record(double value) { * bound */ public void record(double value, long timeMs) { - this.lastRecordTime = time.milliseconds(); + this.lastRecordTime = timeMs; synchronized (this) { // increment all the stats for (int i = 0; i < this.stats.size(); i++) From 58462ef721befc85545b1c334a1d9ef2982c4514 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Sun, 24 Apr 2016 13:14:48 -0700 Subject: [PATCH 3/3] Several test clean-ups --- core/src/main/scala/kafka/utils/ZkUtils.scala | 2 +- .../kafka/api/AdminClientTest.scala | 2 +- .../kafka/api/ProducerBounceTest.scala | 16 ++----- .../test/scala/other/kafka/DeleteZKPath.scala | 44 ------------------- .../kafka/integration/PrimitiveApiTest.scala | 2 +- .../ZookeeperConsumerConnectorTest.scala | 2 +- .../kafka/server/ReplicaManagerTest.scala | 2 +- .../kafka/utils/CommandLineUtilsTest.scala | 2 +- .../test/scala/unit/kafka/zk/ZKPathTest.scala | 27 +++++------- .../unit/kafka/zk/ZooKeeperTestHarness.scala | 13 +++--- 10 files changed, 28 insertions(+), 84 deletions(-) delete mode 100755 core/src/test/scala/other/kafka/DeleteZKPath.scala diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 83ff51773ebba..81eb24ad105c4 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -924,7 +924,7 @@ object ZkPath { isNamespacePresent = true } - def resetNamespaceCheckedState { + def resetNamespaceCheckedState() { isNamespacePresent = false } diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala index ade1911b14b59..7fae81e8622dd 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala @@ -84,7 +84,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging { !consumers(0).assignment().isEmpty }, "Expected non-empty assignment") - val group= client.describeGroup(groupId) + val group = client.describeGroup(groupId) assertEquals("consumer", group.protocolType) assertEquals("range", group.protocol) assertEquals("Stable", group.state) diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala index 369c3b7cd60c5..5994a1d6f58d6 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala @@ -51,16 +51,11 @@ class ProducerBounceTest extends KafkaServerTestHarness { .map(KafkaConfig.fromProps(_, overridingProps)) } - private var consumer1: SimpleConsumer = null - private var consumer2: SimpleConsumer = null - private var producer1: KafkaProducer[Array[Byte],Array[Byte]] = null private var producer2: KafkaProducer[Array[Byte],Array[Byte]] = null private var producer3: KafkaProducer[Array[Byte],Array[Byte]] = null - private var producer4: KafkaProducer[Array[Byte],Array[Byte]] = null private val topic1 = "topic-1" - private val topic2 = "topic-2" @Before override def setUp() { @@ -76,7 +71,6 @@ class ProducerBounceTest extends KafkaServerTestHarness { if (producer1 != null) producer1.close if (producer2 != null) producer2.close if (producer3 != null) producer3.close - if (producer4 != null) producer4.close super.tearDown() } @@ -102,9 +96,8 @@ class ProducerBounceTest extends KafkaServerTestHarness { Thread.sleep(2000) } - // Make sure the producer do not see any exception - // in returned metadata due to broker failures - assertTrue(scheduler.failed == false) + // Make sure the producer do not see any exception in returned metadata due to broker failures + assertFalse(scheduler.failed) // Make sure the leader still exists after bouncing brokers (0 until numPartitions).foreach(partition => TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, partition)) @@ -114,7 +107,7 @@ class ProducerBounceTest extends KafkaServerTestHarness { // Make sure the producer do not see any exception // when draining the left messages on shutdown - assertTrue(scheduler.failed == false) + assertFalse(scheduler.failed) // double check that the leader info has been propagated after consecutive bounces val newLeaders = (0 until numPartitions).map(i => TestUtils.waitUntilMetadataIsPropagated(servers, topic1, i)) @@ -132,8 +125,7 @@ class ProducerBounceTest extends KafkaServerTestHarness { assertEquals("Should have fetched " + scheduler.sent + " unique messages", scheduler.sent, uniqueMessageSize) } - private class ProducerScheduler extends ShutdownableThread("daemon-producer", false) - { + private class ProducerScheduler extends ShutdownableThread("daemon-producer", false) { val numRecords = 1000 var sent = 0 var failed = false diff --git a/core/src/test/scala/other/kafka/DeleteZKPath.scala b/core/src/test/scala/other/kafka/DeleteZKPath.scala deleted file mode 100755 index 202bf4309f80b..0000000000000 --- a/core/src/test/scala/other/kafka/DeleteZKPath.scala +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package kafka - -import consumer.ConsumerConfig -import utils.ZkUtils -import org.I0Itec.zkclient.ZkClient -import org.apache.kafka.common.utils.Utils - -object DeleteZKPath { - def main(args: Array[String]) { - if(args.length < 2) { - println("USAGE: " + DeleteZKPath.getClass.getName + " consumer.properties zk_path") - System.exit(1) - } - - val config = new ConsumerConfig(Utils.loadProps(args(0))) - val zkPath = args(1) - val zkUtils = ZkUtils(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false) - - try { - zkUtils.deletePathRecursive(zkPath); - System.out.println(zkPath + " is deleted") - } catch { - case e: Exception => System.err.println("Path not deleted " + e.printStackTrace()) - } - - } -} diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index beb5d0eafdb11..85e9cad3d248f 100755 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -35,7 +35,7 @@ import java.util.Properties * End to end tests of the primitive apis against a local server */ @deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0") -class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHarness { +class PrimitiveApiTest extends ProducerConsumerTestHarness { val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler]) def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect))) diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala index e4c46973b5d76..83cce77ce3b05 100644 --- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala @@ -37,7 +37,7 @@ import org.apache.log4j.{Level, Logger} import org.junit.Assert._ @deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0") -class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with ZooKeeperTestHarness with Logging { +class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging { val numNodes = 2 val numParts = 2 val topic = "topic1" diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index c2c670e6b69cd..2cdf924ad5e72 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -58,7 +58,7 @@ class ReplicaManagerTest { @After def tearDown() { - metrics.close(); + metrics.close() } @Test diff --git a/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala index 6cc868dbc3998..50023f8046407 100644 --- a/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala @@ -23,7 +23,7 @@ import org.junit.Test class CommandLineUtilsTest { - @Test (expected = classOf[java.lang.IllegalArgumentException]) + @Test(expected = classOf[java.lang.IllegalArgumentException]) def testParseEmptyArg() { val argArray = Array("my.empty.property=") CommandLineUtils.parseKeyValueArgs(argArray, false) diff --git a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala index 92fae022a728c..7ef45505dfeed 100644 --- a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala +++ b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala @@ -25,7 +25,7 @@ import org.junit.Test class ZKPathTest extends ZooKeeperTestHarness { - val path: String = "/some_dir" + val path = "/some_dir" val zkSessionTimeoutMs = 1000 def zkConnectWithInvalidRoot: String = zkConnect + "/ghost" @@ -33,7 +33,7 @@ class ZKPathTest extends ZooKeeperTestHarness { def testCreatePersistentPathThrowsException { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, "test", "1")) - var zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs, + val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false) try { ZkPath.resetNamespaceCheckedState @@ -49,7 +49,7 @@ class ZKPathTest extends ZooKeeperTestHarness { @Test def testCreatePersistentPath { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1")) - var zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false) + val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false) try { ZkPath.resetNamespaceCheckedState zkUtils.createPersistentPath(path) @@ -63,10 +63,8 @@ class ZKPathTest extends ZooKeeperTestHarness { @Test def testMakeSurePersistsPathExistsThrowsException { - val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, - "test", "1")) - var zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs, - config.zkConnectionTimeoutMs, false) + val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, "test", "1")) + val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false) try { ZkPath.resetNamespaceCheckedState zkUtils.makeSurePersistentPathExists(path) @@ -81,7 +79,7 @@ class ZKPathTest extends ZooKeeperTestHarness { @Test def testMakeSurePersistsPathExists { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1")) - var zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false) + val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false) try { ZkPath.resetNamespaceCheckedState zkUtils.makeSurePersistentPathExists(path) @@ -95,10 +93,8 @@ class ZKPathTest extends ZooKeeperTestHarness { @Test def testCreateEphemeralPathThrowsException { - val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, - "test", "1")) - var zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs, - config.zkConnectionTimeoutMs, false) + val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, "test", "1")) + val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false) try { ZkPath.resetNamespaceCheckedState zkUtils.createEphemeralPathExpectConflict(path, "somedata") @@ -113,7 +109,7 @@ class ZKPathTest extends ZooKeeperTestHarness { @Test def testCreateEphemeralPathExists { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1")) - var zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false) + val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false) try { ZkPath.resetNamespaceCheckedState zkUtils.createEphemeralPathExpectConflict(path, "somedata") @@ -129,8 +125,7 @@ class ZKPathTest extends ZooKeeperTestHarness { def testCreatePersistentSequentialThrowsException { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, "test", "1")) - var zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs, - config.zkConnectionTimeoutMs, false) + val zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false) try { ZkPath.resetNamespaceCheckedState zkUtils.createSequentialPersistentPath(path) @@ -145,7 +140,7 @@ class ZKPathTest extends ZooKeeperTestHarness { @Test def testCreatePersistentSequentialExists { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1")) - var zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false) + val zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false) var actualPath: String = "" try { diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala index 95f4e350954c7..0de11cdbe12f9 100755 --- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala +++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala @@ -24,18 +24,19 @@ import org.scalatest.junit.JUnitSuite import org.apache.kafka.common.security.JaasUtils trait ZooKeeperTestHarness extends JUnitSuite with Logging { - var zookeeper: EmbeddedZookeeper = null - var zkPort: Int = -1 - var zkUtils: ZkUtils = null + val zkConnectionTimeout = 6000 val zkSessionTimeout = 6000 - def zkConnect: String = "127.0.0.1:" + zkPort - def confFile: String = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, "") + + var zkUtils: ZkUtils = null + var zookeeper: EmbeddedZookeeper = null + + def zkPort: Int = zookeeper.port + def zkConnect: String = s"127.0.0.1:$zkPort" @Before def setUp() { zookeeper = new EmbeddedZookeeper() - zkPort = zookeeper.port zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, JaasUtils.isZkSecurityEnabled()) }