Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-2617: Upgrade Kafka Client to 2.3.1 #1462

Merged
merged 1 commit into from
Mar 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle/dependency-versions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
jodaTimeVersion = "2.2"
joptSimpleVersion = "5.0.4"
junitVersion = "4.12"
kafkaVersion = "2.0.1"
kafkaVersion = "2.3.1"
log4jVersion = "1.2.17"
log4j2Version = "2.12.0"
metricsVersion = "2.2.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import kafka.common.TopicAndPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.system.IncomingMessageEnvelope;
Expand Down Expand Up @@ -358,7 +357,6 @@ protected int getRecordSize(ConsumerRecord<K, V> r) {
}

private void updateMetrics(ConsumerRecord<K, V> r, TopicPartition tp) {
TopicAndPartition tap = KafkaSystemConsumer.toTopicAndPartition(tp);
SystemStreamPartition ssp = new SystemStreamPartition(systemName, tp.topic(), new Partition(tp.partition()));

Long lag = latestLags.get(ssp);
Expand All @@ -374,11 +372,11 @@ private void updateMetrics(ConsumerRecord<K, V> r, TopicPartition tp) {
long highWatermark = recordOffset + currentSSPLag; // derived value for the highwatermark

int size = getRecordSize(r);
kafkaConsumerMetrics.incReads(tap);
kafkaConsumerMetrics.incBytesReads(tap, size);
kafkaConsumerMetrics.setOffsets(tap, recordOffset);
kafkaConsumerMetrics.incReads(tp);
kafkaConsumerMetrics.incBytesReads(tp, size);
kafkaConsumerMetrics.setOffsets(tp, recordOffset);
kafkaConsumerMetrics.incClientBytesReads(metricName, size);
kafkaConsumerMetrics.setHighWatermarkValue(tap, highWatermark);
kafkaConsumerMetrics.setHighWatermarkValue(tp, highWatermark);
}

private void moveMessagesToTheirQueue(SystemStreamPartition ssp, List<IncomingMessageEnvelope> envelopes) {
Expand Down Expand Up @@ -437,7 +435,7 @@ private void refreshLagMetrics() {
for (Map.Entry<SystemStreamPartition, Long> e : nextOffsets.entrySet()) {
SystemStreamPartition ssp = e.getKey();
Long offset = e.getValue();
TopicAndPartition tp = new TopicAndPartition(ssp.getStream(), ssp.getPartition().getPartitionId());
TopicPartition tp = new TopicPartition(ssp.getStream(), ssp.getPartition().getPartitionId());
Long lag = latestLags.get(ssp);
LOG.trace("Latest offset of {} is {}; lag = {}", ssp, offset, lag);
if (lag != null && offset != null && lag >= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import kafka.common.TopicAndPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.KafkaConfig;
Expand Down Expand Up @@ -277,7 +276,7 @@ public void register(SystemStreamPartition systemStreamPartition, String offset)
topicPartitionsToOffset.put(topicPartition, offset);
}

metrics.registerTopicAndPartition(toTopicAndPartition(topicPartition));
metrics.registerTopicPartition(topicPartition);
}

/**
Expand Down Expand Up @@ -309,10 +308,6 @@ public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
return super.poll(systemStreamPartitions, timeout);
}

protected static TopicAndPartition toTopicAndPartition(TopicPartition topicPartition) {
return new TopicAndPartition(topicPartition.topic(), topicPartition.partition());
}

protected static TopicPartition toTopicPartition(SystemStreamPartition ssp) {
return new TopicPartition(ssp.getStream(), ssp.getPartition().getPartitionId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,22 @@ package org.apache.samza.system.kafka

import java.util.concurrent.ConcurrentHashMap

import kafka.common.TopicAndPartition
import org.apache.kafka.common.TopicPartition
import org.apache.samza.metrics._

class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
val offsets = new ConcurrentHashMap[TopicAndPartition, Counter]
val bytesRead = new ConcurrentHashMap[TopicAndPartition, Counter]
val reads = new ConcurrentHashMap[TopicAndPartition, Counter]
val lag = new ConcurrentHashMap[TopicAndPartition, Gauge[Long]]
val highWatermark = new ConcurrentHashMap[TopicAndPartition, Gauge[Long]]
val offsets = new ConcurrentHashMap[TopicPartition, Counter]
val bytesRead = new ConcurrentHashMap[TopicPartition, Counter]
val reads = new ConcurrentHashMap[TopicPartition, Counter]
val lag = new ConcurrentHashMap[TopicPartition, Gauge[Long]]
val highWatermark = new ConcurrentHashMap[TopicPartition, Gauge[Long]]

val clientBytesRead = new ConcurrentHashMap[String, Counter]
val clientReads = new ConcurrentHashMap[String, Counter]
val clientSkippedFetchRequests = new ConcurrentHashMap[String, Counter]
val topicPartitions = new ConcurrentHashMap[String, Gauge[Int]]

def registerTopicAndPartition(tp: TopicAndPartition) = {
def registerTopicPartition(tp: TopicPartition) = {
if (!offsets.contains(tp)) {
offsets.put(tp, newCounter("%s-%s-offset-change" format(tp.topic, tp.partition)))
bytesRead.put(tp, newCounter("%s-%s-bytes-read" format(tp.topic, tp.partition)))
Expand All @@ -59,25 +59,25 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr
topicPartitions.get(clientName).set(value)
}

def setLagValue(topicAndPartition: TopicAndPartition, value: Long) {
lag.get((topicAndPartition)).set(value);
def setLagValue(topicPartition: TopicPartition, value: Long) {
lag.get((topicPartition)).set(value);
}

def setHighWatermarkValue(topicAndPartition: TopicAndPartition, value: Long) {
highWatermark.get((topicAndPartition)).set(value);
def setHighWatermarkValue(topicPartition: TopicPartition, value: Long) {
highWatermark.get((topicPartition)).set(value);
}

// Counters
def incClientReads(clientName: String) {
clientReads.get(clientName).inc
}

def incReads(topicAndPartition: TopicAndPartition) {
reads.get(topicAndPartition).inc;
def incReads(topicPartition: TopicPartition) {
reads.get(topicPartition).inc;
}

def incBytesReads(topicAndPartition: TopicAndPartition, inc: Long) {
bytesRead.get(topicAndPartition).inc(inc);
def incBytesReads(topicPartition: TopicPartition, inc: Long) {
bytesRead.get(topicPartition).inc(inc);
}

def incClientBytesReads(clientName: String, incBytes: Long) {
Expand All @@ -88,8 +88,8 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr
clientSkippedFetchRequests.get(clientName).inc()
}

def setOffsets(topicAndPartition: TopicAndPartition, offset: Long) {
offsets.get(topicAndPartition).set(offset)
def setOffsets(topicPartition: TopicPartition, offset: Long) {
offsets.get(topicPartition).set(offset)
}

override def getPrefix = systemName + "-"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.samza.system.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -173,6 +174,11 @@ public void close(long timeout, TimeUnit timeUnit) {
new FlushRunnable(0).run();
}

@Override
public void close(Duration timeout) {
close(timeout.toMillis(), TimeUnit.MILLISECONDS);
}

public void open() {
this.closed = false;
openCount++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import java.util.HashMap;
import java.util.Map;
import kafka.common.TopicAndPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.samza.metrics.Metric;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.metrics.ReadableMetricsRegistry;
Expand All @@ -32,8 +32,8 @@ public class TestKafkaSystemConsumerMetrics {
@Test
public void testKafkaSystemConsumerMetrics() {
String systemName = "system";
TopicAndPartition tp1 = new TopicAndPartition("topic1", 1);
TopicAndPartition tp2 = new TopicAndPartition("topic2", 2);
TopicPartition tp1 = new TopicPartition("topic1", 1);
TopicPartition tp2 = new TopicPartition("topic2", 2);
String clientName = "clientName";

// record expected values for further comparison
Expand All @@ -43,8 +43,8 @@ public void testKafkaSystemConsumerMetrics() {
KafkaSystemConsumerMetrics metrics = new KafkaSystemConsumerMetrics(systemName, registry);

// initialize the metrics for the partitions
metrics.registerTopicAndPartition(tp1);
metrics.registerTopicAndPartition(tp2);
metrics.registerTopicPartition(tp1);
metrics.registerTopicPartition(tp2);

// initialize the metrics for the host:port
metrics.registerClientProxy(clientName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@

package org.apache.samza.sql.client.impl;

import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkTimeoutException;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataTypeField;
Expand Down Expand Up @@ -58,7 +56,6 @@
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConversions;

import java.io.File;
import java.io.IOException;
Expand All @@ -84,6 +81,7 @@ public class SamzaExecutor implements SqlExecutor {
// The maximum number of rows of data we keep when user pauses the display view and data accumulates.
private static final int RANDOM_ACCESS_QUEUE_CAPACITY = 5000;
private static final int DEFAULT_ZOOKEEPER_CLIENT_TIMEOUT = 20000;
private static final String ZOOKEEPER_BROKERS_TOPICS_PATH = "/brokers/topics";

private static RandomAccessQueue<OutgoingMessageEnvelope> outputData =
new RandomAccessQueue<>(OutgoingMessageEnvelope.class, RANDOM_ACCESS_QUEUE_CAPACITY);
Expand Down Expand Up @@ -120,9 +118,8 @@ public List<String> listTables(ExecutionContext context) throws ExecutorExceptio
address = DEFAULT_SERVER_ADDRESS;
}
try {
ZkUtils zkUtils = new ZkUtils(new ZkClient(address, DEFAULT_ZOOKEEPER_CLIENT_TIMEOUT),
new ZkConnection(address), false);
return JavaConversions.seqAsJavaList(zkUtils.getAllTopics())
ZkClient zkClient = new ZkClient(address, DEFAULT_ZOOKEEPER_CLIENT_TIMEOUT);
return zkClient.getChildren(ZOOKEEPER_BROKERS_TOPICS_PATH)
.stream()
.map(x -> SAMZA_SYSTEM_KAFKA + "." + x)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,15 +262,14 @@ private void initConsumer(String bootstrapServer) {
bootstrapServer,
"group",
"earliest",
4096L,
"org.apache.kafka.clients.consumer.RangeAssignor",
30000,
true,
false,
500,
SecurityProtocol.PLAINTEXT,
Option$.MODULE$.<File>empty(),
Option$.MODULE$.<Properties>empty(),
new StringDeserializer(),
new ByteArrayDeserializer(),
Option$.MODULE$.<Properties>empty());
new ByteArrayDeserializer());
}

private void initProcessorListener() {
Expand All @@ -291,14 +290,16 @@ private void initProducer(String bootstrapServer) {
60 * 1000L,
1024L * 1024L,
0,
0L,
5 * 1000L,
30 * 1000,
0,
16384,
"none",
20 * 1000,
SecurityProtocol.PLAINTEXT,
null,
Option$.MODULE$.<Properties>apply(new Properties()),
new StringSerializer(),
new ByteArraySerializer(),
Option$.MODULE$.<Properties>apply(new Properties()));
new ByteArraySerializer());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ abstract class AbstractKafkaServerTestHarness extends AbstractZookeeperTestHarne
case e: Exception =>
println("Exception in setup")
println(e)
TestUtils.fail(e.getMessage)
throw new AssertionError(e.getMessage)
}
}.toBuffer
brokerList = TestUtils.getBrokerListStrFromServers(servers, securityProtocol)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
*/
package org.apache.samza.test.harness

import kafka.utils.{CoreUtils, Logging, ZkUtils}
import kafka.zk.{EmbeddedZookeeper, KafkaZkClient, ZkFourLetterWords}
import kafka.utils.{CoreUtils, Logging}
import kafka.zk.{EmbeddedZookeeper, ZkFourLetterWords}
import org.junit.{After, Before}
import org.apache.kafka.common.security.JaasUtils
import javax.security.auth.login.Configuration
/**
* Zookeeper test harness.
Expand All @@ -33,7 +32,6 @@ abstract class AbstractZookeeperTestHarness extends Logging {
val zkConnectionTimeout = 60000
val zkSessionTimeout = 60000

var zkUtils: ZkUtils = null
var zookeeper: EmbeddedZookeeper = null

def zkPort: Int = zookeeper.port
Expand All @@ -49,13 +47,10 @@ abstract class AbstractZookeeperTestHarness extends Logging {
*/
zookeeper.zookeeper.setMinSessionTimeout(120000)
zookeeper.zookeeper.setMaxSessionTimeout(180000)
zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, JaasUtils.isZkSecurityEnabled)
}

@After
def tearDown() {
if (zkUtils != null)
CoreUtils.swallow(zkUtils.close(), null)
if (zookeeper != null)
CoreUtils.swallow(zookeeper.shutdown(), null)

Expand Down