Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-2832: Add a consumer config option to exclude internal topics #932

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ public class ConsumerConfig extends AbstractConfig {
public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records";
private static final String MAX_POLL_RECORDS_DOC = "The maximum number of records returned in a single call to poll().";

/** <code>exclude.internal.topics</code> */
public static final String EXCLUDE_INTERNAL_TOPICS_CONFIG = "exclude.internal.topics";
private static final String EXCLUDE_INTERNAL_TOPICS_DOC = "Whether records from internal topics (such as offsets) should be exposed to the consumer. "
+ "If set to <code>true</code> the only way to receive records from an internal topic is subscribing to it.";

static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
Expand Down Expand Up @@ -316,6 +321,11 @@ public class ConsumerConfig extends AbstractConfig {
atLeast(1),
Importance.MEDIUM,
MAX_POLL_RECORDS_DOC)
.define(EXCLUDE_INTERNAL_TOPICS_CONFIG,
Type.BOOLEAN,
true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the default be false for compatibility?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji I have a comment higher up in this PR that is relevant. The old behavior (old consumer) was to exclude them by default. The thought was this was likely the most common use case for users and it may be worth changing now as many users likely just haven't run into the issue yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. Probably the risk of users accidentally subscribing to the offsets topic offsets any inconvenience for users who are actually trying to do so.

Importance.MEDIUM,
EXCLUDE_INTERNAL_TOPICS_DOC)

// security support
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,8 @@ private KafkaConsumer(ConsumerConfig config,
new ConsumerCoordinator.DefaultOffsetCommitCallback(),
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
this.interceptors);
this.interceptors,
config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG));
if (keyDeserializer == null) {
this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.CommonDefs;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupAuthorizationException;
Expand Down Expand Up @@ -69,6 +70,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
private final boolean autoCommitEnabled;
private final AutoCommitTask autoCommitTask;
private final ConsumerInterceptors interceptors;
private final boolean excludeInternalTopics;

/**
* Initialize the coordination manager.
Expand All @@ -87,7 +89,8 @@ public ConsumerCoordinator(ConsumerNetworkClient client,
OffsetCommitCallback defaultOffsetCommitCallback,
boolean autoCommitEnabled,
long autoCommitIntervalMs,
ConsumerInterceptors interceptors) {
ConsumerInterceptors interceptors,
boolean excludeInternalTopics) {
super(client,
groupId,
sessionTimeoutMs,
Expand All @@ -110,6 +113,7 @@ public ConsumerCoordinator(ConsumerNetworkClient client,
this.autoCommitTask = autoCommitEnabled ? new AutoCommitTask(autoCommitIntervalMs) : null;
this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
this.interceptors = interceptors;
this.excludeInternalTopics = excludeInternalTopics;
}

@Override
Expand Down Expand Up @@ -140,7 +144,8 @@ public void onMetadataUpdate(Cluster cluster) {
final List<String> topicsToSubscribe = new ArrayList<>();

for (String topic : cluster.topics())
if (subscriptions.getSubscribedPattern().matcher(topic).matches())
if (subscriptions.getSubscribedPattern().matcher(topic).matches() &&
!(excludeInternalTopics && CommonDefs.INTERNAL_TOPICS.contains(topic)))
topicsToSubscribe.add(topic);

subscriptions.changeSubscription(topicsToSubscribe);
Expand Down
31 changes: 31 additions & 0 deletions clients/src/main/java/org/apache/kafka/common/CommonDefs.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

/**
* Common definitions used in client-side tools
*/
public final class CommonDefs {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of the overly generic CommonDefs, why not call this Topics or something like that?

// TODO: we store both group metadata and offset data here despite the topic name being offsets only
public static final String GROUP_METADATA_TOPIC_NAME = "__consumer_offsets";
public static final Set<String> INTERNAL_TOPICS = Collections.unmodifiableSet(new HashSet<String>(Arrays.asList(GROUP_METADATA_TOPIC_NAME)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want anything related to internal topics to be client side? This could change in brokers from version to version and the clients should still work. I understand that for now we have no way to get that information, but we will soon (KAFKA-3306). I imagine removing the client side list would be part of the cleanup once thats available. So whatever exists in the mean time should be private so we don't need a deprecation cycle.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this should on an internal package (eg common.internals).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma I assume you mean moving the whole class under common.internals. I can do that.

@granthenke Sorry for the naive question. INTERNAL_TOPICS is being used in ConsumerCoordinator; so making it private would break how it's currently being used there. Making it private could also cause issues with implementing @gwenshap's comment to remove Topic.InternalTopics and use this new INTERNAL_TOPICS instead. Could you please clarify? Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vahidhashemian, yes, that's what I mean.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gwenshap meant that kafka.common.Topic.InternalTopics should be removed in favour of the INTERNAL_TOPICS defined in this PR.

}
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,8 @@ private ConsumerCoordinator buildCoordinator(Metrics metrics, List<PartitionAssi
defaultOffsetCommitCallback,
autoCommitEnabled,
autoCommitIntervalMs,
null);
null,
false);
}

private Struct consumerMetadataResponse(Node node, short error) {
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/kafka/admin/TopicCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package kafka.admin

import java.util.Properties

import joptsimple._
import kafka.common.{AdminCommandFailedException, Topic, TopicExistsException}
import kafka.consumer.{ConsumerConfig, Whitelist}
Expand All @@ -30,9 +29,9 @@ import kafka.utils._
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.Utils

import scala.collection.JavaConversions._
import scala.collection._
import org.apache.kafka.common.CommonDefs


object TopicCommand extends Logging {
Expand Down Expand Up @@ -136,7 +135,7 @@ object TopicCommand extends Logging {
}

if(opts.options.has(opts.partitionsOpt)) {
if (topic == GroupCoordinator.GroupMetadataTopicName) {
if (topic == CommonDefs.GROUP_METADATA_TOPIC_NAME) {
throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.")
}
println("WARNING: If partitions are increased for a topic that has a key, the partition " +
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/common/Topic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ package kafka.common

import util.matching.Regex
import kafka.coordinator.GroupCoordinator
import org.apache.kafka.common.CommonDefs

object Topic {
val legalChars = "[a-zA-Z0-9\\._\\-]"
private val maxNameLength = 255
private val rgx = new Regex(legalChars + "+")

val InternalTopics = Set(GroupCoordinator.GroupMetadataTopicName)
val InternalTopics = Set(CommonDefs.GROUP_METADATA_TOPIC_NAME)

def validate(topic: String) {
if (topic.length <= 0)
Expand Down
3 changes: 0 additions & 3 deletions core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -725,9 +725,6 @@ object GroupCoordinator {
val EmptyGroup = GroupSummary(NoState, NoProtocolType, NoProtocol, NoMembers)
val DeadGroup = GroupSummary(Dead.toString, NoProtocolType, NoProtocol, NoMembers)

// TODO: we store both group metadata and offset data here despite the topic name being offsets only
val GroupMetadataTopicName = "__consumer_offsets"

def apply(config: KafkaConfig,
zkUtils: ZkUtils,
replicaManager: ReplicaManager,
Expand Down
30 changes: 14 additions & 16 deletions core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package kafka.coordinator

import java.util.concurrent.locks.ReentrantReadWriteLock

import kafka.utils.CoreUtils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
Expand All @@ -40,14 +39,13 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.common.TopicAndPartition
import kafka.common.MessageFormatter
import kafka.server.ReplicaManager

import scala.collection._
import java.io.PrintStream
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.TimeUnit

import com.yammer.metrics.core.Gauge
import org.apache.kafka.common.CommonDefs

case class DelayedStore(messageSet: Map[TopicPartition, MessageSet],
callback: Map[TopicPartition, PartitionResponse] => Unit)
Expand Down Expand Up @@ -147,9 +145,9 @@ class GroupMetadataManager(val brokerId: Int,
val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId),
timestamp = timestamp, magicValue = magicValue)

val partitionOpt = replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, groupPartition)
val partitionOpt = replicaManager.getPartition(CommonDefs.GROUP_METADATA_TOPIC_NAME, groupPartition)
partitionOpt.foreach { partition =>
val appendPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, groupPartition)
val appendPartition = TopicAndPartition(CommonDefs.GROUP_METADATA_TOPIC_NAME, groupPartition)

trace("Marking group %s as deleted.".format(group.groupId))

Expand Down Expand Up @@ -177,7 +175,7 @@ class GroupMetadataManager(val brokerId: Int,
timestamp = timestamp,
magicValue = magicValue)

val groupMetadataPartition = new TopicPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(group.groupId))
val groupMetadataPartition = new TopicPartition(CommonDefs.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))

val groupMetadataMessageSet = Map(groupMetadataPartition ->
new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, message))
Expand Down Expand Up @@ -263,7 +261,7 @@ class GroupMetadataManager(val brokerId: Int,
)
}.toSeq

val offsetTopicPartition = new TopicPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(groupId))
val offsetTopicPartition = new TopicPartition(CommonDefs.GROUP_METADATA_TOPIC_NAME, partitionFor(groupId))

val offsetsAndMetadataMessageSet = Map(offsetTopicPartition ->
new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))
Expand Down Expand Up @@ -351,7 +349,7 @@ class GroupMetadataManager(val brokerId: Int,
*/
def loadGroupsForPartition(offsetsPartition: Int,
onGroupLoaded: GroupMetadata => Unit) {
val topicPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)
val topicPartition = TopicAndPartition(CommonDefs.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
scheduler.schedule(topicPartition.toString, loadGroupsAndOffsets)

def loadGroupsAndOffsets() {
Expand Down Expand Up @@ -470,7 +468,7 @@ class GroupMetadataManager(val brokerId: Int,
*/
def removeGroupsForPartition(offsetsPartition: Int,
onGroupUnloaded: GroupMetadata => Unit) {
val topicPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)
val topicPartition = TopicAndPartition(CommonDefs.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
scheduler.schedule(topicPartition.toString, removeGroupsAndOffsets)

def removeGroupsAndOffsets() {
Expand Down Expand Up @@ -507,10 +505,10 @@ class GroupMetadataManager(val brokerId: Int,
}

if (numOffsetsRemoved > 0) info("Removed %d cached offsets for %s on follower transition."
.format(numOffsetsRemoved, TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)))
.format(numOffsetsRemoved, TopicAndPartition(CommonDefs.GROUP_METADATA_TOPIC_NAME, offsetsPartition)))

if (numGroupsRemoved > 0) info("Removed %d cached groups for %s on follower transition."
.format(numGroupsRemoved, TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)))
.format(numGroupsRemoved, TopicAndPartition(CommonDefs.GROUP_METADATA_TOPIC_NAME, offsetsPartition)))
}
}

Expand Down Expand Up @@ -566,9 +564,9 @@ class GroupMetadataManager(val brokerId: Int,
// Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say,
// if we crash or leaders move) since the new leaders will get rid of expired offsets during their own purge cycles.
tombstonesForPartition.flatMap { case (offsetsPartition, tombstones) =>
val partitionOpt = replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)
val partitionOpt = replicaManager.getPartition(CommonDefs.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
partitionOpt.map { partition =>
val appendPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)
val appendPartition = TopicAndPartition(CommonDefs.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
val messages = tombstones.map(_._2).toSeq

trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition))
Expand All @@ -593,7 +591,7 @@ class GroupMetadataManager(val brokerId: Int,
}

private def getHighWatermark(partitionId: Int): Long = {
val partitionOpt = replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, partitionId)
val partitionOpt = replicaManager.getPartition(CommonDefs.GROUP_METADATA_TOPIC_NAME, partitionId)

val hw = partitionOpt.map { partition =>
partition.leaderReplicaIfLocal().map(_.highWatermark.messageOffset).getOrElse(-1L)
Expand Down Expand Up @@ -621,7 +619,7 @@ class GroupMetadataManager(val brokerId: Int,
* If the topic does not exist, the configured partition count is returned.
*/
private def getOffsetsTopicPartitionCount = {
val topic = GroupCoordinator.GroupMetadataTopicName
val topic = CommonDefs.GROUP_METADATA_TOPIC_NAME
val topicData = zkUtils.getPartitionAssignmentForTopics(Seq(topic))
if (topicData(topic).nonEmpty)
topicData(topic).size
Expand All @@ -630,7 +628,7 @@ class GroupMetadataManager(val brokerId: Int,
}

private def getMessageFormatVersionAndTimestamp(partition: Int): (Byte, Long) = {
val groupMetadataTopicAndPartition = new TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, partition)
val groupMetadataTopicAndPartition = new TopicAndPartition(CommonDefs.GROUP_METADATA_TOPIC_NAME, partition)
val messageFormatVersion = replicaManager.getMessageFormatVersion(groupMetadataTopicAndPartition).getOrElse {
throw new IllegalArgumentException(s"Message format version for partition $groupMetadataTopicPartitionCount not found")
}
Expand Down
13 changes: 6 additions & 7 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package kafka.server

import java.nio.ByteBuffer
import java.lang.{Long => JLong, Short => JShort}

import kafka.admin.AdminUtils
import kafka.api._
import kafka.cluster.Partition
Expand All @@ -44,9 +43,9 @@ OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchRespon
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{TopicPartition, Node}

import scala.collection._
import scala.collection.JavaConverters._
import org.apache.kafka.common.CommonDefs

/**
* Logic to handle the various Kafka requests
Expand Down Expand Up @@ -128,11 +127,11 @@ class KafkaApis(val requestChannel: RequestChannel,
// this callback is invoked under the replica state change lock to ensure proper order of
// leadership changes
updatedLeaders.foreach { partition =>
if (partition.topic == GroupCoordinator.GroupMetadataTopicName)
if (partition.topic == CommonDefs.GROUP_METADATA_TOPIC_NAME)
coordinator.handleGroupImmigration(partition.partitionId)
}
updatedFollowers.foreach { partition =>
if (partition.topic == GroupCoordinator.GroupMetadataTopicName)
if (partition.topic == CommonDefs.GROUP_METADATA_TOPIC_NAME)
coordinator.handleGroupEmigration(partition.partitionId)
}
}
Expand Down Expand Up @@ -623,9 +622,9 @@ class KafkaApis(val requestChannel: RequestChannel,
if (topics.size > 0 && topicResponses.size != topics.size) {
val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet
val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
if (topic == GroupCoordinator.GroupMetadataTopicName || config.autoCreateTopicsEnable) {
if (topic == CommonDefs.GROUP_METADATA_TOPIC_NAME || config.autoCreateTopicsEnable) {
try {
if (topic == GroupCoordinator.GroupMetadataTopicName) {
if (topic == CommonDefs.GROUP_METADATA_TOPIC_NAME) {
val aliveBrokers = metadataCache.getAliveBrokers
val offsetsTopicReplicationFactor =
if (aliveBrokers.length > 0)
Expand Down Expand Up @@ -769,7 +768,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId)

// get metadata (and create the topic if necessary)
val offsetsTopicMetadata = getTopicMetadata(Set(GroupCoordinator.GroupMetadataTopicName), request.securityProtocol).head
val offsetsTopicMetadata = getTopicMetadata(Set(CommonDefs.GROUP_METADATA_TOPIC_NAME), request.securityProtocol).head
val coordinatorEndpoint = offsetsTopicMetadata.partitionsMetadata.find(_.partitionId == partition).flatMap {
partitionMetadata => partitionMetadata.leader
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import java.net.Socket
import java.nio.ByteBuffer
import java.util.concurrent.ExecutionException
import java.util.{ArrayList, Collections, Properties}

import kafka.cluster.EndPoint
import kafka.common.TopicAndPartition
import kafka.coordinator.GroupCoordinator
Expand All @@ -34,10 +33,10 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.{BrokerEndPoint, TopicPartition, requests}
import org.junit.Assert._
import org.junit.{After, Assert, Before, Test}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.Buffer
import org.apache.kafka.common.CommonDefs

class AuthorizerIntegrationTest extends KafkaServerTestHarness {
val topic = "topic"
Expand Down Expand Up @@ -143,7 +142,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
consumers += TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT)

// create the consumer offset topic
TestUtils.createTopic(zkUtils, GroupCoordinator.GroupMetadataTopicName,
TestUtils.createTopic(zkUtils, CommonDefs.GROUP_METADATA_TOPIC_NAME,
1,
1,
servers,
Expand Down