Skip to content

Commit

Permalink
SAMZA-2617: Upgrade Kafka Client to 2.3.1 (#1462)
Browse files Browse the repository at this point in the history
  • Loading branch information
perkss committed Mar 3, 2021
1 parent 93c41df commit 89b71ed
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 61 deletions.
2 changes: 1 addition & 1 deletion gradle/dependency-versions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
jodaTimeVersion = "2.2"
joptSimpleVersion = "5.0.4"
junitVersion = "4.12"
kafkaVersion = "2.0.1"
kafkaVersion = "2.3.1"
log4jVersion = "1.2.17"
log4j2Version = "2.12.0"
metricsVersion = "2.2.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import kafka.common.TopicAndPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.system.IncomingMessageEnvelope;
Expand Down Expand Up @@ -358,7 +357,6 @@ protected int getRecordSize(ConsumerRecord<K, V> r) {
}

private void updateMetrics(ConsumerRecord<K, V> r, TopicPartition tp) {
TopicAndPartition tap = KafkaSystemConsumer.toTopicAndPartition(tp);
SystemStreamPartition ssp = new SystemStreamPartition(systemName, tp.topic(), new Partition(tp.partition()));

Long lag = latestLags.get(ssp);
Expand All @@ -374,11 +372,11 @@ private void updateMetrics(ConsumerRecord<K, V> r, TopicPartition tp) {
long highWatermark = recordOffset + currentSSPLag; // derived value for the highwatermark

int size = getRecordSize(r);
kafkaConsumerMetrics.incReads(tap);
kafkaConsumerMetrics.incBytesReads(tap, size);
kafkaConsumerMetrics.setOffsets(tap, recordOffset);
kafkaConsumerMetrics.incReads(tp);
kafkaConsumerMetrics.incBytesReads(tp, size);
kafkaConsumerMetrics.setOffsets(tp, recordOffset);
kafkaConsumerMetrics.incClientBytesReads(metricName, size);
kafkaConsumerMetrics.setHighWatermarkValue(tap, highWatermark);
kafkaConsumerMetrics.setHighWatermarkValue(tp, highWatermark);
}

private void moveMessagesToTheirQueue(SystemStreamPartition ssp, List<IncomingMessageEnvelope> envelopes) {
Expand Down Expand Up @@ -437,7 +435,7 @@ private void refreshLagMetrics() {
for (Map.Entry<SystemStreamPartition, Long> e : nextOffsets.entrySet()) {
SystemStreamPartition ssp = e.getKey();
Long offset = e.getValue();
TopicAndPartition tp = new TopicAndPartition(ssp.getStream(), ssp.getPartition().getPartitionId());
TopicPartition tp = new TopicPartition(ssp.getStream(), ssp.getPartition().getPartitionId());
Long lag = latestLags.get(ssp);
LOG.trace("Latest offset of {} is {}; lag = {}", ssp, offset, lag);
if (lag != null && offset != null && lag >= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import kafka.common.TopicAndPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.KafkaConfig;
Expand Down Expand Up @@ -277,7 +276,7 @@ public void register(SystemStreamPartition systemStreamPartition, String offset)
topicPartitionsToOffset.put(topicPartition, offset);
}

metrics.registerTopicAndPartition(toTopicAndPartition(topicPartition));
metrics.registerTopicPartition(topicPartition);
}

/**
Expand Down Expand Up @@ -309,10 +308,6 @@ public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
return super.poll(systemStreamPartitions, timeout);
}

protected static TopicAndPartition toTopicAndPartition(TopicPartition topicPartition) {
return new TopicAndPartition(topicPartition.topic(), topicPartition.partition());
}

protected static TopicPartition toTopicPartition(SystemStreamPartition ssp) {
return new TopicPartition(ssp.getStream(), ssp.getPartition().getPartitionId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,22 @@ package org.apache.samza.system.kafka

import java.util.concurrent.ConcurrentHashMap

import kafka.common.TopicAndPartition
import org.apache.kafka.common.TopicPartition
import org.apache.samza.metrics._

class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
val offsets = new ConcurrentHashMap[TopicAndPartition, Counter]
val bytesRead = new ConcurrentHashMap[TopicAndPartition, Counter]
val reads = new ConcurrentHashMap[TopicAndPartition, Counter]
val lag = new ConcurrentHashMap[TopicAndPartition, Gauge[Long]]
val highWatermark = new ConcurrentHashMap[TopicAndPartition, Gauge[Long]]
val offsets = new ConcurrentHashMap[TopicPartition, Counter]
val bytesRead = new ConcurrentHashMap[TopicPartition, Counter]
val reads = new ConcurrentHashMap[TopicPartition, Counter]
val lag = new ConcurrentHashMap[TopicPartition, Gauge[Long]]
val highWatermark = new ConcurrentHashMap[TopicPartition, Gauge[Long]]

val clientBytesRead = new ConcurrentHashMap[String, Counter]
val clientReads = new ConcurrentHashMap[String, Counter]
val clientSkippedFetchRequests = new ConcurrentHashMap[String, Counter]
val topicPartitions = new ConcurrentHashMap[String, Gauge[Int]]

def registerTopicAndPartition(tp: TopicAndPartition) = {
def registerTopicPartition(tp: TopicPartition) = {
if (!offsets.contains(tp)) {
offsets.put(tp, newCounter("%s-%s-offset-change" format(tp.topic, tp.partition)))
bytesRead.put(tp, newCounter("%s-%s-bytes-read" format(tp.topic, tp.partition)))
Expand All @@ -59,25 +59,25 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr
topicPartitions.get(clientName).set(value)
}

def setLagValue(topicAndPartition: TopicAndPartition, value: Long) {
lag.get((topicAndPartition)).set(value);
def setLagValue(topicPartition: TopicPartition, value: Long) {
lag.get((topicPartition)).set(value);
}

def setHighWatermarkValue(topicAndPartition: TopicAndPartition, value: Long) {
highWatermark.get((topicAndPartition)).set(value);
def setHighWatermarkValue(topicPartition: TopicPartition, value: Long) {
highWatermark.get((topicPartition)).set(value);
}

// Counters
def incClientReads(clientName: String) {
clientReads.get(clientName).inc
}

def incReads(topicAndPartition: TopicAndPartition) {
reads.get(topicAndPartition).inc;
def incReads(topicPartition: TopicPartition) {
reads.get(topicPartition).inc;
}

def incBytesReads(topicAndPartition: TopicAndPartition, inc: Long) {
bytesRead.get(topicAndPartition).inc(inc);
def incBytesReads(topicPartition: TopicPartition, inc: Long) {
bytesRead.get(topicPartition).inc(inc);
}

def incClientBytesReads(clientName: String, incBytes: Long) {
Expand All @@ -88,8 +88,8 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr
clientSkippedFetchRequests.get(clientName).inc()
}

def setOffsets(topicAndPartition: TopicAndPartition, offset: Long) {
offsets.get(topicAndPartition).set(offset)
def setOffsets(topicPartition: TopicPartition, offset: Long) {
offsets.get(topicPartition).set(offset)
}

override def getPrefix = systemName + "-"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.samza.system.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -173,6 +174,11 @@ public void close(long timeout, TimeUnit timeUnit) {
new FlushRunnable(0).run();
}

@Override
public void close(Duration timeout) {
close(timeout.toMillis(), TimeUnit.MILLISECONDS);
}

public void open() {
this.closed = false;
openCount++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import java.util.HashMap;
import java.util.Map;
import kafka.common.TopicAndPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.samza.metrics.Metric;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.metrics.ReadableMetricsRegistry;
Expand All @@ -32,8 +32,8 @@ public class TestKafkaSystemConsumerMetrics {
@Test
public void testKafkaSystemConsumerMetrics() {
String systemName = "system";
TopicAndPartition tp1 = new TopicAndPartition("topic1", 1);
TopicAndPartition tp2 = new TopicAndPartition("topic2", 2);
TopicPartition tp1 = new TopicPartition("topic1", 1);
TopicPartition tp2 = new TopicPartition("topic2", 2);
String clientName = "clientName";

// record expected values for further comparison
Expand All @@ -43,8 +43,8 @@ public void testKafkaSystemConsumerMetrics() {
KafkaSystemConsumerMetrics metrics = new KafkaSystemConsumerMetrics(systemName, registry);

// initialize the metrics for the partitions
metrics.registerTopicAndPartition(tp1);
metrics.registerTopicAndPartition(tp2);
metrics.registerTopicPartition(tp1);
metrics.registerTopicPartition(tp2);

// initialize the metrics for the host:port
metrics.registerClientProxy(clientName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@

package org.apache.samza.sql.client.impl;

import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkTimeoutException;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataTypeField;
Expand Down Expand Up @@ -58,7 +56,6 @@
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConversions;

import java.io.File;
import java.io.IOException;
Expand All @@ -84,6 +81,7 @@ public class SamzaExecutor implements SqlExecutor {
// The maximum number of rows of data we keep when user pauses the display view and data accumulates.
private static final int RANDOM_ACCESS_QUEUE_CAPACITY = 5000;
private static final int DEFAULT_ZOOKEEPER_CLIENT_TIMEOUT = 20000;
private static final String ZOOKEEPER_BROKERS_TOPICS_PATH = "/brokers/topics";

private static RandomAccessQueue<OutgoingMessageEnvelope> outputData =
new RandomAccessQueue<>(OutgoingMessageEnvelope.class, RANDOM_ACCESS_QUEUE_CAPACITY);
Expand Down Expand Up @@ -120,9 +118,8 @@ public List<String> listTables(ExecutionContext context) throws ExecutorExceptio
address = DEFAULT_SERVER_ADDRESS;
}
try {
ZkUtils zkUtils = new ZkUtils(new ZkClient(address, DEFAULT_ZOOKEEPER_CLIENT_TIMEOUT),
new ZkConnection(address), false);
return JavaConversions.seqAsJavaList(zkUtils.getAllTopics())
ZkClient zkClient = new ZkClient(address, DEFAULT_ZOOKEEPER_CLIENT_TIMEOUT);
return zkClient.getChildren(ZOOKEEPER_BROKERS_TOPICS_PATH)
.stream()
.map(x -> SAMZA_SYSTEM_KAFKA + "." + x)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,15 +262,14 @@ private void initConsumer(String bootstrapServer) {
bootstrapServer,
"group",
"earliest",
4096L,
"org.apache.kafka.clients.consumer.RangeAssignor",
30000,
true,
false,
500,
SecurityProtocol.PLAINTEXT,
Option$.MODULE$.<File>empty(),
Option$.MODULE$.<Properties>empty(),
new StringDeserializer(),
new ByteArrayDeserializer(),
Option$.MODULE$.<Properties>empty());
new ByteArrayDeserializer());
}

private void initProcessorListener() {
Expand All @@ -291,14 +290,16 @@ private void initProducer(String bootstrapServer) {
60 * 1000L,
1024L * 1024L,
0,
0L,
5 * 1000L,
30 * 1000,
0,
16384,
"none",
20 * 1000,
SecurityProtocol.PLAINTEXT,
null,
Option$.MODULE$.<Properties>apply(new Properties()),
new StringSerializer(),
new ByteArraySerializer(),
Option$.MODULE$.<Properties>apply(new Properties()));
new ByteArraySerializer());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ abstract class AbstractKafkaServerTestHarness extends AbstractZookeeperTestHarne
case e: Exception =>
println("Exception in setup")
println(e)
TestUtils.fail(e.getMessage)
throw new AssertionError(e.getMessage)
}
}.toBuffer
brokerList = TestUtils.getBrokerListStrFromServers(servers, securityProtocol)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
*/
package org.apache.samza.test.harness

import kafka.utils.{CoreUtils, Logging, ZkUtils}
import kafka.zk.{EmbeddedZookeeper, KafkaZkClient, ZkFourLetterWords}
import kafka.utils.{CoreUtils, Logging}
import kafka.zk.{EmbeddedZookeeper, ZkFourLetterWords}
import org.junit.{After, Before}
import org.apache.kafka.common.security.JaasUtils
import javax.security.auth.login.Configuration
/**
* Zookeeper test harness.
Expand All @@ -33,7 +32,6 @@ abstract class AbstractZookeeperTestHarness extends Logging {
val zkConnectionTimeout = 60000
val zkSessionTimeout = 60000

var zkUtils: ZkUtils = null
var zookeeper: EmbeddedZookeeper = null

def zkPort: Int = zookeeper.port
Expand All @@ -49,13 +47,10 @@ abstract class AbstractZookeeperTestHarness extends Logging {
*/
zookeeper.zookeeper.setMinSessionTimeout(120000)
zookeeper.zookeeper.setMaxSessionTimeout(180000)
zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, JaasUtils.isZkSecurityEnabled)
}

@After
def tearDown() {
if (zkUtils != null)
CoreUtils.swallow(zkUtils.close(), null)
if (zookeeper != null)
CoreUtils.swallow(zookeeper.shutdown(), null)

Expand Down

0 comments on commit 89b71ed

Please sign in to comment.