Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
a31a7aa
reduce debugging from info to debug in KafkaCheckpointManager.java
Oct 16, 2017
410ce78
Merge branch 'master' of https://github.com/apache/samza
sborya Oct 17, 2017
d4620d6
Merge branch 'master' of https://github.com/apache/samza
sborya Oct 25, 2017
bbffb79
Merge branch 'master' of https://github.com/apache/samza
sborya Oct 25, 2017
010fa16
Merge branch 'master' of https://github.com/apache/samza
sborya Oct 25, 2017
5e6f5fb
Merge branch 'master' of https://github.com/apache/samza
Oct 25, 2017
06b1ac3
Merge branch 'master' of https://github.com/sborya/samza
Oct 25, 2017
1ad58d4
Merge branch 'master' of https://github.com/apache/samza
sborya Oct 31, 2017
dd39d08
Merge branch 'master' of https://github.com/apache/samza
sborya Nov 22, 2017
67e611e
Merge branch 'master' of https://github.com/apache/samza
sborya Jan 10, 2018
0edf343
Merge branch 'master' of https://github.com/apache/samza
sborya Jun 8, 2018
88f8559
Merge branch 'master' of https://github.com/apache/samza
sborya Aug 2, 2018
78ad578
Merge branch 'master' of https://github.com/apache/samza
sborya Aug 8, 2018
7887d88
Merge branch 'master' of https://github.com/apache/samza
sborya Aug 13, 2018
afb34d9
Merge branch 'master' of https://github.com/sborya/samza
sborya Aug 13, 2018
57fca52
Merge branch 'master' of https://github.com/apache/samza
sborya Aug 15, 2018
7f7b559
Merge branch 'master' of https://github.com/apache/samza
sborya Aug 22, 2018
8ab04b2
Merge branch 'master' of https://github.com/apache/samza
sborya Aug 30, 2018
add733b
Merge branch 'master' of https://github.com/apache/samza
sborya Sep 5, 2018
728dc18
Merge branch 'master' of https://github.com/apache/samza
sborya Sep 7, 2018
952dbbe
Merge branch 'master' of https://github.com/apache/samza
sborya Sep 11, 2018
927adff
Merge branch 'master' of https://github.com/apache/samza
sborya Sep 20, 2018
ceebdc3
Merge branch 'master' of https://github.com/apache/samza
sborya Sep 25, 2018
4f0a746
Merge branch 'master' of https://github.com/apache/samza
sborya Sep 26, 2018
cdd811d
Merge branch 'master' of https://github.com/apache/samza
Sep 26, 2018
f6966a8
Merge branch 'master' of https://github.com/apache/samza
Sep 27, 2018
450ea2e
Merge branch 'master' of https://github.com/apache/samza
Oct 1, 2018
5f5d7a3
Merge branch 'master' of https://github.com/apache/samza
Oct 2, 2018
058217e
Merge branch 'master' of https://github.com/apache/samza
Oct 4, 2018
7821072
Merge branch 'master' of https://github.com/apache/samza
Oct 8, 2018
74dd138
Merge branch 'master' of https://github.com/apache/samza
Oct 10, 2018
92180fd
Merge branch 'master' of https://github.com/apache/samza
Oct 11, 2018
712e68a
Merge branch 'master' of https://github.com/apache/samza
Oct 12, 2018
824e19a
restore deprecated kafka consumer classes
Oct 17, 2018
f1f4eba
resteore KafkaSystemFactory
Oct 17, 2018
1e40147
import
Oct 17, 2018
abb215d
KafkaSystemConsumer
Oct 17, 2018
d56feee
Merge branch 'master' of https://github.com/apache/samza
Oct 17, 2018
de5385e
BrokerProxy
Oct 18, 2018
b3b4183
KafkaSystemConsumerMetrics
Oct 18, 2018
8964955
Merge branch 'master' of https://github.com/apache/samza
Oct 18, 2018
57bf054
Merge branch 'master' into OldKafkaConsumer
Oct 18, 2018
353f903
delete java file
sborya Oct 18, 2018
ee95637
changed to new AdminClient
Oct 18, 2018
15f62c4
Merge branch 'master' of https://github.com/apache/samza
Oct 23, 2018
11c4dba
Merge branch 'master' into JavaAdminClient
Oct 23, 2018
21a16a3
use java AdminClient to create and remove topics
Oct 23, 2018
7d60510
Merge branch 'master' of https://github.com/apache/samza
Oct 23, 2018
8410445
added describe after create
Oct 24, 2018
70c6f2b
Merge branch 'master' of https://github.com/apache/samza
Oct 24, 2018
fcb7839
Merge branch 'master' of https://github.com/sborya/samza into OldKafk…
sborya Oct 25, 2018
bc7def0
Merge branch 'JavaAdminClient' of https://github.com/sborya/samza int…
sborya Oct 25, 2018
83c2a85
Merge branch 'master' into JavaAdminClient
Oct 25, 2018
31474da
Merge branch 'JavaAdminClient' of https://github.com/sborya/samza int…
sborya Oct 25, 2018
7ba6061
Merge branch 'master' into JavaAdminClient
sborya Oct 25, 2018
6f6cf92
Merge branch 'master' of https://github.com/apache/samza
Oct 29, 2018
e40adda
provide configs with the topic
Oct 31, 2018
84062e1
Merge branch 'JavaAdminClient' of https://github.com/sborya/samza int…
Oct 31, 2018
b23f31b
manually remove replication.factor form create stream configs
Oct 31, 2018
3b30f86
remove scala create/clear streams
Oct 31, 2018
8aa77a1
Merge branch 'master' of https://github.com/apache/samza
Oct 31, 2018
1016240
Merge branch 'master' of https://github.com/sborya/samza
Oct 31, 2018
f554c32
Merge branch 'master' into JavaAdminClient
Oct 31, 2018
d57a878
cleanup
Oct 31, 2018
c7d94c8
Merge branch 'master' of https://github.com/apache/samza
Nov 1, 2018
1fe59fd
use the correct AdminClient
Nov 1, 2018
e2bbb0a
Merge branch 'master' into JavaAdminClient
Nov 2, 2018
bdc69ae
Update samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaS…
shanthoosh Nov 2, 2018
1445e5d
Merge branch 'JavaAdminClient' of https://github.com/sborya/samza int…
Nov 2, 2018
3b5b3dc
review
Nov 2, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,29 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import kafka.admin.AdminClient;
import kafka.utils.ZkUtils;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.KafkaConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.SystemConfig;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.StreamValidationException;
Expand Down Expand Up @@ -73,16 +81,17 @@ public class KafkaSystemAdmin implements SystemAdmin {
protected static final long DEFAULT_EXPONENTIAL_SLEEP_MAX_DELAY_MS = 10000;
protected static final int MAX_RETRIES_ON_EXCEPTION = 5;
protected static final int DEFAULT_REPL_FACTOR = 2;
private static final int KAFKA_ADMIN_OPS_TIMEOUT_MS = 50000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Do we need to make this to be configurable by the user.
  2. Why do we need to hardcode this to 50 seconds? Isn't that too high?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think user need to configure it. But I think you are right - it is too high. I will change it to 10 seconds (in case brokers are loaded), to match today's max delay.


// used in TestRepartitionJoinWindowApp TODO - remove SAMZA-1945
@VisibleForTesting
public static volatile boolean deleteMessageCalled = false;

protected final String systemName;
protected final Consumer metadataConsumer;
protected final Config config;
protected final Config config;

protected AdminClient adminClient = null;
protected kafka.admin.AdminClient adminClientForDelete = null;

// Custom properties to create a new coordinator stream.
private final Properties coordinatorStreamProperties;
Expand All @@ -99,6 +108,9 @@ public class KafkaSystemAdmin implements SystemAdmin {
// used for intermediate streams
protected final boolean deleteCommittedMessages;

// admin client for create/remove topics
final AdminClient adminClient;

private final AtomicBoolean stopped = new AtomicBoolean(false);

public KafkaSystemAdmin(String systemName, Config config, Consumer metadataConsumer) {
Expand All @@ -111,6 +123,10 @@ public KafkaSystemAdmin(String systemName, Config config, Consumer metadataConsu
}
this.metadataConsumer = metadataConsumer;

Properties props = createAdminClientProperties();
LOG.info("New admin client with props:" + props);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to use two different adminClient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because deleteMessages() is only provided by the old kafka.admin.AdminClient. We will get rid of it when we move to the new Kafka (1.0)

adminClient = AdminClient.create(props);

KafkaConfig kafkaConfig = new KafkaConfig(config);
coordinatorStreamReplicationFactor = Integer.valueOf(kafkaConfig.getCoordinatorReplicationFactor());
coordinatorStreamProperties = KafkaSystemAdminUtilsScala.getCoordinatorTopicProperties(kafkaConfig);
Expand Down Expand Up @@ -167,13 +183,17 @@ public void stop() {
LOG.warn("metadataConsumer.close for system " + systemName + " failed with exception.", e);
}
}
if (adminClient != null) {
if (adminClientForDelete != null) {
try {
adminClient.close();
adminClientForDelete.close();
} catch (Exception e) {
LOG.warn("adminClient.close for system " + systemName + " failed with exception.", e);
LOG.warn("AdminClient.close() for system {} failed with exception {}.", systemName, e);
}
}

if (adminClient != null) {
adminClient.close();
}
}

/**
Expand Down Expand Up @@ -481,18 +501,62 @@ public Integer offsetComparator(String offset1, String offset2) {
@Override
public boolean createStream(StreamSpec streamSpec) {
LOG.info("Creating Kafka topic: {} on system: {}", streamSpec.getPhysicalName(), streamSpec.getSystemName());
final String REPL_FACTOR = "replication.factor";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Would be better to move these static final constants to beginning of the class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually yes, but this is kind of unfortunate hack, and it should be used only in this method. So I'd prefer to keep it here.


KafkaStreamSpec kSpec = toKafkaSpec(streamSpec);
String topicName = kSpec.getPhysicalName();

// create topic.
NewTopic newTopic = new NewTopic(topicName, kSpec.getPartitionCount(), (short) kSpec.getReplicationFactor());

// specify the configs
Map<String, String> streamConfig = new HashMap(streamSpec.getConfig());
// HACK - replication.factor is invalid config for AdminClient.createTopics
if (streamConfig.containsKey(REPL_FACTOR)) {
String repl = streamConfig.get(REPL_FACTOR);
LOG.warn("Configuration {}={} for topic={} is invalid. Using kSpec repl factor {}",
REPL_FACTOR, repl, kSpec.getPhysicalName(), kSpec.getReplicationFactor());
streamConfig.remove(REPL_FACTOR);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should preserve the replicationFactor value defined in the configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should pass it in the spec.
You cannot pass it to the configs - it will fail config validation.

}
newTopic.configs(new MapConfig(streamConfig));
CreateTopicsResult result = adminClient.createTopics(ImmutableSet.of(newTopic));
try {
result.all().get(KAFKA_ADMIN_OPS_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (Exception e) {
if (e instanceof TopicExistsException || e.getCause() instanceof TopicExistsException) {
LOG.info("Topic {} already exists.", topicName);
return false;
}

return KafkaSystemAdminUtilsScala.createStream(toKafkaSpec(streamSpec), getZkConnection());
throw new SamzaException(String.format("Creation of topic %s failed.", topicName), e);
}
LOG.info("Successfully created topic {}", topicName);
DescribeTopicsResult desc = adminClient.describeTopics(ImmutableSet.of(topicName));
try {
TopicDescription td = desc.all().get(KAFKA_ADMIN_OPS_TIMEOUT_MS, TimeUnit.MILLISECONDS).get(topicName);
LOG.info("Topic {} created with {}", topicName, td);
return true;
} catch (Exception e) {
LOG.error("'Describe after create' failed for topic " + topicName, e);
return false;
}
}

@Override
public boolean clearStream(StreamSpec streamSpec) {
LOG.info("Creating Kafka topic: {} on system: {}", streamSpec.getPhysicalName(), streamSpec.getSystemName());

KafkaSystemAdminUtilsScala.clearStream(streamSpec, getZkConnection());
String topicName = streamSpec.getPhysicalName();

Map<String, List<PartitionInfo>> topicsMetadata = getTopicMetadata(ImmutableSet.of(streamSpec.getPhysicalName()));
return topicsMetadata.get(streamSpec.getPhysicalName()).isEmpty();
try {
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(ImmutableSet.of(topicName));
deleteTopicsResult.all().get(KAFKA_ADMIN_OPS_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (Exception e) {
LOG.error("Failed to delete topic {} with exception {}.", topicName, e);
return false;
}

return true;
}

/**
Expand Down Expand Up @@ -566,10 +630,10 @@ Map<String, List<PartitionInfo>> getTopicMetadata(Set<String> topics) {
@Override
public void deleteMessages(Map<SystemStreamPartition, String> offsets) {
if (deleteCommittedMessages) {
if (adminClient == null) {
adminClient = AdminClient.create(createAdminClientProperties());
if (adminClientForDelete == null) {
adminClientForDelete = kafka.admin.AdminClient.create(createAdminClientProperties());
}
KafkaSystemAdminUtilsScala.deleteMessages(adminClient, offsets);
KafkaSystemAdminUtilsScala.deleteMessages(adminClientForDelete, offsets);
deleteMessageCalled = true;
}
}
Expand All @@ -594,7 +658,6 @@ protected Properties createAdminClientProperties() {
}
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);


// kafka.admin.AdminUtils requires zkConnect
// this will change after we move to the new org.apache..AdminClient
String zkConnect =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,18 @@ package org.apache.samza.system.kafka
import java.util
import java.util.Properties

import kafka.admin.{AdminClient, AdminUtils}
import kafka.utils.{Logging, ZkUtils}
import kafka.admin.AdminClient
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.TopicExistsException
import org.apache.samza.config.ApplicationConfig.ApplicationMode
import org.apache.samza.config.{ApplicationConfig, Config, KafkaConfig, StreamConfig}
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
import org.apache.samza.system.{StreamSpec, SystemStreamMetadata, SystemStreamPartition}
import org.apache.samza.util.ExponentialSleepStrategy
import org.slf4j.{Logger, LoggerFactory}
import org.apache.samza.system.{SystemStreamMetadata, SystemStreamPartition}

import scala.collection.JavaConverters._

/**
* A helper class that is used to construct the changelog stream specific information
* A helper class for KafkaSystemAdmin
*
* @param replicationFactor The number of replicas for the changelog stream
* @param kafkaProps The kafka specific properties that need to be used for changelog stream creation
Expand All @@ -47,88 +44,6 @@ case class ChangelogInfo(var replicationFactor: Int, var kafkaProps: Properties)
// TODO move to org.apache.kafka.clients.admin.AdminClien from the kafka.admin.AdminClient
object KafkaSystemAdminUtilsScala extends Logging {

val CLEAR_STREAM_RETRIES = 3
val CREATE_STREAM_RETRIES = 10

/**
* @inheritdoc
*
* Delete a stream in Kafka. Deleting topics works only when the broker is configured with "delete.topic.enable=true".
* Otherwise it's a no-op.
*/
def clearStream(spec: StreamSpec, connectZk: java.util.function.Supplier[ZkUtils]): Unit = {
info("Deleting topic %s for system %s" format(spec.getPhysicalName, spec.getSystemName))
val kSpec = KafkaStreamSpec.fromSpec(spec)
var retries = CLEAR_STREAM_RETRIES
new ExponentialSleepStrategy().run(
loop => {
val zkClient = connectZk.get()
try {
AdminUtils.deleteTopic(
zkClient,
kSpec.getPhysicalName)
} finally {
zkClient.close
}

loop.done
},

(exception, loop) => {
if (retries > 0) {
warn("Exception while trying to delete topic %s. Retrying." format (spec.getPhysicalName), exception)
retries -= 1
} else {
warn("Fail to delete topic %s." format (spec.getPhysicalName), exception)
loop.done
throw exception
}
})
}


def createStream(kSpec: KafkaStreamSpec, connectZk: java.util.function.Supplier[ZkUtils]): Boolean = {
info("Creating topic %s for system %s" format(kSpec.getPhysicalName, kSpec.getSystemName))
var streamCreated = false
var retries = CREATE_STREAM_RETRIES

new ExponentialSleepStrategy(initialDelayMs = 500).run(
loop => {
val zkClient = connectZk.get()
try {
AdminUtils.createTopic(
zkClient,
kSpec.getPhysicalName,
kSpec.getPartitionCount,
kSpec.getReplicationFactor,
kSpec.getProperties)
} finally {
zkClient.close
}

streamCreated = true
loop.done
},

(exception, loop) => {
exception match {
case e: TopicExistsException =>
streamCreated = false
loop.done
case e: Exception =>
if (retries > 0) {
warn("Failed to create topic %s. Retrying." format (kSpec.getPhysicalName), exception)
retries -= 1
} else {
error("Failed to create topic %s. Bailing out." format (kSpec.getPhysicalName), exception)
throw exception
}
}
})

streamCreated
}

/**
* A helper method that takes oldest, newest, and upcoming offsets for each
* system stream partition, and creates a single map from stream name to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@

import com.google.common.collect.ImmutableSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import kafka.api.TopicMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.samza.Partition;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
Expand Down Expand Up @@ -221,10 +220,10 @@ public void testCreateChangelogStreamHelp(final String topic) {
@Test
public void testCreateStream() {
StreamSpec spec = new StreamSpec("testId", "testStream", "testSystem", 8);

KafkaSystemAdmin admin = systemAdmin();
assertTrue("createStream should return true if the stream does not exist and then is created.",
systemAdmin().createStream(spec));
systemAdmin().validateStream(spec);
admin.createStream(spec));
admin.validateStream(spec);

assertFalse("createStream should return false if the stream already exists.", systemAdmin().createStream(spec));
}
Expand Down Expand Up @@ -259,16 +258,28 @@ public void testValidateStreamWrongName() {
systemAdmin().validateStream(spec2);
}

//@Test //TODO - currently the connection to ZK fails, but since it checks for empty, the tests succeeds. SAMZA-1887
@Test
public void testClearStream() {
StreamSpec spec = new StreamSpec("testId", "testStreamClear", "testSystem", 8);

assertTrue("createStream should return true if the stream does not exist and then is created.",
systemAdmin().createStream(spec));
assertTrue(systemAdmin().clearStream(spec));
KafkaSystemAdmin admin = systemAdmin();
String topicName = spec.getPhysicalName();

assertTrue("createStream should return true if the stream does not exist and then is created.", admin.createStream(spec));
// validate topic exists
assertTrue(admin.clearStream(spec));

ImmutableSet<String> topics = ImmutableSet.of(spec.getPhysicalName());
Map<String, List<PartitionInfo>> metadata = systemAdmin().getTopicMetadata(topics);
assertTrue(metadata.get(spec.getPhysicalName()).isEmpty());
// validate that topic was removed
DescribeTopicsResult dtr = admin.adminClient.describeTopics(ImmutableSet.of(topicName));
try {
TopicDescription td = dtr.all().get().get(topicName);
Assert.fail("topic " + topicName + " should've been removed. td=" + td);
} catch (Exception e) {
if (e.getCause() instanceof org.apache.kafka.common.errors.UnknownTopicOrPartitionException) {
// expected
} else {
Assert.fail("topic " + topicName + " should've been removed. Expected UnknownTopicOrPartitionException.");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
kcm1.createResources
kcm1.start
kcm1.stop

// check that start actually creates the topic with log compaction enabled
val zkClient = ZkUtils(zkConnect, 6000, 6000, JaasUtils.isZkSecurityEnabled())
val topicConfig = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, checkpointTopic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness {
var systemAdmin: KafkaSystemAdmin = null

override def generateConfigs(): Seq[KafkaConfig] = {
val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, true)
val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, enableDeleteTopic = true)
props.map(KafkaConfig.fromProps)
}

Expand Down