diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle index e05b837298..e10d09ee85 100644 --- a/gradle/dependency-versions.gradle +++ b/gradle/dependency-versions.gradle @@ -38,7 +38,7 @@ jodaTimeVersion = "2.2" joptSimpleVersion = "5.0.4" junitVersion = "4.12" - kafkaVersion = "2.0.1" + kafkaVersion = "2.3.1" log4jVersion = "1.2.17" log4j2Version = "2.12.0" metricsVersion = "2.2.0" diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java index 329073cc3b..8fd21fe92b 100644 --- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java +++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java @@ -28,14 +28,13 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import kafka.common.TopicAndPartition; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.TopicPartition; import org.apache.samza.Partition; import org.apache.samza.SamzaException; import org.apache.samza.system.IncomingMessageEnvelope; @@ -358,7 +357,6 @@ protected int getRecordSize(ConsumerRecord r) { } private void updateMetrics(ConsumerRecord r, TopicPartition tp) { - TopicAndPartition tap = KafkaSystemConsumer.toTopicAndPartition(tp); SystemStreamPartition ssp = new SystemStreamPartition(systemName, tp.topic(), new Partition(tp.partition())); Long lag = latestLags.get(ssp); @@ -374,11 +372,11 @@ private void updateMetrics(ConsumerRecord r, TopicPartition tp) { long highWatermark = recordOffset + currentSSPLag; // derived value for the highwatermark int size = getRecordSize(r); - kafkaConsumerMetrics.incReads(tap); - kafkaConsumerMetrics.incBytesReads(tap, size); - kafkaConsumerMetrics.setOffsets(tap, recordOffset); + kafkaConsumerMetrics.incReads(tp); + kafkaConsumerMetrics.incBytesReads(tp, size); + kafkaConsumerMetrics.setOffsets(tp, recordOffset); kafkaConsumerMetrics.incClientBytesReads(metricName, size); - kafkaConsumerMetrics.setHighWatermarkValue(tap, highWatermark); + kafkaConsumerMetrics.setHighWatermarkValue(tp, highWatermark); } private void moveMessagesToTheirQueue(SystemStreamPartition ssp, List envelopes) { @@ -437,7 +435,7 @@ private void refreshLagMetrics() { for (Map.Entry e : nextOffsets.entrySet()) { SystemStreamPartition ssp = e.getKey(); Long offset = e.getValue(); - TopicAndPartition tp = new TopicAndPartition(ssp.getStream(), ssp.getPartition().getPartitionId()); + TopicPartition tp = new TopicPartition(ssp.getStream(), ssp.getPartition().getPartitionId()); Long lag = latestLags.get(ssp); LOG.trace("Latest offset of {} is {}; lag = {}", ssp, offset, lag); if (lag != null && offset != null && lag >= 0) { diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java index 27bd63855e..093baf599d 100644 --- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java +++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java @@ -25,10 +25,9 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import kafka.common.TopicAndPartition; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.TopicPartition; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.KafkaConfig; @@ -277,7 +276,7 @@ public void register(SystemStreamPartition systemStreamPartition, String offset) topicPartitionsToOffset.put(topicPartition, offset); } - metrics.registerTopicAndPartition(toTopicAndPartition(topicPartition)); + metrics.registerTopicPartition(topicPartition); } /** @@ -309,10 +308,6 @@ public Map> poll( return super.poll(systemStreamPartitions, timeout); } - protected static TopicAndPartition toTopicAndPartition(TopicPartition topicPartition) { - return new TopicAndPartition(topicPartition.topic(), topicPartition.partition()); - } - protected static TopicPartition toTopicPartition(SystemStreamPartition ssp) { return new TopicPartition(ssp.getStream(), ssp.getPartition().getPartitionId()); } diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala index 59a8854474..ac6f099824 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala @@ -21,22 +21,22 @@ package org.apache.samza.system.kafka import java.util.concurrent.ConcurrentHashMap -import kafka.common.TopicAndPartition +import org.apache.kafka.common.TopicPartition import org.apache.samza.metrics._ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper { - val offsets = new ConcurrentHashMap[TopicAndPartition, Counter] - val bytesRead = new ConcurrentHashMap[TopicAndPartition, Counter] - val reads = new ConcurrentHashMap[TopicAndPartition, Counter] - val lag = new ConcurrentHashMap[TopicAndPartition, Gauge[Long]] - val highWatermark = new ConcurrentHashMap[TopicAndPartition, Gauge[Long]] + val offsets = new ConcurrentHashMap[TopicPartition, Counter] + val bytesRead = new ConcurrentHashMap[TopicPartition, Counter] + val reads = new ConcurrentHashMap[TopicPartition, Counter] + val lag = new ConcurrentHashMap[TopicPartition, Gauge[Long]] + val highWatermark = new ConcurrentHashMap[TopicPartition, Gauge[Long]] val clientBytesRead = new ConcurrentHashMap[String, Counter] val clientReads = new ConcurrentHashMap[String, Counter] val clientSkippedFetchRequests = new ConcurrentHashMap[String, Counter] val topicPartitions = new ConcurrentHashMap[String, Gauge[Int]] - def registerTopicAndPartition(tp: TopicAndPartition) = { + def registerTopicPartition(tp: TopicPartition) = { if (!offsets.contains(tp)) { offsets.put(tp, newCounter("%s-%s-offset-change" format(tp.topic, tp.partition))) bytesRead.put(tp, newCounter("%s-%s-bytes-read" format(tp.topic, tp.partition))) @@ -59,12 +59,12 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr topicPartitions.get(clientName).set(value) } - def setLagValue(topicAndPartition: TopicAndPartition, value: Long) { - lag.get((topicAndPartition)).set(value); + def setLagValue(topicPartition: TopicPartition, value: Long) { + lag.get((topicPartition)).set(value); } - def setHighWatermarkValue(topicAndPartition: TopicAndPartition, value: Long) { - highWatermark.get((topicAndPartition)).set(value); + def setHighWatermarkValue(topicPartition: TopicPartition, value: Long) { + highWatermark.get((topicPartition)).set(value); } // Counters @@ -72,12 +72,12 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr clientReads.get(clientName).inc } - def incReads(topicAndPartition: TopicAndPartition) { - reads.get(topicAndPartition).inc; + def incReads(topicPartition: TopicPartition) { + reads.get(topicPartition).inc; } - def incBytesReads(topicAndPartition: TopicAndPartition, inc: Long) { - bytesRead.get(topicAndPartition).inc(inc); + def incBytesReads(topicPartition: TopicPartition, inc: Long) { + bytesRead.get(topicPartition).inc(inc); } def incClientBytesReads(clientName: String, incBytes: Long) { @@ -88,8 +88,8 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr clientSkippedFetchRequests.get(clientName).inc() } - def setOffsets(topicAndPartition: TopicAndPartition, offset: Long) { - offsets.get(topicAndPartition).set(offset) + def setOffsets(topicPartition: TopicPartition, offset: Long) { + offsets.get(topicPartition).set(offset) } override def getPrefix = systemName + "-" diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java index a28b23ea97..1d85f7b62f 100644 --- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java +++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java @@ -19,6 +19,7 @@ package org.apache.samza.system.kafka; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -173,6 +174,11 @@ public void close(long timeout, TimeUnit timeUnit) { new FlushRunnable(0).run(); } + @Override + public void close(Duration timeout) { + close(timeout.toMillis(), TimeUnit.MILLISECONDS); + } + public void open() { this.closed = false; openCount++; diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumerMetrics.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumerMetrics.java index 5cc6f84cc9..b5c6600f57 100644 --- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumerMetrics.java +++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumerMetrics.java @@ -20,7 +20,7 @@ import java.util.HashMap; import java.util.Map; -import kafka.common.TopicAndPartition; +import org.apache.kafka.common.TopicPartition; import org.apache.samza.metrics.Metric; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.metrics.ReadableMetricsRegistry; @@ -32,8 +32,8 @@ public class TestKafkaSystemConsumerMetrics { @Test public void testKafkaSystemConsumerMetrics() { String systemName = "system"; - TopicAndPartition tp1 = new TopicAndPartition("topic1", 1); - TopicAndPartition tp2 = new TopicAndPartition("topic2", 2); + TopicPartition tp1 = new TopicPartition("topic1", 1); + TopicPartition tp2 = new TopicPartition("topic2", 2); String clientName = "clientName"; // record expected values for further comparison @@ -43,8 +43,8 @@ public void testKafkaSystemConsumerMetrics() { KafkaSystemConsumerMetrics metrics = new KafkaSystemConsumerMetrics(systemName, registry); // initialize the metrics for the partitions - metrics.registerTopicAndPartition(tp1); - metrics.registerTopicAndPartition(tp2); + metrics.registerTopicPartition(tp1); + metrics.registerTopicPartition(tp2); // initialize the metrics for the host:port metrics.registerClientProxy(clientName); diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java index 6770f93a2d..b062758db1 100755 --- a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java +++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java @@ -19,9 +19,7 @@ package org.apache.samza.sql.client.impl; -import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; -import org.I0Itec.zkclient.ZkConnection; import org.I0Itec.zkclient.exception.ZkTimeoutException; import org.apache.calcite.rel.RelRoot; import org.apache.calcite.rel.type.RelDataTypeField; @@ -58,7 +56,6 @@ import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.collection.JavaConversions; import java.io.File; import java.io.IOException; @@ -84,6 +81,7 @@ public class SamzaExecutor implements SqlExecutor { // The maximum number of rows of data we keep when user pauses the display view and data accumulates. private static final int RANDOM_ACCESS_QUEUE_CAPACITY = 5000; private static final int DEFAULT_ZOOKEEPER_CLIENT_TIMEOUT = 20000; + private static final String ZOOKEEPER_BROKERS_TOPICS_PATH = "/brokers/topics"; private static RandomAccessQueue outputData = new RandomAccessQueue<>(OutgoingMessageEnvelope.class, RANDOM_ACCESS_QUEUE_CAPACITY); @@ -120,9 +118,8 @@ public List listTables(ExecutionContext context) throws ExecutorExceptio address = DEFAULT_SERVER_ADDRESS; } try { - ZkUtils zkUtils = new ZkUtils(new ZkClient(address, DEFAULT_ZOOKEEPER_CLIENT_TIMEOUT), - new ZkConnection(address), false); - return JavaConversions.seqAsJavaList(zkUtils.getAllTopics()) + ZkClient zkClient = new ZkClient(address, DEFAULT_ZOOKEEPER_CLIENT_TIMEOUT); + return zkClient.getChildren(ZOOKEEPER_BROKERS_TOPICS_PATH) .stream() .map(x -> SAMZA_SYSTEM_KAFKA + "." + x) .collect(Collectors.toList()); diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java index 1e5e24c826..9dd0b3f768 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java @@ -262,15 +262,14 @@ private void initConsumer(String bootstrapServer) { bootstrapServer, "group", "earliest", - 4096L, - "org.apache.kafka.clients.consumer.RangeAssignor", - 30000, + true, + false, + 500, SecurityProtocol.PLAINTEXT, Option$.MODULE$.empty(), Option$.MODULE$.empty(), new StringDeserializer(), - new ByteArrayDeserializer(), - Option$.MODULE$.empty()); + new ByteArrayDeserializer()); } private void initProcessorListener() { @@ -291,14 +290,16 @@ private void initProducer(String bootstrapServer) { 60 * 1000L, 1024L * 1024L, 0, - 0L, - 5 * 1000L, + 30 * 1000, + 0, + 16384, + "none", + 20 * 1000, SecurityProtocol.PLAINTEXT, null, Option$.MODULE$.apply(new Properties()), new StringSerializer(), - new ByteArraySerializer(), - Option$.MODULE$.apply(new Properties())); + new ByteArraySerializer()); } } } diff --git a/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractKafkaServerTestHarness.scala b/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractKafkaServerTestHarness.scala index ae07a9fc55..e60409f90f 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractKafkaServerTestHarness.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractKafkaServerTestHarness.scala @@ -106,7 +106,7 @@ abstract class AbstractKafkaServerTestHarness extends AbstractZookeeperTestHarne case e: Exception => println("Exception in setup") println(e) - TestUtils.fail(e.getMessage) + throw new AssertionError(e.getMessage) } }.toBuffer brokerList = TestUtils.getBrokerListStrFromServers(servers, securityProtocol) diff --git a/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractZookeeperTestHarness.scala b/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractZookeeperTestHarness.scala index 69fe68cc10..75420d0d43 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractZookeeperTestHarness.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractZookeeperTestHarness.scala @@ -18,10 +18,9 @@ */ package org.apache.samza.test.harness -import kafka.utils.{CoreUtils, Logging, ZkUtils} -import kafka.zk.{EmbeddedZookeeper, KafkaZkClient, ZkFourLetterWords} +import kafka.utils.{CoreUtils, Logging} +import kafka.zk.{EmbeddedZookeeper, ZkFourLetterWords} import org.junit.{After, Before} -import org.apache.kafka.common.security.JaasUtils import javax.security.auth.login.Configuration /** * Zookeeper test harness. @@ -33,7 +32,6 @@ abstract class AbstractZookeeperTestHarness extends Logging { val zkConnectionTimeout = 60000 val zkSessionTimeout = 60000 - var zkUtils: ZkUtils = null var zookeeper: EmbeddedZookeeper = null def zkPort: Int = zookeeper.port @@ -49,13 +47,10 @@ abstract class AbstractZookeeperTestHarness extends Logging { */ zookeeper.zookeeper.setMinSessionTimeout(120000) zookeeper.zookeeper.setMaxSessionTimeout(180000) - zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, JaasUtils.isZkSecurityEnabled) } @After def tearDown() { - if (zkUtils != null) - CoreUtils.swallow(zkUtils.close(), null) if (zookeeper != null) CoreUtils.swallow(zookeeper.shutdown(), null)